Home | History | Annotate | Download | only in ipc
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "ipc/ipc_channel_nacl.h"
      6 
      7 #include <errno.h>
      8 #include <stddef.h>
      9 #include <sys/types.h>
     10 
     11 #include <algorithm>
     12 
     13 #include "base/bind.h"
     14 #include "base/logging.h"
     15 #include "base/message_loop/message_loop_proxy.h"
     16 #include "base/synchronization/lock.h"
     17 #include "base/task_runner_util.h"
     18 #include "base/threading/simple_thread.h"
     19 #include "ipc/file_descriptor_set_posix.h"
     20 #include "ipc/ipc_logging.h"
     21 #include "native_client/src/public/imc_syscalls.h"
     22 #include "native_client/src/public/imc_types.h"
     23 
     24 namespace IPC {
     25 
     26 struct MessageContents {
     27   std::vector<char> data;
     28   std::vector<int> fds;
     29 };
     30 
     31 namespace {
     32 
     33 bool ReadDataOnReaderThread(int pipe, MessageContents* contents) {
     34   DCHECK(pipe >= 0);
     35   if (pipe < 0)
     36     return false;
     37 
     38   contents->data.resize(Channel::kReadBufferSize);
     39   contents->fds.resize(FileDescriptorSet::kMaxDescriptorsPerMessage);
     40 
     41   NaClAbiNaClImcMsgIoVec iov = { &contents->data[0], contents->data.size() };
     42   NaClAbiNaClImcMsgHdr msg = {
     43     &iov, 1, &contents->fds[0], contents->fds.size()
     44   };
     45 
     46   int bytes_read = imc_recvmsg(pipe, &msg, 0);
     47 
     48   if (bytes_read <= 0) {
     49     // NaClIPCAdapter::BlockingReceive returns -1 when the pipe closes (either
     50     // due to error or for regular shutdown).
     51     contents->data.clear();
     52     contents->fds.clear();
     53     return false;
     54   }
     55   DCHECK(bytes_read);
     56   // Resize the buffers down to the number of bytes and fds we actually read.
     57   contents->data.resize(bytes_read);
     58   contents->fds.resize(msg.desc_length);
     59   return true;
     60 }
     61 
     62 }  // namespace
     63 
     64 class Channel::ChannelImpl::ReaderThreadRunner
     65     : public base::DelegateSimpleThread::Delegate {
     66  public:
     67   // |pipe|: A file descriptor from which we will read using imc_recvmsg.
     68   // |data_read_callback|: A callback we invoke (on the main thread) when we
     69   //                       have read data.
     70   // |failure_callback|: A callback we invoke when we have a failure reading
     71   //                     from |pipe|.
     72   // |main_message_loop|: A proxy for the main thread, where we will invoke the
     73   //                      above callbacks.
     74   ReaderThreadRunner(
     75       int pipe,
     76       base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback,
     77       base::Callback<void ()> failure_callback,
     78       scoped_refptr<base::MessageLoopProxy> main_message_loop);
     79 
     80   // DelegateSimpleThread implementation. Reads data from the pipe in a loop
     81   // until either we are told to quit or a read fails.
     82   virtual void Run() OVERRIDE;
     83 
     84  private:
     85   int pipe_;
     86   base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback_;
     87   base::Callback<void ()> failure_callback_;
     88   scoped_refptr<base::MessageLoopProxy> main_message_loop_;
     89 
     90   DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner);
     91 };
     92 
     93 Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner(
     94     int pipe,
     95     base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback,
     96     base::Callback<void ()> failure_callback,
     97     scoped_refptr<base::MessageLoopProxy> main_message_loop)
     98     : pipe_(pipe),
     99       data_read_callback_(data_read_callback),
    100       failure_callback_(failure_callback),
    101       main_message_loop_(main_message_loop) {
    102 }
    103 
    104 void Channel::ChannelImpl::ReaderThreadRunner::Run() {
    105   while (true) {
    106     scoped_ptr<MessageContents> msg_contents(new MessageContents);
    107     bool success = ReadDataOnReaderThread(pipe_, msg_contents.get());
    108     if (success) {
    109       main_message_loop_->PostTask(FROM_HERE,
    110           base::Bind(data_read_callback_, base::Passed(&msg_contents)));
    111     } else {
    112       main_message_loop_->PostTask(FROM_HERE, failure_callback_);
    113       // Because the read failed, we know we're going to quit. Don't bother
    114       // trying to read again.
    115       return;
    116     }
    117   }
    118 }
    119 
    120 Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle,
    121                                   Mode mode,
    122                                   Listener* listener)
    123     : ChannelReader(listener),
    124       mode_(mode),
    125       waiting_connect_(true),
    126       pipe_(-1),
    127       pipe_name_(channel_handle.name),
    128       weak_ptr_factory_(this) {
    129   if (!CreatePipe(channel_handle)) {
    130     // The pipe may have been closed already.
    131     const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client";
    132     LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name
    133                  << "\" in " << modestr << " mode";
    134   }
    135 }
    136 
    137 Channel::ChannelImpl::~ChannelImpl() {
    138   Close();
    139 }
    140 
    141 bool Channel::ChannelImpl::Connect() {
    142   if (pipe_ == -1) {
    143     DLOG(INFO) << "Channel creation failed: " << pipe_name_;
    144     return false;
    145   }
    146 
    147   // Note that Connect is called on the "Channel" thread (i.e., the same thread
    148   // where Channel::Send will be called, and the same thread that should receive
    149   // messages). The constructor might be invoked on another thread (see
    150   // ChannelProxy for an example of that). Therefore, we must wait until Connect
    151   // is called to decide which MessageLoopProxy to pass to ReaderThreadRunner.
    152   reader_thread_runner_.reset(
    153       new ReaderThreadRunner(
    154           pipe_,
    155           base::Bind(&Channel::ChannelImpl::DidRecvMsg,
    156                      weak_ptr_factory_.GetWeakPtr()),
    157           base::Bind(&Channel::ChannelImpl::ReadDidFail,
    158                      weak_ptr_factory_.GetWeakPtr()),
    159           base::MessageLoopProxy::current()));
    160   reader_thread_.reset(
    161       new base::DelegateSimpleThread(reader_thread_runner_.get(),
    162                                      "ipc_channel_nacl reader thread"));
    163   reader_thread_->Start();
    164   waiting_connect_ = false;
    165   // If there were any messages queued before connection, send them.
    166   ProcessOutgoingMessages();
    167   return true;
    168 }
    169 
    170 void Channel::ChannelImpl::Close() {
    171   // For now, we assume that at shutdown, the reader thread will be woken with
    172   // a failure (see NaClIPCAdapter::BlockingRead and CloseChannel). Or... we
    173   // might simply be killed with no chance to clean up anyway :-).
    174   // If untrusted code tries to close the channel prior to shutdown, it's likely
    175   // to hang.
    176   // TODO(dmichael): Can we do anything smarter here to make sure the reader
    177   //                 thread wakes up and quits?
    178   reader_thread_->Join();
    179   close(pipe_);
    180   pipe_ = -1;
    181   reader_thread_runner_.reset();
    182   reader_thread_.reset();
    183   read_queue_.clear();
    184   output_queue_.clear();
    185 }
    186 
    187 bool Channel::ChannelImpl::Send(Message* message) {
    188   DVLOG(2) << "sending message @" << message << " on channel @" << this
    189            << " with type " << message->type();
    190   scoped_ptr<Message> message_ptr(message);
    191 
    192 #ifdef IPC_MESSAGE_LOG_ENABLED
    193   Logging::GetInstance()->OnSendMessage(message_ptr.get(), "");
    194 #endif  // IPC_MESSAGE_LOG_ENABLED
    195 
    196   message->TraceMessageBegin();
    197   output_queue_.push_back(linked_ptr<Message>(message_ptr.release()));
    198   if (!waiting_connect_)
    199     return ProcessOutgoingMessages();
    200 
    201   return true;
    202 }
    203 
    204 void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<MessageContents> contents) {
    205   // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from
    206   // the reader thread after Close is called. If so, we ignore it.
    207   if (pipe_ == -1)
    208     return;
    209 
    210   linked_ptr<std::vector<char> > data(new std::vector<char>);
    211   data->swap(contents->data);
    212   read_queue_.push_back(data);
    213 
    214   input_fds_.insert(input_fds_.end(),
    215                     contents->fds.begin(), contents->fds.end());
    216   contents->fds.clear();
    217 
    218   // In POSIX, we would be told when there are bytes to read by implementing
    219   // OnFileCanReadWithoutBlocking in MessageLoopForIO::Watcher. In NaCl, we
    220   // instead know at this point because the reader thread posted some data to
    221   // us.
    222   ProcessIncomingMessages();
    223 }
    224 
    225 void Channel::ChannelImpl::ReadDidFail() {
    226   Close();
    227 }
    228 
    229 bool Channel::ChannelImpl::CreatePipe(
    230     const IPC::ChannelHandle& channel_handle) {
    231   DCHECK(pipe_ == -1);
    232 
    233   // There's one possible case in NaCl:
    234   // 1) It's a channel wrapping a pipe that is given to us.
    235   // We don't support these:
    236   // 2) It's for a named channel.
    237   // 3) It's for a client that we implement ourself.
    238   // 4) It's the initial IPC channel.
    239 
    240   if (channel_handle.socket.fd == -1) {
    241     NOTIMPLEMENTED();
    242     return false;
    243   }
    244   pipe_ = channel_handle.socket.fd;
    245   return true;
    246 }
    247 
    248 bool Channel::ChannelImpl::ProcessOutgoingMessages() {
    249   DCHECK(!waiting_connect_);  // Why are we trying to send messages if there's
    250                               // no connection?
    251   if (output_queue_.empty())
    252     return true;
    253 
    254   if (pipe_ == -1)
    255     return false;
    256 
    257   // Write out all the messages. The trusted implementation is guaranteed to not
    258   // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg.
    259   while (!output_queue_.empty()) {
    260     linked_ptr<Message> msg = output_queue_.front();
    261     output_queue_.pop_front();
    262 
    263     int fds[FileDescriptorSet::kMaxDescriptorsPerMessage];
    264     const size_t num_fds = msg->file_descriptor_set()->size();
    265     DCHECK(num_fds <= FileDescriptorSet::kMaxDescriptorsPerMessage);
    266     msg->file_descriptor_set()->GetDescriptors(fds);
    267 
    268     NaClAbiNaClImcMsgIoVec iov = {
    269       const_cast<void*>(msg->data()), msg->size()
    270     };
    271     NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds, num_fds };
    272     ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0);
    273 
    274     DCHECK(bytes_written);  // The trusted side shouldn't return 0.
    275     if (bytes_written < 0) {
    276       // The trusted side should only ever give us an error of EPIPE. We
    277       // should never be interrupted, nor should we get EAGAIN.
    278       DCHECK(errno == EPIPE);
    279       Close();
    280       PLOG(ERROR) << "pipe_ error on "
    281                   << pipe_
    282                   << " Currently writing message of size: "
    283                   << msg->size();
    284       return false;
    285     } else {
    286       msg->file_descriptor_set()->CommitAll();
    287     }
    288 
    289     // Message sent OK!
    290     DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type()
    291              << " on fd " << pipe_;
    292   }
    293   return true;
    294 }
    295 
    296 Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
    297     char* buffer,
    298     int buffer_len,
    299     int* bytes_read) {
    300   *bytes_read = 0;
    301   if (pipe_ == -1)
    302     return READ_FAILED;
    303   if (read_queue_.empty())
    304     return READ_PENDING;
    305   while (!read_queue_.empty() && *bytes_read < buffer_len) {
    306     linked_ptr<std::vector<char> > vec(read_queue_.front());
    307     size_t bytes_to_read = buffer_len - *bytes_read;
    308     if (vec->size() <= bytes_to_read) {
    309       // We can read and discard the entire vector.
    310       std::copy(vec->begin(), vec->end(), buffer + *bytes_read);
    311       *bytes_read += vec->size();
    312       read_queue_.pop_front();
    313     } else {
    314       // Read all the bytes we can and discard them from the front of the
    315       // vector. (This can be slowish, since erase has to move the back of the
    316       // vector to the front, but it's hopefully a temporary hack and it keeps
    317       // the code simple).
    318       std::copy(vec->begin(), vec->begin() + bytes_to_read,
    319                 buffer + *bytes_read);
    320       vec->erase(vec->begin(), vec->begin() + bytes_to_read);
    321       *bytes_read += bytes_to_read;
    322     }
    323   }
    324   return READ_SUCCEEDED;
    325 }
    326 
    327 bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
    328   uint16 header_fds = msg->header()->num_fds;
    329   CHECK(header_fds == input_fds_.size());
    330   if (header_fds == 0)
    331     return true;  // Nothing to do.
    332 
    333   // The shenaniganery below with &foo.front() requires input_fds_ to have
    334   // contiguous underlying storage (such as a simple array or a std::vector).
    335   // This is why the header warns not to make input_fds_ a deque<>.
    336   msg->file_descriptor_set()->SetDescriptors(&input_fds_.front(),
    337                                              header_fds);
    338   input_fds_.clear();
    339   return true;
    340 }
    341 
    342 bool Channel::ChannelImpl::DidEmptyInputBuffers() {
    343   // When the input data buffer is empty, the fds should be too.
    344   return input_fds_.empty();
    345 }
    346 
    347 void Channel::ChannelImpl::HandleInternalMessage(const Message& msg) {
    348   // The trusted side IPC::Channel should handle the "hello" handshake; we
    349   // should not receive the "Hello" message.
    350   NOTREACHED();
    351 }
    352 
    353 //------------------------------------------------------------------------------
    354 // Channel's methods simply call through to ChannelImpl.
    355 
    356 Channel::Channel(const IPC::ChannelHandle& channel_handle,
    357                  Mode mode,
    358                  Listener* listener)
    359     : channel_impl_(new ChannelImpl(channel_handle, mode, listener)) {
    360 }
    361 
    362 Channel::~Channel() {
    363   delete channel_impl_;
    364 }
    365 
    366 bool Channel::Connect() {
    367   return channel_impl_->Connect();
    368 }
    369 
    370 void Channel::Close() {
    371   channel_impl_->Close();
    372 }
    373 
    374 base::ProcessId Channel::peer_pid() const {
    375   // This shouldn't actually get used in the untrusted side of the proxy, and we
    376   // don't have the real pid anyway.
    377   return -1;
    378 }
    379 
    380 bool Channel::Send(Message* message) {
    381   return channel_impl_->Send(message);
    382 }
    383 
    384 }  // namespace IPC
    385