Home | History | Annotate | Download | only in table
      1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
      4 
      5 #include "table/merger.h"
      6 
      7 #include "leveldb/comparator.h"
      8 #include "leveldb/iterator.h"
      9 #include "table/iterator_wrapper.h"
     10 
     11 namespace leveldb {
     12 
     13 namespace {
     14 class MergingIterator : public Iterator {
     15  public:
     16   MergingIterator(const Comparator* comparator, Iterator** children, int n)
     17       : comparator_(comparator),
     18         children_(new IteratorWrapper[n]),
     19         n_(n),
     20         current_(NULL),
     21         direction_(kForward) {
     22     for (int i = 0; i < n; i++) {
     23       children_[i].Set(children[i]);
     24     }
     25   }
     26 
     27   virtual ~MergingIterator() {
     28     delete[] children_;
     29   }
     30 
     31   virtual bool Valid() const {
     32     return (current_ != NULL);
     33   }
     34 
     35   virtual void SeekToFirst() {
     36     for (int i = 0; i < n_; i++) {
     37       children_[i].SeekToFirst();
     38     }
     39     FindSmallest();
     40     direction_ = kForward;
     41   }
     42 
     43   virtual void SeekToLast() {
     44     for (int i = 0; i < n_; i++) {
     45       children_[i].SeekToLast();
     46     }
     47     FindLargest();
     48     direction_ = kReverse;
     49   }
     50 
     51   virtual void Seek(const Slice& target) {
     52     for (int i = 0; i < n_; i++) {
     53       children_[i].Seek(target);
     54     }
     55     FindSmallest();
     56     direction_ = kForward;
     57   }
     58 
     59   virtual void Next() {
     60     assert(Valid());
     61 
     62     // Ensure that all children are positioned after key().
     63     // If we are moving in the forward direction, it is already
     64     // true for all of the non-current_ children since current_ is
     65     // the smallest child and key() == current_->key().  Otherwise,
     66     // we explicitly position the non-current_ children.
     67     if (direction_ != kForward) {
     68       for (int i = 0; i < n_; i++) {
     69         IteratorWrapper* child = &children_[i];
     70         if (child != current_) {
     71           child->Seek(key());
     72           if (child->Valid() &&
     73               comparator_->Compare(key(), child->key()) == 0) {
     74             child->Next();
     75           }
     76         }
     77       }
     78       direction_ = kForward;
     79     }
     80 
     81     current_->Next();
     82     FindSmallest();
     83   }
     84 
     85   virtual void Prev() {
     86     assert(Valid());
     87 
     88     // Ensure that all children are positioned before key().
     89     // If we are moving in the reverse direction, it is already
     90     // true for all of the non-current_ children since current_ is
     91     // the largest child and key() == current_->key().  Otherwise,
     92     // we explicitly position the non-current_ children.
     93     if (direction_ != kReverse) {
     94       for (int i = 0; i < n_; i++) {
     95         IteratorWrapper* child = &children_[i];
     96         if (child != current_) {
     97           child->Seek(key());
     98           if (child->Valid()) {
     99             // Child is at first entry >= key().  Step back one to be < key()
    100             child->Prev();
    101           } else {
    102             // Child has no entries >= key().  Position at last entry.
    103             child->SeekToLast();
    104           }
    105         }
    106       }
    107       direction_ = kReverse;
    108     }
    109 
    110     current_->Prev();
    111     FindLargest();
    112   }
    113 
    114   virtual Slice key() const {
    115     assert(Valid());
    116     return current_->key();
    117   }
    118 
    119   virtual Slice value() const {
    120     assert(Valid());
    121     return current_->value();
    122   }
    123 
    124   virtual Status status() const {
    125     Status status;
    126     for (int i = 0; i < n_; i++) {
    127       status = children_[i].status();
    128       if (!status.ok()) {
    129         break;
    130       }
    131     }
    132     return status;
    133   }
    134 
    135  private:
    136   void FindSmallest();
    137   void FindLargest();
    138 
    139   // We might want to use a heap in case there are lots of children.
    140   // For now we use a simple array since we expect a very small number
    141   // of children in leveldb.
    142   const Comparator* comparator_;
    143   IteratorWrapper* children_;
    144   int n_;
    145   IteratorWrapper* current_;
    146 
    147   // Which direction is the iterator moving?
    148   enum Direction {
    149     kForward,
    150     kReverse
    151   };
    152   Direction direction_;
    153 };
    154 
    155 void MergingIterator::FindSmallest() {
    156   IteratorWrapper* smallest = NULL;
    157   for (int i = 0; i < n_; i++) {
    158     IteratorWrapper* child = &children_[i];
    159     if (child->Valid()) {
    160       if (smallest == NULL) {
    161         smallest = child;
    162       } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
    163         smallest = child;
    164       }
    165     }
    166   }
    167   current_ = smallest;
    168 }
    169 
    170 void MergingIterator::FindLargest() {
    171   IteratorWrapper* largest = NULL;
    172   for (int i = n_-1; i >= 0; i--) {
    173     IteratorWrapper* child = &children_[i];
    174     if (child->Valid()) {
    175       if (largest == NULL) {
    176         largest = child;
    177       } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
    178         largest = child;
    179       }
    180     }
    181   }
    182   current_ = largest;
    183 }
    184 }  // namespace
    185 
    186 Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
    187   assert(n >= 0);
    188   if (n == 0) {
    189     return NewEmptyIterator();
    190   } else if (n == 1) {
    191     return list[0];
    192   } else {
    193     return new MergingIterator(cmp, list, n);
    194   }
    195 }
    196 
    197 }  // namespace leveldb
    198