Home | History | Annotate | Download | only in gslib
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2014 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Wrapper for use in daisy-chained copies."""
     16 
     17 from collections import deque
     18 import os
     19 import threading
     20 import time
     21 
     22 from gslib.cloud_api import BadRequestException
     23 from gslib.cloud_api import CloudApi
     24 from gslib.util import CreateLock
     25 from gslib.util import TRANSFER_BUFFER_SIZE
     26 
     27 
     28 # This controls the amount of bytes downloaded per download request.
     29 # We do not buffer this many bytes in memory at a time - that is controlled by
     30 # DaisyChainWrapper.max_buffer_size. This is the upper bound of bytes that may
     31 # be unnecessarily downloaded if there is a break in the resumable upload.
     32 _DEFAULT_DOWNLOAD_CHUNK_SIZE = 1024*1024*100
     33 
     34 
     35 class BufferWrapper(object):
     36   """Wraps the download file pointer to use our in-memory buffer."""
     37 
     38   def __init__(self, daisy_chain_wrapper):
     39     """Provides a buffered write interface for a file download.
     40 
     41     Args:
     42       daisy_chain_wrapper: DaisyChainWrapper instance to use for buffer and
     43                            locking.
     44     """
     45     self.daisy_chain_wrapper = daisy_chain_wrapper
     46 
     47   def write(self, data):  # pylint: disable=invalid-name
     48     """Waits for space in the buffer, then writes data to the buffer."""
     49     while True:
     50       with self.daisy_chain_wrapper.lock:
     51         if (self.daisy_chain_wrapper.bytes_buffered <
     52             self.daisy_chain_wrapper.max_buffer_size):
     53           break
     54       # Buffer was full, yield thread priority so the upload can pull from it.
     55       time.sleep(0)
     56     data_len = len(data)
     57     if data_len:
     58       with self.daisy_chain_wrapper.lock:
     59         self.daisy_chain_wrapper.buffer.append(data)
     60         self.daisy_chain_wrapper.bytes_buffered += data_len
     61 
     62 
     63 class DaisyChainWrapper(object):
     64   """Wrapper class for daisy-chaining a cloud download to an upload.
     65 
     66   This class instantiates a BufferWrapper object to buffer the download into
     67   memory, consuming a maximum of max_buffer_size. It implements intelligent
     68   behavior around read and seek that allow for all of the operations necessary
     69   to copy a file.
     70 
     71   This class is coupled with the XML and JSON implementations in that it
     72   expects that small buffers (maximum of TRANSFER_BUFFER_SIZE) in size will be
     73   used.
     74   """
     75 
     76   def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None,
     77                download_chunk_size=_DEFAULT_DOWNLOAD_CHUNK_SIZE):
     78     """Initializes the daisy chain wrapper.
     79 
     80     Args:
     81       src_url: Source CloudUrl to copy from.
     82       src_obj_size: Size of source object.
     83       gsutil_api: gsutil Cloud API to use for the copy.
     84       progress_callback: Optional callback function for progress notifications
     85           for the download thread. Receives calls with arguments
     86           (bytes_transferred, total_size).
     87       download_chunk_size: Integer number of bytes to download per
     88           GetObjectMedia request. This is the upper bound of bytes that may be
     89           unnecessarily downloaded if there is a break in the resumable upload.
     90 
     91     """
     92     # Current read position for the upload file pointer.
     93     self.position = 0
     94     self.buffer = deque()
     95 
     96     self.bytes_buffered = 0
     97     # Maximum amount of bytes in memory at a time.
     98     self.max_buffer_size = 1024 * 1024  # 1 MiB
     99 
    100     self._download_chunk_size = download_chunk_size
    101 
    102     # We save one buffer's worth of data as a special case for boto,
    103     # which seeks back one buffer and rereads to compute hashes. This is
    104     # unnecessary because we can just compare cloud hash digests at the end,
    105     # but it allows this to work without modfiying boto.
    106     self.last_position = 0
    107     self.last_data = None
    108 
    109     # Protects buffer, position, bytes_buffered, last_position, and last_data.
    110     self.lock = CreateLock()
    111 
    112     # Protects download_exception.
    113     self.download_exception_lock = CreateLock()
    114 
    115     self.src_obj_size = src_obj_size
    116     self.src_url = src_url
    117 
    118     # This is safe to use the upload and download thread because the download
    119     # thread calls only GetObjectMedia, which creates a new HTTP connection
    120     # independent of gsutil_api. Thus, it will not share an HTTP connection
    121     # with the upload.
    122     self.gsutil_api = gsutil_api
    123 
    124     # If self.download_thread dies due to an exception, it is saved here so
    125     # that it can also be raised in the upload thread.
    126     self.download_exception = None
    127     self.download_thread = None
    128     self.progress_callback = progress_callback
    129     self.stop_download = threading.Event()
    130     self.StartDownloadThread(progress_callback=self.progress_callback)
    131 
    132   def StartDownloadThread(self, start_byte=0, progress_callback=None):
    133     """Starts the download thread for the source object (from start_byte)."""
    134 
    135     def PerformDownload(start_byte, progress_callback):
    136       """Downloads the source object in chunks.
    137 
    138       This function checks the stop_download event and exits early if it is set.
    139       It should be set when there is an error during the daisy-chain upload,
    140       then this function can be called again with the upload's current position
    141       as start_byte.
    142 
    143       Args:
    144         start_byte: Byte from which to begin the download.
    145         progress_callback: Optional callback function for progress
    146             notifications. Receives calls with arguments
    147             (bytes_transferred, total_size).
    148       """
    149       # TODO: Support resumable downloads. This would require the BufferWrapper
    150       # object to support seek() and tell() which requires coordination with
    151       # the upload.
    152       try:
    153         while start_byte + self._download_chunk_size < self.src_obj_size:
    154           self.gsutil_api.GetObjectMedia(
    155               self.src_url.bucket_name, self.src_url.object_name,
    156               BufferWrapper(self), start_byte=start_byte,
    157               end_byte=start_byte + self._download_chunk_size - 1,
    158               generation=self.src_url.generation, object_size=self.src_obj_size,
    159               download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
    160               provider=self.src_url.scheme, progress_callback=progress_callback)
    161           if self.stop_download.is_set():
    162             # Download thread needs to be restarted, so exit.
    163             self.stop_download.clear()
    164             return
    165           start_byte += self._download_chunk_size
    166         self.gsutil_api.GetObjectMedia(
    167             self.src_url.bucket_name, self.src_url.object_name,
    168             BufferWrapper(self), start_byte=start_byte,
    169             generation=self.src_url.generation, object_size=self.src_obj_size,
    170             download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
    171             provider=self.src_url.scheme, progress_callback=progress_callback)
    172       # We catch all exceptions here because we want to store them.
    173       except Exception, e:  # pylint: disable=broad-except
    174         # Save the exception so that it can be seen in the upload thread.
    175         with self.download_exception_lock:
    176           self.download_exception = e
    177           raise
    178 
    179     # TODO: If we do gzip encoding transforms mid-transfer, this will fail.
    180     self.download_thread = threading.Thread(
    181         target=PerformDownload,
    182         args=(start_byte, progress_callback))
    183     self.download_thread.start()
    184 
    185   def read(self, amt=None):  # pylint: disable=invalid-name
    186     """Exposes a stream from the in-memory buffer to the upload."""
    187     if self.position == self.src_obj_size or amt == 0:
    188       # If there is no data left or 0 bytes were requested, return an empty
    189       # string so callers can call still call len() and read(0).
    190       return ''
    191     if amt is None or amt > TRANSFER_BUFFER_SIZE:
    192       raise BadRequestException(
    193           'Invalid HTTP read size %s during daisy chain operation, '
    194           'expected <= %s.' % (amt, TRANSFER_BUFFER_SIZE))
    195 
    196     while True:
    197       with self.lock:
    198         if self.buffer:
    199           break
    200         with self.download_exception_lock:
    201           if self.download_exception:
    202             # Download thread died, so we will never recover. Raise the
    203             # exception that killed it.
    204             raise self.download_exception  # pylint: disable=raising-bad-type
    205       # Buffer was empty, yield thread priority so the download thread can fill.
    206       time.sleep(0)
    207     with self.lock:
    208       # TODO: Need to handle the caller requesting less than a
    209       # transfer_buffer_size worth of data.
    210       data = self.buffer.popleft()
    211       self.last_position = self.position
    212       self.last_data = data
    213       data_len = len(data)
    214       self.position += data_len
    215       self.bytes_buffered -= data_len
    216     if data_len > amt:
    217       raise BadRequestException(
    218           'Invalid read during daisy chain operation, got data of size '
    219           '%s, expected size %s.' % (data_len, amt))
    220     return data
    221 
    222   def tell(self):  # pylint: disable=invalid-name
    223     with self.lock:
    224       return self.position
    225 
    226   def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
    227     restart_download = False
    228     if whence == os.SEEK_END:
    229       if offset:
    230         raise IOError(
    231             'Invalid seek during daisy chain operation. Non-zero offset %s '
    232             'from os.SEEK_END is not supported' % offset)
    233       with self.lock:
    234         self.last_position = self.position
    235         self.last_data = None
    236         # Safe because we check position against src_obj_size in read.
    237         self.position = self.src_obj_size
    238     elif whence == os.SEEK_SET:
    239       with self.lock:
    240         if offset == self.position:
    241           pass
    242         elif offset == self.last_position:
    243           self.position = self.last_position
    244           if self.last_data:
    245             # If we seek to end and then back, we won't have last_data; we'll
    246             # get it on the next call to read.
    247             self.buffer.appendleft(self.last_data)
    248             self.bytes_buffered += len(self.last_data)
    249         else:
    250           # Once a download is complete, boto seeks to 0 and re-reads to
    251           # compute the hash if an md5 isn't already present (for example a GCS
    252           # composite object), so we have to re-download the whole object.
    253           # Also, when daisy-chaining to a resumable upload, on error the
    254           # service may have received any number of the bytes; the download
    255           # needs to be restarted from that point.
    256           restart_download = True
    257 
    258       if restart_download:
    259         self.stop_download.set()
    260 
    261         # Consume any remaining bytes in the download thread so that
    262         # the thread can exit, then restart the thread at the desired position.
    263         while self.download_thread.is_alive():
    264           with self.lock:
    265             while self.bytes_buffered:
    266               self.bytes_buffered -= len(self.buffer.popleft())
    267           time.sleep(0)
    268 
    269         with self.lock:
    270           self.position = offset
    271           self.buffer = deque()
    272           self.bytes_buffered = 0
    273           self.last_position = 0
    274           self.last_data = None
    275         self.StartDownloadThread(start_byte=offset,
    276                                  progress_callback=self.progress_callback)
    277     else:
    278       raise IOError('Daisy-chain download wrapper does not support '
    279                     'seek mode %s' % whence)
    280 
    281   def seekable(self):  # pylint: disable=invalid-name
    282     return True
    283