Home | History | Annotate | Download | only in gslib
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2010 Google Inc. All Rights Reserved.
      3 #
      4 # Permission is hereby granted, free of charge, to any person obtaining a
      5 # copy of this software and associated documentation files (the
      6 # "Software"), to deal in the Software without restriction, including
      7 # without limitation the rights to use, copy, modify, merge, publish, dis-
      8 # tribute, sublicense, and/or sell copies of the Software, and to permit
      9 # persons to whom the Software is furnished to do so, subject to the fol-
     10 # lowing conditions:
     11 #
     12 # The above copyright notice and this permission notice shall be included
     13 # in all copies or substantial portions of the Software.
     14 #
     15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
     16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
     17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
     18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
     19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     21 # IN THE SOFTWARE.
     22 """Boto translation layer for resumable uploads.
     23 
     24 See http://code.google.com/apis/storage/docs/developer-guide.html#resumable
     25 for details.
     26 
     27 Resumable uploads will retry interrupted uploads, resuming at the byte
     28 count completed by the last upload attempt. If too many retries happen with
     29 no progress (per configurable num_retries param), the upload will be
     30 aborted in the current process.
     31 
     32 Unlike the boto implementation of resumable upload handler, this class does
     33 not directly interact with tracker files.
     34 
     35 Originally Google wrote and contributed this code to the boto project,
     36 then copied that code back into gsutil on the release of gsutil 4.0 which
     37 supports both boto and non-boto codepaths for resumable uploads.  Any bug
     38 fixes made to this file should also be integrated to resumable_upload_handler.py
     39 in boto, where applicable.
     40 
     41 TODO: gsutil-beta: Add a similar comment to the boto code.
     42 """
     43 
     44 from __future__ import absolute_import
     45 
     46 import errno
     47 import httplib
     48 import random
     49 import re
     50 import socket
     51 import time
     52 import urlparse
     53 from boto import UserAgent
     54 from boto.connection import AWSAuthConnection
     55 from boto.exception import ResumableTransferDisposition
     56 from boto.exception import ResumableUploadException
     57 from gslib.exception import InvalidUrlError
     58 from gslib.util import GetMaxRetryDelay
     59 from gslib.util import GetNumRetries
     60 from gslib.util import XML_PROGRESS_CALLBACKS
     61 
     62 
     63 class BotoResumableUpload(object):
     64   """Upload helper class for resumable uploads via boto."""
     65 
     66   BUFFER_SIZE = 8192
     67   RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
     68                           socket.gaierror)
     69 
     70   # (start, end) response indicating service has nothing (upload protocol uses
     71   # inclusive numbering).
     72   SERVICE_HAS_NOTHING = (0, -1)
     73 
     74   def __init__(self, tracker_callback, logger,
     75                resume_url=None, num_retries=None):
     76     """Constructor. Instantiate once for each uploaded file.
     77 
     78     Args:
     79       tracker_callback: Callback function that takes a string argument.  Used
     80                         by caller to track this upload across upload
     81                         interruption.
     82       logger: logging.logger instance to use for debug messages.
     83       resume_url: If present, attempt to resume the upload at this URL.
     84       num_retries: Number of times to retry the upload making no progress.
     85                    This count resets every time we make progress, so the upload
     86                    can span many more than this number of retries.
     87     """
     88     if resume_url:
     89       self._SetUploadUrl(resume_url)
     90     else:
     91       self.upload_url = None
     92     self.num_retries = num_retries
     93     self.service_has_bytes = 0  # Byte count at last service check.
     94     # Save upload_start_point in instance state so caller can find how
     95     # much was transferred by this ResumableUploadHandler (across retries).
     96     self.upload_start_point = None
     97     self.tracker_callback = tracker_callback
     98     self.logger = logger
     99 
    100   def _SetUploadUrl(self, url):
    101     """Saves URL and resets upload state.
    102 
    103     Called when we start a new resumable upload or get a new tracker
    104     URL for the upload.
    105 
    106     Args:
    107       url: URL string for the upload.
    108 
    109     Raises InvalidUrlError if URL is syntactically invalid.
    110     """
    111     parse_result = urlparse.urlparse(url)
    112     if (parse_result.scheme.lower() not in ['http', 'https'] or
    113         not parse_result.netloc):
    114       raise InvalidUrlError('Invalid upload URL (%s)' % url)
    115     self.upload_url = url
    116     self.upload_url_host = parse_result.netloc
    117     self.upload_url_path = '%s?%s' % (
    118         parse_result.path, parse_result.query)
    119     self.service_has_bytes = 0
    120 
    121   def _BuildContentRangeHeader(self, range_spec='*', length_spec='*'):
    122     return 'bytes %s/%s' % (range_spec, length_spec)
    123 
    124   def _QueryServiceState(self, conn, file_length):
    125     """Queries service to find out state of given upload.
    126 
    127     Note that this method really just makes special case use of the
    128     fact that the upload service always returns the current start/end
    129     state whenever a PUT doesn't complete.
    130 
    131     Args:
    132       conn: HTTPConnection to use for the query.
    133       file_length: Total length of the file.
    134 
    135     Returns:
    136       HTTP response from sending request.
    137 
    138     Raises:
    139       ResumableUploadException if problem querying service.
    140     """
    141     # Send an empty PUT so that service replies with this resumable
    142     # transfer's state.
    143     put_headers = {}
    144     put_headers['Content-Range'] = (
    145         self._BuildContentRangeHeader('*', file_length))
    146     put_headers['Content-Length'] = '0'
    147     return AWSAuthConnection.make_request(
    148         conn, 'PUT', path=self.upload_url_path, auth_path=self.upload_url_path,
    149         headers=put_headers, host=self.upload_url_host)
    150 
    151   def _QueryServicePos(self, conn, file_length):
    152     """Queries service to find out what bytes it currently has.
    153 
    154     Args:
    155       conn: HTTPConnection to use for the query.
    156       file_length: Total length of the file.
    157 
    158     Returns:
    159       (service_start, service_end), where the values are inclusive.
    160       For example, (0, 2) would mean that the service has bytes 0, 1, *and* 2.
    161 
    162     Raises:
    163       ResumableUploadException if problem querying service.
    164     """
    165     resp = self._QueryServiceState(conn, file_length)
    166     if resp.status == 200:
    167       # To handle the boundary condition where the service has the complete
    168       # file, we return (service_start, file_length-1). That way the
    169       # calling code can always simply read up through service_end. (If we
    170       # didn't handle this boundary condition here, the caller would have
    171       # to check whether service_end == file_length and read one fewer byte
    172       # in that case.)
    173       return (0, file_length - 1)  # Completed upload.
    174     if resp.status != 308:
    175       # This means the service didn't have any state for the given
    176       # upload ID, which can happen (for example) if the caller saved
    177       # the upload URL to a file and then tried to restart the transfer
    178       # after that upload ID has gone stale. In that case we need to
    179       # start a new transfer (and the caller will then save the new
    180       # upload URL to the tracker file).
    181       raise ResumableUploadException(
    182           'Got non-308 response (%s) from service state query' %
    183           resp.status, ResumableTransferDisposition.START_OVER)
    184     got_valid_response = False
    185     range_spec = resp.getheader('range')
    186     if range_spec:
    187       # Parse 'bytes=<from>-<to>' range_spec.
    188       m = re.search(r'bytes=(\d+)-(\d+)', range_spec)
    189       if m:
    190         service_start = long(m.group(1))
    191         service_end = long(m.group(2))
    192         got_valid_response = True
    193     else:
    194       # No Range header, which means the service does not yet have
    195       # any bytes. Note that the Range header uses inclusive 'from'
    196       # and 'to' values. Since Range 0-0 would mean that the service
    197       # has byte 0, omitting the Range header is used to indicate that
    198       # the service doesn't have any bytes.
    199       return self.SERVICE_HAS_NOTHING
    200     if not got_valid_response:
    201       raise ResumableUploadException(
    202           'Couldn\'t parse upload service state query response (%s)' %
    203           str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
    204     if conn.debug >= 1:
    205       self.logger.debug('Service has: Range: %d - %d.', service_start,
    206                         service_end)
    207     return (service_start, service_end)
    208 
    209   def _StartNewResumableUpload(self, key, headers=None):
    210     """Starts a new resumable upload.
    211 
    212     Args:
    213       key: Boto Key representing the object to upload.
    214       headers: Headers to use in the upload requests.
    215 
    216     Raises:
    217       ResumableUploadException if any errors occur.
    218     """
    219     conn = key.bucket.connection
    220     if conn.debug >= 1:
    221       self.logger.debug('Starting new resumable upload.')
    222     self.service_has_bytes = 0
    223 
    224     # Start a new resumable upload by sending a POST request with an
    225     # empty body and the "X-Goog-Resumable: start" header. Include any
    226     # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
    227     # (and raise an exception if they tried to pass one, since it's
    228     # a semantic error to specify it at this point, and if we were to
    229     # include one now it would cause the service to expect that many
    230     # bytes; the POST doesn't include the actual file bytes  We set
    231     # the Content-Length in the subsequent PUT, based on the uploaded
    232     # file size.
    233     post_headers = {}
    234     for k in headers:
    235       if k.lower() == 'content-length':
    236         raise ResumableUploadException(
    237             'Attempt to specify Content-Length header (disallowed)',
    238             ResumableTransferDisposition.ABORT)
    239       post_headers[k] = headers[k]
    240     post_headers[conn.provider.resumable_upload_header] = 'start'
    241 
    242     resp = conn.make_request(
    243         'POST', key.bucket.name, key.name, post_headers)
    244     # Get upload URL from response 'Location' header.
    245     body = resp.read()
    246 
    247     # Check for various status conditions.
    248     if resp.status in [429, 500, 503]:
    249       # Retry after a delay.
    250       raise ResumableUploadException(
    251           'Got status %d from attempt to start resumable upload. '
    252           'Will wait/retry' % resp.status,
    253           ResumableTransferDisposition.WAIT_BEFORE_RETRY)
    254     elif resp.status != 200 and resp.status != 201:
    255       raise ResumableUploadException(
    256           'Got status %d from attempt to start resumable upload. '
    257           'Aborting' % resp.status,
    258           ResumableTransferDisposition.ABORT)
    259 
    260     # Else we got 200 or 201 response code, indicating the resumable
    261     # upload was created.
    262     upload_url = resp.getheader('Location')
    263     if not upload_url:
    264       raise ResumableUploadException(
    265           'No resumable upload URL found in resumable initiation '
    266           'POST response (%s)' % body,
    267           ResumableTransferDisposition.WAIT_BEFORE_RETRY)
    268     self._SetUploadUrl(upload_url)
    269     self.tracker_callback(upload_url)
    270 
    271   def _UploadFileBytes(self, conn, http_conn, fp, file_length,
    272                        total_bytes_uploaded, cb, num_cb, headers):
    273     """Attempts to upload file bytes.
    274 
    275     Makes a single attempt using an existing resumable upload connection.
    276 
    277     Args:
    278       conn: HTTPConnection from the boto Key.
    279       http_conn: Separate HTTPConnection for the transfer.
    280       fp: File pointer containing bytes to upload.
    281       file_length: Total length of the file.
    282       total_bytes_uploaded: The total number of bytes uploaded.
    283       cb: Progress callback function that takes (progress, total_size).
    284       num_cb: Granularity of the callback (maximum number of times the
    285               callback will be called during the file transfer). If negative,
    286               perform callback with each buffer read.
    287       headers: Headers to be used in the upload requests.
    288 
    289     Returns:
    290       (etag, generation, metageneration) from service upon success.
    291 
    292     Raises:
    293       ResumableUploadException if any problems occur.
    294     """
    295     buf = fp.read(self.BUFFER_SIZE)
    296     if cb:
    297       # The cb_count represents the number of full buffers to send between
    298       # cb executions.
    299       if num_cb > 2:
    300         cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
    301       elif num_cb < 0:
    302         cb_count = -1
    303       else:
    304         cb_count = 0
    305       i = 0
    306       cb(total_bytes_uploaded, file_length)
    307 
    308     # Build resumable upload headers for the transfer. Don't send a
    309     # Content-Range header if the file is 0 bytes long, because the
    310     # resumable upload protocol uses an *inclusive* end-range (so, sending
    311     # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
    312     put_headers = headers.copy() if headers else {}
    313     if file_length:
    314       if total_bytes_uploaded == file_length:
    315         range_header = self._BuildContentRangeHeader(
    316             '*', file_length)
    317       else:
    318         range_header = self._BuildContentRangeHeader(
    319             '%d-%d' % (total_bytes_uploaded, file_length - 1),
    320             file_length)
    321       put_headers['Content-Range'] = range_header
    322     # Set Content-Length to the total bytes we'll send with this PUT.
    323     put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
    324     http_request = AWSAuthConnection.build_base_http_request(
    325         conn, 'PUT', path=self.upload_url_path, auth_path=None,
    326         headers=put_headers, host=self.upload_url_host)
    327     http_conn.putrequest('PUT', http_request.path)
    328     for k in put_headers:
    329       http_conn.putheader(k, put_headers[k])
    330     http_conn.endheaders()
    331 
    332     # Turn off debug on http connection so upload content isn't included
    333     # in debug stream.
    334     http_conn.set_debuglevel(0)
    335     while buf:
    336       http_conn.send(buf)
    337       total_bytes_uploaded += len(buf)
    338       if cb:
    339         i += 1
    340         if i == cb_count or cb_count == -1:
    341           cb(total_bytes_uploaded, file_length)
    342           i = 0
    343       buf = fp.read(self.BUFFER_SIZE)
    344     http_conn.set_debuglevel(conn.debug)
    345     if cb:
    346       cb(total_bytes_uploaded, file_length)
    347     if total_bytes_uploaded != file_length:
    348       # Abort (and delete the tracker file) so if the user retries
    349       # they'll start a new resumable upload rather than potentially
    350       # attempting to pick back up later where we left off.
    351       raise ResumableUploadException(
    352           'File changed during upload: EOF at %d bytes of %d byte file.' %
    353           (total_bytes_uploaded, file_length),
    354           ResumableTransferDisposition.ABORT)
    355     resp = http_conn.getresponse()
    356     # Restore http connection debug level.
    357     http_conn.set_debuglevel(conn.debug)
    358 
    359     if resp.status == 200:
    360       # Success.
    361       return (resp.getheader('etag'),
    362               resp.getheader('x-goog-generation'),
    363               resp.getheader('x-goog-metageneration'))
    364     # Retry timeout (408) and status 429, 500 and 503 errors after a delay.
    365     elif resp.status in [408, 429, 500, 503]:
    366       disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
    367     else:
    368       # Catch all for any other error codes.
    369       disposition = ResumableTransferDisposition.ABORT
    370     raise ResumableUploadException('Got response code %d while attempting '
    371                                    'upload (%s)' %
    372                                    (resp.status, resp.reason), disposition)
    373 
    374   def _AttemptResumableUpload(self, key, fp, file_length, headers, cb,
    375                               num_cb):
    376     """Attempts a resumable upload.
    377 
    378     Args:
    379       key: Boto key representing object to upload.
    380       fp: File pointer containing upload bytes.
    381       file_length: Total length of the upload.
    382       headers: Headers to be used in upload requests.
    383       cb: Progress callback function that takes (progress, total_size).
    384       num_cb: Granularity of the callback (maximum number of times the
    385               callback will be called during the file transfer). If negative,
    386               perform callback with each buffer read.
    387 
    388     Returns:
    389       (etag, generation, metageneration) from service upon success.
    390 
    391     Raises:
    392       ResumableUploadException if any problems occur.
    393     """
    394     (service_start, service_end) = self.SERVICE_HAS_NOTHING
    395     conn = key.bucket.connection
    396     if self.upload_url:
    397       # Try to resume existing resumable upload.
    398       try:
    399         (service_start, service_end) = (
    400             self._QueryServicePos(conn, file_length))
    401         self.service_has_bytes = service_start
    402         if conn.debug >= 1:
    403           self.logger.debug('Resuming transfer.')
    404       except ResumableUploadException, e:
    405         if conn.debug >= 1:
    406           self.logger.debug('Unable to resume transfer (%s).', e.message)
    407         self._StartNewResumableUpload(key, headers)
    408     else:
    409       self._StartNewResumableUpload(key, headers)
    410 
    411     # upload_start_point allows the code that instantiated the
    412     # ResumableUploadHandler to find out the point from which it started
    413     # uploading (e.g., so it can correctly compute throughput).
    414     if self.upload_start_point is None:
    415       self.upload_start_point = service_end
    416 
    417     total_bytes_uploaded = service_end + 1
    418 
    419     # Start reading from the file based upon the number of bytes that the
    420     # server has so far.
    421     if total_bytes_uploaded < file_length:
    422       fp.seek(total_bytes_uploaded)
    423 
    424     conn = key.bucket.connection
    425 
    426     # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
    427     # pool connections) because httplib requires a new HTTP connection per
    428     # transaction. (Without this, calling http_conn.getresponse() would get
    429     # "ResponseNotReady".)
    430     http_conn = conn.new_http_connection(self.upload_url_host, conn.port,
    431                                          conn.is_secure)
    432     http_conn.set_debuglevel(conn.debug)
    433 
    434     # Make sure to close http_conn at end so if a local file read
    435     # failure occurs partway through service will terminate current upload
    436     # and can report that progress on next attempt.
    437     try:
    438       return self._UploadFileBytes(conn, http_conn, fp, file_length,
    439                                    total_bytes_uploaded, cb, num_cb,
    440                                    headers)
    441     except (ResumableUploadException, socket.error):
    442       resp = self._QueryServiceState(conn, file_length)
    443       if resp.status == 400:
    444         raise ResumableUploadException(
    445             'Got 400 response from service state query after failed resumable '
    446             'upload attempt. This can happen for various reasons, including '
    447             'specifying an invalid request (e.g., an invalid canned ACL) or '
    448             'if the file size changed between upload attempts',
    449             ResumableTransferDisposition.ABORT)
    450       else:
    451         raise
    452     finally:
    453       http_conn.close()
    454 
    455   def HandleResumableUploadException(self, e, debug):
    456     if e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS:
    457       if debug >= 1:
    458         self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
    459                           'aborting but retaining tracker file', e.message)
    460       raise
    461     elif e.disposition == ResumableTransferDisposition.ABORT:
    462       if debug >= 1:
    463         self.logger.debug('Caught non-retryable ResumableUploadException (%s); '
    464                           'aborting and removing tracker file', e.message)
    465       raise
    466     elif e.disposition == ResumableTransferDisposition.START_OVER:
    467       raise
    468     else:
    469       if debug >= 1:
    470         self.logger.debug(
    471             'Caught ResumableUploadException (%s) - will retry', e.message)
    472 
    473   def TrackProgressLessIterations(self, service_had_bytes_before_attempt,
    474                                   debug=0):
    475     """Tracks the number of iterations without progress.
    476 
    477     Performs randomized exponential backoff.
    478 
    479     Args:
    480       service_had_bytes_before_attempt: Number of bytes the service had prior
    481                                        to this upload attempt.
    482       debug: debug level 0..3
    483     """
    484     # At this point we had a re-tryable failure; see if made progress.
    485     if self.service_has_bytes > service_had_bytes_before_attempt:
    486       self.progress_less_iterations = 0   # If progress, reset counter.
    487     else:
    488       self.progress_less_iterations += 1
    489 
    490     if self.progress_less_iterations > self.num_retries:
    491       # Don't retry any longer in the current process.
    492       raise ResumableUploadException(
    493           'Too many resumable upload attempts failed without '
    494           'progress. You might try this upload again later',
    495           ResumableTransferDisposition.ABORT_CUR_PROCESS)
    496 
    497     # Use binary exponential backoff to desynchronize client requests.
    498     sleep_time_secs = min(random.random() * (2**self.progress_less_iterations),
    499                           GetMaxRetryDelay())
    500     if debug >= 1:
    501       self.logger.debug('Got retryable failure (%d progress-less in a row).\n'
    502                         'Sleeping %3.1f seconds before re-trying',
    503                         self.progress_less_iterations, sleep_time_secs)
    504     time.sleep(sleep_time_secs)
    505 
    506   def SendFile(self, key, fp, size, headers, canned_acl=None, cb=None,
    507                num_cb=XML_PROGRESS_CALLBACKS):
    508     """Upload a file to a key into a bucket on GS, resumable upload protocol.
    509 
    510     Args:
    511       key: `boto.s3.key.Key` or subclass representing the upload destination.
    512       fp: File pointer to upload
    513       size: Size of the file to upload.
    514       headers: The headers to pass along with the PUT request
    515       canned_acl: Optional canned ACL to apply to object.
    516       cb: Callback function that will be called to report progress on
    517           the upload.  The callback should accept two integer parameters, the
    518           first representing the number of bytes that have been successfully
    519           transmitted to GS, and the second representing the total number of
    520           bytes that need to be transmitted.
    521       num_cb: (optional) If a callback is specified with the cb parameter, this
    522               parameter determines the granularity of the callback by defining
    523               the maximum number of times the callback will be called during the
    524               file transfer. Providing a negative integer will cause your
    525               callback to be called with each buffer read.
    526 
    527     Raises:
    528       ResumableUploadException if a problem occurs during the transfer.
    529     """
    530 
    531     if not headers:
    532       headers = {}
    533     # If Content-Type header is present and set to None, remove it.
    534     # This is gsutil's way of asking boto to refrain from auto-generating
    535     # that header.
    536     content_type = 'Content-Type'
    537     if content_type in headers and headers[content_type] is None:
    538       del headers[content_type]
    539 
    540     if canned_acl:
    541       headers[key.provider.acl_header] = canned_acl
    542 
    543     headers['User-Agent'] = UserAgent
    544 
    545     file_length = size
    546     debug = key.bucket.connection.debug
    547 
    548     # Use num-retries from constructor if one was provided; else check
    549     # for a value specified in the boto config file; else default to 5.
    550     if self.num_retries is None:
    551       self.num_retries = GetNumRetries()
    552     self.progress_less_iterations = 0
    553 
    554     while True:  # Retry as long as we're making progress.
    555       service_had_bytes_before_attempt = self.service_has_bytes
    556       try:
    557         # Save generation and metageneration in class state so caller
    558         # can find these values, for use in preconditions of future
    559         # operations on the uploaded object.
    560         (_, self.generation, self.metageneration) = (
    561             self._AttemptResumableUpload(key, fp, file_length,
    562                                          headers, cb, num_cb))
    563 
    564         key.generation = self.generation
    565         if debug >= 1:
    566           self.logger.debug('Resumable upload complete.')
    567         return
    568       except self.RETRYABLE_EXCEPTIONS, e:
    569         if debug >= 1:
    570           self.logger.debug('Caught exception (%s)', e.__repr__())
    571         if isinstance(e, IOError) and e.errno == errno.EPIPE:
    572           # Broken pipe error causes httplib to immediately
    573           # close the socket (http://bugs.python.org/issue5542),
    574           # so we need to close the connection before we resume
    575           # the upload (which will cause a new connection to be
    576           # opened the next time an HTTP request is sent).
    577           key.bucket.connection.connection.close()
    578       except ResumableUploadException, e:
    579         self.HandleResumableUploadException(e, debug)
    580 
    581       self.TrackProgressLessIterations(service_had_bytes_before_attempt,
    582                                        debug=debug)
    583