1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/mojo/ipc_message_pipe_reader.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/location.h" 10 #include "base/logging.h" 11 #include "base/message_loop/message_loop_proxy.h" 12 #include "mojo/public/cpp/environment/environment.h" 13 14 namespace IPC { 15 namespace internal { 16 17 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle) 18 : pipe_wait_id_(0), 19 pipe_(handle.Pass()) { 20 StartWaiting(); 21 } 22 23 MessagePipeReader::~MessagePipeReader() { 24 CHECK(!IsValid()); 25 } 26 27 void MessagePipeReader::Close() { 28 StopWaiting(); 29 pipe_.reset(); 30 OnPipeClosed(); 31 } 32 33 void MessagePipeReader::CloseWithError(MojoResult error) { 34 OnPipeError(error); 35 Close(); 36 } 37 38 // static 39 void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) { 40 reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result); 41 } 42 43 void MessagePipeReader::StartWaiting() { 44 DCHECK(pipe_.is_valid()); 45 DCHECK(!pipe_wait_id_); 46 // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in 47 // MessagePipe. 48 // 49 // TODO(morrita): Should we re-set the signal when we get new 50 // message to send? 51 pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait( 52 pipe_.get().value(), 53 MOJO_HANDLE_SIGNAL_READABLE, 54 MOJO_DEADLINE_INDEFINITE, 55 &InvokePipeIsReady, 56 this); 57 } 58 59 void MessagePipeReader::StopWaiting() { 60 if (!pipe_wait_id_) 61 return; 62 mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_); 63 pipe_wait_id_ = 0; 64 } 65 66 void MessagePipeReader::PipeIsReady(MojoResult wait_result) { 67 pipe_wait_id_ = 0; 68 69 if (wait_result != MOJO_RESULT_OK) { 70 if (wait_result != MOJO_RESULT_ABORTED) { 71 // FAILED_PRECONDITION happens every time the peer is dead so 72 // it isn't worth polluting the log message. 73 DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION) 74 << "Pipe got error from the waiter. Closing: " 75 << wait_result; 76 OnPipeError(wait_result); 77 } 78 79 Close(); 80 return; 81 } 82 83 while (pipe_.is_valid()) { 84 MojoResult read_result = ReadMessageBytes(); 85 if (read_result == MOJO_RESULT_SHOULD_WAIT) 86 break; 87 if (read_result != MOJO_RESULT_OK) { 88 // FAILED_PRECONDITION means that all the received messages 89 // got consumed and the peer is already closed. 90 if (read_result != MOJO_RESULT_FAILED_PRECONDITION) { 91 DLOG(WARNING) 92 << "Pipe got error from ReadMessage(). Closing: " << read_result; 93 OnPipeError(read_result); 94 } 95 96 Close(); 97 break; 98 } 99 100 OnMessageReceived(); 101 } 102 103 if (pipe_.is_valid()) 104 StartWaiting(); 105 } 106 107 MojoResult MessagePipeReader::ReadMessageBytes() { 108 DCHECK(handle_buffer_.empty()); 109 110 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size()); 111 uint32_t num_handles = 0; 112 MojoResult result = MojoReadMessage(pipe_.get().value(), 113 num_bytes ? &data_buffer_[0] : NULL, 114 &num_bytes, 115 NULL, 116 &num_handles, 117 MOJO_READ_MESSAGE_FLAG_NONE); 118 data_buffer_.resize(num_bytes); 119 handle_buffer_.resize(num_handles); 120 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) { 121 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that 122 // it needs more bufer. So we re-read it with resized buffers. 123 result = MojoReadMessage(pipe_.get().value(), 124 num_bytes ? &data_buffer_[0] : NULL, 125 &num_bytes, 126 num_handles ? &handle_buffer_[0] : NULL, 127 &num_handles, 128 MOJO_READ_MESSAGE_FLAG_NONE); 129 } 130 131 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes); 132 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles); 133 return result; 134 } 135 136 void MessagePipeReader::DelayedDeleter::operator()( 137 MessagePipeReader* ptr) const { 138 ptr->Close(); 139 base::MessageLoopProxy::current()->PostTask( 140 FROM_HERE, base::Bind(&DeleteNow, ptr)); 141 } 142 143 } // namespace internal 144 } // namespace IPC 145