1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/bindings/lib/connector.h" 6 7 #include <stddef.h> 8 9 #include "mojo/public/cpp/bindings/error_handler.h" 10 #include "mojo/public/cpp/environment/logging.h" 11 12 namespace mojo { 13 namespace internal { 14 15 // ---------------------------------------------------------------------------- 16 17 Connector::Connector(ScopedMessagePipeHandle message_pipe, 18 const MojoAsyncWaiter* waiter) 19 : error_handler_(NULL), 20 waiter_(waiter), 21 message_pipe_(message_pipe.Pass()), 22 incoming_receiver_(NULL), 23 async_wait_id_(0), 24 error_(false), 25 drop_writes_(false), 26 enforce_errors_from_incoming_receiver_(true), 27 destroyed_flag_(NULL) { 28 // Even though we don't have an incoming receiver, we still want to monitor 29 // the message pipe to know if is closed or encounters an error. 30 WaitToReadMore(); 31 } 32 33 Connector::~Connector() { 34 if (destroyed_flag_) 35 *destroyed_flag_ = true; 36 37 CancelWait(); 38 } 39 40 void Connector::CloseMessagePipe() { 41 CancelWait(); 42 Close(message_pipe_.Pass()); 43 } 44 45 ScopedMessagePipeHandle Connector::PassMessagePipe() { 46 CancelWait(); 47 return message_pipe_.Pass(); 48 } 49 50 bool Connector::WaitForIncomingMessage() { 51 if (error_) 52 return false; 53 54 MojoResult rv = Wait(message_pipe_.get(), 55 MOJO_HANDLE_SIGNAL_READABLE, 56 MOJO_DEADLINE_INDEFINITE); 57 if (rv != MOJO_RESULT_OK) { 58 NotifyError(); 59 return false; 60 } 61 mojo_ignore_result(ReadSingleMessage(&rv)); 62 return (rv == MOJO_RESULT_OK); 63 } 64 65 bool Connector::Accept(Message* message) { 66 MOJO_CHECK(message_pipe_.is_valid()); 67 68 if (error_) 69 return false; 70 71 if (drop_writes_) 72 return true; 73 74 MojoResult rv = WriteMessageRaw( 75 message_pipe_.get(), 76 message->data(), 77 message->data_num_bytes(), 78 message->mutable_handles()->empty() ? NULL : 79 reinterpret_cast<const MojoHandle*>( 80 &message->mutable_handles()->front()), 81 static_cast<uint32_t>(message->mutable_handles()->size()), 82 MOJO_WRITE_MESSAGE_FLAG_NONE); 83 84 switch (rv) { 85 case MOJO_RESULT_OK: 86 // The handles were successfully transferred, so we don't need the message 87 // to track their lifetime any longer. 88 message->mutable_handles()->clear(); 89 break; 90 case MOJO_RESULT_FAILED_PRECONDITION: 91 // There's no point in continuing to write to this pipe since the other 92 // end is gone. Avoid writing any future messages. Hide write failures 93 // from the caller since we'd like them to continue consuming any backlog 94 // of incoming messages before regarding the message pipe as closed. 95 drop_writes_ = true; 96 break; 97 case MOJO_RESULT_BUSY: 98 // We'd get a "busy" result if one of the message's handles is: 99 // - |message_pipe_|'s own handle; 100 // - simultaneously being used on another thread; or 101 // - in a "busy" state that prohibits it from being transferred (e.g., 102 // a data pipe handle in the middle of a two-phase read/write, 103 // regardless of which thread that two-phase read/write is happening 104 // on). 105 // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until 106 // crbug.com/389666, etc. are resolved, this will make tests fail quickly 107 // rather than hanging.) 108 MOJO_CHECK(false) << "Race condition or other bug detected"; 109 return false; 110 default: 111 // This particular write was rejected, presumably because of bad input. 112 // The pipe is not necessarily in a bad state. 113 return false; 114 } 115 return true; 116 } 117 118 // static 119 void Connector::CallOnHandleReady(void* closure, MojoResult result) { 120 Connector* self = static_cast<Connector*>(closure); 121 self->OnHandleReady(result); 122 } 123 124 void Connector::OnHandleReady(MojoResult result) { 125 MOJO_CHECK(async_wait_id_ != 0); 126 async_wait_id_ = 0; 127 if (result != MOJO_RESULT_OK) { 128 NotifyError(); 129 return; 130 } 131 ReadAllAvailableMessages(); 132 // At this point, this object might have been deleted. Return. 133 } 134 135 void Connector::WaitToReadMore() { 136 MOJO_CHECK(!async_wait_id_); 137 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), 138 MOJO_HANDLE_SIGNAL_READABLE, 139 MOJO_DEADLINE_INDEFINITE, 140 &Connector::CallOnHandleReady, 141 this); 142 } 143 144 bool Connector::ReadSingleMessage(MojoResult* read_result) { 145 bool receiver_result = false; 146 147 // Detect if |this| was destroyed during message dispatch. Allow for the 148 // possibility of re-entering ReadMore() through message dispatch. 149 bool was_destroyed_during_dispatch = false; 150 bool* previous_destroyed_flag = destroyed_flag_; 151 destroyed_flag_ = &was_destroyed_during_dispatch; 152 153 MojoResult rv = ReadAndDispatchMessage( 154 message_pipe_.get(), incoming_receiver_, &receiver_result); 155 if (read_result) 156 *read_result = rv; 157 158 if (was_destroyed_during_dispatch) { 159 if (previous_destroyed_flag) 160 *previous_destroyed_flag = true; // Propagate flag. 161 return false; 162 } 163 destroyed_flag_ = previous_destroyed_flag; 164 165 if (rv == MOJO_RESULT_SHOULD_WAIT) 166 return true; 167 168 if (rv != MOJO_RESULT_OK || 169 (enforce_errors_from_incoming_receiver_ && !receiver_result)) { 170 NotifyError(); 171 return false; 172 } 173 return true; 174 } 175 176 void Connector::ReadAllAvailableMessages() { 177 while (!error_) { 178 MojoResult rv; 179 180 // Return immediately if |this| was destroyed. Do not touch any members! 181 if (!ReadSingleMessage(&rv)) 182 return; 183 184 if (rv == MOJO_RESULT_SHOULD_WAIT) { 185 WaitToReadMore(); 186 break; 187 } 188 } 189 } 190 191 void Connector::CancelWait() { 192 if (!async_wait_id_) 193 return; 194 195 waiter_->CancelWait(async_wait_id_); 196 async_wait_id_ = 0; 197 } 198 199 void Connector::NotifyError() { 200 error_ = true; 201 CancelWait(); 202 if (error_handler_) 203 error_handler_->OnConnectionError(); 204 } 205 206 } // namespace internal 207 } // namespace mojo 208