1 // Copyright 2015 The Weave Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "src/commands/cloud_command_proxy.h" 6 7 #include <base/bind.h> 8 #include <weave/enum_to_string.h> 9 #include <weave/provider/task_runner.h> 10 11 #include "src/commands/command_instance.h" 12 #include "src/commands/schema_constants.h" 13 #include "src/utils.h" 14 15 namespace weave { 16 17 CloudCommandProxy::CloudCommandProxy( 18 CommandInstance* command_instance, 19 CloudCommandUpdateInterface* cloud_command_updater, 20 ComponentManager* component_manager, 21 std::unique_ptr<BackoffEntry> backoff_entry, 22 provider::TaskRunner* task_runner) 23 : command_instance_{command_instance}, 24 cloud_command_updater_{cloud_command_updater}, 25 component_manager_{component_manager}, 26 task_runner_{task_runner}, 27 cloud_backoff_entry_{std::move(backoff_entry)} { 28 callback_token_ = component_manager_->AddServerStateUpdatedCallback( 29 base::Bind(&CloudCommandProxy::OnDeviceStateUpdated, 30 weak_ptr_factory_.GetWeakPtr())); 31 observer_.Add(command_instance); 32 } 33 34 void CloudCommandProxy::OnErrorChanged() { 35 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue}; 36 patch->Set(commands::attributes::kCommand_Error, 37 command_instance_->GetError() 38 ? ErrorInfoToJson(*command_instance_->GetError()).release() 39 : base::Value::CreateNullValue().release()); 40 QueueCommandUpdate(std::move(patch)); 41 } 42 43 void CloudCommandProxy::OnResultsChanged() { 44 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue}; 45 patch->Set(commands::attributes::kCommand_Results, 46 command_instance_->GetResults().CreateDeepCopy()); 47 QueueCommandUpdate(std::move(patch)); 48 } 49 50 void CloudCommandProxy::OnStateChanged() { 51 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue}; 52 patch->SetString(commands::attributes::kCommand_State, 53 EnumToString(command_instance_->GetState())); 54 QueueCommandUpdate(std::move(patch)); 55 } 56 57 void CloudCommandProxy::OnProgressChanged() { 58 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue}; 59 patch->Set(commands::attributes::kCommand_Progress, 60 command_instance_->GetProgress().CreateDeepCopy()); 61 QueueCommandUpdate(std::move(patch)); 62 } 63 64 void CloudCommandProxy::OnCommandDestroyed() { 65 delete this; 66 } 67 68 void CloudCommandProxy::QueueCommandUpdate( 69 std::unique_ptr<base::DictionaryValue> patch) { 70 ComponentManager::UpdateID id = component_manager_->GetLastStateChangeId(); 71 if (update_queue_.empty() || update_queue_.back().first != id) { 72 // If queue is currently empty or the device state has changed since the 73 // last patch request queued, add a new request to the queue. 74 update_queue_.push_back(std::make_pair(id, std::move(patch))); 75 } else { 76 // Device state hasn't changed since the last time this command update 77 // was queued. We can coalesce the command update patches, unless the 78 // current request is already in flight to the server. 79 if (update_queue_.size() == 1 && command_update_in_progress_) { 80 // Can't update the request which is being sent to the server. 81 // Queue a new update. 82 update_queue_.push_back(std::make_pair(id, std::move(patch))); 83 } else { 84 // Coalesce the patches. 85 update_queue_.back().second->MergeDictionary(patch.get()); 86 } 87 } 88 // Send out an update request to the server, if needed. 89 90 // Post to accumulate more changes during the current message loop task run. 91 task_runner_->PostDelayedTask( 92 FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate, 93 backoff_weak_ptr_factory_.GetWeakPtr()), 94 {}); 95 } 96 97 void CloudCommandProxy::SendCommandUpdate() { 98 if (command_update_in_progress_ || update_queue_.empty()) 99 return; 100 101 // Check if we have any pending updates ready to be sent to the server. 102 // We can only send updates for which the device state at the time the 103 // requests have been queued were successfully propagated to the server. 104 // That is, if the pending device state updates that we recorded while the 105 // command update was queued haven't been acknowledged by the server, we 106 // will hold the corresponding command updates until the related device state 107 // has been successfully updated on the server. 108 if (update_queue_.front().first > last_state_update_id_) 109 return; 110 111 backoff_weak_ptr_factory_.InvalidateWeakPtrs(); 112 if (cloud_backoff_entry_->ShouldRejectRequest()) { 113 VLOG(1) << "Cloud request delayed for " 114 << cloud_backoff_entry_->GetTimeUntilRelease() 115 << " due to backoff policy"; 116 task_runner_->PostDelayedTask( 117 FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate, 118 backoff_weak_ptr_factory_.GetWeakPtr()), 119 cloud_backoff_entry_->GetTimeUntilRelease()); 120 return; 121 } 122 123 // Coalesce any pending updates that were queued prior to the current device 124 // state known to be propagated to the server successfully. 125 auto iter = update_queue_.begin(); 126 auto start = ++iter; 127 while (iter != update_queue_.end()) { 128 if (iter->first > last_state_update_id_) 129 break; 130 update_queue_.front().first = iter->first; 131 update_queue_.front().second->MergeDictionary(iter->second.get()); 132 ++iter; 133 } 134 // Remove all the intermediate items that have been merged into the first 135 // entry. 136 update_queue_.erase(start, iter); 137 command_update_in_progress_ = true; 138 cloud_command_updater_->UpdateCommand( 139 command_instance_->GetID(), *update_queue_.front().second, 140 base::Bind(&CloudCommandProxy::OnUpdateCommandDone, 141 weak_ptr_factory_.GetWeakPtr())); 142 } 143 144 void CloudCommandProxy::ResendCommandUpdate() { 145 command_update_in_progress_ = false; 146 SendCommandUpdate(); 147 } 148 149 void CloudCommandProxy::OnUpdateCommandDone(ErrorPtr error) { 150 command_update_in_progress_ = false; 151 cloud_backoff_entry_->InformOfRequest(!error); 152 if (!error) { 153 // Remove the succeeded update from the queue. 154 update_queue_.pop_front(); 155 } 156 // If we have more pending updates, send a new request to the server 157 // immediately, if possible. 158 SendCommandUpdate(); 159 } 160 161 void CloudCommandProxy::OnDeviceStateUpdated( 162 ComponentManager::UpdateID update_id) { 163 last_state_update_id_ = update_id; 164 // Try to send out any queued command updates that could be performed after 165 // a device state is updated. 166 SendCommandUpdate(); 167 } 168 169 } // namespace weave 170