Home | History | Annotate | Download | only in lib
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/bindings/lib/connector.h"
      6 
      7 #include <assert.h>
      8 #include <stdlib.h>
      9 
     10 #include <algorithm>
     11 
     12 namespace mojo {
     13 namespace internal {
     14 
     15 // ----------------------------------------------------------------------------
     16 
     17 Connector::Connector(ScopedMessagePipeHandle message_pipe)
     18     : message_pipe_(message_pipe.Pass()),
     19       incoming_receiver_(NULL),
     20       error_(false) {
     21 }
     22 
     23 Connector::~Connector() {
     24 }
     25 
     26 void Connector::SetIncomingReceiver(MessageReceiver* receiver) {
     27   assert(!incoming_receiver_);
     28   incoming_receiver_ = receiver;
     29   if (incoming_receiver_)
     30     WaitToReadMore();
     31 }
     32 
     33 bool Connector::Accept(Message* message) {
     34   if (error_)
     35     return false;
     36 
     37   bool wait_to_write;
     38   WriteOne(message, &wait_to_write);
     39 
     40   if (wait_to_write) {
     41     WaitToWriteMore();
     42     if (!error_)
     43       write_queue_.Push(message);
     44   }
     45 
     46   return !error_;
     47 }
     48 
     49 void Connector::OnHandleReady(Callback* callback, MojoResult result) {
     50   if (callback == &read_callback_)
     51     ReadMore();
     52   if (callback == &write_callback_)
     53     WriteMore();
     54 }
     55 
     56 void Connector::WaitToReadMore() {
     57   read_callback_.SetOwnerToNotify(this);
     58   read_callback_.SetAsyncWaitID(
     59       BindingsSupport::Get()->AsyncWait(message_pipe_.get(),
     60                                         MOJO_WAIT_FLAG_READABLE,
     61                                         &read_callback_));
     62 }
     63 
     64 void Connector::WaitToWriteMore() {
     65   write_callback_.SetOwnerToNotify(this);
     66   write_callback_.SetAsyncWaitID(
     67       BindingsSupport::Get()->AsyncWait(message_pipe_.get(),
     68                                         MOJO_WAIT_FLAG_WRITABLE,
     69                                         &write_callback_));
     70 }
     71 
     72 void Connector::ReadMore() {
     73   for (;;) {
     74     MojoResult rv;
     75 
     76     uint32_t num_bytes = 0, num_handles = 0;
     77     rv = ReadMessageRaw(message_pipe_.get(),
     78                         NULL,
     79                         &num_bytes,
     80                         NULL,
     81                         &num_handles,
     82                         MOJO_READ_MESSAGE_FLAG_NONE);
     83     if (rv == MOJO_RESULT_NOT_FOUND) {
     84       WaitToReadMore();
     85       break;
     86     }
     87     if (rv != MOJO_RESULT_RESOURCE_EXHAUSTED) {
     88       error_ = true;
     89       break;
     90     }
     91 
     92     Message message;
     93     message.data = static_cast<MessageData*>(malloc(num_bytes));
     94     message.handles.resize(num_handles);
     95 
     96     rv = ReadMessageRaw(message_pipe_.get(),
     97                         message.data,
     98                         &num_bytes,
     99                         message.handles.empty() ? NULL :
    100                             reinterpret_cast<MojoHandle*>(&message.handles[0]),
    101                         &num_handles,
    102                         MOJO_READ_MESSAGE_FLAG_NONE);
    103     if (rv != MOJO_RESULT_OK) {
    104       error_ = true;
    105       break;
    106     }
    107 
    108     incoming_receiver_->Accept(&message);
    109   }
    110 }
    111 
    112 void Connector::WriteMore() {
    113   while (!error_ && !write_queue_.IsEmpty()) {
    114     Message* message = write_queue_.Peek();
    115 
    116     bool wait_to_write;
    117     WriteOne(message, &wait_to_write);
    118     if (wait_to_write)
    119       break;
    120 
    121     write_queue_.Pop();
    122   }
    123 }
    124 
    125 void Connector::WriteOne(Message* message, bool* wait_to_write) {
    126   // TODO(darin): WriteMessageRaw will eventually start generating an error that
    127   // it cannot accept more data. In that case, we'll need to wait on the pipe
    128   // to determine when we can try writing again. This flag will be set to true
    129   // in that case.
    130   *wait_to_write = false;
    131 
    132   MojoResult rv = WriteMessageRaw(
    133       message_pipe_.get(),
    134       message->data,
    135       message->data->header.num_bytes,
    136       message->handles.empty() ? NULL :
    137           reinterpret_cast<const MojoHandle*>(&message->handles[0]),
    138       static_cast<uint32_t>(message->handles.size()),
    139       MOJO_WRITE_MESSAGE_FLAG_NONE);
    140   if (rv == MOJO_RESULT_OK) {
    141     // The handles were successfully transferred, so we don't need the message
    142     // to track their lifetime any longer.
    143     message->handles.clear();
    144   } else {
    145     error_ = true;
    146   }
    147 }
    148 
    149 // ----------------------------------------------------------------------------
    150 
    151 Connector::Callback::Callback()
    152     : owner_(NULL),
    153       async_wait_id_(0) {
    154 }
    155 
    156 Connector::Callback::~Callback() {
    157   if (owner_)
    158     BindingsSupport::Get()->CancelWait(async_wait_id_);
    159 }
    160 
    161 void Connector::Callback::SetOwnerToNotify(Connector* owner) {
    162   assert(!owner_);
    163   owner_ = owner;
    164 }
    165 
    166 void Connector::Callback::SetAsyncWaitID(BindingsSupport::AsyncWaitID id) {
    167   async_wait_id_ = id;
    168 }
    169 
    170 void Connector::Callback::OnHandleReady(MojoResult result) {
    171   assert(owner_);
    172   Connector* owner = NULL;
    173   std::swap(owner, owner_);
    174   owner->OnHandleReady(this, result);
    175 }
    176 
    177 }  // namespace internal
    178 }  // namespace mojo
    179