1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/common/message_pump_mojo.h" 6 7 #include <algorithm> 8 #include <vector> 9 10 #include "base/logging.h" 11 #include "base/time/time.h" 12 #include "mojo/common/message_pump_mojo_handler.h" 13 14 namespace mojo { 15 namespace common { 16 17 // State needed for one iteration of WaitMany. The first handle and flags 18 // corresponds to that of the control pipe. 19 struct MessagePumpMojo::WaitState { 20 std::vector<Handle> handles; 21 std::vector<MojoWaitFlags> wait_flags; 22 }; 23 24 struct MessagePumpMojo::RunState { 25 RunState() : should_quit(false) { 26 CreateMessagePipe(&read_handle, &write_handle); 27 } 28 29 base::TimeTicks delayed_work_time; 30 31 // Used to wake up WaitForWork(). 32 ScopedMessagePipeHandle read_handle; 33 ScopedMessagePipeHandle write_handle; 34 35 bool should_quit; 36 }; 37 38 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { 39 } 40 41 MessagePumpMojo::~MessagePumpMojo() { 42 } 43 44 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, 45 const Handle& handle, 46 MojoWaitFlags wait_flags, 47 base::TimeTicks deadline) { 48 DCHECK(handler); 49 DCHECK(handle.is_valid()); 50 // Assume it's an error if someone tries to reregister an existing handle. 51 DCHECK_EQ(0u, handlers_.count(handle)); 52 Handler handler_data; 53 handler_data.handler = handler; 54 handler_data.wait_flags = wait_flags; 55 handler_data.deadline = deadline; 56 handler_data.id = next_handler_id_++; 57 handlers_[handle] = handler_data; 58 } 59 60 void MessagePumpMojo::RemoveHandler(const Handle& handle) { 61 handlers_.erase(handle); 62 } 63 64 void MessagePumpMojo::Run(Delegate* delegate) { 65 RunState* old_state = run_state_; 66 RunState run_state; 67 // TODO: better deal with error handling. 68 CHECK(run_state.read_handle.is_valid()); 69 CHECK(run_state.write_handle.is_valid()); 70 run_state_ = &run_state; 71 bool more_work_is_plausible = true; 72 for (;;) { 73 const bool block = !more_work_is_plausible; 74 DoInternalWork(block); 75 76 // There isn't a good way to know if there are more handles ready, we assume 77 // not. 78 more_work_is_plausible = false; 79 80 if (run_state.should_quit) 81 break; 82 83 more_work_is_plausible |= delegate->DoWork(); 84 if (run_state.should_quit) 85 break; 86 87 more_work_is_plausible |= delegate->DoDelayedWork( 88 &run_state.delayed_work_time); 89 if (run_state.should_quit) 90 break; 91 92 if (more_work_is_plausible) 93 continue; 94 95 more_work_is_plausible = delegate->DoIdleWork(); 96 if (run_state.should_quit) 97 break; 98 } 99 run_state_ = old_state; 100 } 101 102 void MessagePumpMojo::Quit() { 103 if (run_state_) 104 run_state_->should_quit = true; 105 } 106 107 void MessagePumpMojo::ScheduleWork() { 108 SignalControlPipe(); 109 } 110 111 void MessagePumpMojo::ScheduleDelayedWork( 112 const base::TimeTicks& delayed_work_time) { 113 if (!run_state_) 114 return; 115 run_state_->delayed_work_time = delayed_work_time; 116 SignalControlPipe(); 117 } 118 119 void MessagePumpMojo::DoInternalWork(bool block) { 120 const MojoDeadline deadline = block ? GetDeadlineForWait() : 0; 121 const WaitState wait_state = GetWaitState(); 122 const MojoResult result = 123 WaitMany(wait_state.handles, wait_state.wait_flags, deadline); 124 if (result == 0) { 125 // Control pipe was written to. 126 uint32_t num_bytes = 0; 127 ReadMessageRaw(run_state_->read_handle.get(), NULL, &num_bytes, NULL, NULL, 128 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 129 } else if (result > 0) { 130 const size_t index = static_cast<size_t>(result); 131 DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end()); 132 handlers_[wait_state.handles[index]].handler->OnHandleReady( 133 wait_state.handles[index]); 134 } else { 135 switch (result) { 136 case MOJO_RESULT_INVALID_ARGUMENT: 137 case MOJO_RESULT_FAILED_PRECONDITION: 138 RemoveFirstInvalidHandle(wait_state); 139 break; 140 case MOJO_RESULT_DEADLINE_EXCEEDED: 141 break; 142 default: 143 NOTREACHED(); 144 } 145 } 146 147 // Notify and remove any handlers whose time has expired. Make a copy in case 148 // someone tries to add/remove new handlers from notification. 149 const HandleToHandler cloned_handlers(handlers_); 150 const base::TimeTicks now(base::TimeTicks::Now()); 151 for (HandleToHandler::const_iterator i = cloned_handlers.begin(); 152 i != cloned_handlers.end(); ++i) { 153 // Since we're iterating over a clone of the handlers, verify the handler is 154 // still valid before notifying. 155 if (!i->second.deadline.is_null() && i->second.deadline < now && 156 handlers_.find(i->first) != handlers_.end() && 157 handlers_[i->first].id == i->second.id) { 158 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED); 159 } 160 } 161 } 162 163 void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) { 164 // TODO(sky): deal with control pipe going bad. 165 for (size_t i = 1; i < wait_state.handles.size(); ++i) { 166 const MojoResult result = 167 Wait(wait_state.handles[i], wait_state.wait_flags[i], 0); 168 if (result == MOJO_RESULT_INVALID_ARGUMENT || 169 result == MOJO_RESULT_FAILED_PRECONDITION) { 170 // Remove the handle first, this way if OnHandleError() tries to remove 171 // the handle our iterator isn't invalidated. 172 DCHECK(handlers_.find(wait_state.handles[i]) != handlers_.end()); 173 MessagePumpMojoHandler* handler = 174 handlers_[wait_state.handles[i]].handler; 175 handlers_.erase(wait_state.handles[i]); 176 handler->OnHandleError(wait_state.handles[i], result); 177 return; 178 } else { 179 DCHECK_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, result); 180 } 181 } 182 } 183 184 void MessagePumpMojo::SignalControlPipe() { 185 if (!run_state_) 186 return; 187 188 // TODO(sky): deal with error? 189 WriteMessageRaw(run_state_->write_handle.get(), NULL, 0, NULL, 0, 190 MOJO_WRITE_MESSAGE_FLAG_NONE); 191 } 192 193 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const { 194 WaitState wait_state; 195 wait_state.handles.push_back(run_state_->read_handle.get()); 196 wait_state.wait_flags.push_back(MOJO_WAIT_FLAG_READABLE); 197 198 for (HandleToHandler::const_iterator i = handlers_.begin(); 199 i != handlers_.end(); ++i) { 200 wait_state.handles.push_back(i->first); 201 wait_state.wait_flags.push_back(i->second.wait_flags); 202 } 203 return wait_state; 204 } 205 206 MojoDeadline MessagePumpMojo::GetDeadlineForWait() const { 207 base::TimeTicks min_time = run_state_->delayed_work_time; 208 for (HandleToHandler::const_iterator i = handlers_.begin(); 209 i != handlers_.end(); ++i) { 210 if (min_time.is_null() && i->second.deadline < min_time) 211 min_time = i->second.deadline; 212 } 213 return min_time.is_null() ? MOJO_DEADLINE_INDEFINITE : 214 std::max(static_cast<MojoDeadline>(0), 215 static_cast<MojoDeadline>( 216 (min_time - base::TimeTicks::Now()).InMicroseconds())); 217 } 218 219 } // namespace common 220 } // namespace mojo 221