1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/bindings/lib/connector.h" 6 7 #include <assert.h> 8 #include <stdlib.h> 9 10 #include <algorithm> 11 12 namespace mojo { 13 namespace internal { 14 15 // ---------------------------------------------------------------------------- 16 17 Connector::Connector(ScopedMessagePipeHandle message_pipe) 18 : message_pipe_(message_pipe.Pass()), 19 incoming_receiver_(NULL), 20 error_(false) { 21 } 22 23 Connector::~Connector() { 24 } 25 26 void Connector::SetIncomingReceiver(MessageReceiver* receiver) { 27 assert(!incoming_receiver_); 28 incoming_receiver_ = receiver; 29 if (incoming_receiver_) 30 WaitToReadMore(); 31 } 32 33 bool Connector::Accept(Message* message) { 34 if (error_) 35 return false; 36 37 bool wait_to_write; 38 WriteOne(message, &wait_to_write); 39 40 if (wait_to_write) { 41 WaitToWriteMore(); 42 if (!error_) 43 write_queue_.Push(message); 44 } 45 46 return !error_; 47 } 48 49 void Connector::OnHandleReady(Callback* callback, MojoResult result) { 50 if (callback == &read_callback_) 51 ReadMore(); 52 if (callback == &write_callback_) 53 WriteMore(); 54 } 55 56 void Connector::WaitToReadMore() { 57 read_callback_.SetOwnerToNotify(this); 58 read_callback_.SetAsyncWaitID( 59 BindingsSupport::Get()->AsyncWait(message_pipe_.get(), 60 MOJO_WAIT_FLAG_READABLE, 61 &read_callback_)); 62 } 63 64 void Connector::WaitToWriteMore() { 65 write_callback_.SetOwnerToNotify(this); 66 write_callback_.SetAsyncWaitID( 67 BindingsSupport::Get()->AsyncWait(message_pipe_.get(), 68 MOJO_WAIT_FLAG_WRITABLE, 69 &write_callback_)); 70 } 71 72 void Connector::ReadMore() { 73 for (;;) { 74 MojoResult rv; 75 76 uint32_t num_bytes = 0, num_handles = 0; 77 rv = ReadMessageRaw(message_pipe_.get(), 78 NULL, 79 &num_bytes, 80 NULL, 81 &num_handles, 82 MOJO_READ_MESSAGE_FLAG_NONE); 83 if (rv == MOJO_RESULT_NOT_FOUND) { 84 WaitToReadMore(); 85 break; 86 } 87 if (rv != MOJO_RESULT_RESOURCE_EXHAUSTED) { 88 error_ = true; 89 break; 90 } 91 92 Message message; 93 message.data = static_cast<MessageData*>(malloc(num_bytes)); 94 message.handles.resize(num_handles); 95 96 rv = ReadMessageRaw(message_pipe_.get(), 97 message.data, 98 &num_bytes, 99 message.handles.empty() ? NULL : 100 reinterpret_cast<MojoHandle*>(&message.handles[0]), 101 &num_handles, 102 MOJO_READ_MESSAGE_FLAG_NONE); 103 if (rv != MOJO_RESULT_OK) { 104 error_ = true; 105 break; 106 } 107 108 incoming_receiver_->Accept(&message); 109 } 110 } 111 112 void Connector::WriteMore() { 113 while (!error_ && !write_queue_.IsEmpty()) { 114 Message* message = write_queue_.Peek(); 115 116 bool wait_to_write; 117 WriteOne(message, &wait_to_write); 118 if (wait_to_write) 119 break; 120 121 write_queue_.Pop(); 122 } 123 } 124 125 void Connector::WriteOne(Message* message, bool* wait_to_write) { 126 // TODO(darin): WriteMessageRaw will eventually start generating an error that 127 // it cannot accept more data. In that case, we'll need to wait on the pipe 128 // to determine when we can try writing again. This flag will be set to true 129 // in that case. 130 *wait_to_write = false; 131 132 MojoResult rv = WriteMessageRaw( 133 message_pipe_.get(), 134 message->data, 135 message->data->header.num_bytes, 136 message->handles.empty() ? NULL : 137 reinterpret_cast<const MojoHandle*>(&message->handles[0]), 138 static_cast<uint32_t>(message->handles.size()), 139 MOJO_WRITE_MESSAGE_FLAG_NONE); 140 if (rv == MOJO_RESULT_OK) { 141 // The handles were successfully transferred, so we don't need the message 142 // to track their lifetime any longer. 143 message->handles.clear(); 144 } else { 145 error_ = true; 146 } 147 } 148 149 // ---------------------------------------------------------------------------- 150 151 Connector::Callback::Callback() 152 : owner_(NULL), 153 async_wait_id_(0) { 154 } 155 156 Connector::Callback::~Callback() { 157 if (owner_) 158 BindingsSupport::Get()->CancelWait(async_wait_id_); 159 } 160 161 void Connector::Callback::SetOwnerToNotify(Connector* owner) { 162 assert(!owner_); 163 owner_ = owner; 164 } 165 166 void Connector::Callback::SetAsyncWaitID(BindingsSupport::AsyncWaitID id) { 167 async_wait_id_ = id; 168 } 169 170 void Connector::Callback::OnHandleReady(MojoResult result) { 171 assert(owner_); 172 Connector* owner = NULL; 173 std::swap(owner, owner_); 174 owner->OnHandleReady(this, result); 175 } 176 177 } // namespace internal 178 } // namespace mojo 179