1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include <assert.h> 18 #include <errno.h> 19 #include <stdio.h> 20 21 #include <map> 22 23 #include <glog/logging.h> 24 25 #include "common/libs/fs/shared_select.h" 26 #include "host/commands/launch/process_monitor.h" 27 28 namespace cvd { 29 30 namespace { 31 32 void NotifyThread(SharedFD fd) { 33 // The restarter thread is (likely) blocked on a call to select, to make it 34 // wake up and do some work we write something (anything, the content is not 35 // important) into the main side of the socket pair so that the call to select 36 // returns and the notification fd (restarter side of the socket pair) is 37 // marked as ready to read. 38 char buffer = 'a'; 39 fd->Write(&buffer, sizeof(buffer)); 40 } 41 42 void ConsumeNotifications(SharedFD fd) { 43 // Once the starter thread is waken up due to a notification, the calls to 44 // select will continue to return immediately unless we read what was written 45 // on the main side of the socket pair. More than one notification can 46 // accumulate before the restarter thread consumes them, so we attempt to read 47 // more than it's written to consume them all at once. In the unlikely case of 48 // more than 8 notifications acummulating we simply read the first 8 and have 49 // another iteration on the restarter thread loop. 50 char buffer[8]; 51 fd->Read(buffer, sizeof(buffer)); 52 } 53 54 } // namespace 55 56 ProcessMonitor::ProcessMonitor() { 57 if (!SharedFD::SocketPair(AF_LOCAL, SOCK_STREAM, 0, &thread_comm_main_, 58 &thread_comm_monitor_)) { 59 LOG(ERROR) << "Unable to create restarter communication socket pair: " 60 << strerror(errno); 61 return; 62 } 63 monitor_thread_ = std::thread([this]() { MonitorRoutine(); }); 64 } 65 66 void ProcessMonitor::StartSubprocess(Command cmd, OnSocketReadyCb callback) { 67 auto proc = cmd.Start(true); 68 if (!proc.Started()) { 69 LOG(ERROR) << "Failed to start process"; 70 return; 71 } 72 MonitorExistingSubprocess(std::move(cmd), std::move(proc), callback); 73 } 74 75 void ProcessMonitor::MonitorExistingSubprocess(Command cmd, Subprocess proc, 76 OnSocketReadyCb callback) { 77 { 78 std::lock_guard<std::mutex> lock(processes_mutex_); 79 monitored_processes_.push_back(MonitorEntry()); 80 auto& entry = monitored_processes_.back(); 81 entry.cmd.reset(new Command(std::move(cmd))); 82 entry.proc.reset(new Subprocess(std::move(proc))); 83 entry.on_control_socket_ready_cb = callback; 84 } 85 // Wake the restarter thread up so that it starts monitoring this subprocess 86 // Do this after releasing the lock so that the restarter thread is free to 87 // begin work as soon as select returns. 88 NotifyThread(thread_comm_main_); 89 } 90 91 bool ProcessMonitor::RestartOnExitCb(MonitorEntry* entry) { 92 // Make sure the process actually exited 93 char buffer[16]; 94 auto bytes_read = entry->proc->control_socket()->Read(buffer, sizeof(buffer)); 95 if (bytes_read > 0) { 96 LOG(WARNING) << "Subprocess " << entry->cmd->GetShortName() << " wrote " 97 << bytes_read 98 << " bytes on the control socket, this is unexpected"; 99 // The process may not have exited, continue monitoring without restarting 100 return true; 101 } 102 103 LOG(INFO) << "Detected exit of monitored subprocess"; 104 // Make sure the subprocess isn't left in a zombie state, and that the 105 // pid is logged 106 int wstatus; 107 auto wait_ret = TEMP_FAILURE_RETRY(entry->proc->Wait(&wstatus, 0)); 108 // None of the error conditions specified on waitpid(2) apply 109 assert(wait_ret > 0); 110 if (WIFEXITED(wstatus)) { 111 LOG(INFO) << "Subprocess " << entry->cmd->GetShortName() << " (" 112 << wait_ret << ") has exited with exit code " 113 << WEXITSTATUS(wstatus); 114 } else if (WIFSIGNALED(wstatus)) { 115 LOG(ERROR) << "Subprocess " << entry->cmd->GetShortName() << " (" 116 << wait_ret << ") was interrupted by a signal: " 117 << WTERMSIG(wstatus); 118 } else { 119 LOG(INFO) << "subprocess " << entry->cmd->GetShortName() << " (" 120 << wait_ret << ") has exited for unknown reasons"; 121 } 122 entry->proc.reset(new Subprocess(entry->cmd->Start(true))); 123 return true; 124 } 125 126 bool ProcessMonitor::DoNotMonitorCb(MonitorEntry*) { 127 return false; 128 } 129 130 void ProcessMonitor::MonitorRoutine() { 131 LOG(INFO) << "Started monitoring subprocesses"; 132 do { 133 SharedFDSet read_set; 134 read_set.Set(thread_comm_monitor_); 135 { 136 std::lock_guard<std::mutex> lock(processes_mutex_); 137 for (auto& monitored_process: monitored_processes_) { 138 auto control_socket = monitored_process.proc->control_socket(); 139 if (!control_socket->IsOpen()) { 140 LOG(ERROR) << "The control socket for " 141 << monitored_process.cmd->GetShortName() 142 << " is closed, it's effectively NOT being monitored"; 143 } 144 read_set.Set(control_socket); 145 } 146 } 147 // We can't call select while holding the lock as it would lead to a 148 // deadlock (restarter thread waiting for notifications from main thread, 149 // main thread waiting for the lock) 150 int num_fds = cvd::Select(&read_set, nullptr, nullptr, nullptr); 151 if (num_fds < 0) { 152 LOG(ERROR) << "Select call returned error on restarter thread: " 153 << strerror(errno); 154 } 155 if (num_fds > 0) { 156 // Try the communication fd, it's the most likely to be set 157 if (read_set.IsSet(thread_comm_monitor_)) { 158 --num_fds; 159 ConsumeNotifications(thread_comm_monitor_); 160 } 161 } 162 { 163 std::lock_guard<std::mutex> lock(processes_mutex_); 164 // Keep track of the number of file descriptors ready for read, chances 165 // are we don't need to go over the entire list of subprocesses 166 auto it = monitored_processes_.begin(); 167 while (it != monitored_processes_.end()) { 168 auto control_socket = it->proc->control_socket(); 169 bool keep_monitoring = true; 170 if (read_set.IsSet(control_socket)) { 171 --num_fds; 172 keep_monitoring = it->on_control_socket_ready_cb(&(*it)); 173 } 174 if (keep_monitoring) { 175 ++it; 176 } else { 177 it = monitored_processes_.erase(it); 178 } 179 } 180 } 181 assert(num_fds == 0); 182 } while (true); 183 } 184 185 } // namespace cvd 186