1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "perfetto/base/unix_task_runner.h" 18 19 #include "perfetto/base/build_config.h" 20 21 #include <errno.h> 22 #include <fcntl.h> 23 #include <stdlib.h> 24 #include <unistd.h> 25 26 #include <limits> 27 28 namespace perfetto { 29 namespace base { 30 31 UnixTaskRunner::UnixTaskRunner() { 32 // Create a self-pipe which is used to wake up the main thread from inside 33 // poll(2). 34 int pipe_fds[2]; 35 PERFETTO_CHECK(pipe(pipe_fds) == 0); 36 37 // Make the pipe non-blocking so that we never block the waking thread (either 38 // the main thread or another one) when scheduling a wake-up. 39 for (auto fd : pipe_fds) { 40 int flags = fcntl(fd, F_GETFL, 0); 41 PERFETTO_CHECK(flags != -1); 42 PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); 43 PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0); 44 } 45 control_read_.reset(pipe_fds[0]); 46 control_write_.reset(pipe_fds[1]); 47 48 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) 49 // We are never expecting to have more than a few bytes in the wake-up pipe. 50 // Reduce the buffer size on Linux. Note that this gets rounded up to the page 51 // size. 52 PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0); 53 #endif 54 55 AddFileDescriptorWatch(control_read_.get(), [] { 56 // Not reached -- see PostFileDescriptorWatches(). 57 PERFETTO_DCHECK(false); 58 }); 59 } 60 61 UnixTaskRunner::~UnixTaskRunner() = default; 62 63 void UnixTaskRunner::WakeUp() { 64 const char dummy = 'P'; 65 if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN) 66 PERFETTO_DPLOG("write()"); 67 } 68 69 void UnixTaskRunner::Run() { 70 PERFETTO_DCHECK_THREAD(thread_checker_); 71 quit_ = false; 72 while (true) { 73 int poll_timeout_ms; 74 { 75 std::lock_guard<std::mutex> lock(lock_); 76 if (quit_) 77 return; 78 poll_timeout_ms = GetDelayMsToNextTaskLocked(); 79 UpdateWatchTasksLocked(); 80 } 81 int ret = PERFETTO_EINTR(poll( 82 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms)); 83 PERFETTO_CHECK(ret >= 0); 84 85 // To avoid starvation we always interleave all types of tasks -- immediate, 86 // delayed and file descriptor watches. 87 PostFileDescriptorWatches(); 88 RunImmediateAndDelayedTask(); 89 } 90 } 91 92 void UnixTaskRunner::Quit() { 93 { 94 std::lock_guard<std::mutex> lock(lock_); 95 quit_ = true; 96 } 97 WakeUp(); 98 } 99 100 bool UnixTaskRunner::IsIdleForTesting() { 101 std::lock_guard<std::mutex> lock(lock_); 102 return immediate_tasks_.empty(); 103 } 104 105 void UnixTaskRunner::UpdateWatchTasksLocked() { 106 PERFETTO_DCHECK_THREAD(thread_checker_); 107 if (!watch_tasks_changed_) 108 return; 109 watch_tasks_changed_ = false; 110 poll_fds_.clear(); 111 for (auto& it : watch_tasks_) { 112 it.second.poll_fd_index = poll_fds_.size(); 113 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0}); 114 } 115 } 116 117 void UnixTaskRunner::RunImmediateAndDelayedTask() { 118 // TODO(skyostil): Add a separate work queue in case in case locking overhead 119 // becomes an issue. 120 std::function<void()> immediate_task; 121 std::function<void()> delayed_task; 122 TimeMillis now = GetWallTimeMs(); 123 { 124 std::lock_guard<std::mutex> lock(lock_); 125 if (!immediate_tasks_.empty()) { 126 immediate_task = std::move(immediate_tasks_.front()); 127 immediate_tasks_.pop_front(); 128 } 129 if (!delayed_tasks_.empty()) { 130 auto it = delayed_tasks_.begin(); 131 if (now >= it->first) { 132 delayed_task = std::move(it->second); 133 delayed_tasks_.erase(it); 134 } 135 } 136 } 137 138 errno = 0; 139 if (immediate_task) 140 RunTask(immediate_task); 141 errno = 0; 142 if (delayed_task) 143 RunTask(delayed_task); 144 } 145 146 void UnixTaskRunner::PostFileDescriptorWatches() { 147 PERFETTO_DCHECK_THREAD(thread_checker_); 148 for (size_t i = 0; i < poll_fds_.size(); i++) { 149 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP))) 150 continue; 151 poll_fds_[i].revents = 0; 152 153 // The wake-up event is handled inline to avoid an infinite recursion of 154 // posted tasks. 155 if (poll_fds_[i].fd == control_read_.get()) { 156 // Drain the byte(s) written to the wake-up pipe. We can potentially read 157 // more than one byte if several wake-ups have been scheduled. 158 char buffer[16]; 159 if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 && 160 errno != EAGAIN) { 161 PERFETTO_DPLOG("read()"); 162 } 163 continue; 164 } 165 166 // Binding to |this| is safe since we are the only object executing the 167 // task. 168 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, 169 poll_fds_[i].fd)); 170 171 // Make the fd negative while a posted task is pending. This makes poll(2) 172 // ignore the fd. 173 PERFETTO_DCHECK(poll_fds_[i].fd >= 0); 174 poll_fds_[i].fd = -poll_fds_[i].fd; 175 } 176 } 177 178 void UnixTaskRunner::RunFileDescriptorWatch(int fd) { 179 std::function<void()> task; 180 { 181 std::lock_guard<std::mutex> lock(lock_); 182 auto it = watch_tasks_.find(fd); 183 if (it == watch_tasks_.end()) 184 return; 185 // Make poll(2) pay attention to the fd again. Since another thread may have 186 // updated this watch we need to refresh the set first. 187 UpdateWatchTasksLocked(); 188 size_t fd_index = it->second.poll_fd_index; 189 PERFETTO_DCHECK(fd_index < poll_fds_.size()); 190 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd); 191 poll_fds_[fd_index].fd = fd; 192 task = it->second.callback; 193 } 194 errno = 0; 195 RunTask(task); 196 } 197 198 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const { 199 PERFETTO_DCHECK_THREAD(thread_checker_); 200 if (!immediate_tasks_.empty()) 201 return 0; 202 if (!delayed_tasks_.empty()) { 203 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs(); 204 return std::max(0, static_cast<int>(diff.count())); 205 } 206 return -1; 207 } 208 209 void UnixTaskRunner::PostTask(std::function<void()> task) { 210 bool was_empty; 211 { 212 std::lock_guard<std::mutex> lock(lock_); 213 was_empty = immediate_tasks_.empty(); 214 immediate_tasks_.push_back(std::move(task)); 215 } 216 if (was_empty) 217 WakeUp(); 218 } 219 220 void UnixTaskRunner::PostDelayedTask(std::function<void()> task, 221 uint32_t delay_ms) { 222 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms); 223 { 224 std::lock_guard<std::mutex> lock(lock_); 225 delayed_tasks_.insert(std::make_pair(runtime, std::move(task))); 226 } 227 WakeUp(); 228 } 229 230 void UnixTaskRunner::AddFileDescriptorWatch(int fd, 231 std::function<void()> task) { 232 PERFETTO_DCHECK(fd >= 0); 233 { 234 std::lock_guard<std::mutex> lock(lock_); 235 PERFETTO_DCHECK(!watch_tasks_.count(fd)); 236 watch_tasks_[fd] = {std::move(task), SIZE_MAX}; 237 watch_tasks_changed_ = true; 238 } 239 WakeUp(); 240 } 241 242 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) { 243 PERFETTO_DCHECK(fd >= 0); 244 { 245 std::lock_guard<std::mutex> lock(lock_); 246 PERFETTO_DCHECK(watch_tasks_.count(fd)); 247 watch_tasks_.erase(fd); 248 watch_tasks_changed_ = true; 249 } 250 // No need to schedule a wake-up for this. 251 } 252 253 } // namespace base 254 } // namespace perfetto 255