Home | History | Annotate | Download | only in io
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 // Decodes the blocks generated by block_builder.cc.
     17 
     18 #include "tensorflow/core/lib/io/block.h"
     19 
     20 #include <algorithm>
     21 #include "tensorflow/core/lib/core/coding.h"
     22 #include "tensorflow/core/lib/core/errors.h"
     23 #include "tensorflow/core/lib/io/format.h"
     24 #include "tensorflow/core/platform/logging.h"
     25 
     26 namespace tensorflow {
     27 namespace table {
     28 
     29 inline uint32 Block::NumRestarts() const {
     30   assert(size_ >= sizeof(uint32));
     31   return core::DecodeFixed32(data_ + size_ - sizeof(uint32));
     32 }
     33 
     34 Block::Block(const BlockContents& contents)
     35     : data_(contents.data.data()),
     36       size_(contents.data.size()),
     37       owned_(contents.heap_allocated) {
     38   if (size_ < sizeof(uint32)) {
     39     size_ = 0;  // Error marker
     40   } else {
     41     size_t max_restarts_allowed = (size_ - sizeof(uint32)) / sizeof(uint32);
     42     if (NumRestarts() > max_restarts_allowed) {
     43       // The size is too small for NumRestarts()
     44       size_ = 0;
     45     } else {
     46       restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32);
     47     }
     48   }
     49 }
     50 
     51 Block::~Block() {
     52   if (owned_) {
     53     delete[] data_;
     54   }
     55 }
     56 
     57 // Helper routine: decode the next block entry starting at "p",
     58 // storing the number of shared key bytes, non_shared key bytes,
     59 // and the length of the value in "*shared", "*non_shared", and
     60 // "*value_length", respectively.  Will not dereference past "limit".
     61 //
     62 // If any errors are detected, returns NULL.  Otherwise, returns a
     63 // pointer to the key delta (just past the three decoded values).
     64 static inline const char* DecodeEntry(const char* p, const char* limit,
     65                                       uint32* shared, uint32* non_shared,
     66                                       uint32* value_length) {
     67   if (limit - p < 3) return nullptr;
     68   *shared = reinterpret_cast<const unsigned char*>(p)[0];
     69   *non_shared = reinterpret_cast<const unsigned char*>(p)[1];
     70   *value_length = reinterpret_cast<const unsigned char*>(p)[2];
     71   if ((*shared | *non_shared | *value_length) < 128) {
     72     // Fast path: all three values are encoded in one byte each
     73     p += 3;
     74   } else {
     75     if ((p = core::GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
     76     if ((p = core::GetVarint32Ptr(p, limit, non_shared)) == nullptr)
     77       return nullptr;
     78     if ((p = core::GetVarint32Ptr(p, limit, value_length)) == nullptr)
     79       return nullptr;
     80   }
     81 
     82   if (static_cast<uint32>(limit - p) < (*non_shared + *value_length)) {
     83     return nullptr;
     84   }
     85   return p;
     86 }
     87 
     88 class Block::Iter : public Iterator {
     89  private:
     90   const char* const data_;     // underlying block contents
     91   uint32 const restarts_;      // Offset of restart array (list of fixed32)
     92   uint32 const num_restarts_;  // Number of uint32 entries in restart array
     93 
     94   // current_ is offset in data_ of current entry.  >= restarts_ if !Valid
     95   uint32 current_;
     96   uint32 restart_index_;  // Index of restart block in which current_ falls
     97   string key_;
     98   StringPiece value_;
     99   Status status_;
    100 
    101   inline int Compare(const StringPiece& a, const StringPiece& b) const {
    102     return a.compare(b);
    103   }
    104 
    105   // Return the offset in data_ just past the end of the current entry.
    106   inline uint32 NextEntryOffset() const {
    107     return (value_.data() + value_.size()) - data_;
    108   }
    109 
    110   uint32 GetRestartPoint(uint32 index) {
    111     assert(index < num_restarts_);
    112     return core::DecodeFixed32(data_ + restarts_ + index * sizeof(uint32));
    113   }
    114 
    115   void SeekToRestartPoint(uint32 index) {
    116     key_.clear();
    117     restart_index_ = index;
    118     // current_ will be fixed by ParseNextKey();
    119 
    120     // ParseNextKey() starts at the end of value_, so set value_ accordingly
    121     uint32 offset = GetRestartPoint(index);
    122     value_ = StringPiece(data_ + offset, 0);
    123   }
    124 
    125  public:
    126   Iter(const char* data, uint32 restarts, uint32 num_restarts)
    127       : data_(data),
    128         restarts_(restarts),
    129         num_restarts_(num_restarts),
    130         current_(restarts_),
    131         restart_index_(num_restarts_) {
    132     assert(num_restarts_ > 0);
    133   }
    134 
    135   bool Valid() const override { return current_ < restarts_; }
    136   Status status() const override { return status_; }
    137   StringPiece key() const override {
    138     assert(Valid());
    139     return key_;
    140   }
    141   StringPiece value() const override {
    142     assert(Valid());
    143     return value_;
    144   }
    145 
    146   void Next() override {
    147     assert(Valid());
    148     ParseNextKey();
    149   }
    150 
    151   void Seek(const StringPiece& target) override {
    152     // Binary search in restart array to find the last restart point
    153     // with a key < target
    154     uint32 left = 0;
    155     uint32 right = num_restarts_ - 1;
    156     while (left < right) {
    157       uint32 mid = (left + right + 1) / 2;
    158       uint32 region_offset = GetRestartPoint(mid);
    159       uint32 shared, non_shared, value_length;
    160       const char* key_ptr =
    161           DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
    162                       &non_shared, &value_length);
    163       if (key_ptr == nullptr || (shared != 0)) {
    164         CorruptionError();
    165         return;
    166       }
    167       StringPiece mid_key(key_ptr, non_shared);
    168       if (Compare(mid_key, target) < 0) {
    169         // Key at "mid" is smaller than "target".  Therefore all
    170         // blocks before "mid" are uninteresting.
    171         left = mid;
    172       } else {
    173         // Key at "mid" is >= "target".  Therefore all blocks at or
    174         // after "mid" are uninteresting.
    175         right = mid - 1;
    176       }
    177     }
    178 
    179     // Linear search (within restart block) for first key >= target
    180     SeekToRestartPoint(left);
    181     while (true) {
    182       if (!ParseNextKey()) {
    183         return;
    184       }
    185       if (Compare(key_, target) >= 0) {
    186         return;
    187       }
    188     }
    189   }
    190 
    191   void SeekToFirst() override {
    192     SeekToRestartPoint(0);
    193     ParseNextKey();
    194   }
    195 
    196  private:
    197   void CorruptionError() {
    198     current_ = restarts_;
    199     restart_index_ = num_restarts_;
    200     status_ = errors::DataLoss("bad entry in block");
    201     key_.clear();
    202     value_ = StringPiece();
    203   }
    204 
    205   bool ParseNextKey() {
    206     current_ = NextEntryOffset();
    207     const char* p = data_ + current_;
    208     const char* limit = data_ + restarts_;  // Restarts come right after data
    209     if (p >= limit) {
    210       // No more entries to return.  Mark as invalid.
    211       current_ = restarts_;
    212       restart_index_ = num_restarts_;
    213       return false;
    214     }
    215 
    216     // Decode next entry
    217     uint32 shared, non_shared, value_length;
    218     p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
    219     if (p == nullptr || key_.size() < shared) {
    220       CorruptionError();
    221       return false;
    222     } else {
    223       key_.resize(shared);
    224       key_.append(p, non_shared);
    225       value_ = StringPiece(p + non_shared, value_length);
    226       while (restart_index_ + 1 < num_restarts_ &&
    227              GetRestartPoint(restart_index_ + 1) < current_) {
    228         ++restart_index_;
    229       }
    230       return true;
    231     }
    232   }
    233 };
    234 
    235 Iterator* Block::NewIterator() {
    236   if (size_ < sizeof(uint32)) {
    237     return NewErrorIterator(errors::DataLoss("bad block contents"));
    238   }
    239   const uint32 num_restarts = NumRestarts();
    240   if (num_restarts == 0) {
    241     return NewEmptyIterator();
    242   } else {
    243     return new Iter(data_, restart_offset_, num_restarts);
    244   }
    245 }
    246 
    247 }  // namespace table
    248 }  // namespace tensorflow
    249