Home | History | Annotate | Download | only in lib
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/bindings/lib/connector.h"
      6 
      7 #include <stddef.h>
      8 
      9 #include "mojo/public/cpp/bindings/error_handler.h"
     10 #include "mojo/public/cpp/environment/logging.h"
     11 
     12 namespace mojo {
     13 namespace internal {
     14 
     15 // ----------------------------------------------------------------------------
     16 
     17 Connector::Connector(ScopedMessagePipeHandle message_pipe,
     18                      const MojoAsyncWaiter* waiter)
     19     : error_handler_(NULL),
     20       waiter_(waiter),
     21       message_pipe_(message_pipe.Pass()),
     22       incoming_receiver_(NULL),
     23       async_wait_id_(0),
     24       error_(false),
     25       drop_writes_(false),
     26       enforce_errors_from_incoming_receiver_(true),
     27       destroyed_flag_(NULL) {
     28   // Even though we don't have an incoming receiver, we still want to monitor
     29   // the message pipe to know if is closed or encounters an error.
     30   WaitToReadMore();
     31 }
     32 
     33 Connector::~Connector() {
     34   if (destroyed_flag_)
     35     *destroyed_flag_ = true;
     36 
     37   CancelWait();
     38 }
     39 
     40 void Connector::CloseMessagePipe() {
     41   CancelWait();
     42   Close(message_pipe_.Pass());
     43 }
     44 
     45 ScopedMessagePipeHandle Connector::PassMessagePipe() {
     46   CancelWait();
     47   return message_pipe_.Pass();
     48 }
     49 
     50 bool Connector::WaitForIncomingMessage() {
     51   if (error_)
     52     return false;
     53 
     54   MojoResult rv = Wait(message_pipe_.get(),
     55                        MOJO_HANDLE_SIGNAL_READABLE,
     56                        MOJO_DEADLINE_INDEFINITE);
     57   if (rv != MOJO_RESULT_OK) {
     58     NotifyError();
     59     return false;
     60   }
     61   mojo_ignore_result(ReadSingleMessage(&rv));
     62   return (rv == MOJO_RESULT_OK);
     63 }
     64 
     65 bool Connector::Accept(Message* message) {
     66   MOJO_CHECK(message_pipe_.is_valid());
     67 
     68   if (error_)
     69     return false;
     70 
     71   if (drop_writes_)
     72     return true;
     73 
     74   MojoResult rv = WriteMessageRaw(
     75       message_pipe_.get(),
     76       message->data(),
     77       message->data_num_bytes(),
     78       message->mutable_handles()->empty() ? NULL :
     79           reinterpret_cast<const MojoHandle*>(
     80               &message->mutable_handles()->front()),
     81       static_cast<uint32_t>(message->mutable_handles()->size()),
     82       MOJO_WRITE_MESSAGE_FLAG_NONE);
     83 
     84   switch (rv) {
     85     case MOJO_RESULT_OK:
     86       // The handles were successfully transferred, so we don't need the message
     87       // to track their lifetime any longer.
     88       message->mutable_handles()->clear();
     89       break;
     90     case MOJO_RESULT_FAILED_PRECONDITION:
     91       // There's no point in continuing to write to this pipe since the other
     92       // end is gone. Avoid writing any future messages. Hide write failures
     93       // from the caller since we'd like them to continue consuming any backlog
     94       // of incoming messages before regarding the message pipe as closed.
     95       drop_writes_ = true;
     96       break;
     97     case MOJO_RESULT_BUSY:
     98       // We'd get a "busy" result if one of the message's handles is:
     99       //   - |message_pipe_|'s own handle;
    100       //   - simultaneously being used on another thread; or
    101       //   - in a "busy" state that prohibits it from being transferred (e.g.,
    102       //     a data pipe handle in the middle of a two-phase read/write,
    103       //     regardless of which thread that two-phase read/write is happening
    104       //     on).
    105       // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until
    106       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
    107       // rather than hanging.)
    108       MOJO_CHECK(false) << "Race condition or other bug detected";
    109       return false;
    110     default:
    111       // This particular write was rejected, presumably because of bad input.
    112       // The pipe is not necessarily in a bad state.
    113       return false;
    114   }
    115   return true;
    116 }
    117 
    118 // static
    119 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
    120   Connector* self = static_cast<Connector*>(closure);
    121   self->OnHandleReady(result);
    122 }
    123 
    124 void Connector::OnHandleReady(MojoResult result) {
    125   MOJO_CHECK(async_wait_id_ != 0);
    126   async_wait_id_ = 0;
    127   if (result != MOJO_RESULT_OK) {
    128     NotifyError();
    129     return;
    130   }
    131   ReadAllAvailableMessages();
    132   // At this point, this object might have been deleted. Return.
    133 }
    134 
    135 void Connector::WaitToReadMore() {
    136   MOJO_CHECK(!async_wait_id_);
    137   async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
    138                                       MOJO_HANDLE_SIGNAL_READABLE,
    139                                       MOJO_DEADLINE_INDEFINITE,
    140                                       &Connector::CallOnHandleReady,
    141                                       this);
    142 }
    143 
    144 bool Connector::ReadSingleMessage(MojoResult* read_result) {
    145   bool receiver_result = false;
    146 
    147   // Detect if |this| was destroyed during message dispatch. Allow for the
    148   // possibility of re-entering ReadMore() through message dispatch.
    149   bool was_destroyed_during_dispatch = false;
    150   bool* previous_destroyed_flag = destroyed_flag_;
    151   destroyed_flag_ = &was_destroyed_during_dispatch;
    152 
    153   MojoResult rv = ReadAndDispatchMessage(
    154       message_pipe_.get(), incoming_receiver_, &receiver_result);
    155   if (read_result)
    156     *read_result = rv;
    157 
    158   if (was_destroyed_during_dispatch) {
    159     if (previous_destroyed_flag)
    160       *previous_destroyed_flag = true;  // Propagate flag.
    161     return false;
    162   }
    163   destroyed_flag_ = previous_destroyed_flag;
    164 
    165   if (rv == MOJO_RESULT_SHOULD_WAIT)
    166     return true;
    167 
    168   if (rv != MOJO_RESULT_OK ||
    169       (enforce_errors_from_incoming_receiver_ && !receiver_result)) {
    170     NotifyError();
    171     return false;
    172   }
    173   return true;
    174 }
    175 
    176 void Connector::ReadAllAvailableMessages() {
    177   while (!error_) {
    178     MojoResult rv;
    179 
    180     // Return immediately if |this| was destroyed. Do not touch any members!
    181     if (!ReadSingleMessage(&rv))
    182       return;
    183 
    184     if (rv == MOJO_RESULT_SHOULD_WAIT) {
    185       WaitToReadMore();
    186       break;
    187     }
    188   }
    189 }
    190 
    191 void Connector::CancelWait() {
    192   if (!async_wait_id_)
    193     return;
    194 
    195   waiter_->CancelWait(async_wait_id_);
    196   async_wait_id_ = 0;
    197 }
    198 
    199 void Connector::NotifyError() {
    200   error_ = true;
    201   CancelWait();
    202   if (error_handler_)
    203     error_handler_->OnConnectionError();
    204 }
    205 
    206 }  // namespace internal
    207 }  // namespace mojo
    208