Home | History | Annotate | Download | only in message_loops
      1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include <brillo/message_loops/base_message_loop.h>
      6 
      7 #include <fcntl.h>
      8 #include <sys/stat.h>
      9 #include <sys/types.h>
     10 #include <unistd.h>
     11 
     12 #ifndef __ANDROID_HOST__
     13 // Used for MISC_MAJOR. Only required for the target and not always available
     14 // for the host.
     15 #include <linux/major.h>
     16 #endif
     17 
     18 #include <vector>
     19 
     20 #include <base/bind.h>
     21 #include <base/files/file_path.h>
     22 #include <base/files/file_util.h>
     23 #include <base/run_loop.h>
     24 #include <base/strings/string_number_conversions.h>
     25 #include <base/strings/string_split.h>
     26 
     27 #include <brillo/location_logging.h>
     28 #include <brillo/strings/string_utils.h>
     29 
     30 using base::Closure;
     31 
     32 namespace {
     33 
     34 const char kMiscMinorPath[] = "/proc/misc";
     35 const char kBinderDriverName[] = "binder";
     36 
     37 }  // namespace
     38 
     39 namespace brillo {
     40 
     41 const int BaseMessageLoop::kInvalidMinor = -1;
     42 const int BaseMessageLoop::kUninitializedMinor = -2;
     43 
     44 BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
     45     : base_loop_(base_loop),
     46       weak_ptr_factory_(this) {}
     47 
     48 BaseMessageLoop::~BaseMessageLoop() {
     49   for (auto& io_task : io_tasks_) {
     50     DVLOG_LOC(io_task.second.location(), 1)
     51         << "Removing file descriptor watcher task_id " << io_task.first
     52         << " leaked on BaseMessageLoop, scheduled from this location.";
     53     io_task.second.StopWatching();
     54   }
     55 
     56   // Note all pending canceled delayed tasks when destroying the message loop.
     57   size_t lazily_deleted_tasks = 0;
     58   for (const auto& delayed_task : delayed_tasks_) {
     59     if (delayed_task.second.closure.is_null()) {
     60       lazily_deleted_tasks++;
     61     } else {
     62       DVLOG_LOC(delayed_task.second.location, 1)
     63           << "Removing delayed task_id " << delayed_task.first
     64           << " leaked on BaseMessageLoop, scheduled from this location.";
     65     }
     66   }
     67   if (lazily_deleted_tasks) {
     68     LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
     69   }
     70 }
     71 
     72 MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
     73     const tracked_objects::Location& from_here,
     74     const Closure &task,
     75     base::TimeDelta delay) {
     76   TaskId task_id =  NextTaskId();
     77   bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
     78       from_here,
     79       base::Bind(&BaseMessageLoop::OnRanPostedTask,
     80                  weak_ptr_factory_.GetWeakPtr(),
     81                  task_id),
     82       delay);
     83   DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
     84                           << " to run in " << delay << ".";
     85   if (!base_scheduled)
     86     return MessageLoop::kTaskIdNull;
     87 
     88   delayed_tasks_.emplace(task_id,
     89                          DelayedTask{from_here, task_id, std::move(task)});
     90   return task_id;
     91 }
     92 
     93 MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
     94     const tracked_objects::Location& from_here,
     95     int fd,
     96     WatchMode mode,
     97     bool persistent,
     98     const Closure &task) {
     99   // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
    100   if (fd < 0)
    101     return MessageLoop::kTaskIdNull;
    102 
    103   base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ;
    104   switch (mode) {
    105     case MessageLoop::kWatchRead:
    106       base_mode = base::MessageLoopForIO::WATCH_READ;
    107       break;
    108     case MessageLoop::kWatchWrite:
    109       base_mode = base::MessageLoopForIO::WATCH_WRITE;
    110       break;
    111     default:
    112       return MessageLoop::kTaskIdNull;
    113   }
    114 
    115   TaskId task_id =  NextTaskId();
    116   auto it_bool = io_tasks_.emplace(
    117       std::piecewise_construct,
    118       std::forward_as_tuple(task_id),
    119       std::forward_as_tuple(
    120           from_here, this, task_id, fd, base_mode, persistent, task));
    121   // This should always insert a new element.
    122   DCHECK(it_bool.second);
    123   bool scheduled = it_bool.first->second.StartWatching();
    124   DVLOG_LOC(from_here, 1)
    125       << "Watching fd " << fd << " for "
    126       << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
    127       << (persistent ? " persistently" : " just once")
    128       << " as task_id " << task_id
    129       << (scheduled ? " successfully" : " failed.");
    130 
    131   if (!scheduled) {
    132     io_tasks_.erase(task_id);
    133     return MessageLoop::kTaskIdNull;
    134   }
    135 
    136 #ifndef __ANDROID_HOST__
    137   // Determine if the passed fd is the binder file descriptor. For that, we need
    138   // to check that is a special char device and that the major and minor device
    139   // numbers match. The binder file descriptor can't be removed and added back
    140   // to an epoll group when there's work available to be done by the file
    141   // descriptor due to bugs in the binder driver (b/26524111) when used with
    142   // epoll. Therefore, we flag the binder fd and never attempt to remove it.
    143   // This may cause the binder file descriptor to be attended with higher
    144   // priority and cause starvation of other events.
    145   struct stat buf;
    146   if (fstat(fd, &buf) == 0 &&
    147       S_ISCHR(buf.st_mode) &&
    148       major(buf.st_rdev) == MISC_MAJOR &&
    149       minor(buf.st_rdev) == GetBinderMinor()) {
    150     it_bool.first->second.RunImmediately();
    151   }
    152 #endif
    153 
    154   return task_id;
    155 }
    156 
    157 bool BaseMessageLoop::CancelTask(TaskId task_id) {
    158   if (task_id == kTaskIdNull)
    159     return false;
    160   auto delayed_task_it = delayed_tasks_.find(task_id);
    161   if (delayed_task_it == delayed_tasks_.end()) {
    162     // This might be an IOTask then.
    163     auto io_task_it = io_tasks_.find(task_id);
    164     if (io_task_it == io_tasks_.end())
    165       return false;
    166     return io_task_it->second.CancelTask();
    167   }
    168   // A DelayedTask was found for this task_id at this point.
    169 
    170   // Check if the callback was already canceled but we have the entry in
    171   // delayed_tasks_ since it didn't fire yet in the message loop.
    172   if (delayed_task_it->second.closure.is_null())
    173     return false;
    174 
    175   DVLOG_LOC(delayed_task_it->second.location, 1)
    176       << "Removing task_id " << task_id << " scheduled from this location.";
    177   // We reset to closure to a null Closure to release all the resources
    178   // used by this closure at this point, but we don't remove the task_id from
    179   // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
    180   delayed_task_it->second.closure = Closure();
    181 
    182   return true;
    183 }
    184 
    185 bool BaseMessageLoop::RunOnce(bool may_block) {
    186   run_once_ = true;
    187   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
    188   base_run_loop_ = &run_loop;
    189   if (!may_block)
    190     run_loop.RunUntilIdle();
    191   else
    192     run_loop.Run();
    193   base_run_loop_ = nullptr;
    194   // If the flag was reset to false, it means a closure was run.
    195   if (!run_once_)
    196     return true;
    197 
    198   run_once_ = false;
    199   return false;
    200 }
    201 
    202 void BaseMessageLoop::Run() {
    203   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
    204   base_run_loop_ = &run_loop;
    205   run_loop.Run();
    206   base_run_loop_ = nullptr;
    207 }
    208 
    209 void BaseMessageLoop::BreakLoop() {
    210   if (base_run_loop_ == nullptr) {
    211     DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
    212     return;  // Message loop not running, nothing to do.
    213   }
    214   base_run_loop_->Quit();
    215 }
    216 
    217 Closure BaseMessageLoop::QuitClosure() const {
    218   if (base_run_loop_ == nullptr)
    219     return base::Bind(&base::DoNothing);
    220   return base_run_loop_->QuitClosure();
    221 }
    222 
    223 MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
    224   TaskId res;
    225   do {
    226     res = ++last_id_;
    227     // We would run out of memory before we run out of task ids.
    228   } while (!res ||
    229            delayed_tasks_.find(res) != delayed_tasks_.end() ||
    230            io_tasks_.find(res) != io_tasks_.end());
    231   return res;
    232 }
    233 
    234 void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
    235   auto task_it = delayed_tasks_.find(task_id);
    236   DCHECK(task_it != delayed_tasks_.end());
    237   if (!task_it->second.closure.is_null()) {
    238     DVLOG_LOC(task_it->second.location, 1)
    239         << "Running delayed task_id " << task_id
    240         << " scheduled from this location.";
    241     // Mark the task as canceled while we are running it so CancelTask returns
    242     // false.
    243     Closure closure = std::move(task_it->second.closure);
    244     task_it->second.closure = Closure();
    245     closure.Run();
    246 
    247     // If the |run_once_| flag is set, it is because we are instructed to run
    248     // only once callback.
    249     if (run_once_) {
    250       run_once_ = false;
    251       BreakLoop();
    252     }
    253   }
    254   delayed_tasks_.erase(task_it);
    255 }
    256 
    257 void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
    258   auto task_it = io_tasks_.find(task_id);
    259   // Even if this task was canceled while we were waiting in the message loop
    260   // for this method to run, the entry in io_tasks_ should still be present, but
    261   // won't do anything.
    262   DCHECK(task_it != io_tasks_.end());
    263   task_it->second.OnFileReadyPostedTask();
    264 }
    265 
    266 int BaseMessageLoop::ParseBinderMinor(
    267     const std::string& file_contents) {
    268   int result = kInvalidMinor;
    269   // Split along '\n', then along the ' '. Note that base::SplitString trims all
    270   // white spaces at the beginning and end after splitting.
    271   std::vector<std::string> lines =
    272       base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
    273                         base::SPLIT_WANT_ALL);
    274   for (const std::string& line : lines) {
    275     if (line.empty())
    276       continue;
    277     std::string number;
    278     std::string name;
    279     if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
    280       continue;
    281 
    282     if (name == kBinderDriverName && base::StringToInt(number, &result))
    283       break;
    284   }
    285   return result;
    286 }
    287 
    288 unsigned int BaseMessageLoop::GetBinderMinor() {
    289   if (binder_minor_ != kUninitializedMinor)
    290     return binder_minor_;
    291 
    292   std::string proc_misc;
    293   if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
    294     return binder_minor_;
    295   binder_minor_ = ParseBinderMinor(proc_misc);
    296   return binder_minor_;
    297 }
    298 
    299 BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
    300                                 BaseMessageLoop* loop,
    301                                 MessageLoop::TaskId task_id,
    302                                 int fd,
    303                                 base::MessageLoopForIO::Mode base_mode,
    304                                 bool persistent,
    305                                 const Closure& task)
    306     : location_(location), loop_(loop), task_id_(task_id),
    307       fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
    308 
    309 bool BaseMessageLoop::IOTask::StartWatching() {
    310   return loop_->base_loop_->WatchFileDescriptor(
    311       fd_, persistent_, base_mode_, &fd_watcher_, this);
    312 }
    313 
    314 void BaseMessageLoop::IOTask::StopWatching() {
    315   // This is safe to call even if we are not watching for it.
    316   fd_watcher_.StopWatchingFileDescriptor();
    317 }
    318 
    319 void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
    320   OnFileReady();
    321 }
    322 
    323 void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
    324   OnFileReady();
    325 }
    326 
    327 void BaseMessageLoop::IOTask::OnFileReady() {
    328   // For file descriptors marked with the immediate_run flag, we don't call
    329   // StopWatching() and wait, instead we dispatch the callback immediately.
    330   if (immediate_run_) {
    331     posted_task_pending_ = true;
    332     OnFileReadyPostedTask();
    333     return;
    334   }
    335 
    336   // When the file descriptor becomes available we stop watching for it and
    337   // schedule a task to run the callback from the main loop. The callback will
    338   // run using the same scheduler used to run other delayed tasks, avoiding
    339   // starvation of the available posted tasks if there are file descriptors
    340   // always available. The new posted task will use the same TaskId as the
    341   // current file descriptor watching task an could be canceled in either state,
    342   // when waiting for the file descriptor or waiting in the main loop.
    343   StopWatching();
    344   bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
    345       location_,
    346       base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
    347                  loop_->weak_ptr_factory_.GetWeakPtr(),
    348                  task_id_));
    349   posted_task_pending_ = true;
    350   if (base_scheduled) {
    351     DVLOG_LOC(location_, 1)
    352         << "Dispatching task_id " << task_id_ << " for "
    353         << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
    354             "reading" : "writing")
    355         << " file descriptor " << fd_ << ", scheduled from this location.";
    356   } else {
    357     // In the rare case that PostTask() fails, we fall back to run it directly.
    358     // This would indicate a bigger problem with the message loop setup.
    359     LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
    360     OnFileReadyPostedTask();
    361   }
    362 }
    363 
    364 void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
    365   // We can't access |this| after running the |closure_| since it could call
    366   // CancelTask on its own task_id, so we copy the members we need now.
    367   BaseMessageLoop* loop_ptr = loop_;
    368   DCHECK(posted_task_pending_ = true);
    369   posted_task_pending_ = false;
    370 
    371   // If this task was already canceled, the closure will be null and there is
    372   // nothing else to do here. This execution doesn't count a step for RunOnce()
    373   // unless we have a callback to run.
    374   if (closure_.is_null()) {
    375     loop_->io_tasks_.erase(task_id_);
    376     return;
    377   }
    378 
    379   DVLOG_LOC(location_, 1)
    380       << "Running task_id " << task_id_ << " for "
    381       << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
    382           "reading" : "writing")
    383       << " file descriptor " << fd_ << ", scheduled from this location.";
    384 
    385   if (persistent_) {
    386     // In the persistent case we just run the callback. If this callback cancels
    387     // the task id, we can't access |this| anymore, so we re-start watching the
    388     // file descriptor before running the callback, unless this is a fd where
    389     // we didn't stop watching the file descriptor when it became available.
    390     if (!immediate_run_)
    391       StartWatching();
    392     closure_.Run();
    393   } else {
    394     // This will destroy |this|, the fd_watcher and therefore stop watching this
    395     // file descriptor.
    396     Closure closure_copy = std::move(closure_);
    397     loop_->io_tasks_.erase(task_id_);
    398     // Run the closure from the local copy we just made.
    399     closure_copy.Run();
    400   }
    401 
    402   if (loop_ptr->run_once_) {
    403     loop_ptr->run_once_ = false;
    404     loop_ptr->BreakLoop();
    405   }
    406 }
    407 
    408 bool BaseMessageLoop::IOTask::CancelTask() {
    409   if (closure_.is_null())
    410     return false;
    411 
    412   DVLOG_LOC(location_, 1)
    413       << "Removing task_id " << task_id_ << " scheduled from this location.";
    414 
    415   if (!posted_task_pending_) {
    416     // Destroying the FileDescriptorWatcher implicitly stops watching the file
    417     // descriptor. This will delete our instance.
    418     loop_->io_tasks_.erase(task_id_);
    419     return true;
    420   }
    421   // The IOTask is waiting for the message loop to run its delayed task, so
    422   // it is not watching for the file descriptor. We release the closure
    423   // resources now but keep the IOTask instance alive while we wait for the
    424   // callback to run and delete the IOTask.
    425   closure_ = Closure();
    426   return true;
    427 }
    428 
    429 }  // namespace brillo
    430