Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "perfetto/base/unix_task_runner.h"
     18 
     19 #include "perfetto/base/build_config.h"
     20 
     21 #include <errno.h>
     22 #include <stdlib.h>
     23 #include <unistd.h>
     24 
     25 #include <limits>
     26 
     27 namespace perfetto {
     28 namespace base {
     29 
     30 UnixTaskRunner::UnixTaskRunner() {
     31   AddFileDescriptorWatch(event_.fd(), [] {
     32     // Not reached -- see PostFileDescriptorWatches().
     33     PERFETTO_DFATAL("Should be unreachable.");
     34   });
     35 }
     36 
     37 UnixTaskRunner::~UnixTaskRunner() = default;
     38 
     39 void UnixTaskRunner::WakeUp() {
     40   event_.Notify();
     41 }
     42 
     43 void UnixTaskRunner::Run() {
     44   PERFETTO_DCHECK_THREAD(thread_checker_);
     45   created_thread_id_ = GetThreadId();
     46   quit_ = false;
     47   for (;;) {
     48     int poll_timeout_ms;
     49     {
     50       std::lock_guard<std::mutex> lock(lock_);
     51       if (quit_)
     52         return;
     53       poll_timeout_ms = GetDelayMsToNextTaskLocked();
     54       UpdateWatchTasksLocked();
     55     }
     56     int ret = PERFETTO_EINTR(poll(
     57         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
     58     PERFETTO_CHECK(ret >= 0);
     59 
     60     // To avoid starvation we always interleave all types of tasks -- immediate,
     61     // delayed and file descriptor watches.
     62     PostFileDescriptorWatches();
     63     RunImmediateAndDelayedTask();
     64   }
     65 }
     66 
     67 void UnixTaskRunner::Quit() {
     68   std::lock_guard<std::mutex> lock(lock_);
     69   quit_ = true;
     70   WakeUp();
     71 }
     72 
     73 bool UnixTaskRunner::QuitCalled() {
     74   std::lock_guard<std::mutex> lock(lock_);
     75   return quit_;
     76 }
     77 
     78 bool UnixTaskRunner::IsIdleForTesting() {
     79   std::lock_guard<std::mutex> lock(lock_);
     80   return immediate_tasks_.empty();
     81 }
     82 
     83 void UnixTaskRunner::UpdateWatchTasksLocked() {
     84   PERFETTO_DCHECK_THREAD(thread_checker_);
     85   if (!watch_tasks_changed_)
     86     return;
     87   watch_tasks_changed_ = false;
     88   poll_fds_.clear();
     89   for (auto& it : watch_tasks_) {
     90     it.second.poll_fd_index = poll_fds_.size();
     91     poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
     92   }
     93 }
     94 
     95 void UnixTaskRunner::RunImmediateAndDelayedTask() {
     96   // If locking overhead becomes an issue, add a separate work queue.
     97   std::function<void()> immediate_task;
     98   std::function<void()> delayed_task;
     99   TimeMillis now = GetWallTimeMs();
    100   {
    101     std::lock_guard<std::mutex> lock(lock_);
    102     if (!immediate_tasks_.empty()) {
    103       immediate_task = std::move(immediate_tasks_.front());
    104       immediate_tasks_.pop_front();
    105     }
    106     if (!delayed_tasks_.empty()) {
    107       auto it = delayed_tasks_.begin();
    108       if (now >= it->first) {
    109         delayed_task = std::move(it->second);
    110         delayed_tasks_.erase(it);
    111       }
    112     }
    113   }
    114 
    115   errno = 0;
    116   if (immediate_task)
    117     RunTask(immediate_task);
    118   errno = 0;
    119   if (delayed_task)
    120     RunTask(delayed_task);
    121 }
    122 
    123 void UnixTaskRunner::PostFileDescriptorWatches() {
    124   PERFETTO_DCHECK_THREAD(thread_checker_);
    125   for (size_t i = 0; i < poll_fds_.size(); i++) {
    126     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
    127       continue;
    128     poll_fds_[i].revents = 0;
    129 
    130     // The wake-up event is handled inline to avoid an infinite recursion of
    131     // posted tasks.
    132     if (poll_fds_[i].fd == event_.fd()) {
    133       event_.Clear();
    134       continue;
    135     }
    136 
    137     // Binding to |this| is safe since we are the only object executing the
    138     // task.
    139     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
    140                        poll_fds_[i].fd));
    141 
    142     // Make the fd negative while a posted task is pending. This makes poll(2)
    143     // ignore the fd.
    144     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
    145     poll_fds_[i].fd = -poll_fds_[i].fd;
    146   }
    147 }
    148 
    149 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
    150   std::function<void()> task;
    151   {
    152     std::lock_guard<std::mutex> lock(lock_);
    153     auto it = watch_tasks_.find(fd);
    154     if (it == watch_tasks_.end())
    155       return;
    156     // Make poll(2) pay attention to the fd again. Since another thread may have
    157     // updated this watch we need to refresh the set first.
    158     UpdateWatchTasksLocked();
    159     size_t fd_index = it->second.poll_fd_index;
    160     PERFETTO_DCHECK(fd_index < poll_fds_.size());
    161     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
    162     poll_fds_[fd_index].fd = fd;
    163     task = it->second.callback;
    164   }
    165   errno = 0;
    166   RunTask(task);
    167 }
    168 
    169 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
    170   PERFETTO_DCHECK_THREAD(thread_checker_);
    171   if (!immediate_tasks_.empty())
    172     return 0;
    173   if (!delayed_tasks_.empty()) {
    174     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
    175     return std::max(0, static_cast<int>(diff.count()));
    176   }
    177   return -1;
    178 }
    179 
    180 void UnixTaskRunner::PostTask(std::function<void()> task) {
    181   bool was_empty;
    182   {
    183     std::lock_guard<std::mutex> lock(lock_);
    184     was_empty = immediate_tasks_.empty();
    185     immediate_tasks_.push_back(std::move(task));
    186   }
    187   if (was_empty)
    188     WakeUp();
    189 }
    190 
    191 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
    192                                      uint32_t delay_ms) {
    193   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
    194   {
    195     std::lock_guard<std::mutex> lock(lock_);
    196     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
    197   }
    198   WakeUp();
    199 }
    200 
    201 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
    202                                             std::function<void()> task) {
    203   PERFETTO_DCHECK(fd >= 0);
    204   {
    205     std::lock_guard<std::mutex> lock(lock_);
    206     PERFETTO_DCHECK(!watch_tasks_.count(fd));
    207     watch_tasks_[fd] = {std::move(task), SIZE_MAX};
    208     watch_tasks_changed_ = true;
    209   }
    210   WakeUp();
    211 }
    212 
    213 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
    214   PERFETTO_DCHECK(fd >= 0);
    215   {
    216     std::lock_guard<std::mutex> lock(lock_);
    217     PERFETTO_DCHECK(watch_tasks_.count(fd));
    218     watch_tasks_.erase(fd);
    219     watch_tasks_changed_ = true;
    220   }
    221   // No need to schedule a wake-up for this.
    222 }
    223 
    224 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
    225   return GetThreadId() == created_thread_id_;
    226 }
    227 
    228 }  // namespace base
    229 }  // namespace perfetto
    230