Home | History | Annotate | Download | only in leveldb_proto
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef COMPONENTS_LEVELDB_PROTO_PROTO_DATABASE_IMPL_H_
      6 #define COMPONENTS_LEVELDB_PROTO_PROTO_DATABASE_IMPL_H_
      7 
      8 #include <string>
      9 #include <vector>
     10 
     11 #include "base/bind.h"
     12 #include "base/files/file_path.h"
     13 #include "base/memory/scoped_ptr.h"
     14 #include "base/message_loop/message_loop.h"
     15 #include "base/sequenced_task_runner.h"
     16 #include "base/strings/string_util.h"
     17 #include "base/threading/sequenced_worker_pool.h"
     18 #include "base/threading/thread_checker.h"
     19 #include "components/leveldb_proto/leveldb_database.h"
     20 #include "components/leveldb_proto/proto_database.h"
     21 
     22 namespace leveldb_proto {
     23 
     24 typedef std::vector<std::pair<std::string, std::string> > KeyValueVector;
     25 typedef std::vector<std::string> KeyVector;
     26 
     27 // When the ProtoDatabaseImpl instance is deleted, in-progress asynchronous
     28 // operations will be completed and the corresponding callbacks will be called.
     29 // Construction/calls/destruction should all happen on the same thread.
     30 template <typename T>
     31 class ProtoDatabaseImpl : public ProtoDatabase<T> {
     32  public:
     33   // All blocking calls/disk access will happen on the provided |task_runner|.
     34   explicit ProtoDatabaseImpl(
     35       scoped_refptr<base::SequencedTaskRunner> task_runner);
     36 
     37   virtual ~ProtoDatabaseImpl();
     38 
     39   // ProtoDatabase implementation.
     40   // TODO(cjhopman): Perhaps Init() shouldn't be exposed to users and not just
     41   //     part of the constructor
     42   virtual void Init(const base::FilePath& database_dir,
     43                     typename ProtoDatabase<T>::InitCallback callback) OVERRIDE;
     44   virtual void UpdateEntries(
     45       scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save,
     46       scoped_ptr<KeyVector> keys_to_remove,
     47       typename ProtoDatabase<T>::UpdateCallback callback) OVERRIDE;
     48   virtual void LoadEntries(
     49       typename ProtoDatabase<T>::LoadCallback callback) OVERRIDE;
     50 
     51   // Allow callers to provide their own Database implementation.
     52   void InitWithDatabase(scoped_ptr<LevelDB> database,
     53                         const base::FilePath& database_dir,
     54                         typename ProtoDatabase<T>::InitCallback callback);
     55 
     56  private:
     57   base::ThreadChecker thread_checker_;
     58 
     59   // Used to run blocking tasks in-order.
     60   scoped_refptr<base::SequencedTaskRunner> task_runner_;
     61 
     62   scoped_ptr<LevelDB> db_;
     63 
     64   DISALLOW_COPY_AND_ASSIGN(ProtoDatabaseImpl);
     65 };
     66 
     67 namespace {
     68 
     69 template <typename T>
     70 void RunInitCallback(typename ProtoDatabase<T>::InitCallback callback,
     71                      const bool* success) {
     72   callback.Run(*success);
     73 }
     74 
     75 template <typename T>
     76 void RunUpdateCallback(typename ProtoDatabase<T>::UpdateCallback callback,
     77                        const bool* success) {
     78   callback.Run(*success);
     79 }
     80 
     81 template <typename T>
     82 void RunLoadCallback(typename ProtoDatabase<T>::LoadCallback callback,
     83                      const bool* success, scoped_ptr<std::vector<T> > entries) {
     84   callback.Run(*success, entries.Pass());
     85 }
     86 
     87 void InitFromTaskRunner(LevelDB* database, const base::FilePath& database_dir,
     88                         bool* success) {
     89   DCHECK(success);
     90 
     91   // TODO(cjhopman): Histogram for database size.
     92   *success = database->Init(database_dir);
     93 }
     94 
     95 template <typename T>
     96 void UpdateEntriesFromTaskRunner(
     97     LevelDB* database,
     98     scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save,
     99     scoped_ptr<KeyVector> keys_to_remove, bool* success) {
    100   DCHECK(success);
    101   // Serialize the values from Proto to string before passing on to database.
    102   KeyValueVector pairs_to_save;
    103   for (typename ProtoDatabase<T>::KeyEntryVector::iterator it =
    104            entries_to_save->begin();
    105        it != entries_to_save->end(); ++it) {
    106     pairs_to_save.push_back(
    107         std::make_pair(it->first, it->second.SerializeAsString()));
    108   }
    109   *success = database->Save(pairs_to_save, *keys_to_remove);
    110 }
    111 
    112 template <typename T>
    113 void LoadEntriesFromTaskRunner(LevelDB* database, std::vector<T>* entries,
    114                                bool* success) {
    115   DCHECK(success);
    116   DCHECK(entries);
    117 
    118   entries->clear();
    119   std::vector<std::string> loaded_entries;
    120   *success = database->Load(&loaded_entries);
    121   for (std::vector<std::string>::iterator it = loaded_entries.begin();
    122        it != loaded_entries.end(); ++it) {
    123     T entry;
    124     if (!entry.ParseFromString(*it)) {
    125       DLOG(WARNING) << "Unable to parse leveldb_proto entry " << *it;
    126       // TODO(cjhopman): Decide what to do about un-parseable entries.
    127     }
    128     entries->push_back(entry);
    129   }
    130 }
    131 
    132 }  // namespace
    133 
    134 template <typename T>
    135 ProtoDatabaseImpl<T>::ProtoDatabaseImpl(
    136     scoped_refptr<base::SequencedTaskRunner> task_runner)
    137     : task_runner_(task_runner) {}
    138 
    139 template <typename T>
    140 ProtoDatabaseImpl<T>::~ProtoDatabaseImpl() {
    141   DCHECK(thread_checker_.CalledOnValidThread());
    142   if (!task_runner_->DeleteSoon(FROM_HERE, db_.release())) {
    143     DLOG(WARNING) << "DOM distiller database will not be deleted.";
    144   }
    145 }
    146 
    147 template <typename T>
    148 void ProtoDatabaseImpl<T>::Init(
    149     const base::FilePath& database_dir,
    150     typename ProtoDatabase<T>::InitCallback callback) {
    151   DCHECK(thread_checker_.CalledOnValidThread());
    152   InitWithDatabase(scoped_ptr<LevelDB>(new LevelDB()), database_dir, callback);
    153 }
    154 
    155 template <typename T>
    156 void ProtoDatabaseImpl<T>::InitWithDatabase(
    157     scoped_ptr<LevelDB> database, const base::FilePath& database_dir,
    158     typename ProtoDatabase<T>::InitCallback callback) {
    159   DCHECK(thread_checker_.CalledOnValidThread());
    160   DCHECK(!db_);
    161   DCHECK(database);
    162   db_.reset(database.release());
    163   bool* success = new bool(false);
    164   task_runner_->PostTaskAndReply(
    165       FROM_HERE, base::Bind(InitFromTaskRunner, base::Unretained(db_.get()),
    166                             database_dir, success),
    167       base::Bind(RunInitCallback<T>, callback, base::Owned(success)));
    168 }
    169 
    170 template <typename T>
    171 void ProtoDatabaseImpl<T>::UpdateEntries(
    172     scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save,
    173     scoped_ptr<KeyVector> keys_to_remove,
    174     typename ProtoDatabase<T>::UpdateCallback callback) {
    175   DCHECK(thread_checker_.CalledOnValidThread());
    176   bool* success = new bool(false);
    177   task_runner_->PostTaskAndReply(
    178       FROM_HERE,
    179       base::Bind(UpdateEntriesFromTaskRunner<T>, base::Unretained(db_.get()),
    180                  base::Passed(&entries_to_save), base::Passed(&keys_to_remove),
    181                  success),
    182       base::Bind(RunUpdateCallback<T>, callback, base::Owned(success)));
    183 }
    184 
    185 template <typename T>
    186 void ProtoDatabaseImpl<T>::LoadEntries(
    187     typename ProtoDatabase<T>::LoadCallback callback) {
    188   DCHECK(thread_checker_.CalledOnValidThread());
    189   bool* success = new bool(false);
    190 
    191   scoped_ptr<std::vector<T> > entries(new std::vector<T>());
    192   // Get this pointer before entries is base::Passed() so we can use it below.
    193   std::vector<T>* entries_ptr = entries.get();
    194 
    195   task_runner_->PostTaskAndReply(
    196       FROM_HERE, base::Bind(LoadEntriesFromTaskRunner<T>,
    197                             base::Unretained(db_.get()), entries_ptr, success),
    198       base::Bind(RunLoadCallback<T>, callback, base::Owned(success),
    199                  base::Passed(&entries)));
    200 }
    201 
    202 }  // namespace leveldb_proto
    203 
    204 #endif  // COMPONENTS_LEVELDB_PROTO_PROTO_DATABASE_IMPL_H_
    205