1 /* 2 * Copyright 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define LOG_TAG "async_manager" 18 19 #include "async_manager.h" 20 21 #include "osi/include/log.h" 22 23 #include <algorithm> 24 #include <atomic> 25 #include <condition_variable> 26 #include <mutex> 27 #include <thread> 28 #include <vector> 29 #include "fcntl.h" 30 #include "sys/select.h" 31 #include "unistd.h" 32 33 namespace test_vendor_lib { 34 // Implementation of AsyncManager is divided between two classes, three if 35 // AsyncManager itself is taken into account, but its only responsability 36 // besides being a proxy for the other two classes is to provide a global 37 // synchronization mechanism for callbacks and client code to use. 38 39 // The watching of file descriptors is done through AsyncFdWatcher. Several 40 // objects of this class may coexist simultaneosly as they share no state. 41 // After construction of this objects nothing happens beyond some very simple 42 // member initialization. When the first FD is set up for watching the object 43 // starts a new thread which watches the given (and later provided) FDs using 44 // select() inside a loop. A special FD (a pipe) is also watched which is 45 // used to notify the thread of internal changes on the object state (like 46 // the addition of new FDs to watch on). Every access to internal state is 47 // synchronized using a single internal mutex. The thread is only stopped on 48 // destruction of the object, by modifying a flag, which is the only member 49 // variable accessed without acquiring the lock (because the notification to 50 // the thread is done later by writing to a pipe which means the thread will 51 // be notified regardless of what phase of the loop it is in that moment) 52 53 // The scheduling of asynchronous tasks, periodic or not, is handled by the 54 // AsyncTaskManager class. Like the one for FDs, this class shares no internal 55 // state between different instances so it is safe to use several objects of 56 // this class, also nothing interesting happens upon construction, but only 57 // after a Task has been scheduled and access to internal state is synchronized 58 // using a single internal mutex. When the first task is scheduled a thread 59 // is started which monitors a queue of tasks. The queue is peeked to see 60 // when the next task should be carried out and then the thread performs a 61 // (absolute) timed wait on a condition variable. The wait ends because of a 62 // time out or a notify on the cond var, the former means a task is due 63 // for execution while the later means there has been a change in internal 64 // state, like a task has been scheduled/canceled or the flag to stop has 65 // been set. Setting and querying the stop flag or modifying the task queue 66 // and subsequent notification on the cond var is done atomically (e.g while 67 // holding the lock on the internal mutex) to ensure that the thread never 68 // misses the notification, since notifying a cond var is not persistent as 69 // writing on a pipe (if not done this way, the thread could query the 70 // stopping flag and be put aside by the OS scheduler right after, then the 71 // 'stop thread' procedure could run, setting the flag, notifying a cond 72 // var that no one is waiting on and joining the thread, the thread then 73 // resumes execution believing that it needs to continue and waits on the 74 // cond var possibly forever if there are no tasks scheduled, efectively 75 // causing a deadlock). 76 77 // This number also states the maximum number of scheduled tasks we can handle 78 // at a given time 79 static const uint16_t kMaxTaskId = 80 -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/ 81 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) { 82 return (id == kMaxTaskId) ? 1 : id + 1; 83 } 84 // The buffer is only 10 bytes because the expected number of bytes 85 // written on this socket is 1. It is possible that the thread is notified 86 // more than once but highly unlikely, so a buffer of size 10 seems enough 87 // and the reads are performed inside a while just in case it isn't. From 88 // the thread routine's point of view it is the same to have been notified 89 // just once or 100 times so it just tries to consume the entire buffer. 90 // In the cases where an interrupt would cause read to return without 91 // having read everything that was available a new iteration of the thread 92 // loop will bring execution to this point almost immediately, so there is 93 // no need to treat that case. 94 static const int kNotificationBufferSize = 10; 95 96 // Async File Descriptor Watcher Implementation: 97 class AsyncManager::AsyncFdWatcher { 98 public: 99 int WatchFdForNonBlockingReads( 100 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) { 101 // add file descriptor and callback 102 { 103 std::unique_lock<std::mutex> guard(internal_mutex_); 104 watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback; 105 } 106 107 // start the thread if not started yet 108 int started = tryStartThread(); 109 if (started != 0) { 110 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__); 111 return started; 112 } 113 114 // notify the thread so that it knows of the new FD 115 notifyThread(); 116 117 return 0; 118 } 119 120 void StopWatchingFileDescriptor(int file_descriptor) { 121 std::unique_lock<std::mutex> guard(internal_mutex_); 122 watched_shared_fds_.erase(file_descriptor); 123 } 124 125 AsyncFdWatcher() = default; 126 127 ~AsyncFdWatcher() = default; 128 129 int stopThread() { 130 if (!std::atomic_exchange(&running_, false)) { 131 return 0; // if not running already 132 } 133 134 notifyThread(); 135 136 if (std::this_thread::get_id() != thread_.get_id()) { 137 thread_.join(); 138 } else { 139 LOG_WARN(LOG_TAG, 140 "%s: Starting thread stop from inside the reading thread itself", 141 __func__); 142 } 143 144 { 145 std::unique_lock<std::mutex> guard(internal_mutex_); 146 watched_shared_fds_.clear(); 147 } 148 149 return 0; 150 } 151 152 private: 153 AsyncFdWatcher(const AsyncFdWatcher&) = delete; 154 AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete; 155 156 // Make sure to call this with at least one file descriptor ready to be 157 // watched upon or the thread routine will return immediately 158 int tryStartThread() { 159 if (std::atomic_exchange(&running_, true)) { 160 return 0; // if already running 161 } 162 // set up the communication channel 163 int pipe_fds[2]; 164 if (pipe2(pipe_fds, O_NONBLOCK)) { 165 LOG_ERROR(LOG_TAG, 166 "%s:Unable to establish a communication channel to the reading " 167 "thread", 168 __func__); 169 return -1; 170 } 171 notification_listen_fd_ = pipe_fds[0]; 172 notification_write_fd_ = pipe_fds[1]; 173 174 thread_ = std::thread([this]() { ThreadRoutine(); }); 175 if (!thread_.joinable()) { 176 LOG_ERROR(LOG_TAG, "%s: Unable to start reading thread", __func__); 177 return -1; 178 } 179 return 0; 180 } 181 182 int notifyThread() { 183 char buffer = '0'; 184 if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) { 185 LOG_ERROR(LOG_TAG, "%s: Unable to send message to reading thread", 186 __func__); 187 return -1; 188 } 189 return 0; 190 } 191 192 int setUpFileDescriptorSet(fd_set& read_fds) { 193 // add comm channel to the set 194 FD_SET(notification_listen_fd_, &read_fds); 195 int nfds = notification_listen_fd_; 196 197 // add watched FDs to the set 198 { 199 std::unique_lock<std::mutex> guard(internal_mutex_); 200 for (auto& fdp : watched_shared_fds_) { 201 FD_SET(fdp.first, &read_fds); 202 nfds = std::max(fdp.first, nfds); 203 } 204 } 205 return nfds; 206 } 207 208 // check the comm channel and read everything there 209 bool consumeThreadNotifications(fd_set& read_fds) { 210 if (FD_ISSET(notification_listen_fd_, &read_fds)) { 211 char buffer[kNotificationBufferSize]; 212 while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, 213 kNotificationBufferSize)) == 214 kNotificationBufferSize) { 215 } 216 return true; 217 } 218 return false; 219 } 220 221 // check all file descriptors and call callbacks if necesary 222 void runAppropriateCallbacks(fd_set& read_fds) { 223 // not a good idea to call a callback while holding the FD lock, 224 // nor to release the lock while traversing the map 225 std::vector<decltype(watched_shared_fds_)::value_type> fds; 226 { 227 std::unique_lock<std::mutex> guard(internal_mutex_); 228 for (auto& fdc : watched_shared_fds_) { 229 if (FD_ISSET(fdc.first, &read_fds)) { 230 fds.push_back(fdc); 231 } 232 } 233 } 234 for (auto& p : fds) { 235 p.second(p.first); 236 } 237 } 238 239 void ThreadRoutine() { 240 while (running_) { 241 fd_set read_fds; 242 FD_ZERO(&read_fds); 243 int nfds = setUpFileDescriptorSet(read_fds); 244 245 // wait until there is data available to read on some FD 246 int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL); 247 if (retval <= 0) { // there was some error or a timeout 248 LOG_ERROR(LOG_TAG, 249 "%s: There was an error while waiting for data on the file " 250 "descriptors", 251 __func__); 252 continue; 253 } 254 255 consumeThreadNotifications(read_fds); 256 257 // Do not read if there was a call to stop running 258 if (!running_) { 259 break; 260 } 261 262 runAppropriateCallbacks(read_fds); 263 } 264 } 265 266 std::atomic_bool running_{false}; 267 std::thread thread_; 268 std::mutex internal_mutex_; 269 270 std::map<int, ReadCallback> watched_shared_fds_; 271 272 // A pair of FD to send information to the reading thread 273 int notification_listen_fd_; 274 int notification_write_fd_; 275 }; 276 277 // Async task manager implementation 278 class AsyncManager::AsyncTaskManager { 279 public: 280 AsyncTaskId ExecAsync(std::chrono::milliseconds delay, 281 const TaskCallback& callback) { 282 return scheduleTask(std::make_shared<Task>( 283 std::chrono::steady_clock::now() + delay, callback)); 284 } 285 286 AsyncTaskId ExecAsyncPeriodically(std::chrono::milliseconds delay, 287 std::chrono::milliseconds period, 288 const TaskCallback& callback) { 289 return scheduleTask(std::make_shared<Task>( 290 std::chrono::steady_clock::now() + delay, period, callback)); 291 } 292 293 bool CancelAsyncTask(AsyncTaskId async_task_id) { 294 // remove task from queue (and task id asociation) while holding lock 295 std::unique_lock<std::mutex> guard(internal_mutex_); 296 if (tasks_by_id.count(async_task_id) == 0) { 297 return false; 298 } 299 task_queue_.erase(tasks_by_id[async_task_id]); 300 tasks_by_id.erase(async_task_id); 301 return true; 302 } 303 304 AsyncTaskManager() = default; 305 306 ~AsyncTaskManager() = default; 307 308 int stopThread() { 309 { 310 std::unique_lock<std::mutex> guard(internal_mutex_); 311 tasks_by_id.clear(); 312 task_queue_.clear(); 313 if (!running_) { 314 return 0; 315 } 316 running_ = false; 317 // notify the thread 318 internal_cond_var_.notify_one(); 319 } // release the lock before joining a thread that is likely waiting for it 320 if (std::this_thread::get_id() != thread_.get_id()) { 321 thread_.join(); 322 } else { 323 LOG_WARN(LOG_TAG, 324 "%s: Starting thread stop from inside the task thread itself", 325 __func__); 326 } 327 return 0; 328 } 329 330 private: 331 // Holds the data for each task 332 class Task { 333 public: 334 Task(std::chrono::steady_clock::time_point time, 335 std::chrono::milliseconds period, const TaskCallback& callback) 336 : time(time), 337 periodic(true), 338 period(period), 339 callback(callback), 340 task_id(kInvalidTaskId) {} 341 Task(std::chrono::steady_clock::time_point time, 342 const TaskCallback& callback) 343 : time(time), 344 periodic(false), 345 callback(callback), 346 task_id(kInvalidTaskId) {} 347 348 // Operators needed to be in a collection 349 bool operator<(const Task& another) const { 350 return std::make_pair(time, task_id) < 351 std::make_pair(another.time, another.task_id); 352 } 353 354 bool isPeriodic() const { return periodic; } 355 356 // These fields should no longer be public if the class ever becomes 357 // public or gets more complex 358 std::chrono::steady_clock::time_point time; 359 bool periodic; 360 std::chrono::milliseconds period; 361 TaskCallback callback; 362 AsyncTaskId task_id; 363 }; 364 365 // A comparator class to put shared pointers to tasks in an ordered set 366 struct task_p_comparator { 367 bool operator()(const std::shared_ptr<Task>& t1, 368 const std::shared_ptr<Task>& t2) const { 369 return *t1 < *t2; 370 } 371 }; 372 373 AsyncTaskManager(const AsyncTaskManager&) = delete; 374 AsyncTaskManager& operator=(const AsyncTaskManager&) = delete; 375 376 AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) { 377 AsyncTaskId task_id = kInvalidTaskId; 378 { 379 std::unique_lock<std::mutex> guard(internal_mutex_); 380 // no more room for new tasks, we need a larger type for IDs 381 if (tasks_by_id.size() == kMaxTaskId) // TODO potentially type unsafe 382 return kInvalidTaskId; 383 do { 384 lastTaskId_ = NextAsyncTaskId(lastTaskId_); 385 } while (isTaskIdInUse(lastTaskId_)); 386 task->task_id = lastTaskId_; 387 // add task to the queue and map 388 tasks_by_id[lastTaskId_] = task; 389 task_queue_.insert(task); 390 task_id = lastTaskId_; 391 } 392 // start thread if necessary 393 int started = tryStartThread(); 394 if (started != 0) { 395 LOG_ERROR(LOG_TAG, "%s: Unable to start thread", __func__); 396 return kInvalidTaskId; 397 } 398 // notify the thread so that it knows of the new task 399 internal_cond_var_.notify_one(); 400 // return task id 401 return task_id; 402 } 403 404 bool isTaskIdInUse(const AsyncTaskId& task_id) const { 405 return tasks_by_id.count(task_id) != 0; 406 } 407 408 int tryStartThread() { 409 // need the lock because of the running flag and the cond var 410 std::unique_lock<std::mutex> guard(internal_mutex_); 411 // check that the thread is not yet running 412 if (running_) { 413 return 0; 414 } 415 // start the thread 416 running_ = true; 417 thread_ = std::thread([this]() { ThreadRoutine(); }); 418 if (!thread_.joinable()) { 419 LOG_ERROR(LOG_TAG, "%s: Unable to start task thread", __func__); 420 return -1; 421 } 422 return 0; 423 } 424 425 void ThreadRoutine() { 426 while (1) { 427 TaskCallback callback; 428 bool run_it = false; 429 { 430 std::unique_lock<std::mutex> guard(internal_mutex_); 431 if (!task_queue_.empty()) { 432 std::shared_ptr<Task> task_p = *(task_queue_.begin()); 433 if (task_p->time < std::chrono::steady_clock::now()) { 434 run_it = true; 435 callback = task_p->callback; 436 task_queue_.erase(task_p); // need to remove and add again if 437 // periodic to update order 438 if (task_p->isPeriodic()) { 439 task_p->time += task_p->period; 440 task_queue_.insert(task_p); 441 } else { 442 tasks_by_id.erase(task_p->task_id); 443 } 444 } 445 } 446 } 447 if (run_it) { 448 callback(); 449 } 450 { 451 std::unique_lock<std::mutex> guard(internal_mutex_); 452 // wait on condition variable with timeout just in time for next task if 453 // any 454 if (task_queue_.size() > 0) { 455 internal_cond_var_.wait_until(guard, (*task_queue_.begin())->time); 456 } else { 457 internal_cond_var_.wait(guard); 458 } 459 // check for termination right after being notified (and maybe before?) 460 if (!running_) break; 461 } 462 } 463 } 464 465 bool running_ = false; 466 std::thread thread_; 467 std::mutex internal_mutex_; 468 std::condition_variable internal_cond_var_; 469 470 AsyncTaskId lastTaskId_ = kInvalidTaskId; 471 std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id; 472 std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_; 473 }; 474 475 // Async Manager Implementation: 476 AsyncManager::AsyncManager() 477 : fdWatcher_p_(new AsyncFdWatcher()), 478 taskManager_p_(new AsyncTaskManager()) {} 479 480 AsyncManager::~AsyncManager() { 481 // Make sure the threads are stopped before destroying the object. 482 // The threads need to be stopped here and not in each internal class' 483 // destructor because unique_ptr's reset() first assigns nullptr to the 484 // pointer and only then calls the destructor, so any callback running 485 // on these threads would dereference a null pointer if they called a member 486 // function of this class. 487 fdWatcher_p_->stopThread(); 488 taskManager_p_->stopThread(); 489 } 490 491 int AsyncManager::WatchFdForNonBlockingReads( 492 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) { 493 return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, 494 on_read_fd_ready_callback); 495 } 496 497 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) { 498 fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor); 499 } 500 501 AsyncTaskId AsyncManager::ExecAsync(std::chrono::milliseconds delay, 502 const TaskCallback& callback) { 503 return taskManager_p_->ExecAsync(delay, callback); 504 } 505 506 AsyncTaskId AsyncManager::ExecAsyncPeriodically( 507 std::chrono::milliseconds delay, std::chrono::milliseconds period, 508 const TaskCallback& callback) { 509 return taskManager_p_->ExecAsyncPeriodically(delay, period, callback); 510 } 511 512 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) { 513 return taskManager_p_->CancelAsyncTask(async_task_id); 514 } 515 516 void AsyncManager::Synchronize(const CriticalCallback& critical) { 517 std::unique_lock<std::mutex> guard(synchronization_mutex_); 518 critical(); 519 } 520 } // namespace test_vendor_lib 521