Home | History | Annotate | Download | only in commands
      1 // Copyright 2015 The Weave Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "src/commands/command_queue.h"
      6 
      7 #include <base/bind.h>
      8 #include <base/time/time.h>
      9 
     10 namespace weave {
     11 
     12 namespace {
     13 const int kRemoveCommandDelayMin = 5;
     14 
     15 std::string GetCommandHandlerKey(const std::string& component_path,
     16                                  const std::string& command_name) {
     17   return component_path + ":" + command_name;
     18 }
     19 }
     20 
     21 CommandQueue::CommandQueue(provider::TaskRunner* task_runner,
     22                            base::Clock* clock)
     23     : task_runner_{task_runner}, clock_{clock} {}
     24 
     25 void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) {
     26   on_command_added_.push_back(callback);
     27   // Send all pre-existed commands.
     28   for (const auto& command : map_)
     29     callback.Run(command.second.get());
     30 }
     31 
     32 void CommandQueue::AddCommandRemovedCallback(const CommandCallback& callback) {
     33   on_command_removed_.push_back(callback);
     34 }
     35 
     36 void CommandQueue::AddCommandHandler(
     37     const std::string& component_path,
     38     const std::string& command_name,
     39     const Device::CommandHandlerCallback& callback) {
     40   if (!command_name.empty()) {
     41     CHECK(default_command_callback_.is_null())
     42         << "Commands specific handler are not allowed after default one";
     43 
     44     for (const auto& command : map_) {
     45       if (command.second->GetState() == Command::State::kQueued &&
     46           command.second->GetName() == command_name &&
     47           command.second->GetComponent() == component_path) {
     48         callback.Run(command.second);
     49       }
     50     }
     51 
     52     std::string key = GetCommandHandlerKey(component_path, command_name);
     53     CHECK(command_callbacks_.insert(std::make_pair(key, callback)).second)
     54         << command_name << " already has handler";
     55 
     56   } else {
     57     CHECK(component_path.empty())
     58         << "Default handler must not be component-specific";
     59     for (const auto& command : map_) {
     60       std::string key = GetCommandHandlerKey(command.second->GetComponent(),
     61                                              command.second->GetName());
     62       if (command.second->GetState() == Command::State::kQueued &&
     63           command_callbacks_.find(key) == command_callbacks_.end()) {
     64         callback.Run(command.second);
     65       }
     66     }
     67 
     68     CHECK(default_command_callback_.is_null()) << "Already has default handler";
     69     default_command_callback_ = callback;
     70   }
     71 }
     72 
     73 void CommandQueue::Add(std::unique_ptr<CommandInstance> instance) {
     74   std::string id = instance->GetID();
     75   LOG_IF(FATAL, id.empty()) << "Command has no ID";
     76   instance->AttachToQueue(this);
     77   auto pair = map_.insert(std::make_pair(id, std::move(instance)));
     78   LOG_IF(FATAL, !pair.second) << "Command with ID '" << id
     79                               << "' is already in the queue";
     80   for (const auto& cb : on_command_added_)
     81     cb.Run(pair.first->second.get());
     82 
     83   std::string key = GetCommandHandlerKey(pair.first->second->GetComponent(),
     84                                          pair.first->second->GetName());
     85   auto it_handler = command_callbacks_.find(key);
     86 
     87   if (it_handler != command_callbacks_.end())
     88     it_handler->second.Run(pair.first->second);
     89   else if (!default_command_callback_.is_null())
     90     default_command_callback_.Run(pair.first->second);
     91 }
     92 
     93 void CommandQueue::RemoveLater(const std::string& id) {
     94   auto p = map_.find(id);
     95   if (p == map_.end())
     96     return;
     97   auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin);
     98   remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id));
     99   if (remove_queue_.size() == 1) {
    100     // The queue was empty, this is the first command to be removed, schedule
    101     // a clean-up task.
    102     ScheduleCleanup(remove_delay);
    103   }
    104 }
    105 
    106 bool CommandQueue::Remove(const std::string& id) {
    107   auto p = map_.find(id);
    108   if (p == map_.end())
    109     return false;
    110   std::shared_ptr<CommandInstance> instance = p->second;
    111   instance->DetachFromQueue();
    112   map_.erase(p);
    113   for (const auto& cb : on_command_removed_)
    114     cb.Run(instance.get());
    115   return true;
    116 }
    117 
    118 void CommandQueue::Cleanup(const base::Time& cutoff_time) {
    119   while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) {
    120     Remove(remove_queue_.top().second);
    121     remove_queue_.pop();
    122   }
    123 }
    124 
    125 void CommandQueue::ScheduleCleanup(base::TimeDelta delay) {
    126   task_runner_->PostDelayedTask(
    127       FROM_HERE,
    128       base::Bind(&CommandQueue::PerformScheduledCleanup,
    129                  weak_ptr_factory_.GetWeakPtr()),
    130       delay);
    131 }
    132 
    133 void CommandQueue::PerformScheduledCleanup() {
    134   base::Time now = clock_->Now();
    135   Cleanup(now);
    136   if (!remove_queue_.empty())
    137     ScheduleCleanup(remove_queue_.top().first - now);
    138 }
    139 
    140 CommandInstance* CommandQueue::Find(const std::string& id) const {
    141   auto p = map_.find(id);
    142   return (p != map_.end()) ? p->second.get() : nullptr;
    143 }
    144 
    145 }  // namespace weave
    146