Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "perfetto/base/unix_task_runner.h"
     18 
     19 #include "perfetto/base/build_config.h"
     20 
     21 #include <errno.h>
     22 #include <fcntl.h>
     23 #include <stdlib.h>
     24 #include <unistd.h>
     25 
     26 #include <limits>
     27 
     28 namespace perfetto {
     29 namespace base {
     30 
     31 UnixTaskRunner::UnixTaskRunner() {
     32   // Create a self-pipe which is used to wake up the main thread from inside
     33   // poll(2).
     34   int pipe_fds[2];
     35   PERFETTO_CHECK(pipe(pipe_fds) == 0);
     36 
     37   // Make the pipe non-blocking so that we never block the waking thread (either
     38   // the main thread or another one) when scheduling a wake-up.
     39   for (auto fd : pipe_fds) {
     40     int flags = fcntl(fd, F_GETFL, 0);
     41     PERFETTO_CHECK(flags != -1);
     42     PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
     43     PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
     44   }
     45   control_read_.reset(pipe_fds[0]);
     46   control_write_.reset(pipe_fds[1]);
     47 
     48 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX)
     49   // We are never expecting to have more than a few bytes in the wake-up pipe.
     50   // Reduce the buffer size on Linux. Note that this gets rounded up to the page
     51   // size.
     52   PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
     53 #endif
     54 
     55   AddFileDescriptorWatch(control_read_.get(), [] {
     56     // Not reached -- see PostFileDescriptorWatches().
     57     PERFETTO_DCHECK(false);
     58   });
     59 }
     60 
     61 UnixTaskRunner::~UnixTaskRunner() = default;
     62 
     63 void UnixTaskRunner::WakeUp() {
     64   const char dummy = 'P';
     65   if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
     66     PERFETTO_DPLOG("write()");
     67 }
     68 
     69 void UnixTaskRunner::Run() {
     70   PERFETTO_DCHECK_THREAD(thread_checker_);
     71   quit_ = false;
     72   while (true) {
     73     int poll_timeout_ms;
     74     {
     75       std::lock_guard<std::mutex> lock(lock_);
     76       if (quit_)
     77         return;
     78       poll_timeout_ms = GetDelayMsToNextTaskLocked();
     79       UpdateWatchTasksLocked();
     80     }
     81     int ret = PERFETTO_EINTR(poll(
     82         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
     83     PERFETTO_CHECK(ret >= 0);
     84 
     85     // To avoid starvation we always interleave all types of tasks -- immediate,
     86     // delayed and file descriptor watches.
     87     PostFileDescriptorWatches();
     88     RunImmediateAndDelayedTask();
     89   }
     90 }
     91 
     92 void UnixTaskRunner::Quit() {
     93   {
     94     std::lock_guard<std::mutex> lock(lock_);
     95     quit_ = true;
     96   }
     97   WakeUp();
     98 }
     99 
    100 bool UnixTaskRunner::IsIdleForTesting() {
    101   std::lock_guard<std::mutex> lock(lock_);
    102   return immediate_tasks_.empty();
    103 }
    104 
    105 void UnixTaskRunner::UpdateWatchTasksLocked() {
    106   PERFETTO_DCHECK_THREAD(thread_checker_);
    107   if (!watch_tasks_changed_)
    108     return;
    109   watch_tasks_changed_ = false;
    110   poll_fds_.clear();
    111   for (auto& it : watch_tasks_) {
    112     it.second.poll_fd_index = poll_fds_.size();
    113     poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
    114   }
    115 }
    116 
    117 void UnixTaskRunner::RunImmediateAndDelayedTask() {
    118   // TODO(skyostil): Add a separate work queue in case in case locking overhead
    119   // becomes an issue.
    120   std::function<void()> immediate_task;
    121   std::function<void()> delayed_task;
    122   TimeMillis now = GetWallTimeMs();
    123   {
    124     std::lock_guard<std::mutex> lock(lock_);
    125     if (!immediate_tasks_.empty()) {
    126       immediate_task = std::move(immediate_tasks_.front());
    127       immediate_tasks_.pop_front();
    128     }
    129     if (!delayed_tasks_.empty()) {
    130       auto it = delayed_tasks_.begin();
    131       if (now >= it->first) {
    132         delayed_task = std::move(it->second);
    133         delayed_tasks_.erase(it);
    134       }
    135     }
    136   }
    137 
    138   errno = 0;
    139   if (immediate_task)
    140     RunTask(immediate_task);
    141   errno = 0;
    142   if (delayed_task)
    143     RunTask(delayed_task);
    144 }
    145 
    146 void UnixTaskRunner::PostFileDescriptorWatches() {
    147   PERFETTO_DCHECK_THREAD(thread_checker_);
    148   for (size_t i = 0; i < poll_fds_.size(); i++) {
    149     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
    150       continue;
    151     poll_fds_[i].revents = 0;
    152 
    153     // The wake-up event is handled inline to avoid an infinite recursion of
    154     // posted tasks.
    155     if (poll_fds_[i].fd == control_read_.get()) {
    156       // Drain the byte(s) written to the wake-up pipe. We can potentially read
    157       // more than one byte if several wake-ups have been scheduled.
    158       char buffer[16];
    159       if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
    160           errno != EAGAIN) {
    161         PERFETTO_DPLOG("read()");
    162       }
    163       continue;
    164     }
    165 
    166     // Binding to |this| is safe since we are the only object executing the
    167     // task.
    168     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
    169                        poll_fds_[i].fd));
    170 
    171     // Make the fd negative while a posted task is pending. This makes poll(2)
    172     // ignore the fd.
    173     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
    174     poll_fds_[i].fd = -poll_fds_[i].fd;
    175   }
    176 }
    177 
    178 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
    179   std::function<void()> task;
    180   {
    181     std::lock_guard<std::mutex> lock(lock_);
    182     auto it = watch_tasks_.find(fd);
    183     if (it == watch_tasks_.end())
    184       return;
    185     // Make poll(2) pay attention to the fd again. Since another thread may have
    186     // updated this watch we need to refresh the set first.
    187     UpdateWatchTasksLocked();
    188     size_t fd_index = it->second.poll_fd_index;
    189     PERFETTO_DCHECK(fd_index < poll_fds_.size());
    190     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
    191     poll_fds_[fd_index].fd = fd;
    192     task = it->second.callback;
    193   }
    194   errno = 0;
    195   RunTask(task);
    196 }
    197 
    198 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
    199   PERFETTO_DCHECK_THREAD(thread_checker_);
    200   if (!immediate_tasks_.empty())
    201     return 0;
    202   if (!delayed_tasks_.empty()) {
    203     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
    204     return std::max(0, static_cast<int>(diff.count()));
    205   }
    206   return -1;
    207 }
    208 
    209 void UnixTaskRunner::PostTask(std::function<void()> task) {
    210   bool was_empty;
    211   {
    212     std::lock_guard<std::mutex> lock(lock_);
    213     was_empty = immediate_tasks_.empty();
    214     immediate_tasks_.push_back(std::move(task));
    215   }
    216   if (was_empty)
    217     WakeUp();
    218 }
    219 
    220 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
    221                                      uint32_t delay_ms) {
    222   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
    223   {
    224     std::lock_guard<std::mutex> lock(lock_);
    225     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
    226   }
    227   WakeUp();
    228 }
    229 
    230 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
    231                                             std::function<void()> task) {
    232   PERFETTO_DCHECK(fd >= 0);
    233   {
    234     std::lock_guard<std::mutex> lock(lock_);
    235     PERFETTO_DCHECK(!watch_tasks_.count(fd));
    236     watch_tasks_[fd] = {std::move(task), SIZE_MAX};
    237     watch_tasks_changed_ = true;
    238   }
    239   WakeUp();
    240 }
    241 
    242 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
    243   PERFETTO_DCHECK(fd >= 0);
    244   {
    245     std::lock_guard<std::mutex> lock(lock_);
    246     PERFETTO_DCHECK(watch_tasks_.count(fd));
    247     watch_tasks_.erase(fd);
    248     watch_tasks_changed_ = true;
    249   }
    250   // No need to schedule a wake-up for this.
    251 }
    252 
    253 }  // namespace base
    254 }  // namespace perfetto
    255