Home | History | Annotate | Download | only in web-page-replay
      1 #!/usr/bin/env python
      2 # Copyright 2011 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #      http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 
     16 import calendar
     17 import email.utils
     18 import httparchive
     19 import unittest
     20 
     21 
     22 def create_request(headers):
     23   return httparchive.ArchivedHttpRequest(
     24       'GET', 'www.test.com', '/', None, headers)
     25 
     26 def create_response(headers):
     27   return httparchive.ArchivedHttpResponse(
     28       11, 200, 'OK', headers, '')
     29 
     30 
     31 class HttpArchiveTest(unittest.TestCase):
     32 
     33   REQUEST_HEADERS = {}
     34   REQUEST = create_request(REQUEST_HEADERS)
     35 
     36   # Used for if-(un)modified-since checks
     37   DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT'
     38   DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT'
     39   DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT'
     40   DATE_INVALID = 'This is an invalid date!!'
     41 
     42   # etag values
     43   ETAG_VALID = 'etag'
     44   ETAG_INVALID = 'This is an invalid etag value!!'
     45 
     46   RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)]
     47   RESPONSE = create_response(RESPONSE_HEADERS)
     48 
     49   def setUp(self):
     50     self.archive = httparchive.HttpArchive()
     51     self.archive[self.REQUEST] = self.RESPONSE
     52 
     53     # Also add an identical POST request for testing
     54     request = httparchive.ArchivedHttpRequest(
     55         'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS)
     56     self.archive[request] = self.RESPONSE
     57 
     58   def tearDown(self):
     59     pass
     60 
     61   def test_init(self):
     62     archive = httparchive.HttpArchive()
     63     self.assertEqual(len(archive), 0)
     64 
     65   def test_request__TrimHeaders(self):
     66     request = httparchive.ArchivedHttpRequest
     67     header1 = {'accept-encoding': 'gzip,deflate'}
     68     self.assertEqual(request._TrimHeaders(header1),
     69                      [(k, v) for k, v in header1.items()])
     70 
     71     header2 = {'referer': 'www.google.com'}
     72     self.assertEqual(request._TrimHeaders(header2), [])
     73 
     74     header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!',
     75                'hello': 'world'}
     76     self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')])
     77 
     78     # Tests that spaces and trailing comma get stripped.
     79     header4 = {'accept-encoding': 'gzip, deflate,, '}
     80     self.assertEqual(request._TrimHeaders(header4),
     81                      [('accept-encoding', 'gzip,deflate')])
     82 
     83     # Tests that 'lzma' gets stripped.
     84     header5 = {'accept-encoding': 'gzip, deflate, lzma'}
     85     self.assertEqual(request._TrimHeaders(header5),
     86                      [('accept-encoding', 'gzip,deflate')])
     87 
     88     # Tests that x-client-data gets stripped.
     89     header6 = {'x-client-data': 'testdata'}
     90     self.assertEqual(request._TrimHeaders(header6), [])
     91 
     92   def test_matches(self):
     93     headers = {}
     94     request1 = httparchive.ArchivedHttpRequest(
     95         'GET', 'www.test.com', '/index.html?hello=world', None, headers)
     96     request2 = httparchive.ArchivedHttpRequest(
     97         'GET', 'www.test.com', '/index.html?foo=bar', None, headers)
     98 
     99     self.assert_(not request1.matches(
    100         request2.command, request2.host, request2.full_path, use_query=True))
    101     self.assert_(request1.matches(
    102         request2.command, request2.host, request2.full_path, use_query=False))
    103 
    104     self.assert_(request1.matches(
    105         request2.command, request2.host, None, use_query=True))
    106     self.assert_(request1.matches(
    107         request2.command, None, request2.full_path, use_query=False))
    108 
    109     empty_request = httparchive.ArchivedHttpRequest(
    110         None, None, None, None, headers)
    111     self.assert_(not empty_request.matches(
    112         request2.command, request2.host, None, use_query=True))
    113     self.assert_(not empty_request.matches(
    114         request2.command, None, request2.full_path, use_query=False))
    115 
    116   def setup_find_closest_request(self):
    117     headers = {}
    118     request1 = httparchive.ArchivedHttpRequest(
    119         'GET', 'www.test.com', '/a?hello=world', None, headers)
    120     request2 = httparchive.ArchivedHttpRequest(
    121         'GET', 'www.test.com', '/a?foo=bar', None, headers)
    122     request3 = httparchive.ArchivedHttpRequest(
    123         'GET', 'www.test.com', '/b?hello=world', None, headers)
    124     request4 = httparchive.ArchivedHttpRequest(
    125         'GET', 'www.test.com', '/c?hello=world', None, headers)
    126 
    127     archive = httparchive.HttpArchive()
    128     # Add requests 2 and 3 and find closest match with request1
    129     archive[request2] = self.RESPONSE
    130     archive[request3] = self.RESPONSE
    131 
    132     return archive, request1, request2, request3, request4
    133 
    134   def test_find_closest_request(self):
    135     archive, request1, request2, request3, request4 = (
    136       self.setup_find_closest_request())
    137 
    138     # Always favor requests with same paths, even if use_path=False.
    139     self.assertEqual(
    140         request2, archive.find_closest_request(request1, use_path=False))
    141     # If we match strictly on path, request2 is the only match
    142     self.assertEqual(
    143         request2, archive.find_closest_request(request1, use_path=True))
    144     # request4 can be matched with request3, if use_path=False
    145     self.assertEqual(
    146         request3, archive.find_closest_request(request4, use_path=False))
    147     # ...but None, if use_path=True
    148     self.assertEqual(
    149         None, archive.find_closest_request(request4, use_path=True))
    150 
    151   def test_find_closest_request_delete_simple(self):
    152     archive, request1, request2, request3, request4 = (
    153       self.setup_find_closest_request())
    154 
    155     del archive[request3]
    156     self.assertEqual(
    157         request2, archive.find_closest_request(request1, use_path=False))
    158     self.assertEqual(
    159         request2, archive.find_closest_request(request1, use_path=True))
    160 
    161   def test_find_closest_request_delete_complex(self):
    162     archive, request1, request2, request3, request4 = (
    163       self.setup_find_closest_request())
    164 
    165     del archive[request2]
    166     self.assertEqual(
    167         request3, archive.find_closest_request(request1, use_path=False))
    168     self.assertEqual(
    169         None, archive.find_closest_request(request1, use_path=True))
    170 
    171   def test_find_closest_request_timestamp(self):
    172     headers = {}
    173     request1 = httparchive.ArchivedHttpRequest(
    174         'GET', 'www.test.com', '/index.html?time=100000000&important=true',
    175         None, headers)
    176     request2 = httparchive.ArchivedHttpRequest(
    177         'GET', 'www.test.com', '/index.html?time=99999999&important=true',
    178         None, headers)
    179     request3 = httparchive.ArchivedHttpRequest(
    180         'GET', 'www.test.com', '/index.html?time=10000000&important=false',
    181         None, headers)
    182     archive = httparchive.HttpArchive()
    183     # Add requests 2 and 3 and find closest match with request1
    184     archive[request2] = self.RESPONSE
    185     archive[request3] = self.RESPONSE
    186 
    187     # Although request3 is lexicographically closer, request2 is semantically
    188     # more similar.
    189     self.assertEqual(
    190         request2, archive.find_closest_request(request1, use_path=True))
    191 
    192   def test_get_cmp_seq(self):
    193     # The order of key-value pairs in query and header respectively should not
    194     # matter.
    195     headers = {'k2': 'v2', 'k1': 'v1'}
    196     request = httparchive.ArchivedHttpRequest(
    197         'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers)
    198     self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'),
    199                       ('k1', 'v1'), ('k2', 'v2')],
    200                      request._GetCmpSeq('c=d&a=b;e=f'))
    201 
    202   def test_get_simple(self):
    203     request = self.REQUEST
    204     response = self.RESPONSE
    205     archive = self.archive
    206 
    207     self.assertEqual(archive.get(request), response)
    208 
    209     false_request_headers = {'foo': 'bar'}
    210     false_request = create_request(false_request_headers)
    211     self.assertEqual(archive.get(false_request, default=None), None)
    212 
    213   def test_get_modified_headers(self):
    214     request = self.REQUEST
    215     response = self.RESPONSE
    216     archive = self.archive
    217     not_modified_response = httparchive.create_response(304)
    218 
    219     # Fail check and return response again
    220     request_headers = {'if-modified-since': self.DATE_PAST}
    221     request = create_request(request_headers)
    222     self.assertEqual(archive.get(request), response)
    223 
    224     # Succeed check and return 304 Not Modified
    225     request_headers = {'if-modified-since': self.DATE_FUTURE}
    226     request = create_request(request_headers)
    227     self.assertEqual(archive.get(request), not_modified_response)
    228 
    229     # Succeed check and return 304 Not Modified
    230     request_headers = {'if-modified-since': self.DATE_PRESENT}
    231     request = create_request(request_headers)
    232     self.assertEqual(archive.get(request), not_modified_response)
    233 
    234     # Invalid date, fail check and return response again
    235     request_headers = {'if-modified-since': self.DATE_INVALID}
    236     request = create_request(request_headers)
    237     self.assertEqual(archive.get(request), response)
    238 
    239     # fail check since the request is not a GET or HEAD request (as per RFC)
    240     request_headers = {'if-modified-since': self.DATE_FUTURE}
    241     request = httparchive.ArchivedHttpRequest(
    242         'POST', 'www.test.com', '/', None, request_headers)
    243     self.assertEqual(archive.get(request), response)
    244 
    245   def test_get_unmodified_headers(self):
    246     request = self.REQUEST
    247     response = self.RESPONSE
    248     archive = self.archive
    249     not_modified_response = httparchive.create_response(304)
    250 
    251     # Succeed check
    252     request_headers = {'if-unmodified-since': self.DATE_PAST}
    253     request = create_request(request_headers)
    254     self.assertEqual(archive.get(request), not_modified_response)
    255 
    256     # Fail check
    257     request_headers = {'if-unmodified-since': self.DATE_FUTURE}
    258     request = create_request(request_headers)
    259     self.assertEqual(archive.get(request), response)
    260 
    261     # Succeed check
    262     request_headers = {'if-unmodified-since': self.DATE_PRESENT}
    263     request = create_request(request_headers)
    264     self.assertEqual(archive.get(request), not_modified_response)
    265 
    266     # Fail check
    267     request_headers = {'if-unmodified-since': self.DATE_INVALID}
    268     request = create_request(request_headers)
    269     self.assertEqual(archive.get(request), response)
    270 
    271     # Fail check since the request is not a GET or HEAD request (as per RFC)
    272     request_headers = {'if-modified-since': self.DATE_PAST}
    273     request = httparchive.ArchivedHttpRequest(
    274         'POST', 'www.test.com', '/', None, request_headers)
    275     self.assertEqual(archive.get(request), response)
    276 
    277   def test_get_etags(self):
    278     request = self.REQUEST
    279     response = self.RESPONSE
    280     archive = self.archive
    281     not_modified_response = httparchive.create_response(304)
    282     precondition_failed_response = httparchive.create_response(412)
    283 
    284     # if-match headers
    285     request_headers = {'if-match': self.ETAG_VALID}
    286     request = create_request(request_headers)
    287     self.assertEqual(archive.get(request), response)
    288 
    289     request_headers = {'if-match': self.ETAG_INVALID}
    290     request = create_request(request_headers)
    291     self.assertEqual(archive.get(request), precondition_failed_response)
    292 
    293     # if-none-match headers
    294     request_headers = {'if-none-match': self.ETAG_VALID}
    295     request = create_request(request_headers)
    296     self.assertEqual(archive.get(request), not_modified_response)
    297 
    298     request_headers = {'if-none-match': self.ETAG_INVALID}
    299     request = create_request(request_headers)
    300     self.assertEqual(archive.get(request), response)
    301 
    302   def test_get_multiple_match_headers(self):
    303     request = self.REQUEST
    304     response = self.RESPONSE
    305     archive = self.archive
    306     not_modified_response = httparchive.create_response(304)
    307     precondition_failed_response = httparchive.create_response(412)
    308 
    309     # if-match headers
    310     # If the request would, without the If-Match header field,
    311     # result in anything other than a 2xx or 412 status,
    312     # then the If-Match header MUST be ignored.
    313 
    314     request_headers = {
    315         'if-match': self.ETAG_VALID,
    316         'if-modified-since': self.DATE_PAST,
    317     }
    318     request = create_request(request_headers)
    319     self.assertEqual(archive.get(request), response)
    320 
    321     # Invalid etag, precondition failed
    322     request_headers = {
    323         'if-match': self.ETAG_INVALID,
    324         'if-modified-since': self.DATE_PAST,
    325     }
    326     request = create_request(request_headers)
    327     self.assertEqual(archive.get(request), precondition_failed_response)
    328 
    329     # 304 response; ignore if-match header
    330     request_headers = {
    331         'if-match': self.ETAG_VALID,
    332         'if-modified-since': self.DATE_FUTURE,
    333     }
    334     request = create_request(request_headers)
    335     self.assertEqual(archive.get(request), not_modified_response)
    336 
    337     # 304 response; ignore if-match header
    338     request_headers = {
    339         'if-match': self.ETAG_INVALID,
    340         'if-modified-since': self.DATE_PRESENT,
    341     }
    342     request = create_request(request_headers)
    343     self.assertEqual(archive.get(request), not_modified_response)
    344 
    345     # Invalid etag, precondition failed
    346     request_headers = {
    347         'if-match': self.ETAG_INVALID,
    348         'if-modified-since': self.DATE_INVALID,
    349     }
    350     request = create_request(request_headers)
    351     self.assertEqual(archive.get(request), precondition_failed_response)
    352 
    353   def test_get_multiple_none_match_headers(self):
    354     request = self.REQUEST
    355     response = self.RESPONSE
    356     archive = self.archive
    357     not_modified_response = httparchive.create_response(304)
    358     precondition_failed_response = httparchive.create_response(412)
    359 
    360     # if-none-match headers
    361     # If the request would, without the If-None-Match header field,
    362     # result in anything other than a 2xx or 304 status,
    363     # then the If-None-Match header MUST be ignored.
    364 
    365     request_headers = {
    366         'if-none-match': self.ETAG_VALID,
    367         'if-modified-since': self.DATE_PAST,
    368     }
    369     request = create_request(request_headers)
    370     self.assertEqual(archive.get(request), response)
    371 
    372     request_headers = {
    373         'if-none-match': self.ETAG_INVALID,
    374         'if-modified-since': self.DATE_PAST,
    375     }
    376     request = create_request(request_headers)
    377     self.assertEqual(archive.get(request), response)
    378 
    379     # etag match, precondition failed
    380     request_headers = {
    381         'if-none-match': self.ETAG_VALID,
    382         'if-modified-since': self.DATE_FUTURE,
    383     }
    384     request = create_request(request_headers)
    385     self.assertEqual(archive.get(request), not_modified_response)
    386 
    387     request_headers = {
    388         'if-none-match': self.ETAG_INVALID,
    389         'if-modified-since': self.DATE_PRESENT,
    390     }
    391     request = create_request(request_headers)
    392     self.assertEqual(archive.get(request), not_modified_response)
    393 
    394     request_headers = {
    395         'if-none-match': self.ETAG_INVALID,
    396         'if-modified-since': self.DATE_INVALID,
    397     }
    398     request = create_request(request_headers)
    399     self.assertEqual(archive.get(request), response)
    400 
    401   def test_response__TrimHeaders(self):
    402     response = httparchive.ArchivedHttpResponse
    403     header1 = [('access-control-allow-origin', '*'),
    404                ('content-type', 'image/jpeg'),
    405                ('content-length', 2878)]
    406     self.assertEqual(response._TrimHeaders(header1), header1)
    407 
    408     header2 = [('content-type', 'text/javascript; charset=utf-8'),
    409                ('connection', 'keep-alive'),
    410                ('cache-control', 'private, must-revalidate, max-age=0'),
    411                ('content-encoding', 'gzip')]
    412     self.assertEqual(response._TrimHeaders(header2), header2)
    413 
    414     header3 = [('content-security-policy', """\
    415 default-src 'self' http://*.cnn.com:* https://*.cnn.com:* \
    416 *.cnn.net:* *.turner.com:* *.ugdturner.com:* *.vgtf.net:*; \
    417 script-src 'unsafe-inline' 'unsafe-eval' 'self' *; \
    418 style-src 'unsafe-inline' 'self' *; frame-src 'self' *; \
    419 object-src 'self' *; img-src 'self' * data:; media-src 'self' *; \
    420 font-src 'self' *; connect-src 'self' *"""),
    421                ('access-control-allow-origin', '*'),
    422                ('content-type', 'text/html; charset=utf-8'),
    423                ('content-encoding', 'gzip')]
    424     self.assertEqual(response._TrimHeaders(header3), [
    425         ('access-control-allow-origin', '*'),
    426         ('content-type', 'text/html; charset=utf-8'),
    427         ('content-encoding', 'gzip')
    428     ])
    429 
    430     header4 = [('content-security-policy', """\
    431 default-src * data: blob:;script-src *.facebook.com *.fbcdn.net \
    432 *.facebook.net *.google-analytics.com *.virtualearth.net *.google.com \
    433 127.0.0.1:* *.spotilocal.com:* 'unsafe-inline' 'unsafe-eval' \
    434 fbstatic-a.akamaihd.net fbcdn-static-b-a.akamaihd.net *.atlassolutions.com \
    435 blob: chrome-extension://lifbcibllhkdhoafpjfnlhfpfgnpldfl \
    436 *.liverail.com;style-src * 'unsafe-inline' data:;connect-src *.facebook.com \
    437 *.fbcdn.net *.facebook.net *.spotilocal.com:* *.akamaihd.net \
    438 wss://*.facebook.com:* https://fb.scanandcleanlocal.com:* \
    439 *.atlassolutions.com attachment.fbsbx.com ws://localhost:* \
    440 blob: 127.0.0.1:* *.liverail.com""")]
    441     self.assertEqual(response._TrimHeaders(header4), [])
    442 
    443 
    444 class ArchivedHttpResponse(unittest.TestCase):
    445   PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT'
    446   PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT'  # PAST_DATE_A -1 hour
    447   PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT'  # PAST_DATE_A +1 hour
    448   NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT'
    449   NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT'  # NOW_DATE_A -1 hour
    450   NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT'  # NOW_DATE_A +1 hour
    451   NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A))
    452 
    453   def setUp(self):
    454     self.response = create_response([('date', self.PAST_DATE_A)])
    455 
    456   def test_update_date_same_date(self):
    457     self.assertEqual(
    458         self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS),
    459         self.NOW_DATE_A)
    460 
    461   def test_update_date_before_date(self):
    462     self.assertEqual(
    463         self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
    464         self.NOW_DATE_B)
    465 
    466   def test_update_date_after_date(self):
    467     self.assertEqual(
    468         self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS),
    469         self.NOW_DATE_C)
    470 
    471   def test_update_date_bad_date_param(self):
    472     self.assertEqual(
    473         self.response.update_date('garbage date', now=self.NOW_SECONDS),
    474         'garbage date')
    475 
    476   def test_update_date_bad_date_header(self):
    477     self.response.set_header('date', 'garbage date')
    478     self.assertEqual(
    479         self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
    480         self.PAST_DATE_B)
    481 
    482 
    483 if __name__ == '__main__':
    484   unittest.main()
    485