Home | History | Annotate | Download | only in commands
      1 // Copyright 2015 The Weave Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "src/commands/cloud_command_proxy.h"
      6 
      7 #include <base/bind.h>
      8 #include <weave/enum_to_string.h>
      9 #include <weave/provider/task_runner.h>
     10 
     11 #include "src/commands/command_instance.h"
     12 #include "src/commands/schema_constants.h"
     13 #include "src/utils.h"
     14 
     15 namespace weave {
     16 
     17 CloudCommandProxy::CloudCommandProxy(
     18     CommandInstance* command_instance,
     19     CloudCommandUpdateInterface* cloud_command_updater,
     20     ComponentManager* component_manager,
     21     std::unique_ptr<BackoffEntry> backoff_entry,
     22     provider::TaskRunner* task_runner)
     23     : command_instance_{command_instance},
     24       cloud_command_updater_{cloud_command_updater},
     25       component_manager_{component_manager},
     26       task_runner_{task_runner},
     27       cloud_backoff_entry_{std::move(backoff_entry)} {
     28   callback_token_ = component_manager_->AddServerStateUpdatedCallback(
     29       base::Bind(&CloudCommandProxy::OnDeviceStateUpdated,
     30                  weak_ptr_factory_.GetWeakPtr()));
     31   observer_.Add(command_instance);
     32 }
     33 
     34 void CloudCommandProxy::OnErrorChanged() {
     35   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
     36   patch->Set(commands::attributes::kCommand_Error,
     37              command_instance_->GetError()
     38                  ? ErrorInfoToJson(*command_instance_->GetError()).release()
     39                  : base::Value::CreateNullValue().release());
     40   QueueCommandUpdate(std::move(patch));
     41 }
     42 
     43 void CloudCommandProxy::OnResultsChanged() {
     44   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
     45   patch->Set(commands::attributes::kCommand_Results,
     46              command_instance_->GetResults().CreateDeepCopy());
     47   QueueCommandUpdate(std::move(patch));
     48 }
     49 
     50 void CloudCommandProxy::OnStateChanged() {
     51   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
     52   patch->SetString(commands::attributes::kCommand_State,
     53                    EnumToString(command_instance_->GetState()));
     54   QueueCommandUpdate(std::move(patch));
     55 }
     56 
     57 void CloudCommandProxy::OnProgressChanged() {
     58   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
     59   patch->Set(commands::attributes::kCommand_Progress,
     60              command_instance_->GetProgress().CreateDeepCopy());
     61   QueueCommandUpdate(std::move(patch));
     62 }
     63 
     64 void CloudCommandProxy::OnCommandDestroyed() {
     65   delete this;
     66 }
     67 
     68 void CloudCommandProxy::QueueCommandUpdate(
     69     std::unique_ptr<base::DictionaryValue> patch) {
     70   ComponentManager::UpdateID id = component_manager_->GetLastStateChangeId();
     71   if (update_queue_.empty() || update_queue_.back().first != id) {
     72     // If queue is currently empty or the device state has changed since the
     73     // last patch request queued, add a new request to the queue.
     74     update_queue_.push_back(std::make_pair(id, std::move(patch)));
     75   } else {
     76     // Device state hasn't changed since the last time this command update
     77     // was queued. We can coalesce the command update patches, unless the
     78     // current request is already in flight to the server.
     79     if (update_queue_.size() == 1 && command_update_in_progress_) {
     80       // Can't update the request which is being sent to the server.
     81       // Queue a new update.
     82       update_queue_.push_back(std::make_pair(id, std::move(patch)));
     83     } else {
     84       // Coalesce the patches.
     85       update_queue_.back().second->MergeDictionary(patch.get());
     86     }
     87   }
     88   // Send out an update request to the server, if needed.
     89 
     90   // Post to accumulate more changes during the current message loop task run.
     91   task_runner_->PostDelayedTask(
     92       FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
     93                             backoff_weak_ptr_factory_.GetWeakPtr()),
     94       {});
     95 }
     96 
     97 void CloudCommandProxy::SendCommandUpdate() {
     98   if (command_update_in_progress_ || update_queue_.empty())
     99     return;
    100 
    101   // Check if we have any pending updates ready to be sent to the server.
    102   // We can only send updates for which the device state at the time the
    103   // requests have been queued were successfully propagated to the server.
    104   // That is, if the pending device state updates that we recorded while the
    105   // command update was queued haven't been acknowledged by the server, we
    106   // will hold the corresponding command updates until the related device state
    107   // has been successfully updated on the server.
    108   if (update_queue_.front().first > last_state_update_id_)
    109     return;
    110 
    111   backoff_weak_ptr_factory_.InvalidateWeakPtrs();
    112   if (cloud_backoff_entry_->ShouldRejectRequest()) {
    113     VLOG(1) << "Cloud request delayed for "
    114             << cloud_backoff_entry_->GetTimeUntilRelease()
    115             << " due to backoff policy";
    116     task_runner_->PostDelayedTask(
    117         FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
    118                               backoff_weak_ptr_factory_.GetWeakPtr()),
    119         cloud_backoff_entry_->GetTimeUntilRelease());
    120     return;
    121   }
    122 
    123   // Coalesce any pending updates that were queued prior to the current device
    124   // state known to be propagated to the server successfully.
    125   auto iter = update_queue_.begin();
    126   auto start = ++iter;
    127   while (iter != update_queue_.end()) {
    128     if (iter->first > last_state_update_id_)
    129       break;
    130     update_queue_.front().first = iter->first;
    131     update_queue_.front().second->MergeDictionary(iter->second.get());
    132     ++iter;
    133   }
    134   // Remove all the intermediate items that have been merged into the first
    135   // entry.
    136   update_queue_.erase(start, iter);
    137   command_update_in_progress_ = true;
    138   cloud_command_updater_->UpdateCommand(
    139       command_instance_->GetID(), *update_queue_.front().second,
    140       base::Bind(&CloudCommandProxy::OnUpdateCommandDone,
    141                  weak_ptr_factory_.GetWeakPtr()));
    142 }
    143 
    144 void CloudCommandProxy::ResendCommandUpdate() {
    145   command_update_in_progress_ = false;
    146   SendCommandUpdate();
    147 }
    148 
    149 void CloudCommandProxy::OnUpdateCommandDone(ErrorPtr error) {
    150   command_update_in_progress_ = false;
    151   cloud_backoff_entry_->InformOfRequest(!error);
    152   if (!error) {
    153     // Remove the succeeded update from the queue.
    154     update_queue_.pop_front();
    155   }
    156   // If we have more pending updates, send a new request to the server
    157   // immediately, if possible.
    158   SendCommandUpdate();
    159 }
    160 
    161 void CloudCommandProxy::OnDeviceStateUpdated(
    162     ComponentManager::UpdateID update_id) {
    163   last_state_update_id_ = update_id;
    164   // Try to send out any queued command updates that could be performed after
    165   // a device state is updated.
    166   SendCommandUpdate();
    167 }
    168 
    169 }  // namespace weave
    170