Home | History | Annotate | Download | only in gslib
      1 # Copyright 2014 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 """Helper class for streaming resumable uploads."""
     15 
     16 import collections
     17 import os
     18 
     19 from gslib.exception import CommandException
     20 from gslib.util import GetJsonResumableChunkSize
     21 
     22 
     23 class ResumableStreamingJsonUploadWrapper(object):
     24   """Wraps an input stream in a buffer for resumable uploads.
     25 
     26   This class takes a non-seekable input stream, buffers it, and exposes it
     27   as a stream with limited seek capabilities such that it can be used in a
     28   resumable JSON API upload.
     29 
     30   max_buffer_size bytes of buffering is supported.
     31   """
     32 
     33   def __init__(self, stream, max_buffer_size, test_small_buffer=False):
     34     """Initializes the wrapper.
     35 
     36     Args:
     37       stream: Input stream.
     38       max_buffer_size: Maximum size of internal buffer; should be >= the chunk
     39           size of the resumable upload API to ensure that at least one full
     40           chunk write can be replayed in the event of a server error.
     41       test_small_buffer: Skip check for buffer size vs. chunk size, for testing.
     42     """
     43     self._orig_fp = stream
     44 
     45     if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize():
     46       raise CommandException('Resumable streaming upload created with buffer '
     47                              'size %s, JSON resumable upload chunk size %s. '
     48                              'Buffer size must be >= JSON resumable upload '
     49                              'chunk size to ensure that uploads can be '
     50                              'resumed.' % (max_buffer_size,
     51                                            GetJsonResumableChunkSize()))
     52 
     53     self._max_buffer_size = max_buffer_size
     54     self._buffer = collections.deque()
     55     self._buffer_start = 0
     56     self._buffer_end = 0
     57     self._position = 0
     58 
     59   def read(self, size=-1):  # pylint: disable=invalid-name
     60     """"Reads from the wrapped stream.
     61 
     62     Args:
     63       size: The amount of bytes to read. If omitted or negative, the entire
     64           contents of the stream will be read and returned.
     65 
     66     Returns:
     67       Bytes from the wrapped stream.
     68     """
     69     read_all_bytes = size is None or size < 0
     70     if read_all_bytes:
     71       bytes_remaining = self._max_buffer_size
     72     else:
     73       bytes_remaining = size
     74     data = b''
     75     buffered_data = []
     76     if self._position < self._buffer_end:
     77       # There was a backwards seek, so read from the buffer first.
     78 
     79       # TODO: Performance test to validate if it is worth re-aligning
     80       # the buffers in this case.  Also, seeking through the buffer for
     81       # each read on a long catch-up is probably not performant, but we'd
     82       # need a more complex data structure than a deque to get around this.
     83       pos_in_buffer = self._buffer_start
     84       buffer_index = 0
     85       # First, find the start position in the buffer.
     86       while pos_in_buffer + len(self._buffer[buffer_index]) < self._position:
     87         # When this loop exits, buffer_index will refer to a buffer that
     88         # has at least some overlap with self._position, and
     89         # pos_in_buffer will be >= self._position
     90         pos_in_buffer += len(self._buffer[buffer_index])
     91         buffer_index += 1
     92 
     93       # Read until we've read enough or we're out of buffer.
     94       while pos_in_buffer < self._buffer_end and bytes_remaining > 0:
     95         buffer_len = len(self._buffer[buffer_index])
     96         # This describes how far into the current buffer self._position is.
     97         offset_from_position = self._position - pos_in_buffer
     98         bytes_available_this_buffer = buffer_len - offset_from_position
     99         read_size = min(bytes_available_this_buffer, bytes_remaining)
    100         buffered_data.append(
    101             self._buffer[buffer_index]
    102             [offset_from_position:offset_from_position + read_size])
    103         bytes_remaining -= read_size
    104         pos_in_buffer += buffer_len
    105         buffer_index += 1
    106         self._position += read_size
    107 
    108     # At this point we're guaranteed that if there are any bytes left to read,
    109     # then self._position == self._buffer_end, and we can read from the
    110     # wrapped stream if needed.
    111     if read_all_bytes:
    112       # TODO: The user is requesting reading until the end of an
    113       # arbitrary length stream, which is bad we'll need to return data
    114       # with no size limits; if the stream is sufficiently long, we could run
    115       # out of memory. We could break this down into smaller reads and
    116       # buffer it as we go, but we're still left returning the data all at
    117       # once to the caller.  We could raise, but for now trust the caller to
    118       # be sane and have enough memory to hold the remaining stream contents.
    119       new_data = self._orig_fp.read(size)
    120       data_len = len(new_data)
    121       if not buffered_data:
    122         data = new_data
    123       else:
    124         buffered_data.append(new_data)
    125         data = b''.join(buffered_data)
    126       self._position += data_len
    127     elif bytes_remaining:
    128       new_data = self._orig_fp.read(bytes_remaining)
    129       if not buffered_data:
    130         data = new_data
    131       else:
    132         buffered_data.append(new_data)
    133         data = b''.join(buffered_data)
    134       data_len = len(new_data)
    135       if data_len:
    136         self._position += data_len
    137         self._buffer.append(new_data)
    138         self._buffer_end += data_len
    139         oldest_data = None
    140         while self._buffer_end - self._buffer_start > self._max_buffer_size:
    141           oldest_data = self._buffer.popleft()
    142           self._buffer_start += len(oldest_data)
    143         if oldest_data:
    144           refill_amount = self._max_buffer_size - (self._buffer_end -
    145                                                    self._buffer_start)
    146           if refill_amount:
    147             self._buffer.appendleft(oldest_data[-refill_amount:])
    148             self._buffer_start -= refill_amount
    149     else:
    150       data = b''.join(buffered_data) if buffered_data else b''
    151 
    152     return data
    153 
    154   def tell(self):  # pylint: disable=invalid-name
    155     """Returns the current stream position."""
    156     return self._position
    157 
    158   def seekable(self):  # pylint: disable=invalid-name
    159     """Returns true since limited seek support exists."""
    160     return True
    161 
    162   def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
    163     """Seeks on the buffered stream.
    164 
    165     Args:
    166       offset: The offset to seek to; must be within the buffer bounds.
    167       whence: Must be os.SEEK_SET.
    168 
    169     Raises:
    170       CommandException if an unsupported seek mode or position is used.
    171     """
    172     if whence == os.SEEK_SET:
    173       if offset < self._buffer_start or offset > self._buffer_end:
    174         raise CommandException('Unable to resume upload because of limited '
    175                                'buffering available for streaming uploads. '
    176                                'Offset %s was requested, but only data from '
    177                                '%s to %s is buffered.' %
    178                                (offset, self._buffer_start, self._buffer_end))
    179       # Move to a position within the buffer.
    180       self._position = offset
    181     elif whence == os.SEEK_END:
    182       if offset > self._max_buffer_size:
    183         raise CommandException('Invalid SEEK_END offset %s on streaming '
    184                                'upload. Only %s can be buffered.' %
    185                                (offset, self._max_buffer_size))
    186       # Read to the end and rely on buffering to handle the offset.
    187       while self.read(self._max_buffer_size):
    188         pass
    189       # Now we're at the end.
    190       self._position -= offset
    191     else:
    192       raise CommandException('Invalid seek mode on streaming upload. '
    193                              '(mode %s, offset %s)' % (whence, offset))
    194 
    195   def close(self):  # pylint: disable=invalid-name
    196     return self._orig_fp.close()
    197