1 # Copyright 2014 Google Inc. All Rights Reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 """Helper class for streaming resumable uploads.""" 15 16 import collections 17 import os 18 19 from gslib.exception import CommandException 20 from gslib.util import GetJsonResumableChunkSize 21 22 23 class ResumableStreamingJsonUploadWrapper(object): 24 """Wraps an input stream in a buffer for resumable uploads. 25 26 This class takes a non-seekable input stream, buffers it, and exposes it 27 as a stream with limited seek capabilities such that it can be used in a 28 resumable JSON API upload. 29 30 max_buffer_size bytes of buffering is supported. 31 """ 32 33 def __init__(self, stream, max_buffer_size, test_small_buffer=False): 34 """Initializes the wrapper. 35 36 Args: 37 stream: Input stream. 38 max_buffer_size: Maximum size of internal buffer; should be >= the chunk 39 size of the resumable upload API to ensure that at least one full 40 chunk write can be replayed in the event of a server error. 41 test_small_buffer: Skip check for buffer size vs. chunk size, for testing. 42 """ 43 self._orig_fp = stream 44 45 if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize(): 46 raise CommandException('Resumable streaming upload created with buffer ' 47 'size %s, JSON resumable upload chunk size %s. ' 48 'Buffer size must be >= JSON resumable upload ' 49 'chunk size to ensure that uploads can be ' 50 'resumed.' % (max_buffer_size, 51 GetJsonResumableChunkSize())) 52 53 self._max_buffer_size = max_buffer_size 54 self._buffer = collections.deque() 55 self._buffer_start = 0 56 self._buffer_end = 0 57 self._position = 0 58 59 def read(self, size=-1): # pylint: disable=invalid-name 60 """"Reads from the wrapped stream. 61 62 Args: 63 size: The amount of bytes to read. If omitted or negative, the entire 64 contents of the stream will be read and returned. 65 66 Returns: 67 Bytes from the wrapped stream. 68 """ 69 read_all_bytes = size is None or size < 0 70 if read_all_bytes: 71 bytes_remaining = self._max_buffer_size 72 else: 73 bytes_remaining = size 74 data = b'' 75 buffered_data = [] 76 if self._position < self._buffer_end: 77 # There was a backwards seek, so read from the buffer first. 78 79 # TODO: Performance test to validate if it is worth re-aligning 80 # the buffers in this case. Also, seeking through the buffer for 81 # each read on a long catch-up is probably not performant, but we'd 82 # need a more complex data structure than a deque to get around this. 83 pos_in_buffer = self._buffer_start 84 buffer_index = 0 85 # First, find the start position in the buffer. 86 while pos_in_buffer + len(self._buffer[buffer_index]) < self._position: 87 # When this loop exits, buffer_index will refer to a buffer that 88 # has at least some overlap with self._position, and 89 # pos_in_buffer will be >= self._position 90 pos_in_buffer += len(self._buffer[buffer_index]) 91 buffer_index += 1 92 93 # Read until we've read enough or we're out of buffer. 94 while pos_in_buffer < self._buffer_end and bytes_remaining > 0: 95 buffer_len = len(self._buffer[buffer_index]) 96 # This describes how far into the current buffer self._position is. 97 offset_from_position = self._position - pos_in_buffer 98 bytes_available_this_buffer = buffer_len - offset_from_position 99 read_size = min(bytes_available_this_buffer, bytes_remaining) 100 buffered_data.append( 101 self._buffer[buffer_index] 102 [offset_from_position:offset_from_position + read_size]) 103 bytes_remaining -= read_size 104 pos_in_buffer += buffer_len 105 buffer_index += 1 106 self._position += read_size 107 108 # At this point we're guaranteed that if there are any bytes left to read, 109 # then self._position == self._buffer_end, and we can read from the 110 # wrapped stream if needed. 111 if read_all_bytes: 112 # TODO: The user is requesting reading until the end of an 113 # arbitrary length stream, which is bad we'll need to return data 114 # with no size limits; if the stream is sufficiently long, we could run 115 # out of memory. We could break this down into smaller reads and 116 # buffer it as we go, but we're still left returning the data all at 117 # once to the caller. We could raise, but for now trust the caller to 118 # be sane and have enough memory to hold the remaining stream contents. 119 new_data = self._orig_fp.read(size) 120 data_len = len(new_data) 121 if not buffered_data: 122 data = new_data 123 else: 124 buffered_data.append(new_data) 125 data = b''.join(buffered_data) 126 self._position += data_len 127 elif bytes_remaining: 128 new_data = self._orig_fp.read(bytes_remaining) 129 if not buffered_data: 130 data = new_data 131 else: 132 buffered_data.append(new_data) 133 data = b''.join(buffered_data) 134 data_len = len(new_data) 135 if data_len: 136 self._position += data_len 137 self._buffer.append(new_data) 138 self._buffer_end += data_len 139 oldest_data = None 140 while self._buffer_end - self._buffer_start > self._max_buffer_size: 141 oldest_data = self._buffer.popleft() 142 self._buffer_start += len(oldest_data) 143 if oldest_data: 144 refill_amount = self._max_buffer_size - (self._buffer_end - 145 self._buffer_start) 146 if refill_amount: 147 self._buffer.appendleft(oldest_data[-refill_amount:]) 148 self._buffer_start -= refill_amount 149 else: 150 data = b''.join(buffered_data) if buffered_data else b'' 151 152 return data 153 154 def tell(self): # pylint: disable=invalid-name 155 """Returns the current stream position.""" 156 return self._position 157 158 def seekable(self): # pylint: disable=invalid-name 159 """Returns true since limited seek support exists.""" 160 return True 161 162 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name 163 """Seeks on the buffered stream. 164 165 Args: 166 offset: The offset to seek to; must be within the buffer bounds. 167 whence: Must be os.SEEK_SET. 168 169 Raises: 170 CommandException if an unsupported seek mode or position is used. 171 """ 172 if whence == os.SEEK_SET: 173 if offset < self._buffer_start or offset > self._buffer_end: 174 raise CommandException('Unable to resume upload because of limited ' 175 'buffering available for streaming uploads. ' 176 'Offset %s was requested, but only data from ' 177 '%s to %s is buffered.' % 178 (offset, self._buffer_start, self._buffer_end)) 179 # Move to a position within the buffer. 180 self._position = offset 181 elif whence == os.SEEK_END: 182 if offset > self._max_buffer_size: 183 raise CommandException('Invalid SEEK_END offset %s on streaming ' 184 'upload. Only %s can be buffered.' % 185 (offset, self._max_buffer_size)) 186 # Read to the end and rely on buffering to handle the offset. 187 while self.read(self._max_buffer_size): 188 pass 189 # Now we're at the end. 190 self._position -= offset 191 else: 192 raise CommandException('Invalid seek mode on streaming upload. ' 193 '(mode %s, offset %s)' % (whence, offset)) 194 195 def close(self): # pylint: disable=invalid-name 196 return self._orig_fp.close() 197