Home | History | Annotate | Download | only in model
      1 # Copyright (C) 2010 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 import math
     30 import logging
     31 
     32 from google.appengine.ext import blobstore
     33 from google.appengine.ext import db
     34 
     35 MAX_DATA_ENTRY_PER_FILE = 30
     36 MAX_ENTRY_LEN = 1000 * 1000
     37 
     38 
     39 class ChunkData:
     40     def __init__(self):
     41         self.reused_key = None
     42         self.data_entry = None
     43         self.entry_future = None
     44         self.index = None
     45 
     46 
     47 class DataEntry(db.Model):
     48     """Datastore entry that stores one segmant of file data
     49        (<1000*1000 bytes).
     50     """
     51 
     52     data = db.BlobProperty()
     53 
     54     @classmethod
     55     def get(cls, key):
     56         return db.get(key)
     57 
     58     @classmethod
     59     def get_async(cls, key):
     60         return db.get_async(key)
     61 
     62     @classmethod
     63     def delete_async(cls, key):
     64         return db.delete_async(key)
     65 
     66 
     67 class DataStoreFile(db.Model):
     68     """This class stores file in datastore.
     69        If a file is oversize (>1000*1000 bytes), the file is split into
     70        multiple segments and stored in multiple datastore entries.
     71     """
     72 
     73     name = db.StringProperty()
     74     data_keys = db.ListProperty(db.Key)
     75     # keys to the data store entries that can be reused for new data.
     76     # If it is emtpy, create new DataEntry.
     77     new_data_keys = db.ListProperty(db.Key)
     78     date = db.DateTimeProperty(auto_now_add=True)
     79 
     80     data = None
     81 
     82     def _get_chunk_indices(self, data_length):
     83         nchunks = math.ceil(float(data_length) / MAX_ENTRY_LEN)
     84         return xrange(0, int(nchunks) * MAX_ENTRY_LEN, MAX_ENTRY_LEN)
     85 
     86     def _convert_blob_keys(self, keys):
     87         converted_keys = []
     88         for key in keys:
     89             new_key = blobstore.BlobMigrationRecord.get_new_blob_key(key)
     90             if new_key:
     91                 converted_keys.append(new_key)
     92             else:
     93                 converted_keys.append(key)
     94         return keys
     95 
     96     def delete_data(self, keys=None):
     97         if not keys:
     98             keys = self._convert_blob_keys(self.data_keys)
     99         logging.info('Doing async delete of keys: %s', keys)
    100 
    101         get_futures = [DataEntry.get_async(k) for k in keys]
    102         delete_futures = []
    103         for get_future in get_futures:
    104             result = get_future.get_result()
    105             if result:
    106                 delete_futures.append(DataEntry.delete_async(result.key()))
    107 
    108         for delete_future in delete_futures:
    109             delete_future.get_result()
    110 
    111     def save_data(self, data):
    112         if not data:
    113             logging.warning("No data to save.")
    114             return False
    115 
    116         if len(data) > (MAX_DATA_ENTRY_PER_FILE * MAX_ENTRY_LEN):
    117             logging.error("File too big, can't save to datastore: %dK",
    118                 len(data) / 1024)
    119             return False
    120 
    121         start = 0
    122         # Use the new_data_keys to store new data. If all new data are saved
    123         # successfully, swap new_data_keys and data_keys so we can reuse the
    124         # data_keys entries in next run. If unable to save new data for any
    125         # reason, only the data pointed by new_data_keys may be corrupted,
    126         # the existing data_keys data remains untouched. The corrupted data
    127         # in new_data_keys will be overwritten in next update.
    128         keys = self._convert_blob_keys(self.new_data_keys)
    129         self.new_data_keys = []
    130 
    131         chunk_indices = self._get_chunk_indices(len(data))
    132         logging.info('Saving file in %s chunks', len(chunk_indices))
    133 
    134         chunk_data = []
    135         for chunk_index in chunk_indices:
    136             chunk = ChunkData()
    137             chunk.index = chunk_index
    138             if keys:
    139                 chunk.reused_key = keys.pop()
    140                 chunk.entry_future = DataEntry.get_async(chunk.reused_key)
    141             else:
    142                 chunk.data_entry = DataEntry()
    143             chunk_data.append(chunk)
    144 
    145         put_futures = []
    146         for chunk in chunk_data:
    147             if chunk.entry_future:
    148                 data_entry = chunk.entry_future.get_result()
    149                 if not data_entry:
    150                     logging.warning("Found key, but no data entry: %s", chunk.reused_key)
    151                     data_entry = DataEntry()
    152                 chunk.data_entry = data_entry
    153 
    154             chunk.data_entry.data = db.Blob(data[chunk.index: chunk.index + MAX_ENTRY_LEN])
    155             put_futures.append(db.put_async(chunk.data_entry))
    156 
    157         for future in put_futures:
    158             key = None
    159             try:
    160                 key = future.get_result()
    161                 self.new_data_keys.append(key)
    162             except Exception, err:
    163                 logging.error("Failed to save data store entry: %s", err)
    164                 self.delete_data(keys)
    165                 return False
    166 
    167         if keys:
    168             self.delete_data(keys)
    169 
    170         temp_keys = self._convert_blob_keys(self.data_keys)
    171         self.data_keys = self.new_data_keys
    172         self.new_data_keys = temp_keys
    173         self.data = data
    174 
    175         return True
    176 
    177     def load_data(self):
    178         if not self.data_keys:
    179             logging.warning("No data to load.")
    180             return None
    181 
    182         data_futures = [(k, DataEntry.get_async(k)) for k in self._convert_blob_keys(self.data_keys)]
    183 
    184         data = []
    185         for key, future in data_futures:
    186             result = future.get_result()
    187             if not result:
    188                 logging.error("No data found for key: %s.", key)
    189                 return None
    190             data.append(result)
    191 
    192         self.data = "".join([d.data for d in data])
    193 
    194         return self.data
    195