1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "ipc/ipc_channel_nacl.h" 6 7 #include <errno.h> 8 #include <stddef.h> 9 #include <sys/types.h> 10 11 #include <algorithm> 12 13 #include "base/bind.h" 14 #include "base/logging.h" 15 #include "base/message_loop/message_loop_proxy.h" 16 #include "base/synchronization/lock.h" 17 #include "base/task_runner_util.h" 18 #include "base/threading/simple_thread.h" 19 #include "ipc/file_descriptor_set_posix.h" 20 #include "ipc/ipc_logging.h" 21 #include "native_client/src/public/imc_syscalls.h" 22 #include "native_client/src/public/imc_types.h" 23 24 namespace IPC { 25 26 struct MessageContents { 27 std::vector<char> data; 28 std::vector<int> fds; 29 }; 30 31 namespace { 32 33 bool ReadDataOnReaderThread(int pipe, MessageContents* contents) { 34 DCHECK(pipe >= 0); 35 if (pipe < 0) 36 return false; 37 38 contents->data.resize(Channel::kReadBufferSize); 39 contents->fds.resize(FileDescriptorSet::kMaxDescriptorsPerMessage); 40 41 NaClAbiNaClImcMsgIoVec iov = { &contents->data[0], contents->data.size() }; 42 NaClAbiNaClImcMsgHdr msg = { 43 &iov, 1, &contents->fds[0], contents->fds.size() 44 }; 45 46 int bytes_read = imc_recvmsg(pipe, &msg, 0); 47 48 if (bytes_read <= 0) { 49 // NaClIPCAdapter::BlockingReceive returns -1 when the pipe closes (either 50 // due to error or for regular shutdown). 51 contents->data.clear(); 52 contents->fds.clear(); 53 return false; 54 } 55 DCHECK(bytes_read); 56 // Resize the buffers down to the number of bytes and fds we actually read. 57 contents->data.resize(bytes_read); 58 contents->fds.resize(msg.desc_length); 59 return true; 60 } 61 62 } // namespace 63 64 class Channel::ChannelImpl::ReaderThreadRunner 65 : public base::DelegateSimpleThread::Delegate { 66 public: 67 // |pipe|: A file descriptor from which we will read using imc_recvmsg. 68 // |data_read_callback|: A callback we invoke (on the main thread) when we 69 // have read data. 70 // |failure_callback|: A callback we invoke when we have a failure reading 71 // from |pipe|. 72 // |main_message_loop|: A proxy for the main thread, where we will invoke the 73 // above callbacks. 74 ReaderThreadRunner( 75 int pipe, 76 base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback, 77 base::Callback<void ()> failure_callback, 78 scoped_refptr<base::MessageLoopProxy> main_message_loop); 79 80 // DelegateSimpleThread implementation. Reads data from the pipe in a loop 81 // until either we are told to quit or a read fails. 82 virtual void Run() OVERRIDE; 83 84 private: 85 int pipe_; 86 base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback_; 87 base::Callback<void ()> failure_callback_; 88 scoped_refptr<base::MessageLoopProxy> main_message_loop_; 89 90 DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner); 91 }; 92 93 Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner( 94 int pipe, 95 base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback, 96 base::Callback<void ()> failure_callback, 97 scoped_refptr<base::MessageLoopProxy> main_message_loop) 98 : pipe_(pipe), 99 data_read_callback_(data_read_callback), 100 failure_callback_(failure_callback), 101 main_message_loop_(main_message_loop) { 102 } 103 104 void Channel::ChannelImpl::ReaderThreadRunner::Run() { 105 while (true) { 106 scoped_ptr<MessageContents> msg_contents(new MessageContents); 107 bool success = ReadDataOnReaderThread(pipe_, msg_contents.get()); 108 if (success) { 109 main_message_loop_->PostTask(FROM_HERE, 110 base::Bind(data_read_callback_, base::Passed(&msg_contents))); 111 } else { 112 main_message_loop_->PostTask(FROM_HERE, failure_callback_); 113 // Because the read failed, we know we're going to quit. Don't bother 114 // trying to read again. 115 return; 116 } 117 } 118 } 119 120 Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, 121 Mode mode, 122 Listener* listener) 123 : ChannelReader(listener), 124 mode_(mode), 125 waiting_connect_(true), 126 pipe_(-1), 127 pipe_name_(channel_handle.name), 128 weak_ptr_factory_(this) { 129 if (!CreatePipe(channel_handle)) { 130 // The pipe may have been closed already. 131 const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client"; 132 LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name 133 << "\" in " << modestr << " mode"; 134 } 135 } 136 137 Channel::ChannelImpl::~ChannelImpl() { 138 Close(); 139 } 140 141 bool Channel::ChannelImpl::Connect() { 142 if (pipe_ == -1) { 143 DLOG(INFO) << "Channel creation failed: " << pipe_name_; 144 return false; 145 } 146 147 // Note that Connect is called on the "Channel" thread (i.e., the same thread 148 // where Channel::Send will be called, and the same thread that should receive 149 // messages). The constructor might be invoked on another thread (see 150 // ChannelProxy for an example of that). Therefore, we must wait until Connect 151 // is called to decide which MessageLoopProxy to pass to ReaderThreadRunner. 152 reader_thread_runner_.reset( 153 new ReaderThreadRunner( 154 pipe_, 155 base::Bind(&Channel::ChannelImpl::DidRecvMsg, 156 weak_ptr_factory_.GetWeakPtr()), 157 base::Bind(&Channel::ChannelImpl::ReadDidFail, 158 weak_ptr_factory_.GetWeakPtr()), 159 base::MessageLoopProxy::current())); 160 reader_thread_.reset( 161 new base::DelegateSimpleThread(reader_thread_runner_.get(), 162 "ipc_channel_nacl reader thread")); 163 reader_thread_->Start(); 164 waiting_connect_ = false; 165 // If there were any messages queued before connection, send them. 166 ProcessOutgoingMessages(); 167 return true; 168 } 169 170 void Channel::ChannelImpl::Close() { 171 // For now, we assume that at shutdown, the reader thread will be woken with 172 // a failure (see NaClIPCAdapter::BlockingRead and CloseChannel). Or... we 173 // might simply be killed with no chance to clean up anyway :-). 174 // If untrusted code tries to close the channel prior to shutdown, it's likely 175 // to hang. 176 // TODO(dmichael): Can we do anything smarter here to make sure the reader 177 // thread wakes up and quits? 178 reader_thread_->Join(); 179 close(pipe_); 180 pipe_ = -1; 181 reader_thread_runner_.reset(); 182 reader_thread_.reset(); 183 read_queue_.clear(); 184 output_queue_.clear(); 185 } 186 187 bool Channel::ChannelImpl::Send(Message* message) { 188 DVLOG(2) << "sending message @" << message << " on channel @" << this 189 << " with type " << message->type(); 190 scoped_ptr<Message> message_ptr(message); 191 192 #ifdef IPC_MESSAGE_LOG_ENABLED 193 Logging::GetInstance()->OnSendMessage(message_ptr.get(), ""); 194 #endif // IPC_MESSAGE_LOG_ENABLED 195 196 message->TraceMessageBegin(); 197 output_queue_.push_back(linked_ptr<Message>(message_ptr.release())); 198 if (!waiting_connect_) 199 return ProcessOutgoingMessages(); 200 201 return true; 202 } 203 204 void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<MessageContents> contents) { 205 // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from 206 // the reader thread after Close is called. If so, we ignore it. 207 if (pipe_ == -1) 208 return; 209 210 linked_ptr<std::vector<char> > data(new std::vector<char>); 211 data->swap(contents->data); 212 read_queue_.push_back(data); 213 214 input_fds_.insert(input_fds_.end(), 215 contents->fds.begin(), contents->fds.end()); 216 contents->fds.clear(); 217 218 // In POSIX, we would be told when there are bytes to read by implementing 219 // OnFileCanReadWithoutBlocking in MessageLoopForIO::Watcher. In NaCl, we 220 // instead know at this point because the reader thread posted some data to 221 // us. 222 ProcessIncomingMessages(); 223 } 224 225 void Channel::ChannelImpl::ReadDidFail() { 226 Close(); 227 } 228 229 bool Channel::ChannelImpl::CreatePipe( 230 const IPC::ChannelHandle& channel_handle) { 231 DCHECK(pipe_ == -1); 232 233 // There's one possible case in NaCl: 234 // 1) It's a channel wrapping a pipe that is given to us. 235 // We don't support these: 236 // 2) It's for a named channel. 237 // 3) It's for a client that we implement ourself. 238 // 4) It's the initial IPC channel. 239 240 if (channel_handle.socket.fd == -1) { 241 NOTIMPLEMENTED(); 242 return false; 243 } 244 pipe_ = channel_handle.socket.fd; 245 return true; 246 } 247 248 bool Channel::ChannelImpl::ProcessOutgoingMessages() { 249 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's 250 // no connection? 251 if (output_queue_.empty()) 252 return true; 253 254 if (pipe_ == -1) 255 return false; 256 257 // Write out all the messages. The trusted implementation is guaranteed to not 258 // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg. 259 while (!output_queue_.empty()) { 260 linked_ptr<Message> msg = output_queue_.front(); 261 output_queue_.pop_front(); 262 263 int fds[FileDescriptorSet::kMaxDescriptorsPerMessage]; 264 const size_t num_fds = msg->file_descriptor_set()->size(); 265 DCHECK(num_fds <= FileDescriptorSet::kMaxDescriptorsPerMessage); 266 msg->file_descriptor_set()->GetDescriptors(fds); 267 268 NaClAbiNaClImcMsgIoVec iov = { 269 const_cast<void*>(msg->data()), msg->size() 270 }; 271 NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds, num_fds }; 272 ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0); 273 274 DCHECK(bytes_written); // The trusted side shouldn't return 0. 275 if (bytes_written < 0) { 276 // The trusted side should only ever give us an error of EPIPE. We 277 // should never be interrupted, nor should we get EAGAIN. 278 DCHECK(errno == EPIPE); 279 Close(); 280 PLOG(ERROR) << "pipe_ error on " 281 << pipe_ 282 << " Currently writing message of size: " 283 << msg->size(); 284 return false; 285 } else { 286 msg->file_descriptor_set()->CommitAll(); 287 } 288 289 // Message sent OK! 290 DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type() 291 << " on fd " << pipe_; 292 } 293 return true; 294 } 295 296 Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( 297 char* buffer, 298 int buffer_len, 299 int* bytes_read) { 300 *bytes_read = 0; 301 if (pipe_ == -1) 302 return READ_FAILED; 303 if (read_queue_.empty()) 304 return READ_PENDING; 305 while (!read_queue_.empty() && *bytes_read < buffer_len) { 306 linked_ptr<std::vector<char> > vec(read_queue_.front()); 307 size_t bytes_to_read = buffer_len - *bytes_read; 308 if (vec->size() <= bytes_to_read) { 309 // We can read and discard the entire vector. 310 std::copy(vec->begin(), vec->end(), buffer + *bytes_read); 311 *bytes_read += vec->size(); 312 read_queue_.pop_front(); 313 } else { 314 // Read all the bytes we can and discard them from the front of the 315 // vector. (This can be slowish, since erase has to move the back of the 316 // vector to the front, but it's hopefully a temporary hack and it keeps 317 // the code simple). 318 std::copy(vec->begin(), vec->begin() + bytes_to_read, 319 buffer + *bytes_read); 320 vec->erase(vec->begin(), vec->begin() + bytes_to_read); 321 *bytes_read += bytes_to_read; 322 } 323 } 324 return READ_SUCCEEDED; 325 } 326 327 bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { 328 uint16 header_fds = msg->header()->num_fds; 329 CHECK(header_fds == input_fds_.size()); 330 if (header_fds == 0) 331 return true; // Nothing to do. 332 333 // The shenaniganery below with &foo.front() requires input_fds_ to have 334 // contiguous underlying storage (such as a simple array or a std::vector). 335 // This is why the header warns not to make input_fds_ a deque<>. 336 msg->file_descriptor_set()->SetDescriptors(&input_fds_.front(), 337 header_fds); 338 input_fds_.clear(); 339 return true; 340 } 341 342 bool Channel::ChannelImpl::DidEmptyInputBuffers() { 343 // When the input data buffer is empty, the fds should be too. 344 return input_fds_.empty(); 345 } 346 347 void Channel::ChannelImpl::HandleInternalMessage(const Message& msg) { 348 // The trusted side IPC::Channel should handle the "hello" handshake; we 349 // should not receive the "Hello" message. 350 NOTREACHED(); 351 } 352 353 //------------------------------------------------------------------------------ 354 // Channel's methods simply call through to ChannelImpl. 355 356 Channel::Channel(const IPC::ChannelHandle& channel_handle, 357 Mode mode, 358 Listener* listener) 359 : channel_impl_(new ChannelImpl(channel_handle, mode, listener)) { 360 } 361 362 Channel::~Channel() { 363 delete channel_impl_; 364 } 365 366 bool Channel::Connect() { 367 return channel_impl_->Connect(); 368 } 369 370 void Channel::Close() { 371 channel_impl_->Close(); 372 } 373 374 base::ProcessId Channel::peer_pid() const { 375 // This shouldn't actually get used in the untrusted side of the proxy, and we 376 // don't have the real pid anyway. 377 return -1; 378 } 379 380 bool Channel::Send(Message* message) { 381 return channel_impl_->Send(message); 382 } 383 384 } // namespace IPC 385