Home | History | Annotate | Download | only in tests
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2014 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Unit tests for resumable streaming upload functions and classes."""
     16 
     17 from __future__ import absolute_import
     18 
     19 from hashlib import md5
     20 import os
     21 import pkgutil
     22 
     23 from gslib.exception import CommandException
     24 from gslib.hashing_helper import CalculateHashesFromContents
     25 from gslib.hashing_helper import CalculateMd5FromContents
     26 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
     27 import gslib.tests.testcase as testcase
     28 from gslib.util import GetJsonResumableChunkSize
     29 from gslib.util import TRANSFER_BUFFER_SIZE
     30 
     31 
     32 _TEST_FILE = 'test.txt'
     33 
     34 
     35 class TestResumableStreamingJsonUploadWrapper(testcase.GsUtilUnitTestCase):
     36   """Unit tests for the TestResumableStreamingJsonUploadWrapper class."""
     37 
     38   _temp_test_file = None
     39   _temp_test_file_contents = None
     40   _temp_test_file_len = None
     41 
     42   def _GetTestFile(self):
     43     if not self._temp_test_file:
     44       self._temp_test_file_contents = pkgutil.get_data(
     45           'gslib', 'tests/test_data/%s' % _TEST_FILE)
     46       self._temp_test_file = self.CreateTempFile(
     47           file_name=_TEST_FILE, contents=self._temp_test_file_contents)
     48       self._temp_test_file_len = len(self._temp_test_file_contents)
     49     return self._temp_test_file
     50 
     51   def testReadInChunks(self):
     52     tmp_file = self._GetTestFile()
     53     with open(tmp_file, 'rb') as stream:
     54       wrapper = ResumableStreamingJsonUploadWrapper(
     55           stream, TRANSFER_BUFFER_SIZE, test_small_buffer=True)
     56       hash_dict = {'md5': md5()}
     57       # CalculateHashesFromContents reads in chunks, but does not seek.
     58       CalculateHashesFromContents(wrapper, hash_dict)
     59     with open(tmp_file, 'rb') as stream:
     60       actual = CalculateMd5FromContents(stream)
     61     self.assertEqual(actual, hash_dict['md5'].hexdigest())
     62 
     63   def testReadInChunksWithSeekToBeginning(self):
     64     """Reads one buffer, then seeks to 0 and reads chunks until the end."""
     65     tmp_file = self._GetTestFile()
     66     for initial_read in (TRANSFER_BUFFER_SIZE - 1,
     67                          TRANSFER_BUFFER_SIZE,
     68                          TRANSFER_BUFFER_SIZE + 1,
     69                          TRANSFER_BUFFER_SIZE * 2 - 1,
     70                          TRANSFER_BUFFER_SIZE * 2,
     71                          TRANSFER_BUFFER_SIZE * 2 + 1,
     72                          TRANSFER_BUFFER_SIZE * 3 - 1,
     73                          TRANSFER_BUFFER_SIZE * 3,
     74                          TRANSFER_BUFFER_SIZE * 3 + 1):
     75       for buffer_size in (TRANSFER_BUFFER_SIZE - 1,
     76                           TRANSFER_BUFFER_SIZE,
     77                           TRANSFER_BUFFER_SIZE + 1,
     78                           self._temp_test_file_len - 1,
     79                           self._temp_test_file_len,
     80                           self._temp_test_file_len + 1):
     81         # Can't seek to 0 if the buffer is too small, so we expect an
     82         # exception.
     83         expect_exception = buffer_size < self._temp_test_file_len
     84         with open(tmp_file, 'rb') as stream:
     85           wrapper = ResumableStreamingJsonUploadWrapper(
     86               stream, buffer_size, test_small_buffer=True)
     87           wrapper.read(initial_read)
     88           # CalculateMd5FromContents seeks to 0, reads in chunks, then seeks
     89           # to 0 again.
     90           try:
     91             hex_digest = CalculateMd5FromContents(wrapper)
     92             if expect_exception:
     93               self.fail('Did not get expected CommandException for '
     94                         'initial read size %s, buffer size %s' %
     95                         (initial_read, buffer_size))
     96           except CommandException, e:
     97             if not expect_exception:
     98               self.fail('Got unexpected CommandException "%s" for '
     99                         'initial read size %s, buffer size %s' %
    100                         (str(e), initial_read, buffer_size))
    101         if not expect_exception:
    102           with open(tmp_file, 'rb') as stream:
    103             actual = CalculateMd5FromContents(stream)
    104           self.assertEqual(
    105               actual, hex_digest,
    106               'Digests not equal for initial read size %s, buffer size %s' %
    107               (initial_read, buffer_size))
    108 
    109   def _testSeekBack(self, initial_reads, buffer_size, seek_back_amount):
    110     """Tests reading then seeking backwards.
    111 
    112     This function simulates an upload that is resumed after a connection break.
    113     It reads one transfer buffer at a time until it reaches initial_position,
    114     then seeks backwards (as if the server did not receive some of the bytes)
    115     and reads to the end of the file, ensuring the data read after the seek
    116     matches the original file.
    117 
    118     Args:
    119       initial_reads: List of integers containing read sizes to perform
    120           before seek.
    121       buffer_size: Maximum buffer size for the wrapper.
    122       seek_back_amount: Number of bytes to seek backward.
    123 
    124     Raises:
    125       AssertionError on wrong data returned by the wrapper.
    126     """
    127     tmp_file = self._GetTestFile()
    128     initial_position = 0
    129     for read_size in initial_reads:
    130       initial_position += read_size
    131     self.assertGreaterEqual(
    132         buffer_size, seek_back_amount,
    133         'seek_back_amount must be less than initial position %s '
    134         '(but was actually: %s)' % (buffer_size, seek_back_amount))
    135     self.assertLess(
    136         initial_position, self._temp_test_file_len,
    137         'initial_position must be less than test file size %s '
    138         '(but was actually: %s)' % (self._temp_test_file_len, initial_position))
    139 
    140     with open(tmp_file, 'rb') as stream:
    141       wrapper = ResumableStreamingJsonUploadWrapper(
    142           stream, buffer_size, test_small_buffer=True)
    143       position = 0
    144       for read_size in initial_reads:
    145         data = wrapper.read(read_size)
    146         self.assertEqual(
    147             self._temp_test_file_contents[position:position + read_size],
    148             data, 'Data from position %s to %s did not match file contents.' %
    149             (position, position + read_size))
    150         position += len(data)
    151       wrapper.seek(initial_position - seek_back_amount)
    152       self.assertEqual(wrapper.tell(),
    153                        initial_position - seek_back_amount)
    154       data = wrapper.read()
    155       self.assertEqual(
    156           self._temp_test_file_len - (initial_position - seek_back_amount),
    157           len(data),
    158           'Unexpected data length with initial pos %s seek_back_amount %s. '
    159           'Expected: %s, actual: %s.' %
    160           (initial_position, seek_back_amount,
    161            self._temp_test_file_len - (initial_position - seek_back_amount),
    162            len(data)))
    163       self.assertEqual(
    164           self._temp_test_file_contents[-len(data):], data,
    165           'Data from position %s to EOF did not match file contents.' %
    166           position)
    167 
    168   def testReadSeekAndReadToEOF(self):
    169     """Tests performing reads on the wrapper, seeking, then reading to EOF."""
    170     for initial_reads in ([1],
    171                           [TRANSFER_BUFFER_SIZE - 1],
    172                           [TRANSFER_BUFFER_SIZE],
    173                           [TRANSFER_BUFFER_SIZE + 1],
    174                           [1, TRANSFER_BUFFER_SIZE - 1],
    175                           [1, TRANSFER_BUFFER_SIZE],
    176                           [1, TRANSFER_BUFFER_SIZE + 1],
    177                           [TRANSFER_BUFFER_SIZE - 1, 1],
    178                           [TRANSFER_BUFFER_SIZE, 1],
    179                           [TRANSFER_BUFFER_SIZE + 1, 1],
    180                           [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE - 1],
    181                           [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE],
    182                           [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE + 1],
    183                           [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE - 1],
    184                           [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE],
    185                           [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE + 1],
    186                           [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE - 1],
    187                           [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE],
    188                           [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE + 1],
    189                           [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE,
    190                            TRANSFER_BUFFER_SIZE]):
    191       initial_position = 0
    192       for read_size in initial_reads:
    193         initial_position += read_size
    194       for buffer_size in (initial_position,
    195                           initial_position + 1,
    196                           initial_position * 2 - 1,
    197                           initial_position * 2):
    198         for seek_back_amount in (
    199             min(TRANSFER_BUFFER_SIZE - 1, initial_position),
    200             min(TRANSFER_BUFFER_SIZE, initial_position),
    201             min(TRANSFER_BUFFER_SIZE + 1, initial_position),
    202             min(TRANSFER_BUFFER_SIZE * 2 - 1, initial_position),
    203             min(TRANSFER_BUFFER_SIZE * 2, initial_position),
    204             min(TRANSFER_BUFFER_SIZE * 2 + 1, initial_position)):
    205           self._testSeekBack(initial_reads, buffer_size, seek_back_amount)
    206 
    207   def testBufferSizeLessThanChunkSize(self):
    208     ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize())
    209     try:
    210       ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize() - 1)
    211       self.fail('Did not get expected CommandException')
    212     except CommandException, e:
    213       self.assertIn('Buffer size must be >= JSON resumable upload', str(e))
    214 
    215   def testSeekPartialBuffer(self):
    216     """Tests seeking back partially within the buffer."""
    217     tmp_file = self._GetTestFile()
    218     read_size = TRANSFER_BUFFER_SIZE
    219     with open(tmp_file, 'rb') as stream:
    220       wrapper = ResumableStreamingJsonUploadWrapper(
    221           stream, TRANSFER_BUFFER_SIZE * 3, test_small_buffer=True)
    222       position = 0
    223       for _ in xrange(3):
    224         data = wrapper.read(read_size)
    225         self.assertEqual(
    226             self._temp_test_file_contents[position:position + read_size],
    227             data, 'Data from position %s to %s did not match file contents.' %
    228             (position, position + read_size))
    229         position += len(data)
    230 
    231       data = wrapper.read(read_size / 2)
    232       # Buffer contents should now be have contents from:
    233       # read_size/2 through 7*read_size/2.
    234       position = read_size / 2
    235       wrapper.seek(position)
    236       data = wrapper.read()
    237       self.assertEqual(
    238           self._temp_test_file_contents[-len(data):], data,
    239           'Data from position %s to EOF did not match file contents.' %
    240           position)
    241 
    242   def testSeekEnd(self):
    243     tmp_file = self._GetTestFile()
    244     for buffer_size in (TRANSFER_BUFFER_SIZE - 1,
    245                         TRANSFER_BUFFER_SIZE,
    246                         TRANSFER_BUFFER_SIZE + 1):
    247       for seek_back in (TRANSFER_BUFFER_SIZE - 1,
    248                         TRANSFER_BUFFER_SIZE,
    249                         TRANSFER_BUFFER_SIZE + 1):
    250         expect_exception = seek_back > buffer_size
    251         with open(tmp_file, 'rb') as stream:
    252           wrapper = ResumableStreamingJsonUploadWrapper(
    253               stream, buffer_size, test_small_buffer=True)
    254           # Read to the end.
    255           while wrapper.read(TRANSFER_BUFFER_SIZE):
    256             pass
    257           try:
    258             wrapper.seek(seek_back, whence=os.SEEK_END)
    259             if expect_exception:
    260               self.fail('Did not get expected CommandException for '
    261                         'seek_back size %s, buffer size %s' %
    262                         (seek_back, buffer_size))
    263           except CommandException, e:
    264             if not expect_exception:
    265               self.fail('Got unexpected CommandException "%s" for '
    266                         'seek_back size %s, buffer size %s' %
    267                         (str(e), seek_back, buffer_size))
    268