Home | History | Annotate | Download | only in mojo
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/mojo/ipc_message_pipe_reader.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/bind_helpers.h"
      9 #include "base/location.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop_proxy.h"
     12 #include "mojo/public/cpp/environment/environment.h"
     13 
     14 namespace IPC {
     15 namespace internal {
     16 
     17 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
     18     : pipe_wait_id_(0),
     19       pipe_(handle.Pass()) {
     20   StartWaiting();
     21 }
     22 
     23 MessagePipeReader::~MessagePipeReader() {
     24   CHECK(!IsValid());
     25 }
     26 
     27 void MessagePipeReader::Close() {
     28   StopWaiting();
     29   pipe_.reset();
     30   OnPipeClosed();
     31 }
     32 
     33 void MessagePipeReader::CloseWithError(MojoResult error) {
     34   OnPipeError(error);
     35   Close();
     36 }
     37 
     38 // static
     39 void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
     40   reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
     41 }
     42 
     43 void MessagePipeReader::StartWaiting() {
     44   DCHECK(pipe_.is_valid());
     45   DCHECK(!pipe_wait_id_);
     46   // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
     47   // MessagePipe.
     48   //
     49   // TODO(morrita): Should we re-set the signal when we get new
     50   // message to send?
     51   pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
     52       pipe_.get().value(),
     53       MOJO_HANDLE_SIGNAL_READABLE,
     54       MOJO_DEADLINE_INDEFINITE,
     55       &InvokePipeIsReady,
     56       this);
     57 }
     58 
     59 void MessagePipeReader::StopWaiting() {
     60   if (!pipe_wait_id_)
     61     return;
     62   mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
     63   pipe_wait_id_ = 0;
     64 }
     65 
     66 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
     67   pipe_wait_id_ = 0;
     68 
     69   if (wait_result != MOJO_RESULT_OK) {
     70     if (wait_result != MOJO_RESULT_ABORTED) {
     71       // FAILED_PRECONDITION happens every time the peer is dead so
     72       // it isn't worth polluting the log message.
     73       DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
     74           << "Pipe got error from the waiter. Closing: "
     75           << wait_result;
     76       OnPipeError(wait_result);
     77     }
     78 
     79     Close();
     80     return;
     81   }
     82 
     83   while (pipe_.is_valid()) {
     84     MojoResult read_result = ReadMessageBytes();
     85     if (read_result == MOJO_RESULT_SHOULD_WAIT)
     86       break;
     87     if (read_result != MOJO_RESULT_OK) {
     88       // FAILED_PRECONDITION means that all the received messages
     89       // got consumed and the peer is already closed.
     90       if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
     91         DLOG(WARNING)
     92             << "Pipe got error from ReadMessage(). Closing: " << read_result;
     93         OnPipeError(read_result);
     94       }
     95 
     96       Close();
     97       break;
     98     }
     99 
    100     OnMessageReceived();
    101   }
    102 
    103   if (pipe_.is_valid())
    104     StartWaiting();
    105 }
    106 
    107 MojoResult MessagePipeReader::ReadMessageBytes() {
    108   DCHECK(handle_buffer_.empty());
    109 
    110   uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
    111   uint32_t num_handles = 0;
    112   MojoResult result = MojoReadMessage(pipe_.get().value(),
    113                                       num_bytes ? &data_buffer_[0] : NULL,
    114                                       &num_bytes,
    115                                       NULL,
    116                                       &num_handles,
    117                                       MOJO_READ_MESSAGE_FLAG_NONE);
    118   data_buffer_.resize(num_bytes);
    119   handle_buffer_.resize(num_handles);
    120   if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
    121     // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
    122     // it needs more bufer. So we re-read it with resized buffers.
    123     result = MojoReadMessage(pipe_.get().value(),
    124                              num_bytes ? &data_buffer_[0] : NULL,
    125                              &num_bytes,
    126                              num_handles ? &handle_buffer_[0] : NULL,
    127                              &num_handles,
    128                              MOJO_READ_MESSAGE_FLAG_NONE);
    129   }
    130 
    131   DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
    132   DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
    133   return result;
    134 }
    135 
    136 void MessagePipeReader::DelayedDeleter::operator()(
    137     MessagePipeReader* ptr) const {
    138   ptr->Close();
    139   base::MessageLoopProxy::current()->PostTask(
    140       FROM_HERE, base::Bind(&DeleteNow, ptr));
    141 }
    142 
    143 }  // namespace internal
    144 }  // namespace IPC
    145