1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/bindings/connector.h" 6 7 #include <stdint.h> 8 #include <utility> 9 10 #include "base/bind.h" 11 #include "base/location.h" 12 #include "base/logging.h" 13 #include "base/macros.h" 14 #include "base/synchronization/lock.h" 15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 17 18 namespace mojo { 19 20 Connector::Connector(ScopedMessagePipeHandle message_pipe, 21 ConnectorConfig config, 22 scoped_refptr<base::SingleThreadTaskRunner> runner) 23 : message_pipe_(std::move(message_pipe)), 24 task_runner_(std::move(runner)), 25 weak_factory_(this) { 26 if (config == MULTI_THREADED_SEND) 27 lock_.emplace(); 28 29 weak_self_ = weak_factory_.GetWeakPtr(); 30 // Even though we don't have an incoming receiver, we still want to monitor 31 // the message pipe to know if is closed or encounters an error. 32 WaitToReadMore(); 33 } 34 35 Connector::~Connector() { 36 { 37 // Allow for quick destruction on any thread if the pipe is already closed. 38 base::AutoLock lock(connected_lock_); 39 if (!connected_) 40 return; 41 } 42 43 DCHECK(thread_checker_.CalledOnValidThread()); 44 CancelWait(); 45 } 46 47 void Connector::CloseMessagePipe() { 48 // Throw away the returned message pipe. 49 PassMessagePipe(); 50 } 51 52 ScopedMessagePipeHandle Connector::PassMessagePipe() { 53 DCHECK(thread_checker_.CalledOnValidThread()); 54 55 CancelWait(); 56 internal::MayAutoLock locker(&lock_); 57 ScopedMessagePipeHandle message_pipe = std::move(message_pipe_); 58 weak_factory_.InvalidateWeakPtrs(); 59 sync_handle_watcher_callback_count_ = 0; 60 61 base::AutoLock lock(connected_lock_); 62 connected_ = false; 63 return message_pipe; 64 } 65 66 void Connector::RaiseError() { 67 DCHECK(thread_checker_.CalledOnValidThread()); 68 69 HandleError(true, true); 70 } 71 72 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { 73 DCHECK(thread_checker_.CalledOnValidThread()); 74 75 if (error_) 76 return false; 77 78 ResumeIncomingMethodCallProcessing(); 79 80 MojoResult rv = 81 Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); 82 if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED) 83 return false; 84 if (rv != MOJO_RESULT_OK) { 85 // Users that call WaitForIncomingMessage() should expect their code to be 86 // re-entered, so we call the error handler synchronously. 87 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 88 return false; 89 } 90 ignore_result(ReadSingleMessage(&rv)); 91 return (rv == MOJO_RESULT_OK); 92 } 93 94 void Connector::PauseIncomingMethodCallProcessing() { 95 DCHECK(thread_checker_.CalledOnValidThread()); 96 97 if (paused_) 98 return; 99 100 paused_ = true; 101 CancelWait(); 102 } 103 104 void Connector::ResumeIncomingMethodCallProcessing() { 105 DCHECK(thread_checker_.CalledOnValidThread()); 106 107 if (!paused_) 108 return; 109 110 paused_ = false; 111 WaitToReadMore(); 112 } 113 114 bool Connector::Accept(Message* message) { 115 DCHECK(lock_ || thread_checker_.CalledOnValidThread()); 116 117 // It shouldn't hurt even if |error_| may be changed by a different thread at 118 // the same time. The outcome is that we may write into |message_pipe_| after 119 // encountering an error, which should be fine. 120 if (error_) 121 return false; 122 123 internal::MayAutoLock locker(&lock_); 124 125 if (!message_pipe_.is_valid() || drop_writes_) 126 return true; 127 128 MojoResult rv = 129 WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(), 130 MOJO_WRITE_MESSAGE_FLAG_NONE); 131 132 switch (rv) { 133 case MOJO_RESULT_OK: 134 break; 135 case MOJO_RESULT_FAILED_PRECONDITION: 136 // There's no point in continuing to write to this pipe since the other 137 // end is gone. Avoid writing any future messages. Hide write failures 138 // from the caller since we'd like them to continue consuming any backlog 139 // of incoming messages before regarding the message pipe as closed. 140 drop_writes_ = true; 141 break; 142 case MOJO_RESULT_BUSY: 143 // We'd get a "busy" result if one of the message's handles is: 144 // - |message_pipe_|'s own handle; 145 // - simultaneously being used on another thread; or 146 // - in a "busy" state that prohibits it from being transferred (e.g., 147 // a data pipe handle in the middle of a two-phase read/write, 148 // regardless of which thread that two-phase read/write is happening 149 // on). 150 // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until 151 // crbug.com/389666, etc. are resolved, this will make tests fail quickly 152 // rather than hanging.) 153 CHECK(false) << "Race condition or other bug detected"; 154 return false; 155 default: 156 // This particular write was rejected, presumably because of bad input. 157 // The pipe is not necessarily in a bad state. 158 return false; 159 } 160 return true; 161 } 162 163 void Connector::AllowWokenUpBySyncWatchOnSameThread() { 164 DCHECK(thread_checker_.CalledOnValidThread()); 165 166 allow_woken_up_by_others_ = true; 167 168 EnsureSyncWatcherExists(); 169 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 170 } 171 172 bool Connector::SyncWatch(const bool* should_stop) { 173 DCHECK(thread_checker_.CalledOnValidThread()); 174 175 if (error_) 176 return false; 177 178 ResumeIncomingMethodCallProcessing(); 179 180 EnsureSyncWatcherExists(); 181 return sync_watcher_->SyncWatch(should_stop); 182 } 183 184 void Connector::SetWatcherHeapProfilerTag(const char* tag) { 185 heap_profiler_tag_ = tag; 186 if (handle_watcher_) { 187 handle_watcher_->set_heap_profiler_tag(tag); 188 } 189 } 190 191 void Connector::OnWatcherHandleReady(MojoResult result) { 192 OnHandleReadyInternal(result); 193 } 194 195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 196 base::WeakPtr<Connector> weak_self(weak_self_); 197 198 sync_handle_watcher_callback_count_++; 199 OnHandleReadyInternal(result); 200 // At this point, this object might have been deleted. 201 if (weak_self) { 202 DCHECK_LT(0u, sync_handle_watcher_callback_count_); 203 sync_handle_watcher_callback_count_--; 204 } 205 } 206 207 void Connector::OnHandleReadyInternal(MojoResult result) { 208 DCHECK(thread_checker_.CalledOnValidThread()); 209 210 if (result != MOJO_RESULT_OK) { 211 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 212 return; 213 } 214 ReadAllAvailableMessages(); 215 // At this point, this object might have been deleted. Return. 216 } 217 218 void Connector::WaitToReadMore() { 219 CHECK(!paused_); 220 DCHECK(!handle_watcher_); 221 222 handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_)); 223 if (heap_profiler_tag_) 224 handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); 225 MojoResult rv = handle_watcher_->Start( 226 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 227 base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); 228 229 if (rv != MOJO_RESULT_OK) { 230 // If the watch failed because the handle is invalid or its conditions can 231 // no longer be met, we signal the error asynchronously to avoid reentry. 232 task_runner_->PostTask( 233 FROM_HERE, 234 base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); 235 } 236 237 if (allow_woken_up_by_others_) { 238 EnsureSyncWatcherExists(); 239 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 240 } 241 } 242 243 bool Connector::ReadSingleMessage(MojoResult* read_result) { 244 CHECK(!paused_); 245 246 bool receiver_result = false; 247 248 // Detect if |this| was destroyed or the message pipe was closed/transferred 249 // during message dispatch. 250 base::WeakPtr<Connector> weak_self = weak_self_; 251 252 Message message; 253 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 254 *read_result = rv; 255 256 if (rv == MOJO_RESULT_OK) { 257 receiver_result = 258 incoming_receiver_ && incoming_receiver_->Accept(&message); 259 } 260 261 if (!weak_self) 262 return false; 263 264 if (rv == MOJO_RESULT_SHOULD_WAIT) 265 return true; 266 267 if (rv != MOJO_RESULT_OK) { 268 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 269 return false; 270 } 271 272 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { 273 HandleError(true, false); 274 return false; 275 } 276 return true; 277 } 278 279 void Connector::ReadAllAvailableMessages() { 280 while (!error_) { 281 MojoResult rv; 282 283 if (!ReadSingleMessage(&rv)) { 284 // Return immediately without touching any members. |this| may have been 285 // destroyed. 286 return; 287 } 288 289 if (paused_) 290 return; 291 292 if (rv == MOJO_RESULT_SHOULD_WAIT) 293 break; 294 } 295 } 296 297 void Connector::CancelWait() { 298 handle_watcher_.reset(); 299 sync_watcher_.reset(); 300 } 301 302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 303 if (error_ || !message_pipe_.is_valid()) 304 return; 305 306 if (paused_) { 307 // Enforce calling the error handler asynchronously if the user has paused 308 // receiving messages. We need to wait until the user starts receiving 309 // messages again. 310 force_async_handler = true; 311 } 312 313 if (!force_pipe_reset && force_async_handler) 314 force_pipe_reset = true; 315 316 if (force_pipe_reset) { 317 CancelWait(); 318 internal::MayAutoLock locker(&lock_); 319 message_pipe_.reset(); 320 MessagePipe dummy_pipe; 321 message_pipe_ = std::move(dummy_pipe.handle0); 322 } else { 323 CancelWait(); 324 } 325 326 if (force_async_handler) { 327 if (!paused_) 328 WaitToReadMore(); 329 } else { 330 error_ = true; 331 if (!connection_error_handler_.is_null()) 332 connection_error_handler_.Run(); 333 } 334 } 335 336 void Connector::EnsureSyncWatcherExists() { 337 if (sync_watcher_) 338 return; 339 sync_watcher_.reset(new SyncHandleWatcher( 340 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 341 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 342 base::Unretained(this)))); 343 } 344 345 } // namespace mojo 346