Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/broker.h"
      6 
      7 #include <fcntl.h>
      8 #include <unistd.h>
      9 
     10 #include <utility>
     11 
     12 #include "base/logging.h"
     13 #include "mojo/edk/embedder/embedder_internal.h"
     14 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
     15 #include "mojo/edk/embedder/platform_handle_utils.h"
     16 #include "mojo/edk/embedder/platform_handle_vector.h"
     17 #include "mojo/edk/embedder/platform_shared_buffer.h"
     18 #include "mojo/edk/system/broker_messages.h"
     19 #include "mojo/edk/system/channel.h"
     20 
     21 namespace mojo {
     22 namespace edk {
     23 
     24 namespace {
     25 
     26 bool WaitForBrokerMessage(PlatformHandle platform_handle,
     27                           BrokerMessageType expected_type,
     28                           size_t expected_num_handles,
     29                           std::deque<PlatformHandle>* incoming_handles) {
     30   Channel::MessagePtr message(
     31       new Channel::Message(sizeof(BrokerMessageHeader), expected_num_handles));
     32   std::deque<PlatformHandle> incoming_platform_handles;
     33   ssize_t read_result = PlatformChannelRecvmsg(
     34       platform_handle, const_cast<void*>(message->data()),
     35       message->data_num_bytes(), &incoming_platform_handles, true /* block */);
     36   bool error = false;
     37   if (read_result < 0) {
     38     PLOG(ERROR) << "Recvmsg error";
     39     error = true;
     40   } else if (static_cast<size_t>(read_result) != message->data_num_bytes()) {
     41     LOG(ERROR) << "Invalid node channel message";
     42     error = true;
     43   } else if (incoming_platform_handles.size() != expected_num_handles) {
     44     LOG(ERROR) << "Received unexpected number of handles";
     45     error = true;
     46   }
     47 
     48   if (!error) {
     49     const BrokerMessageHeader* header =
     50         reinterpret_cast<const BrokerMessageHeader*>(message->payload());
     51     if (header->type != expected_type) {
     52       LOG(ERROR) << "Unexpected message";
     53       error = true;
     54     }
     55   }
     56 
     57   if (error) {
     58     CloseAllPlatformHandles(&incoming_platform_handles);
     59   } else {
     60     if (incoming_handles)
     61       incoming_handles->swap(incoming_platform_handles);
     62   }
     63   return !error;
     64 }
     65 
     66 }  // namespace
     67 
     68 Broker::Broker(ScopedPlatformHandle platform_handle)
     69     : sync_channel_(std::move(platform_handle)) {
     70   CHECK(sync_channel_.is_valid());
     71 
     72   // Mark the channel as blocking.
     73   int flags = fcntl(sync_channel_.get().handle, F_GETFL);
     74   PCHECK(flags != -1);
     75   flags = fcntl(sync_channel_.get().handle, F_SETFL, flags & ~O_NONBLOCK);
     76   PCHECK(flags != -1);
     77 
     78   // Wait for the first message, which should contain a handle.
     79   std::deque<PlatformHandle> incoming_platform_handles;
     80   if (WaitForBrokerMessage(sync_channel_.get(), BrokerMessageType::INIT, 1,
     81                            &incoming_platform_handles)) {
     82     parent_channel_ = ScopedPlatformHandle(incoming_platform_handles.front());
     83   }
     84 }
     85 
     86 Broker::~Broker() = default;
     87 
     88 ScopedPlatformHandle Broker::GetParentPlatformHandle() {
     89   return std::move(parent_channel_);
     90 }
     91 
     92 scoped_refptr<PlatformSharedBuffer> Broker::GetSharedBuffer(size_t num_bytes) {
     93   base::AutoLock lock(lock_);
     94 
     95   BufferRequestData* buffer_request;
     96   Channel::MessagePtr out_message = CreateBrokerMessage(
     97       BrokerMessageType::BUFFER_REQUEST, 0, 0, &buffer_request);
     98   buffer_request->size = num_bytes;
     99   ssize_t write_result = PlatformChannelWrite(
    100       sync_channel_.get(), out_message->data(), out_message->data_num_bytes());
    101   if (write_result < 0) {
    102     PLOG(ERROR) << "Error sending sync broker message";
    103     return nullptr;
    104   } else if (static_cast<size_t>(write_result) !=
    105              out_message->data_num_bytes()) {
    106     LOG(ERROR) << "Error sending complete broker message";
    107     return nullptr;
    108   }
    109 
    110   std::deque<PlatformHandle> incoming_platform_handles;
    111   if (WaitForBrokerMessage(sync_channel_.get(),
    112                            BrokerMessageType::BUFFER_RESPONSE, 2,
    113                            &incoming_platform_handles)) {
    114     ScopedPlatformHandle rw_handle(incoming_platform_handles.front());
    115     incoming_platform_handles.pop_front();
    116     ScopedPlatformHandle ro_handle(incoming_platform_handles.front());
    117     return PlatformSharedBuffer::CreateFromPlatformHandlePair(
    118         num_bytes, std::move(rw_handle), std::move(ro_handle));
    119   }
    120 
    121   return nullptr;
    122 }
    123 
    124 }  // namespace edk
    125 }  // namespace mojo
    126