Home | History | Annotate | Download | only in message_loops
      1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <brillo/message_loops/base_message_loop.h>
      6 
      7 #include <fcntl.h>
      8 #include <sys/stat.h>
      9 #include <sys/types.h>
     10 #include <unistd.h>
     11 
     12 #ifndef __APPLE__
     13 #include <sys/sysmacros.h>
     14 #endif
     15 
     16 #ifndef __ANDROID_HOST__
     17 // Used for MISC_MAJOR. Only required for the target and not always available
     18 // for the host.
     19 #include <linux/major.h>
     20 #endif
     21 
     22 #include <vector>
     23 
     24 #include <base/bind.h>
     25 #include <base/files/file_path.h>
     26 #include <base/files/file_util.h>
     27 #include <base/run_loop.h>
     28 #include <base/strings/string_number_conversions.h>
     29 #include <base/strings/string_split.h>
     30 
     31 #include <brillo/location_logging.h>
     32 #include <brillo/strings/string_utils.h>
     33 
     34 using base::Closure;
     35 
     36 namespace {
     37 
     38 const char kMiscMinorPath[] = "/proc/misc";
     39 const char kBinderDriverName[] = "binder";
     40 
     41 }  // namespace
     42 
     43 namespace brillo {
     44 
     45 const int BaseMessageLoop::kInvalidMinor = -1;
     46 const int BaseMessageLoop::kUninitializedMinor = -2;
     47 
     48 BaseMessageLoop::BaseMessageLoop() {
     49   CHECK(!base::MessageLoop::current())
     50       << "You can't create a base::MessageLoopForIO when another "
     51          "base::MessageLoop is already created for this thread.";
     52   owned_base_loop_.reset(new base::MessageLoopForIO);
     53   base_loop_ = owned_base_loop_.get();
     54 }
     55 
     56 BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
     57     : base_loop_(base_loop) {}
     58 
     59 BaseMessageLoop::~BaseMessageLoop() {
     60   for (auto& io_task : io_tasks_) {
     61     DVLOG_LOC(io_task.second.location(), 1)
     62         << "Removing file descriptor watcher task_id " << io_task.first
     63         << " leaked on BaseMessageLoop, scheduled from this location.";
     64     io_task.second.StopWatching();
     65   }
     66 
     67   // Note all pending canceled delayed tasks when destroying the message loop.
     68   size_t lazily_deleted_tasks = 0;
     69   for (const auto& delayed_task : delayed_tasks_) {
     70     if (delayed_task.second.closure.is_null()) {
     71       lazily_deleted_tasks++;
     72     } else {
     73       DVLOG_LOC(delayed_task.second.location, 1)
     74           << "Removing delayed task_id " << delayed_task.first
     75           << " leaked on BaseMessageLoop, scheduled from this location.";
     76     }
     77   }
     78   if (lazily_deleted_tasks) {
     79     LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
     80   }
     81 }
     82 
     83 MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
     84     const tracked_objects::Location& from_here,
     85     const Closure &task,
     86     base::TimeDelta delay) {
     87   TaskId task_id =  NextTaskId();
     88   bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
     89       from_here,
     90       base::Bind(&BaseMessageLoop::OnRanPostedTask,
     91                  weak_ptr_factory_.GetWeakPtr(),
     92                  task_id),
     93       delay);
     94   DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
     95                           << " to run in " << delay << ".";
     96   if (!base_scheduled)
     97     return MessageLoop::kTaskIdNull;
     98 
     99   delayed_tasks_.emplace(task_id,
    100                          DelayedTask{from_here, task_id, std::move(task)});
    101   return task_id;
    102 }
    103 
    104 MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
    105     const tracked_objects::Location& from_here,
    106     int fd,
    107     WatchMode mode,
    108     bool persistent,
    109     const Closure &task) {
    110   // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
    111   if (fd < 0)
    112     return MessageLoop::kTaskIdNull;
    113 
    114   base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ;
    115   switch (mode) {
    116     case MessageLoop::kWatchRead:
    117       base_mode = base::MessageLoopForIO::WATCH_READ;
    118       break;
    119     case MessageLoop::kWatchWrite:
    120       base_mode = base::MessageLoopForIO::WATCH_WRITE;
    121       break;
    122     default:
    123       return MessageLoop::kTaskIdNull;
    124   }
    125 
    126   TaskId task_id =  NextTaskId();
    127   auto it_bool = io_tasks_.emplace(
    128       std::piecewise_construct,
    129       std::forward_as_tuple(task_id),
    130       std::forward_as_tuple(
    131           from_here, this, task_id, fd, base_mode, persistent, task));
    132   // This should always insert a new element.
    133   DCHECK(it_bool.second);
    134   bool scheduled = it_bool.first->second.StartWatching();
    135   DVLOG_LOC(from_here, 1)
    136       << "Watching fd " << fd << " for "
    137       << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
    138       << (persistent ? " persistently" : " just once")
    139       << " as task_id " << task_id
    140       << (scheduled ? " successfully" : " failed.");
    141 
    142   if (!scheduled) {
    143     io_tasks_.erase(task_id);
    144     return MessageLoop::kTaskIdNull;
    145   }
    146 
    147 #ifndef __ANDROID_HOST__
    148   // Determine if the passed fd is the binder file descriptor. For that, we need
    149   // to check that is a special char device and that the major and minor device
    150   // numbers match. The binder file descriptor can't be removed and added back
    151   // to an epoll group when there's work available to be done by the file
    152   // descriptor due to bugs in the binder driver (b/26524111) when used with
    153   // epoll. Therefore, we flag the binder fd and never attempt to remove it.
    154   // This may cause the binder file descriptor to be attended with higher
    155   // priority and cause starvation of other events.
    156   struct stat buf;
    157   if (fstat(fd, &buf) == 0 &&
    158       S_ISCHR(buf.st_mode) &&
    159       major(buf.st_rdev) == MISC_MAJOR &&
    160       minor(buf.st_rdev) == GetBinderMinor()) {
    161     it_bool.first->second.RunImmediately();
    162   }
    163 #endif
    164 
    165   return task_id;
    166 }
    167 
    168 bool BaseMessageLoop::CancelTask(TaskId task_id) {
    169   if (task_id == kTaskIdNull)
    170     return false;
    171   auto delayed_task_it = delayed_tasks_.find(task_id);
    172   if (delayed_task_it == delayed_tasks_.end()) {
    173     // This might be an IOTask then.
    174     auto io_task_it = io_tasks_.find(task_id);
    175     if (io_task_it == io_tasks_.end())
    176       return false;
    177     return io_task_it->second.CancelTask();
    178   }
    179   // A DelayedTask was found for this task_id at this point.
    180 
    181   // Check if the callback was already canceled but we have the entry in
    182   // delayed_tasks_ since it didn't fire yet in the message loop.
    183   if (delayed_task_it->second.closure.is_null())
    184     return false;
    185 
    186   DVLOG_LOC(delayed_task_it->second.location, 1)
    187       << "Removing task_id " << task_id << " scheduled from this location.";
    188   // We reset to closure to a null Closure to release all the resources
    189   // used by this closure at this point, but we don't remove the task_id from
    190   // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
    191   delayed_task_it->second.closure = Closure();
    192 
    193   return true;
    194 }
    195 
    196 bool BaseMessageLoop::RunOnce(bool may_block) {
    197   run_once_ = true;
    198   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
    199   base_run_loop_ = &run_loop;
    200   if (!may_block)
    201     run_loop.RunUntilIdle();
    202   else
    203     run_loop.Run();
    204   base_run_loop_ = nullptr;
    205   // If the flag was reset to false, it means a closure was run.
    206   if (!run_once_)
    207     return true;
    208 
    209   run_once_ = false;
    210   return false;
    211 }
    212 
    213 void BaseMessageLoop::Run() {
    214   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
    215   base_run_loop_ = &run_loop;
    216   run_loop.Run();
    217   base_run_loop_ = nullptr;
    218 }
    219 
    220 void BaseMessageLoop::BreakLoop() {
    221   if (base_run_loop_ == nullptr) {
    222     DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
    223     return;  // Message loop not running, nothing to do.
    224   }
    225   base_run_loop_->Quit();
    226 }
    227 
    228 Closure BaseMessageLoop::QuitClosure() const {
    229   if (base_run_loop_ == nullptr)
    230     return base::Bind(&base::DoNothing);
    231   return base_run_loop_->QuitClosure();
    232 }
    233 
    234 MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
    235   TaskId res;
    236   do {
    237     res = ++last_id_;
    238     // We would run out of memory before we run out of task ids.
    239   } while (!res ||
    240            delayed_tasks_.find(res) != delayed_tasks_.end() ||
    241            io_tasks_.find(res) != io_tasks_.end());
    242   return res;
    243 }
    244 
    245 void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
    246   auto task_it = delayed_tasks_.find(task_id);
    247   DCHECK(task_it != delayed_tasks_.end());
    248   if (!task_it->second.closure.is_null()) {
    249     DVLOG_LOC(task_it->second.location, 1)
    250         << "Running delayed task_id " << task_id
    251         << " scheduled from this location.";
    252     // Mark the task as canceled while we are running it so CancelTask returns
    253     // false.
    254     Closure closure = std::move(task_it->second.closure);
    255     task_it->second.closure = Closure();
    256     closure.Run();
    257 
    258     // If the |run_once_| flag is set, it is because we are instructed to run
    259     // only once callback.
    260     if (run_once_) {
    261       run_once_ = false;
    262       BreakLoop();
    263     }
    264   }
    265   delayed_tasks_.erase(task_it);
    266 }
    267 
    268 void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
    269   auto task_it = io_tasks_.find(task_id);
    270   // Even if this task was canceled while we were waiting in the message loop
    271   // for this method to run, the entry in io_tasks_ should still be present, but
    272   // won't do anything.
    273   DCHECK(task_it != io_tasks_.end());
    274   task_it->second.OnFileReadyPostedTask();
    275 }
    276 
    277 int BaseMessageLoop::ParseBinderMinor(
    278     const std::string& file_contents) {
    279   int result = kInvalidMinor;
    280   // Split along '\n', then along the ' '. Note that base::SplitString trims all
    281   // white spaces at the beginning and end after splitting.
    282   std::vector<std::string> lines =
    283       base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
    284                         base::SPLIT_WANT_ALL);
    285   for (const std::string& line : lines) {
    286     if (line.empty())
    287       continue;
    288     std::string number;
    289     std::string name;
    290     if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
    291       continue;
    292 
    293     if (name == kBinderDriverName && base::StringToInt(number, &result))
    294       break;
    295   }
    296   return result;
    297 }
    298 
    299 unsigned int BaseMessageLoop::GetBinderMinor() {
    300   if (binder_minor_ != kUninitializedMinor)
    301     return binder_minor_;
    302 
    303   std::string proc_misc;
    304   if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
    305     return binder_minor_;
    306   binder_minor_ = ParseBinderMinor(proc_misc);
    307   return binder_minor_;
    308 }
    309 
    310 BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
    311                                 BaseMessageLoop* loop,
    312                                 MessageLoop::TaskId task_id,
    313                                 int fd,
    314                                 base::MessageLoopForIO::Mode base_mode,
    315                                 bool persistent,
    316                                 const Closure& task)
    317     : location_(location), loop_(loop), task_id_(task_id),
    318       fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
    319 
    320 bool BaseMessageLoop::IOTask::StartWatching() {
    321   return loop_->base_loop_->WatchFileDescriptor(
    322       fd_, persistent_, base_mode_, &fd_watcher_, this);
    323 }
    324 
    325 void BaseMessageLoop::IOTask::StopWatching() {
    326   // This is safe to call even if we are not watching for it.
    327   fd_watcher_.StopWatchingFileDescriptor();
    328 }
    329 
    330 void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
    331   OnFileReady();
    332 }
    333 
    334 void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
    335   OnFileReady();
    336 }
    337 
    338 void BaseMessageLoop::IOTask::OnFileReady() {
    339   // For file descriptors marked with the immediate_run flag, we don't call
    340   // StopWatching() and wait, instead we dispatch the callback immediately.
    341   if (immediate_run_) {
    342     posted_task_pending_ = true;
    343     OnFileReadyPostedTask();
    344     return;
    345   }
    346 
    347   // When the file descriptor becomes available we stop watching for it and
    348   // schedule a task to run the callback from the main loop. The callback will
    349   // run using the same scheduler used to run other delayed tasks, avoiding
    350   // starvation of the available posted tasks if there are file descriptors
    351   // always available. The new posted task will use the same TaskId as the
    352   // current file descriptor watching task an could be canceled in either state,
    353   // when waiting for the file descriptor or waiting in the main loop.
    354   StopWatching();
    355   bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
    356       location_,
    357       base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
    358                  loop_->weak_ptr_factory_.GetWeakPtr(),
    359                  task_id_));
    360   posted_task_pending_ = true;
    361   if (base_scheduled) {
    362     DVLOG_LOC(location_, 1)
    363         << "Dispatching task_id " << task_id_ << " for "
    364         << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
    365             "reading" : "writing")
    366         << " file descriptor " << fd_ << ", scheduled from this location.";
    367   } else {
    368     // In the rare case that PostTask() fails, we fall back to run it directly.
    369     // This would indicate a bigger problem with the message loop setup.
    370     LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
    371     OnFileReadyPostedTask();
    372   }
    373 }
    374 
    375 void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
    376   // We can't access |this| after running the |closure_| since it could call
    377   // CancelTask on its own task_id, so we copy the members we need now.
    378   BaseMessageLoop* loop_ptr = loop_;
    379   DCHECK(posted_task_pending_ = true);
    380   posted_task_pending_ = false;
    381 
    382   // If this task was already canceled, the closure will be null and there is
    383   // nothing else to do here. This execution doesn't count a step for RunOnce()
    384   // unless we have a callback to run.
    385   if (closure_.is_null()) {
    386     loop_->io_tasks_.erase(task_id_);
    387     return;
    388   }
    389 
    390   DVLOG_LOC(location_, 1)
    391       << "Running task_id " << task_id_ << " for "
    392       << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
    393           "reading" : "writing")
    394       << " file descriptor " << fd_ << ", scheduled from this location.";
    395 
    396   if (persistent_) {
    397     // In the persistent case we just run the callback. If this callback cancels
    398     // the task id, we can't access |this| anymore, so we re-start watching the
    399     // file descriptor before running the callback, unless this is a fd where
    400     // we didn't stop watching the file descriptor when it became available.
    401     if (!immediate_run_)
    402       StartWatching();
    403     closure_.Run();
    404   } else {
    405     // This will destroy |this|, the fd_watcher and therefore stop watching this
    406     // file descriptor.
    407     Closure closure_copy = std::move(closure_);
    408     loop_->io_tasks_.erase(task_id_);
    409     // Run the closure from the local copy we just made.
    410     closure_copy.Run();
    411   }
    412 
    413   if (loop_ptr->run_once_) {
    414     loop_ptr->run_once_ = false;
    415     loop_ptr->BreakLoop();
    416   }
    417 }
    418 
    419 bool BaseMessageLoop::IOTask::CancelTask() {
    420   if (closure_.is_null())
    421     return false;
    422 
    423   DVLOG_LOC(location_, 1)
    424       << "Removing task_id " << task_id_ << " scheduled from this location.";
    425 
    426   if (!posted_task_pending_) {
    427     // Destroying the FileDescriptorWatcher implicitly stops watching the file
    428     // descriptor. This will delete our instance.
    429     loop_->io_tasks_.erase(task_id_);
    430     return true;
    431   }
    432   // The IOTask is waiting for the message loop to run its delayed task, so
    433   // it is not watching for the file descriptor. We release the closure
    434   // resources now but keep the IOTask instance alive while we wait for the
    435   // callback to run and delete the IOTask.
    436   closure_ = Closure();
    437   return true;
    438 }
    439 
    440 }  // namespace brillo
    441