1 // Copyright 2015 The Weave Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "src/commands/command_queue.h" 6 7 #include <base/bind.h> 8 #include <base/time/time.h> 9 10 namespace weave { 11 12 namespace { 13 const int kRemoveCommandDelayMin = 5; 14 15 std::string GetCommandHandlerKey(const std::string& component_path, 16 const std::string& command_name) { 17 return component_path + ":" + command_name; 18 } 19 } 20 21 CommandQueue::CommandQueue(provider::TaskRunner* task_runner, 22 base::Clock* clock) 23 : task_runner_{task_runner}, clock_{clock} {} 24 25 void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) { 26 on_command_added_.push_back(callback); 27 // Send all pre-existed commands. 28 for (const auto& command : map_) 29 callback.Run(command.second.get()); 30 } 31 32 void CommandQueue::AddCommandRemovedCallback(const CommandCallback& callback) { 33 on_command_removed_.push_back(callback); 34 } 35 36 void CommandQueue::AddCommandHandler( 37 const std::string& component_path, 38 const std::string& command_name, 39 const Device::CommandHandlerCallback& callback) { 40 if (!command_name.empty()) { 41 CHECK(default_command_callback_.is_null()) 42 << "Commands specific handler are not allowed after default one"; 43 44 for (const auto& command : map_) { 45 if (command.second->GetState() == Command::State::kQueued && 46 command.second->GetName() == command_name && 47 command.second->GetComponent() == component_path) { 48 callback.Run(command.second); 49 } 50 } 51 52 std::string key = GetCommandHandlerKey(component_path, command_name); 53 CHECK(command_callbacks_.insert(std::make_pair(key, callback)).second) 54 << command_name << " already has handler"; 55 56 } else { 57 CHECK(component_path.empty()) 58 << "Default handler must not be component-specific"; 59 for (const auto& command : map_) { 60 std::string key = GetCommandHandlerKey(command.second->GetComponent(), 61 command.second->GetName()); 62 if (command.second->GetState() == Command::State::kQueued && 63 command_callbacks_.find(key) == command_callbacks_.end()) { 64 callback.Run(command.second); 65 } 66 } 67 68 CHECK(default_command_callback_.is_null()) << "Already has default handler"; 69 default_command_callback_ = callback; 70 } 71 } 72 73 void CommandQueue::Add(std::unique_ptr<CommandInstance> instance) { 74 std::string id = instance->GetID(); 75 LOG_IF(FATAL, id.empty()) << "Command has no ID"; 76 instance->AttachToQueue(this); 77 auto pair = map_.insert(std::make_pair(id, std::move(instance))); 78 LOG_IF(FATAL, !pair.second) << "Command with ID '" << id 79 << "' is already in the queue"; 80 for (const auto& cb : on_command_added_) 81 cb.Run(pair.first->second.get()); 82 83 std::string key = GetCommandHandlerKey(pair.first->second->GetComponent(), 84 pair.first->second->GetName()); 85 auto it_handler = command_callbacks_.find(key); 86 87 if (it_handler != command_callbacks_.end()) 88 it_handler->second.Run(pair.first->second); 89 else if (!default_command_callback_.is_null()) 90 default_command_callback_.Run(pair.first->second); 91 } 92 93 void CommandQueue::RemoveLater(const std::string& id) { 94 auto p = map_.find(id); 95 if (p == map_.end()) 96 return; 97 auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin); 98 remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id)); 99 if (remove_queue_.size() == 1) { 100 // The queue was empty, this is the first command to be removed, schedule 101 // a clean-up task. 102 ScheduleCleanup(remove_delay); 103 } 104 } 105 106 bool CommandQueue::Remove(const std::string& id) { 107 auto p = map_.find(id); 108 if (p == map_.end()) 109 return false; 110 std::shared_ptr<CommandInstance> instance = p->second; 111 instance->DetachFromQueue(); 112 map_.erase(p); 113 for (const auto& cb : on_command_removed_) 114 cb.Run(instance.get()); 115 return true; 116 } 117 118 void CommandQueue::Cleanup(const base::Time& cutoff_time) { 119 while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) { 120 Remove(remove_queue_.top().second); 121 remove_queue_.pop(); 122 } 123 } 124 125 void CommandQueue::ScheduleCleanup(base::TimeDelta delay) { 126 task_runner_->PostDelayedTask( 127 FROM_HERE, 128 base::Bind(&CommandQueue::PerformScheduledCleanup, 129 weak_ptr_factory_.GetWeakPtr()), 130 delay); 131 } 132 133 void CommandQueue::PerformScheduledCleanup() { 134 base::Time now = clock_->Now(); 135 Cleanup(now); 136 if (!remove_queue_.empty()) 137 ScheduleCleanup(remove_queue_.top().first - now); 138 } 139 140 CommandInstance* CommandQueue::Find(const std::string& id) const { 141 auto p = map_.find(id); 142 return (p != map_.end()) ? p->second.get() : nullptr; 143 } 144 145 } // namespace weave 146