Home | History | Annotate | Download | only in engine
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "google_apis/gcm/engine/rmq_store.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/bind.h"
      9 #include "base/callback.h"
     10 #include "base/files/file_path.h"
     11 #include "base/logging.h"
     12 #include "base/message_loop/message_loop_proxy.h"
     13 #include "base/sequenced_task_runner.h"
     14 #include "base/stl_util.h"
     15 #include "base/strings/string_number_conversions.h"
     16 #include "base/strings/string_piece.h"
     17 #include "base/tracked_objects.h"
     18 #include "components/webdata/encryptor/encryptor.h"
     19 #include "google_apis/gcm/base/mcs_message.h"
     20 #include "google_apis/gcm/base/mcs_util.h"
     21 #include "google_apis/gcm/protocol/mcs.pb.h"
     22 #include "third_party/leveldatabase/src/include/leveldb/db.h"
     23 
     24 namespace gcm {
     25 
     26 namespace {
     27 
     28 // ---- LevelDB keys. ----
     29 // Key for this device's android id.
     30 const char kDeviceAIDKey[] = "device_aid_key";
     31 // Key for this device's android security token.
     32 const char kDeviceTokenKey[] = "device_token_key";
     33 // Lowest lexicographically ordered incoming message key.
     34 // Used for prefixing messages.
     35 const char kIncomingMsgKeyStart[] = "incoming1-";
     36 // Key guaranteed to be higher than all incoming message keys.
     37 // Used for limiting iteration.
     38 const char kIncomingMsgKeyEnd[] = "incoming2-";
     39 // Lowest lexicographically ordered outgoing message key.
     40 // Used for prefixing outgoing messages.
     41 const char kOutgoingMsgKeyStart[] = "outgoing1-";
     42 // Key guaranteed to be higher than all outgoing message keys.
     43 // Used for limiting iteration.
     44 const char kOutgoingMsgKeyEnd[] = "outgoing2-";
     45 
     46 std::string MakeIncomingKey(const std::string& persistent_id) {
     47   return kIncomingMsgKeyStart + persistent_id;
     48 }
     49 
     50 std::string MakeOutgoingKey(const std::string& persistent_id) {
     51   return kOutgoingMsgKeyStart + persistent_id;
     52 }
     53 
     54 std::string ParseOutgoingKey(const std::string& key) {
     55   return key.substr(arraysize(kOutgoingMsgKeyStart) - 1);
     56 }
     57 
     58 leveldb::Slice MakeSlice(const base::StringPiece& s) {
     59   return leveldb::Slice(s.begin(), s.size());
     60 }
     61 
     62 }  // namespace
     63 
     64 class RMQStore::Backend : public base::RefCountedThreadSafe<RMQStore::Backend> {
     65  public:
     66   Backend(const base::FilePath& path,
     67           scoped_refptr<base::SequencedTaskRunner> foreground_runner);
     68 
     69   // Blocking implementations of RMQStore methods.
     70   void Load(const LoadCallback& callback);
     71   void Destroy(const UpdateCallback& callback);
     72   void SetDeviceCredentials(uint64 device_android_id,
     73                             uint64 device_security_token,
     74                             const UpdateCallback& callback);
     75   void AddIncomingMessage(const std::string& persistent_id,
     76                           const UpdateCallback& callback);
     77   void RemoveIncomingMessages(const PersistentIdList& persistent_ids,
     78                               const UpdateCallback& callback);
     79   void AddOutgoingMessage(const std::string& persistent_id,
     80                           const MCSMessage& message,
     81                           const UpdateCallback& callback);
     82   void RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
     83                               const UpdateCallback& callback);
     84 
     85  private:
     86   friend class base::RefCountedThreadSafe<Backend>;
     87   ~Backend();
     88 
     89   bool LoadDeviceCredentials(uint64* android_id, uint64* security_token);
     90   bool LoadIncomingMessages(std::vector<std::string>* incoming_messages);
     91   bool LoadOutgoingMessages(
     92       std::map<std::string, google::protobuf::MessageLite*>* outgoing_messages);
     93 
     94   const base::FilePath path_;
     95   scoped_refptr<base::SequencedTaskRunner> foreground_task_runner_;
     96 
     97   scoped_ptr<leveldb::DB> db_;
     98 };
     99 
    100 RMQStore::Backend::Backend(
    101     const base::FilePath& path,
    102     scoped_refptr<base::SequencedTaskRunner> foreground_task_runner)
    103     : path_(path),
    104       foreground_task_runner_(foreground_task_runner) {
    105 }
    106 
    107 RMQStore::Backend::~Backend() {
    108 }
    109 
    110 void RMQStore::Backend::Load(const LoadCallback& callback) {
    111   LoadResult result;
    112 
    113   leveldb::Options options;
    114   options.create_if_missing = true;
    115   leveldb::DB* db;
    116   leveldb::Status status = leveldb::DB::Open(options,
    117                                              path_.AsUTF8Unsafe(),
    118                                              &db);
    119   if (!status.ok()) {
    120     LOG(ERROR) << "Failed to open database " << path_.value()
    121                << ": " << status.ToString();
    122     foreground_task_runner_->PostTask(FROM_HERE,
    123                                       base::Bind(callback, result));
    124     return;
    125   }
    126   db_.reset(db);
    127 
    128   if (!LoadDeviceCredentials(&result.device_android_id,
    129                              &result.device_security_token) ||
    130       !LoadIncomingMessages(&result.incoming_messages) ||
    131       !LoadOutgoingMessages(&result.outgoing_messages)) {
    132     result.device_android_id = 0;
    133     result.device_security_token = 0;
    134     result.incoming_messages.clear();
    135     STLDeleteContainerPairSecondPointers(result.outgoing_messages.begin(),
    136                                          result.outgoing_messages.end());
    137     result.outgoing_messages.clear();
    138     foreground_task_runner_->PostTask(FROM_HERE,
    139                                       base::Bind(callback, result));
    140     return;
    141   }
    142 
    143   DVLOG(1) << "Succeeded in loading " << result.incoming_messages.size()
    144           << " unacknowledged incoming messages and "
    145           << result.outgoing_messages.size()
    146           << " unacknowledged outgoing messages.";
    147   result.success = true;
    148   foreground_task_runner_->PostTask(FROM_HERE,
    149                                     base::Bind(callback, result));
    150   return;
    151 }
    152 
    153 void RMQStore::Backend::Destroy(const UpdateCallback& callback) {
    154   DVLOG(1) << "Destroying RMQ store.";
    155   const leveldb::Status s =
    156       leveldb::DestroyDB(path_.AsUTF8Unsafe(),
    157                          leveldb::Options());
    158   if (s.ok()) {
    159     foreground_task_runner_->PostTask(FROM_HERE,
    160                                       base::Bind(callback, true));
    161     return;
    162   }
    163   LOG(ERROR) << "Destroy failed.";
    164   foreground_task_runner_->PostTask(FROM_HERE,
    165                                     base::Bind(callback, false));
    166 }
    167 
    168 void RMQStore::Backend::SetDeviceCredentials(uint64 device_android_id,
    169                                              uint64 device_security_token,
    170                                              const UpdateCallback& callback) {
    171   DVLOG(1) << "Saving device credentials with AID " << device_android_id;
    172   leveldb::WriteOptions write_options;
    173   write_options.sync = true;
    174 
    175   std::string encrypted_token;
    176   Encryptor::EncryptString(base::Uint64ToString(device_security_token),
    177                            &encrypted_token);
    178   leveldb::Status s =
    179       db_->Put(write_options,
    180                MakeSlice(kDeviceAIDKey),
    181                MakeSlice(base::Uint64ToString(device_android_id)));
    182   if (s.ok()) {
    183     s = db_->Put(write_options,
    184                  MakeSlice(kDeviceTokenKey),
    185                  MakeSlice(encrypted_token));
    186   }
    187   if (s.ok()) {
    188     foreground_task_runner_->PostTask(FROM_HERE,
    189                                       base::Bind(callback, true));
    190     return;
    191   }
    192   LOG(ERROR) << "LevelDB put failed: " << s.ToString();
    193   foreground_task_runner_->PostTask(FROM_HERE,
    194                                     base::Bind(callback, false));
    195 }
    196 
    197 void RMQStore::Backend::AddIncomingMessage(const std::string& persistent_id,
    198                                            const UpdateCallback& callback) {
    199   DVLOG(1) << "Saving incoming message with id " << persistent_id;
    200   leveldb::WriteOptions write_options;
    201   write_options.sync = true;
    202 
    203   const leveldb::Status s =
    204       db_->Put(write_options,
    205                MakeSlice(MakeIncomingKey(persistent_id)),
    206                MakeSlice(persistent_id));
    207   if (s.ok()) {
    208     foreground_task_runner_->PostTask(FROM_HERE,
    209                                       base::Bind(callback, true));
    210     return;
    211   }
    212   LOG(ERROR) << "LevelDB put failed: " << s.ToString();
    213   foreground_task_runner_->PostTask(FROM_HERE,
    214                                     base::Bind(callback, false));
    215 }
    216 
    217 void RMQStore::Backend::RemoveIncomingMessages(
    218     const PersistentIdList& persistent_ids,
    219     const UpdateCallback& callback) {
    220   leveldb::WriteOptions write_options;
    221   write_options.sync = true;
    222 
    223   leveldb::Status s;
    224   for (PersistentIdList::const_iterator iter = persistent_ids.begin();
    225        iter != persistent_ids.end(); ++iter){
    226     DVLOG(1) << "Removing incoming message with id " << *iter;
    227     s = db_->Delete(write_options,
    228                     MakeSlice(MakeIncomingKey(*iter)));
    229     if (!s.ok())
    230       break;
    231   }
    232   if (s.ok()) {
    233     foreground_task_runner_->PostTask(FROM_HERE,
    234                                       base::Bind(callback, true));
    235     return;
    236   }
    237   LOG(ERROR) << "LevelDB remove failed: " << s.ToString();
    238   foreground_task_runner_->PostTask(FROM_HERE,
    239                                     base::Bind(callback, false));
    240 }
    241 
    242 void RMQStore::Backend::AddOutgoingMessage(
    243    const std::string& persistent_id,
    244    const MCSMessage& message,
    245    const UpdateCallback& callback) {
    246   DVLOG(1) << "Saving outgoing message with id " << persistent_id;
    247   leveldb::WriteOptions write_options;
    248   write_options.sync = true;
    249 
    250   std::string data = static_cast<char>(message.tag()) +
    251       message.SerializeAsString();
    252   const leveldb::Status s =
    253       db_->Put(write_options,
    254                MakeSlice(MakeOutgoingKey(persistent_id)),
    255                MakeSlice(data));
    256   if (s.ok()) {
    257     foreground_task_runner_->PostTask(FROM_HERE,
    258                                       base::Bind(callback, true));
    259     return;
    260   }
    261   LOG(ERROR) << "LevelDB put failed: " << s.ToString();
    262   foreground_task_runner_->PostTask(FROM_HERE,
    263                                     base::Bind(callback, false));
    264 
    265 }
    266 
    267 void RMQStore::Backend::RemoveOutgoingMessages(
    268     const PersistentIdList& persistent_ids,
    269     const UpdateCallback& callback) {
    270   leveldb::WriteOptions write_options;
    271   write_options.sync = true;
    272 
    273   leveldb::Status s;
    274   for (PersistentIdList::const_iterator iter = persistent_ids.begin();
    275        iter != persistent_ids.end(); ++iter){
    276     DVLOG(1) << "Removing outgoing message with id " << *iter;
    277     s = db_->Delete(write_options,
    278                     MakeSlice(MakeOutgoingKey(*iter)));
    279     if (!s.ok())
    280       break;
    281   }
    282   if (s.ok()) {
    283     foreground_task_runner_->PostTask(FROM_HERE,
    284                                       base::Bind(callback, true));
    285     return;
    286   }
    287   LOG(ERROR) << "LevelDB remove failed: " << s.ToString();
    288   foreground_task_runner_->PostTask(FROM_HERE,
    289                                     base::Bind(callback, false));
    290 }
    291 
    292 bool RMQStore::Backend::LoadDeviceCredentials(uint64* android_id,
    293                                               uint64* security_token) {
    294   leveldb::ReadOptions read_options;
    295   read_options.verify_checksums = true;
    296 
    297   std::string result;
    298   leveldb::Status s = db_->Get(read_options,
    299                                MakeSlice(kDeviceAIDKey),
    300                                &result);
    301   if (s.ok()) {
    302     if (!base::StringToUint64(result, android_id)) {
    303       LOG(ERROR) << "Failed to restore device id.";
    304       return false;
    305     }
    306     result.clear();
    307     s = db_->Get(read_options,
    308                  MakeSlice(kDeviceTokenKey),
    309                  &result);
    310   }
    311   if (s.ok()) {
    312     std::string decrypted_token;
    313     Encryptor::DecryptString(result, &decrypted_token);
    314     if (!base::StringToUint64(decrypted_token, security_token)) {
    315       LOG(ERROR) << "Failed to restore security token.";
    316       return false;
    317     }
    318     return true;
    319   }
    320 
    321   if (s.IsNotFound()) {
    322     DVLOG(1) << "No credentials found.";
    323     return true;
    324   }
    325 
    326   LOG(ERROR) << "Error reading credentials from store.";
    327   return false;
    328 }
    329 
    330 bool RMQStore::Backend::LoadIncomingMessages(
    331     std::vector<std::string>* incoming_messages) {
    332   leveldb::ReadOptions read_options;
    333   read_options.verify_checksums = true;
    334 
    335   scoped_ptr<leveldb::Iterator> iter(db_->NewIterator(read_options));
    336   for (iter->Seek(MakeSlice(kIncomingMsgKeyStart));
    337        iter->Valid() && iter->key().ToString() < kIncomingMsgKeyEnd;
    338        iter->Next()) {
    339     leveldb::Slice s = iter->value();
    340     if (s.empty()) {
    341       LOG(ERROR) << "Error reading incoming message with key "
    342                  << iter->key().ToString();
    343       return false;
    344     }
    345     DVLOG(1) << "Found incoming message with id " << s.ToString();
    346     incoming_messages->push_back(s.ToString());
    347   }
    348 
    349   return true;
    350 }
    351 
    352 bool RMQStore::Backend::LoadOutgoingMessages(
    353     std::map<std::string, google::protobuf::MessageLite*>*
    354         outgoing_messages) {
    355   leveldb::ReadOptions read_options;
    356   read_options.verify_checksums = true;
    357 
    358   scoped_ptr<leveldb::Iterator> iter(db_->NewIterator(read_options));
    359   for (iter->Seek(MakeSlice(kOutgoingMsgKeyStart));
    360        iter->Valid() && iter->key().ToString() < kOutgoingMsgKeyEnd;
    361        iter->Next()) {
    362     leveldb::Slice s = iter->value();
    363     if (s.size() <= 1) {
    364       LOG(ERROR) << "Error reading incoming message with key " << s.ToString();
    365       return false;
    366     }
    367     uint8 tag = iter->value().data()[0];
    368     std::string id = ParseOutgoingKey(iter->key().ToString());
    369     scoped_ptr<google::protobuf::MessageLite> message(
    370         BuildProtobufFromTag(tag));
    371     if (!message.get() ||
    372         !message->ParseFromString(iter->value().ToString().substr(1))) {
    373       LOG(ERROR) << "Failed to parse outgoing message with id "
    374                  << id << " and tag " << tag;
    375       return false;
    376     }
    377     DVLOG(1) << "Found outgoing message with id " << id << " of type "
    378              << base::IntToString(tag);
    379     (*outgoing_messages)[id] = message.release();
    380   }
    381 
    382   return true;
    383 }
    384 
    385 RMQStore::LoadResult::LoadResult()
    386     : success(false),
    387       device_android_id(0),
    388       device_security_token(0) {
    389 }
    390 RMQStore::LoadResult::~LoadResult() {}
    391 
    392 RMQStore::RMQStore(
    393     const base::FilePath& path,
    394     scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
    395   : backend_(new Backend(path, base::MessageLoopProxy::current())),
    396     blocking_task_runner_(blocking_task_runner) {
    397 }
    398 
    399 RMQStore::~RMQStore() {
    400 }
    401 
    402 void RMQStore::Load(const LoadCallback& callback) {
    403   blocking_task_runner_->PostTask(FROM_HERE,
    404                                   base::Bind(&RMQStore::Backend::Load,
    405                                              backend_,
    406                                              callback));
    407 }
    408 
    409 void RMQStore::Destroy(const UpdateCallback& callback) {
    410   blocking_task_runner_->PostTask(
    411       FROM_HERE,
    412       base::Bind(&RMQStore::Backend::Destroy,
    413                  backend_,
    414                  callback));
    415 }
    416 
    417 void RMQStore::SetDeviceCredentials(uint64 device_android_id,
    418                                     uint64 device_security_token,
    419                                     const UpdateCallback& callback) {
    420   blocking_task_runner_->PostTask(
    421       FROM_HERE,
    422       base::Bind(&RMQStore::Backend::SetDeviceCredentials,
    423                  backend_,
    424                  device_android_id,
    425                  device_security_token,
    426                  callback));
    427 }
    428 
    429 void RMQStore::AddIncomingMessage(const std::string& persistent_id,
    430                                   const UpdateCallback& callback) {
    431   blocking_task_runner_->PostTask(
    432       FROM_HERE,
    433       base::Bind(&RMQStore::Backend::AddIncomingMessage,
    434                  backend_,
    435                  persistent_id,
    436                  callback));
    437 }
    438 
    439 void RMQStore::RemoveIncomingMessage(const std::string& persistent_id,
    440                                      const UpdateCallback& callback) {
    441   blocking_task_runner_->PostTask(
    442       FROM_HERE,
    443       base::Bind(&RMQStore::Backend::RemoveIncomingMessages,
    444                  backend_,
    445                  PersistentIdList(1, persistent_id),
    446                  callback));
    447 }
    448 
    449 void RMQStore::RemoveIncomingMessages(const PersistentIdList& persistent_ids,
    450                                       const UpdateCallback& callback) {
    451   blocking_task_runner_->PostTask(
    452       FROM_HERE,
    453       base::Bind(&RMQStore::Backend::RemoveIncomingMessages,
    454                  backend_,
    455                  persistent_ids,
    456                  callback));
    457 }
    458 
    459 void RMQStore::AddOutgoingMessage(const std::string& persistent_id,
    460                                   const MCSMessage& message,
    461                                   const UpdateCallback& callback) {
    462   blocking_task_runner_->PostTask(
    463       FROM_HERE,
    464       base::Bind(&RMQStore::Backend::AddOutgoingMessage,
    465                  backend_,
    466                  persistent_id,
    467                  message,
    468                  callback));
    469 }
    470 
    471 void RMQStore::RemoveOutgoingMessage(const std::string& persistent_id,
    472                                      const UpdateCallback& callback) {
    473   blocking_task_runner_->PostTask(
    474       FROM_HERE,
    475       base::Bind(&RMQStore::Backend::RemoveOutgoingMessages,
    476                  backend_,
    477                  PersistentIdList(1, persistent_id),
    478                  callback));
    479 }
    480 
    481 void RMQStore::RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
    482                                       const UpdateCallback& callback) {
    483   blocking_task_runner_->PostTask(
    484       FROM_HERE,
    485       base::Bind(&RMQStore::Backend::RemoveOutgoingMessages,
    486                  backend_,
    487                  persistent_ids,
    488                  callback));
    489 }
    490 
    491 }  // namespace gcm
    492