Home | History | Annotate | Download | only in src
      1 /*
      2  * Copyright 2016 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #define LOG_TAG "async_manager"
     18 
     19 #include "async_manager.h"
     20 
     21 #include "osi/include/log.h"
     22 
     23 #include <algorithm>
     24 #include <atomic>
     25 #include <condition_variable>
     26 #include <mutex>
     27 #include <thread>
     28 #include <vector>
     29 #include "fcntl.h"
     30 #include "sys/select.h"
     31 #include "unistd.h"
     32 
     33 namespace test_vendor_lib {
     34 // Implementation of AsyncManager is divided between two classes, three if
     35 // AsyncManager itself is taken into account, but its only responsability
     36 // besides being a proxy for the other two classes is to provide a global
     37 // synchronization mechanism for callbacks and client code to use.
     38 
     39 // The watching of file descriptors is done through AsyncFdWatcher. Several
     40 // objects of this class may coexist simultaneosly as they share no state.
     41 // After construction of this objects nothing happens beyond some very simple
     42 // member initialization. When the first FD is set up for watching the object
     43 // starts a new thread which watches the given (and later provided) FDs using
     44 // select() inside a loop. A special FD (a pipe) is also watched which is
     45 // used to notify the thread of internal changes on the object state (like
     46 // the addition of new FDs to watch on). Every access to internal state is
     47 // synchronized using a single internal mutex. The thread is only stopped on
     48 // destruction of the object, by modifying a flag, which is the only member
     49 // variable accessed without acquiring the lock (because the notification to
     50 // the thread is done later by writing to a pipe which means the thread will
     51 // be notified regardless of what phase of the loop it is in that moment)
     52 
     53 // The scheduling of asynchronous tasks, periodic or not, is handled by the
     54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
     55 // state between different instances so it is safe to use several objects of
     56 // this class, also nothing interesting happens upon construction, but only
     57 // after a Task has been scheduled and access to internal state is synchronized
     58 // using a single internal mutex. When the first task is scheduled a thread
     59 // is started which monitors a queue of tasks. The queue is peeked to see
     60 // when the next task should be carried out and then the thread performs a
     61 // (absolute) timed wait on a condition variable. The wait ends because of a
     62 // time out or a notify on the cond var, the former means a task is due
     63 // for execution while the later means there has been a change in internal
     64 // state, like a task has been scheduled/canceled or the flag to stop has
     65 // been set. Setting and querying the stop flag or modifying the task queue
     66 // and subsequent notification on the cond var is done atomically (e.g while
     67 // holding the lock on the internal mutex) to ensure that the thread never
     68 // misses the notification, since notifying a cond var is not persistent as
     69 // writing on a pipe (if not done this way, the thread could query the
     70 // stopping flag and be put aside by the OS scheduler right after, then the
     71 // 'stop thread' procedure could run, setting the flag, notifying a cond
     72 // var that no one is waiting on and joining the thread, the thread then
     73 // resumes execution believing that it needs to continue and waits on the
     74 // cond var possibly forever if there are no tasks scheduled, efectively
     75 // causing a deadlock).
     76 
     77 // This number also states the maximum number of scheduled tasks we can handle
     78 // at a given time
     79 static const uint16_t kMaxTaskId =
     80     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
     81 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
     82   return (id == kMaxTaskId) ? 1 : id + 1;
     83 }
     84 // The buffer is only 10 bytes because the expected number of bytes
     85 // written on this socket is 1. It is possible that the thread is notified
     86 // more than once but highly unlikely, so a buffer of size 10 seems enough
     87 // and the reads are performed inside a while just in case it isn't. From
     88 // the thread routine's point of view it is the same to have been notified
     89 // just once or 100 times so it just tries to consume the entire buffer.
     90 // In the cases where an interrupt would cause read to return without
     91 // having read everything that was available a new iteration of the thread
     92 // loop will bring execution to this point almost immediately, so there is
     93 // no need to treat that case.
     94 static const int kNotificationBufferSize = 10;
     95 
     96 // Async File Descriptor Watcher Implementation:
     97 class AsyncManager::AsyncFdWatcher {
     98  public:
     99   int WatchFdForNonBlockingReads(
    100       int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
    101     // add file descriptor and callback
    102     {
    103       std::unique_lock<std::mutex> guard(internal_mutex_);
    104       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
    105     }
    106 
    107     // start the thread if not started yet
    108     int started = tryStartThread();
    109     if (started != 0) {
    110       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
    111       return started;
    112     }
    113 
    114     // notify the thread so that it knows of the new FD
    115     notifyThread();
    116 
    117     return 0;
    118   }
    119 
    120   void StopWatchingFileDescriptor(int file_descriptor) {
    121     std::unique_lock<std::mutex> guard(internal_mutex_);
    122     watched_shared_fds_.erase(file_descriptor);
    123   }
    124 
    125   AsyncFdWatcher() = default;
    126 
    127   ~AsyncFdWatcher() = default;
    128 
    129   int stopThread() {
    130     if (!std::atomic_exchange(&running_, false)) {
    131       return 0;  // if not running already
    132     }
    133 
    134     notifyThread();
    135 
    136     if (std::this_thread::get_id() != thread_.get_id()) {
    137       thread_.join();
    138     } else {
    139       LOG_WARN(LOG_TAG,
    140                "%s: Starting thread stop from inside the reading thread itself",
    141                __func__);
    142     }
    143 
    144     {
    145       std::unique_lock<std::mutex> guard(internal_mutex_);
    146       watched_shared_fds_.clear();
    147     }
    148 
    149     return 0;
    150   }
    151 
    152  private:
    153   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
    154   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
    155 
    156   // Make sure to call this with at least one file descriptor ready to be
    157   // watched upon or the thread routine will return immediately
    158   int tryStartThread() {
    159     if (std::atomic_exchange(&running_, true)) {
    160       return 0;  // if already running
    161     }
    162     // set up the communication channel
    163     int pipe_fds[2];
    164     if (pipe2(pipe_fds, O_NONBLOCK)) {
    165       LOG_ERROR(LOG_TAG,
    166                 "%s:Unable to establish a communication channel to the reading "
    167                 "thread",
    168                 __func__);
    169       return -1;
    170     }
    171     notification_listen_fd_ = pipe_fds[0];
    172     notification_write_fd_ = pipe_fds[1];
    173 
    174     thread_ = std::thread([this]() { ThreadRoutine(); });
    175     if (!thread_.joinable()) {
    176       LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__);
    177       return -1;
    178     }
    179     return 0;
    180   }
    181 
    182   int notifyThread() {
    183     char buffer = '0';
    184     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
    185       LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread",
    186                 __func__);
    187       return -1;
    188     }
    189     return 0;
    190   }
    191 
    192   int setUpFileDescriptorSet(fd_set& read_fds) {
    193     // add comm channel to the set
    194     FD_SET(notification_listen_fd_, &read_fds);
    195     int nfds = notification_listen_fd_;
    196 
    197     // add watched FDs to the set
    198     {
    199       std::unique_lock<std::mutex> guard(internal_mutex_);
    200       for (auto& fdp : watched_shared_fds_) {
    201         FD_SET(fdp.first, &read_fds);
    202         nfds = std::max(fdp.first, nfds);
    203       }
    204     }
    205     return nfds;
    206   }
    207 
    208   // check the comm channel and read everything there
    209   bool consumeThreadNotifications(fd_set& read_fds) {
    210     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
    211       char buffer[kNotificationBufferSize];
    212       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
    213                                      kNotificationBufferSize)) ==
    214              kNotificationBufferSize) {
    215       }
    216       return true;
    217     }
    218     return false;
    219   }
    220 
    221   // check all file descriptors and call callbacks if necesary
    222   void runAppropriateCallbacks(fd_set& read_fds) {
    223     // not a good idea to call a callback while holding the FD lock,
    224     // nor to release the lock while traversing the map
    225     std::vector<decltype(watched_shared_fds_)::value_type> fds;
    226     {
    227       std::unique_lock<std::mutex> guard(internal_mutex_);
    228       for (auto& fdc : watched_shared_fds_) {
    229         if (FD_ISSET(fdc.first, &read_fds)) {
    230           fds.push_back(fdc);
    231         }
    232       }
    233     }
    234     for (auto& p : fds) {
    235       p.second(p.first);
    236     }
    237   }
    238 
    239   void ThreadRoutine() {
    240     while (running_) {
    241       fd_set read_fds;
    242       FD_ZERO(&read_fds);
    243       int nfds = setUpFileDescriptorSet(read_fds);
    244 
    245       // wait until there is data available to read on some FD
    246       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
    247       if (retval <= 0) {  // there was some error or a timeout
    248         LOG_ERROR(LOG_TAG,
    249                   "%s: There was an error while waiting for data on the file "
    250                   "descriptors",
    251                   __func__);
    252         continue;
    253       }
    254 
    255       consumeThreadNotifications(read_fds);
    256 
    257       // Do not read if there was a call to stop running
    258       if (!running_) {
    259         break;
    260       }
    261 
    262       runAppropriateCallbacks(read_fds);
    263     }
    264   }
    265 
    266   std::atomic_bool running_{false};
    267   std::thread thread_;
    268   std::mutex internal_mutex_;
    269 
    270   std::map<int, ReadCallback> watched_shared_fds_;
    271 
    272   // A pair of FD to send information to the reading thread
    273   int notification_listen_fd_;
    274   int notification_write_fd_;
    275 };
    276 
    277 // Async task manager implementation
    278 class AsyncManager::AsyncTaskManager {
    279  public:
    280   AsyncTaskId ExecAsync(std::chrono::milliseconds delay,
    281                         const TaskCallback& callback) {
    282     return scheduleTask(std::make_shared<Task>(
    283         std::chrono::steady_clock::now() + delay, callback));
    284   }
    285 
    286   AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay,
    287                                     std::chrono::milliseconds period,
    288                                     const TaskCallback& callback) {
    289     return scheduleTask(std::make_shared<Task>(
    290         std::chrono::steady_clock::now() + delay, period, callback));
    291   }
    292 
    293   bool CancelAsyncTask(AsyncTaskId async_task_id) {
    294     // remove task from queue (and task id asociation) while holding lock
    295     std::unique_lock<std::mutex> guard(internal_mutex_);
    296     if (tasks_by_id.count(async_task_id) == 0) {
    297       return false;
    298     }
    299     task_queue_.erase(tasks_by_id[async_task_id]);
    300     tasks_by_id.erase(async_task_id);
    301     return true;
    302   }
    303 
    304   AsyncTaskManager() = default;
    305 
    306   ~AsyncTaskManager() = default;
    307 
    308   int stopThread() {
    309     {
    310       std::unique_lock<std::mutex> guard(internal_mutex_);
    311       tasks_by_id.clear();
    312       task_queue_.clear();
    313       if (!running_) {
    314         return 0;
    315       }
    316       running_ = false;
    317       // notify the thread
    318       internal_cond_var_.notify_one();
    319     }  // release the lock before joining a thread that is likely waiting for it
    320     if (std::this_thread::get_id() != thread_.get_id()) {
    321       thread_.join();
    322     } else {
    323       LOG_WARN(LOG_TAG,
    324                "%s: Starting thread stop from inside the task thread itself",
    325                __func__);
    326     }
    327     return 0;
    328   }
    329 
    330  private:
    331   // Holds the data for each task
    332   class Task {
    333    public:
    334     Task(std::chrono::steady_clock::time_point time,
    335          std::chrono::milliseconds period, const TaskCallback& callback)
    336         : time(time),
    337           periodic(true),
    338           period(period),
    339           callback(callback),
    340           task_id(kInvalidTaskId) {}
    341     Task(std::chrono::steady_clock::time_point time,
    342          const TaskCallback& callback)
    343         : time(time),
    344           periodic(false),
    345           callback(callback),
    346           task_id(kInvalidTaskId) {}
    347 
    348     // Operators needed to be in a collection
    349     bool operator<(const Task& another) const {
    350       return std::make_pair(time, task_id) <
    351              std::make_pair(another.time, another.task_id);
    352     }
    353 
    354     bool isPeriodic() const { return periodic; }
    355 
    356     // These fields should no longer be public if the class ever becomes
    357     // public or gets more complex
    358     std::chrono::steady_clock::time_point time;
    359     bool periodic;
    360     std::chrono::milliseconds period;
    361     TaskCallback callback;
    362     AsyncTaskId task_id;
    363   };
    364 
    365   // A comparator class to put shared pointers to tasks in an ordered set
    366   struct task_p_comparator {
    367     bool operator()(const std::shared_ptr<Task>& t1,
    368                     const std::shared_ptr<Task>& t2) const {
    369       return *t1 < *t2;
    370     }
    371   };
    372 
    373   AsyncTaskManager(const AsyncTaskManager&) = delete;
    374   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
    375 
    376   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
    377     AsyncTaskId task_id = kInvalidTaskId;
    378     {
    379       std::unique_lock<std::mutex> guard(internal_mutex_);
    380       // no more room for new tasks, we need a larger type for IDs
    381       if (tasks_by_id.size() == kMaxTaskId)  // TODO potentially type unsafe
    382         return kInvalidTaskId;
    383       do {
    384         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
    385       } while (isTaskIdInUse(lastTaskId_));
    386       task->task_id = lastTaskId_;
    387       // add task to the queue and map
    388       tasks_by_id[lastTaskId_] = task;
    389       task_queue_.insert(task);
    390       task_id = lastTaskId_;
    391     }
    392     // start thread if necessary
    393     int started = tryStartThread();
    394     if (started != 0) {
    395       LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__);
    396       return kInvalidTaskId;
    397     }
    398     // notify the thread so that it knows of the new task
    399     internal_cond_var_.notify_one();
    400     // return task id
    401     return task_id;
    402   }
    403 
    404   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
    405     return tasks_by_id.count(task_id) != 0;
    406   }
    407 
    408   int tryStartThread() {
    409     // need the lock because of the running flag and the cond var
    410     std::unique_lock<std::mutex> guard(internal_mutex_);
    411     // check that the thread is not yet running
    412     if (running_) {
    413       return 0;
    414     }
    415     // start the thread
    416     running_ = true;
    417     thread_ = std::thread([this]() { ThreadRoutine(); });
    418     if (!thread_.joinable()) {
    419       LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__);
    420       return -1;
    421     }
    422     return 0;
    423   }
    424 
    425   void ThreadRoutine() {
    426     while (1) {
    427       TaskCallback callback;
    428       bool run_it = false;
    429       {
    430         std::unique_lock<std::mutex> guard(internal_mutex_);
    431         if (!task_queue_.empty()) {
    432           std::shared_ptr<Task> task_p = *(task_queue_.begin());
    433           if (task_p->time < std::chrono::steady_clock::now()) {
    434             run_it = true;
    435             callback = task_p->callback;
    436             task_queue_.erase(task_p);  // need to remove and add again if
    437                                         // periodic to update order
    438             if (task_p->isPeriodic()) {
    439               task_p->time += task_p->period;
    440               task_queue_.insert(task_p);
    441             } else {
    442               tasks_by_id.erase(task_p->task_id);
    443             }
    444           }
    445         }
    446       }
    447       if (run_it) {
    448         callback();
    449       }
    450       {
    451         std::unique_lock<std::mutex> guard(internal_mutex_);
    452         // wait on condition variable with timeout just in time for next task if
    453         // any
    454         if (task_queue_.size() > 0) {
    455           internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time);
    456         } else {
    457           internal_cond_var_.wait(guard);
    458         }
    459         // check for termination right after being notified (and maybe before?)
    460         if (!running_) break;
    461       }
    462     }
    463   }
    464 
    465   bool running_ = false;
    466   std::thread thread_;
    467   std::mutex internal_mutex_;
    468   std::condition_variable internal_cond_var_;
    469 
    470   AsyncTaskId lastTaskId_ = kInvalidTaskId;
    471   std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id;
    472   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
    473 };
    474 
    475 // Async Manager Implementation:
    476 AsyncManager::AsyncManager()
    477     : fdWatcher_p_(new AsyncFdWatcher()),
    478       taskManager_p_(new AsyncTaskManager()) {}
    479 
    480 AsyncManager::~AsyncManager() {
    481   // Make sure the threads are stopped before destroying the object.
    482   // The threads need to be stopped here and not in each internal class'
    483   // destructor because unique_ptr's reset() first assigns nullptr to the
    484   // pointer and only then calls the destructor, so any callback running
    485   // on these threads would dereference a null pointer if they called a member
    486   // function of this class.
    487   fdWatcher_p_->stopThread();
    488   taskManager_p_->stopThread();
    489 }
    490 
    491 int AsyncManager::WatchFdForNonBlockingReads(
    492     int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
    493   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
    494                                                   on_read_fd_ready_callback);
    495 }
    496 
    497 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
    498   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
    499 }
    500 
    501 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay,
    502                                     const TaskCallback& callback) {
    503   return taskManager_p_->ExecAsync(delay, callback);
    504 }
    505 
    506 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
    507     std::chrono::milliseconds delay, std::chrono::milliseconds period,
    508     const TaskCallback& callback) {
    509   return taskManager_p_->ExecAsyncPeriodically(delay, period, callback);
    510 }
    511 
    512 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
    513   return taskManager_p_->CancelAsyncTask(async_task_id);
    514 }
    515 
    516 void AsyncManager::Synchronize(const CriticalCallback& critical) {
    517   std::unique_lock<std::mutex> guard(synchronization_mutex_);
    518   critical();
    519 }
    520 }  // namespace test_vendor_lib
    521