1 # -*- coding: utf-8 -*- 2 # Copyright 2010 Google Inc. All Rights Reserved. 3 # 4 # Permission is hereby granted, free of charge, to any person obtaining a 5 # copy of this software and associated documentation files (the 6 # "Software"), to deal in the Software without restriction, including 7 # without limitation the rights to use, copy, modify, merge, publish, dis- 8 # tribute, sublicense, and/or sell copies of the Software, and to permit 9 # persons to whom the Software is furnished to do so, subject to the fol- 10 # lowing conditions: 11 # 12 # The above copyright notice and this permission notice shall be included 13 # in all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21 # IN THE SOFTWARE. 22 """Boto translation layer for resumable uploads. 23 24 See http://code.google.com/apis/storage/docs/developer-guide.html#resumable 25 for details. 26 27 Resumable uploads will retry interrupted uploads, resuming at the byte 28 count completed by the last upload attempt. If too many retries happen with 29 no progress (per configurable num_retries param), the upload will be 30 aborted in the current process. 31 32 Unlike the boto implementation of resumable upload handler, this class does 33 not directly interact with tracker files. 34 35 Originally Google wrote and contributed this code to the boto project, 36 then copied that code back into gsutil on the release of gsutil 4.0 which 37 supports both boto and non-boto codepaths for resumable uploads. Any bug 38 fixes made to this file should also be integrated to resumable_upload_handler.py 39 in boto, where applicable. 40 41 TODO: gsutil-beta: Add a similar comment to the boto code. 42 """ 43 44 from __future__ import absolute_import 45 46 import errno 47 import httplib 48 import random 49 import re 50 import socket 51 import time 52 import urlparse 53 from boto import UserAgent 54 from boto.connection import AWSAuthConnection 55 from boto.exception import ResumableTransferDisposition 56 from boto.exception import ResumableUploadException 57 from gslib.exception import InvalidUrlError 58 from gslib.util import GetMaxRetryDelay 59 from gslib.util import GetNumRetries 60 from gslib.util import XML_PROGRESS_CALLBACKS 61 62 63 class BotoResumableUpload(object): 64 """Upload helper class for resumable uploads via boto.""" 65 66 BUFFER_SIZE = 8192 67 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, 68 socket.gaierror) 69 70 # (start, end) response indicating service has nothing (upload protocol uses 71 # inclusive numbering). 72 SERVICE_HAS_NOTHING = (0, -1) 73 74 def __init__(self, tracker_callback, logger, 75 resume_url=None, num_retries=None): 76 """Constructor. Instantiate once for each uploaded file. 77 78 Args: 79 tracker_callback: Callback function that takes a string argument. Used 80 by caller to track this upload across upload 81 interruption. 82 logger: logging.logger instance to use for debug messages. 83 resume_url: If present, attempt to resume the upload at this URL. 84 num_retries: Number of times to retry the upload making no progress. 85 This count resets every time we make progress, so the upload 86 can span many more than this number of retries. 87 """ 88 if resume_url: 89 self._SetUploadUrl(resume_url) 90 else: 91 self.upload_url = None 92 self.num_retries = num_retries 93 self.service_has_bytes = 0 # Byte count at last service check. 94 # Save upload_start_point in instance state so caller can find how 95 # much was transferred by this ResumableUploadHandler (across retries). 96 self.upload_start_point = None 97 self.tracker_callback = tracker_callback 98 self.logger = logger 99 100 def _SetUploadUrl(self, url): 101 """Saves URL and resets upload state. 102 103 Called when we start a new resumable upload or get a new tracker 104 URL for the upload. 105 106 Args: 107 url: URL string for the upload. 108 109 Raises InvalidUrlError if URL is syntactically invalid. 110 """ 111 parse_result = urlparse.urlparse(url) 112 if (parse_result.scheme.lower() not in ['http', 'https'] or 113 not parse_result.netloc): 114 raise InvalidUrlError('Invalid upload URL (%s)' % url) 115 self.upload_url = url 116 self.upload_url_host = parse_result.netloc 117 self.upload_url_path = '%s?%s' % ( 118 parse_result.path, parse_result.query) 119 self.service_has_bytes = 0 120 121 def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'): 122 return 'bytes %s/%s' % (range_spec, length_spec) 123 124 def _QueryServiceState(self, conn, file_length): 125 """Queries service to find out state of given upload. 126 127 Note that this method really just makes special case use of the 128 fact that the upload service always returns the current start/end 129 state whenever a PUT doesn't complete. 130 131 Args: 132 conn: HTTPConnection to use for the query. 133 file_length: Total length of the file. 134 135 Returns: 136 HTTP response from sending request. 137 138 Raises: 139 ResumableUploadException if problem querying service. 140 """ 141 # Send an empty PUT so that service replies with this resumable 142 # transfer's state. 143 put_headers = {} 144 put_headers['Content-Range'] = ( 145 self._BuildContentRangeHeader('*', file_length)) 146 put_headers['Content-Length'] = '0' 147 return AWSAuthConnection.make_request( 148 conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path, 149 headers=put_headers, host=self.upload_url_host) 150 151 def _QueryServicePos(self, conn, file_length): 152 """Queries service to find out what bytes it currently has. 153 154 Args: 155 conn: HTTPConnection to use for the query. 156 file_length: Total length of the file. 157 158 Returns: 159 (service_start, service_end), where the values are inclusive. 160 For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2. 161 162 Raises: 163 ResumableUploadException if problem querying service. 164 """ 165 resp = self._QueryServiceState(conn, file_length) 166 if resp.status == 200: 167 # To handle the boundary condition where the service has the complete 168 # file, we return (service_start, file_length-1). That way the 169 # calling code can always simply read up through service_end. (If we 170 # didn't handle this boundary condition here, the caller would have 171 # to check whether service_end == file_length and read one fewer byte 172 # in that case.) 173 return (0, file_length - 1) # Completed upload. 174 if resp.status != 308: 175 # This means the service didn't have any state for the given 176 # upload ID, which can happen (for example) if the caller saved 177 # the upload URL to a file and then tried to restart the transfer 178 # after that upload ID has gone stale. In that case we need to 179 # start a new transfer (and the caller will then save the new 180 # upload URL to the tracker file). 181 raise ResumableUploadException( 182 'Got non-308 response (%s) from service state query' % 183 resp.status, ResumableTransferDisposition.START_OVER) 184 got_valid_response = False 185 range_spec = resp.getheader('range') 186 if range_spec: 187 # Parse 'bytes=<from>-<to>' range_spec. 188 m = re.search(r'bytes=(\d+)-(\d+)', range_spec) 189 if m: 190 service_start = long(m.group(1)) 191 service_end = long(m.group(2)) 192 got_valid_response = True 193 else: 194 # No Range header, which means the service does not yet have 195 # any bytes. Note that the Range header uses inclusive 'from' 196 # and 'to' values. Since Range 0-0 would mean that the service 197 # has byte 0, omitting the Range header is used to indicate that 198 # the service doesn't have any bytes. 199 return self.SERVICE_HAS_NOTHING 200 if not got_valid_response: 201 raise ResumableUploadException( 202 'Couldn\'t parse upload service state query response (%s)' % 203 str(resp.getheaders()), ResumableTransferDisposition.START_OVER) 204 if conn.debug >= 1: 205 self.logger.debug('Service has: Range: %d - %d.', service_start, 206 service_end) 207 return (service_start, service_end) 208 209 def _StartNewResumableUpload(self, key, headers=None): 210 """Starts a new resumable upload. 211 212 Args: 213 key: Boto Key representing the object to upload. 214 headers: Headers to use in the upload requests. 215 216 Raises: 217 ResumableUploadException if any errors occur. 218 """ 219 conn = key.bucket.connection 220 if conn.debug >= 1: 221 self.logger.debug('Starting new resumable upload.') 222 self.service_has_bytes = 0 223 224 # Start a new resumable upload by sending a POST request with an 225 # empty body and the "X-Goog-Resumable: start" header. Include any 226 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length 227 # (and raise an exception if they tried to pass one, since it's 228 # a semantic error to specify it at this point, and if we were to 229 # include one now it would cause the service to expect that many 230 # bytes; the POST doesn't include the actual file bytes We set 231 # the Content-Length in the subsequent PUT, based on the uploaded 232 # file size. 233 post_headers = {} 234 for k in headers: 235 if k.lower() == 'content-length': 236 raise ResumableUploadException( 237 'Attempt to specify Content-Length header (disallowed)', 238 ResumableTransferDisposition.ABORT) 239 post_headers[k] = headers[k] 240 post_headers[conn.provider.resumable_upload_header] = 'start' 241 242 resp = conn.make_request( 243 'POST', key.bucket.name, key.name, post_headers) 244 # Get upload URL from response 'Location' header. 245 body = resp.read() 246 247 # Check for various status conditions. 248 if resp.status in [429, 500, 503]: 249 # Retry after a delay. 250 raise ResumableUploadException( 251 'Got status %d from attempt to start resumable upload. ' 252 'Will wait/retry' % resp.status, 253 ResumableTransferDisposition.WAIT_BEFORE_RETRY) 254 elif resp.status != 200 and resp.status != 201: 255 raise ResumableUploadException( 256 'Got status %d from attempt to start resumable upload. ' 257 'Aborting' % resp.status, 258 ResumableTransferDisposition.ABORT) 259 260 # Else we got 200 or 201 response code, indicating the resumable 261 # upload was created. 262 upload_url = resp.getheader('Location') 263 if not upload_url: 264 raise ResumableUploadException( 265 'No resumable upload URL found in resumable initiation ' 266 'POST response (%s)' % body, 267 ResumableTransferDisposition.WAIT_BEFORE_RETRY) 268 self._SetUploadUrl(upload_url) 269 self.tracker_callback(upload_url) 270 271 def _UploadFileBytes(self, conn, http_conn, fp, file_length, 272 total_bytes_uploaded, cb, num_cb, headers): 273 """Attempts to upload file bytes. 274 275 Makes a single attempt using an existing resumable upload connection. 276 277 Args: 278 conn: HTTPConnection from the boto Key. 279 http_conn: Separate HTTPConnection for the transfer. 280 fp: File pointer containing bytes to upload. 281 file_length: Total length of the file. 282 total_bytes_uploaded: The total number of bytes uploaded. 283 cb: Progress callback function that takes (progress, total_size). 284 num_cb: Granularity of the callback (maximum number of times the 285 callback will be called during the file transfer). If negative, 286 perform callback with each buffer read. 287 headers: Headers to be used in the upload requests. 288 289 Returns: 290 (etag, generation, metageneration) from service upon success. 291 292 Raises: 293 ResumableUploadException if any problems occur. 294 """ 295 buf = fp.read(self.BUFFER_SIZE) 296 if cb: 297 # The cb_count represents the number of full buffers to send between 298 # cb executions. 299 if num_cb > 2: 300 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2) 301 elif num_cb < 0: 302 cb_count = -1 303 else: 304 cb_count = 0 305 i = 0 306 cb(total_bytes_uploaded, file_length) 307 308 # Build resumable upload headers for the transfer. Don't send a 309 # Content-Range header if the file is 0 bytes long, because the 310 # resumable upload protocol uses an *inclusive* end-range (so, sending 311 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file). 312 put_headers = headers.copy() if headers else {} 313 if file_length: 314 if total_bytes_uploaded == file_length: 315 range_header = self._BuildContentRangeHeader( 316 '*', file_length) 317 else: 318 range_header = self._BuildContentRangeHeader( 319 '%d-%d' % (total_bytes_uploaded, file_length - 1), 320 file_length) 321 put_headers['Content-Range'] = range_header 322 # Set Content-Length to the total bytes we'll send with this PUT. 323 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded) 324 http_request = AWSAuthConnection.build_base_http_request( 325 conn, 'PUT', path=self.upload_url_path, auth_path=None, 326 headers=put_headers, host=self.upload_url_host) 327 http_conn.putrequest('PUT', http_request.path) 328 for k in put_headers: 329 http_conn.putheader(k, put_headers[k]) 330 http_conn.endheaders() 331 332 # Turn off debug on http connection so upload content isn't included 333 # in debug stream. 334 http_conn.set_debuglevel(0) 335 while buf: 336 http_conn.send(buf) 337 total_bytes_uploaded += len(buf) 338 if cb: 339 i += 1 340 if i == cb_count or cb_count == -1: 341 cb(total_bytes_uploaded, file_length) 342 i = 0 343 buf = fp.read(self.BUFFER_SIZE) 344 http_conn.set_debuglevel(conn.debug) 345 if cb: 346 cb(total_bytes_uploaded, file_length) 347 if total_bytes_uploaded != file_length: 348 # Abort (and delete the tracker file) so if the user retries 349 # they'll start a new resumable upload rather than potentially 350 # attempting to pick back up later where we left off. 351 raise ResumableUploadException( 352 'File changed during upload: EOF at %d bytes of %d byte file.' % 353 (total_bytes_uploaded, file_length), 354 ResumableTransferDisposition.ABORT) 355 resp = http_conn.getresponse() 356 # Restore http connection debug level. 357 http_conn.set_debuglevel(conn.debug) 358 359 if resp.status == 200: 360 # Success. 361 return (resp.getheader('etag'), 362 resp.getheader('x-goog-generation'), 363 resp.getheader('x-goog-metageneration')) 364 # Retry timeout (408) and status 429, 500 and 503 errors after a delay. 365 elif resp.status in [408, 429, 500, 503]: 366 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY 367 else: 368 # Catch all for any other error codes. 369 disposition = ResumableTransferDisposition.ABORT 370 raise ResumableUploadException('Got response code %d while attempting ' 371 'upload (%s)' % 372 (resp.status, resp.reason), disposition) 373 374 def _AttemptResumableUpload(self, key, fp, file_length, headers, cb, 375 num_cb): 376 """Attempts a resumable upload. 377 378 Args: 379 key: Boto key representing object to upload. 380 fp: File pointer containing upload bytes. 381 file_length: Total length of the upload. 382 headers: Headers to be used in upload requests. 383 cb: Progress callback function that takes (progress, total_size). 384 num_cb: Granularity of the callback (maximum number of times the 385 callback will be called during the file transfer). If negative, 386 perform callback with each buffer read. 387 388 Returns: 389 (etag, generation, metageneration) from service upon success. 390 391 Raises: 392 ResumableUploadException if any problems occur. 393 """ 394 (service_start, service_end) = self.SERVICE_HAS_NOTHING 395 conn = key.bucket.connection 396 if self.upload_url: 397 # Try to resume existing resumable upload. 398 try: 399 (service_start, service_end) = ( 400 self._QueryServicePos(conn, file_length)) 401 self.service_has_bytes = service_start 402 if conn.debug >= 1: 403 self.logger.debug('Resuming transfer.') 404 except ResumableUploadException, e: 405 if conn.debug >= 1: 406 self.logger.debug('Unable to resume transfer (%s).', e.message) 407 self._StartNewResumableUpload(key, headers) 408 else: 409 self._StartNewResumableUpload(key, headers) 410 411 # upload_start_point allows the code that instantiated the 412 # ResumableUploadHandler to find out the point from which it started 413 # uploading (e.g., so it can correctly compute throughput). 414 if self.upload_start_point is None: 415 self.upload_start_point = service_end 416 417 total_bytes_uploaded = service_end + 1 418 419 # Start reading from the file based upon the number of bytes that the 420 # server has so far. 421 if total_bytes_uploaded < file_length: 422 fp.seek(total_bytes_uploaded) 423 424 conn = key.bucket.connection 425 426 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses 427 # pool connections) because httplib requires a new HTTP connection per 428 # transaction. (Without this, calling http_conn.getresponse() would get 429 # "ResponseNotReady".) 430 http_conn = conn.new_http_connection(self.upload_url_host, conn.port, 431 conn.is_secure) 432 http_conn.set_debuglevel(conn.debug) 433 434 # Make sure to close http_conn at end so if a local file read 435 # failure occurs partway through service will terminate current upload 436 # and can report that progress on next attempt. 437 try: 438 return self._UploadFileBytes(conn, http_conn, fp, file_length, 439 total_bytes_uploaded, cb, num_cb, 440 headers) 441 except (ResumableUploadException, socket.error): 442 resp = self._QueryServiceState(conn, file_length) 443 if resp.status == 400: 444 raise ResumableUploadException( 445 'Got 400 response from service state query after failed resumable ' 446 'upload attempt. This can happen for various reasons, including ' 447 'specifying an invalid request (e.g., an invalid canned ACL) or ' 448 'if the file size changed between upload attempts', 449 ResumableTransferDisposition.ABORT) 450 else: 451 raise 452 finally: 453 http_conn.close() 454 455 def HandleResumableUploadException(self, e, debug): 456 if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS: 457 if debug >= 1: 458 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' 459 'aborting but retaining tracker file', e.message) 460 raise 461 elif e.disposition == ResumableTransferDisposition.ABORT: 462 if debug >= 1: 463 self.logger.debug('Caught non-retryable ResumableUploadException (%s); ' 464 'aborting and removing tracker file', e.message) 465 raise 466 elif e.disposition == ResumableTransferDisposition.START_OVER: 467 raise 468 else: 469 if debug >= 1: 470 self.logger.debug( 471 'Caught ResumableUploadException (%s) - will retry', e.message) 472 473 def TrackProgressLessIterations(self, service_had_bytes_before_attempt, 474 debug=0): 475 """Tracks the number of iterations without progress. 476 477 Performs randomized exponential backoff. 478 479 Args: 480 service_had_bytes_before_attempt: Number of bytes the service had prior 481 to this upload attempt. 482 debug: debug level 0..3 483 """ 484 # At this point we had a re-tryable failure; see if made progress. 485 if self.service_has_bytes > service_had_bytes_before_attempt: 486 self.progress_less_iterations = 0 # If progress, reset counter. 487 else: 488 self.progress_less_iterations += 1 489 490 if self.progress_less_iterations > self.num_retries: 491 # Don't retry any longer in the current process. 492 raise ResumableUploadException( 493 'Too many resumable upload attempts failed without ' 494 'progress. You might try this upload again later', 495 ResumableTransferDisposition.ABORT_CUR_PROCESS) 496 497 # Use binary exponential backoff to desynchronize client requests. 498 sleep_time_secs = min(random.random() * (2**self.progress_less_iterations), 499 GetMaxRetryDelay()) 500 if debug >= 1: 501 self.logger.debug('Got retryable failure (%d progress-less in a row).\n' 502 'Sleeping %3.1f seconds before re-trying', 503 self.progress_less_iterations, sleep_time_secs) 504 time.sleep(sleep_time_secs) 505 506 def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None, 507 num_cb=XML_PROGRESS_CALLBACKS): 508 """Upload a file to a key into a bucket on GS, resumable upload protocol. 509 510 Args: 511 key: `boto.s3.key.Key` or subclass representing the upload destination. 512 fp: File pointer to upload 513 size: Size of the file to upload. 514 headers: The headers to pass along with the PUT request 515 canned_acl: Optional canned ACL to apply to object. 516 cb: Callback function that will be called to report progress on 517 the upload. The callback should accept two integer parameters, the 518 first representing the number of bytes that have been successfully 519 transmitted to GS, and the second representing the total number of 520 bytes that need to be transmitted. 521 num_cb: (optional) If a callback is specified with the cb parameter, this 522 parameter determines the granularity of the callback by defining 523 the maximum number of times the callback will be called during the 524 file transfer. Providing a negative integer will cause your 525 callback to be called with each buffer read. 526 527 Raises: 528 ResumableUploadException if a problem occurs during the transfer. 529 """ 530 531 if not headers: 532 headers = {} 533 # If Content-Type header is present and set to None, remove it. 534 # This is gsutil's way of asking boto to refrain from auto-generating 535 # that header. 536 content_type = 'Content-Type' 537 if content_type in headers and headers[content_type] is None: 538 del headers[content_type] 539 540 if canned_acl: 541 headers[key.provider.acl_header] = canned_acl 542 543 headers['User-Agent'] = UserAgent 544 545 file_length = size 546 debug = key.bucket.connection.debug 547 548 # Use num-retries from constructor if one was provided; else check 549 # for a value specified in the boto config file; else default to 5. 550 if self.num_retries is None: 551 self.num_retries = GetNumRetries() 552 self.progress_less_iterations = 0 553 554 while True: # Retry as long as we're making progress. 555 service_had_bytes_before_attempt = self.service_has_bytes 556 try: 557 # Save generation and metageneration in class state so caller 558 # can find these values, for use in preconditions of future 559 # operations on the uploaded object. 560 (_, self.generation, self.metageneration) = ( 561 self._AttemptResumableUpload(key, fp, file_length, 562 headers, cb, num_cb)) 563 564 key.generation = self.generation 565 if debug >= 1: 566 self.logger.debug('Resumable upload complete.') 567 return 568 except self.RETRYABLE_EXCEPTIONS, e: 569 if debug >= 1: 570 self.logger.debug('Caught exception (%s)', e.__repr__()) 571 if isinstance(e, IOError) and e.errno == errno.EPIPE: 572 # Broken pipe error causes httplib to immediately 573 # close the socket (http://bugs.python.org/issue5542), 574 # so we need to close the connection before we resume 575 # the upload (which will cause a new connection to be 576 # opened the next time an HTTP request is sent). 577 key.bucket.connection.connection.close() 578 except ResumableUploadException, e: 579 self.HandleResumableUploadException(e, debug) 580 581 self.TrackProgressLessIterations(service_had_bytes_before_attempt, 582 debug=debug) 583