Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/edk/system/node_channel.h"
      6 
      7 #include <cstring>
      8 #include <limits>
      9 #include <sstream>
     10 
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/logging.h"
     14 #include "mojo/edk/system/channel.h"
     15 #include "mojo/edk/system/request_context.h"
     16 
     17 #if defined(OS_MACOSX) && !defined(OS_IOS)
     18 #include "mojo/edk/system/mach_port_relay.h"
     19 #endif
     20 
     21 namespace mojo {
     22 namespace edk {
     23 
     24 namespace {
     25 
     26 template <typename T>
     27 T Align(T t) {
     28   const auto k = kChannelMessageAlignment;
     29   return t + (k - (t % k)) % k;
     30 }
     31 
     32 // NOTE: Please ONLY append messages to the end of this enum.
     33 enum class MessageType : uint32_t {
     34   ACCEPT_CHILD,
     35   ACCEPT_PARENT,
     36   ADD_BROKER_CLIENT,
     37   BROKER_CLIENT_ADDED,
     38   ACCEPT_BROKER_CLIENT,
     39   PORTS_MESSAGE,
     40   REQUEST_PORT_MERGE,
     41   REQUEST_INTRODUCTION,
     42   INTRODUCE,
     43 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
     44   RELAY_PORTS_MESSAGE,
     45 #endif
     46   BROADCAST,
     47 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
     48   PORTS_MESSAGE_FROM_RELAY,
     49 #endif
     50   ACCEPT_PEER,
     51 };
     52 
     53 struct Header {
     54   MessageType type;
     55   uint32_t padding;
     56 };
     57 
     58 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
     59               "Invalid header size.");
     60 
     61 struct AcceptChildData {
     62   ports::NodeName parent_name;
     63   ports::NodeName token;
     64 };
     65 
     66 struct AcceptParentData {
     67   ports::NodeName token;
     68   ports::NodeName child_name;
     69 };
     70 
     71 struct AcceptPeerData {
     72   ports::NodeName token;
     73   ports::NodeName peer_name;
     74   ports::PortName port_name;
     75 };
     76 
     77 // This message may include a process handle on plaforms that require it.
     78 struct AddBrokerClientData {
     79   ports::NodeName client_name;
     80 #if !defined(OS_WIN)
     81   uint32_t process_handle;
     82   uint32_t padding;
     83 #endif
     84 };
     85 
     86 #if !defined(OS_WIN)
     87 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
     88               "Unexpected pid size");
     89 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
     90               "Invalid AddBrokerClientData size.");
     91 #endif
     92 
     93 // This data is followed by a platform channel handle to the broker.
     94 struct BrokerClientAddedData {
     95   ports::NodeName client_name;
     96 };
     97 
     98 // This data may be followed by a platform channel handle to the broker. If not,
     99 // then the parent is the broker and its channel should be used as such.
    100 struct AcceptBrokerClientData {
    101   ports::NodeName broker_name;
    102 };
    103 
    104 // This is followed by arbitrary payload data which is interpreted as a token
    105 // string for port location.
    106 struct RequestPortMergeData {
    107   ports::PortName connector_port_name;
    108 };
    109 
    110 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
    111 //
    112 // For INTRODUCE the message also includes a valid platform handle for a channel
    113 // the receiver may use to communicate with the named node directly, or an
    114 // invalid platform handle if the node is unknown to the sender or otherwise
    115 // cannot be introduced.
    116 struct IntroductionData {
    117   ports::NodeName name;
    118 };
    119 
    120 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    121 // This struct is followed by the full payload of a message to be relayed.
    122 struct RelayPortsMessageData {
    123   ports::NodeName destination;
    124 };
    125 
    126 // This struct is followed by the full payload of a relayed message.
    127 struct PortsMessageFromRelayData {
    128   ports::NodeName source;
    129 };
    130 #endif
    131 
    132 template <typename DataType>
    133 Channel::MessagePtr CreateMessage(MessageType type,
    134                                   size_t payload_size,
    135                                   size_t num_handles,
    136                                   DataType** out_data) {
    137   Channel::MessagePtr message(
    138       new Channel::Message(sizeof(Header) + payload_size, num_handles));
    139   Header* header = reinterpret_cast<Header*>(message->mutable_payload());
    140   header->type = type;
    141   header->padding = 0;
    142   *out_data = reinterpret_cast<DataType*>(&header[1]);
    143   return message;
    144 }
    145 
    146 template <typename DataType>
    147 bool GetMessagePayload(const void* bytes,
    148                        size_t num_bytes,
    149                        DataType** out_data) {
    150   static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
    151   if (num_bytes < sizeof(Header) + sizeof(DataType))
    152     return false;
    153   *out_data = reinterpret_cast<const DataType*>(
    154       static_cast<const char*>(bytes) + sizeof(Header));
    155   return true;
    156 }
    157 
    158 }  // namespace
    159 
    160 // static
    161 scoped_refptr<NodeChannel> NodeChannel::Create(
    162     Delegate* delegate,
    163     ConnectionParams connection_params,
    164     scoped_refptr<base::TaskRunner> io_task_runner,
    165     const ProcessErrorCallback& process_error_callback) {
    166 #if defined(OS_NACL_SFI)
    167   LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
    168   return nullptr;
    169 #else
    170   return new NodeChannel(delegate, std::move(connection_params), io_task_runner,
    171                          process_error_callback);
    172 #endif
    173 }
    174 
    175 // static
    176 Channel::MessagePtr NodeChannel::CreatePortsMessage(size_t payload_size,
    177                                                     void** payload,
    178                                                     size_t num_handles) {
    179   return CreateMessage(MessageType::PORTS_MESSAGE, payload_size, num_handles,
    180                        payload);
    181 }
    182 
    183 // static
    184 void NodeChannel::GetPortsMessageData(Channel::Message* message,
    185                                       void** data,
    186                                       size_t* num_data_bytes) {
    187   *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
    188   *num_data_bytes = message->payload_size() - sizeof(Header);
    189 }
    190 
    191 void NodeChannel::Start() {
    192 #if defined(OS_MACOSX) && !defined(OS_IOS)
    193   MachPortRelay* relay = delegate_->GetMachPortRelay();
    194   if (relay)
    195     relay->AddObserver(this);
    196 #endif
    197 
    198   base::AutoLock lock(channel_lock_);
    199   // ShutDown() may have already been called, in which case |channel_| is null.
    200   if (channel_)
    201     channel_->Start();
    202 }
    203 
    204 void NodeChannel::ShutDown() {
    205 #if defined(OS_MACOSX) && !defined(OS_IOS)
    206   MachPortRelay* relay = delegate_->GetMachPortRelay();
    207   if (relay)
    208     relay->RemoveObserver(this);
    209 #endif
    210 
    211   base::AutoLock lock(channel_lock_);
    212   if (channel_) {
    213     channel_->ShutDown();
    214     channel_ = nullptr;
    215   }
    216 }
    217 
    218 void NodeChannel::LeakHandleOnShutdown() {
    219   base::AutoLock lock(channel_lock_);
    220   if (channel_) {
    221     channel_->LeakHandle();
    222   }
    223 }
    224 
    225 void NodeChannel::NotifyBadMessage(const std::string& error) {
    226   if (!process_error_callback_.is_null())
    227     process_error_callback_.Run("Received bad user message: " + error);
    228 }
    229 
    230 void NodeChannel::SetRemoteProcessHandle(base::ProcessHandle process_handle) {
    231   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    232   base::AutoLock lock(remote_process_handle_lock_);
    233   DCHECK_EQ(base::kNullProcessHandle, remote_process_handle_);
    234   CHECK_NE(remote_process_handle_, base::GetCurrentProcessHandle());
    235   remote_process_handle_ = process_handle;
    236 #if defined(OS_WIN)
    237   DCHECK(!scoped_remote_process_handle_.is_valid());
    238   scoped_remote_process_handle_.reset(PlatformHandle(process_handle));
    239 #endif
    240 }
    241 
    242 bool NodeChannel::HasRemoteProcessHandle() {
    243   base::AutoLock lock(remote_process_handle_lock_);
    244   return remote_process_handle_ != base::kNullProcessHandle;
    245 }
    246 
    247 base::ProcessHandle NodeChannel::CopyRemoteProcessHandle() {
    248   base::AutoLock lock(remote_process_handle_lock_);
    249 #if defined(OS_WIN)
    250   if (remote_process_handle_ != base::kNullProcessHandle) {
    251     // Privileged nodes use this to pass their childrens' process handles to the
    252     // broker on launch.
    253     HANDLE handle = remote_process_handle_;
    254     BOOL result = DuplicateHandle(
    255         base::GetCurrentProcessHandle(), remote_process_handle_,
    256         base::GetCurrentProcessHandle(), &handle, 0, FALSE,
    257         DUPLICATE_SAME_ACCESS);
    258     DPCHECK(result);
    259     return handle;
    260   }
    261   return base::kNullProcessHandle;
    262 #else
    263   return remote_process_handle_;
    264 #endif
    265 }
    266 
    267 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
    268   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    269   remote_node_name_ = name;
    270 }
    271 
    272 void NodeChannel::AcceptChild(const ports::NodeName& parent_name,
    273                               const ports::NodeName& token) {
    274   AcceptChildData* data;
    275   Channel::MessagePtr message = CreateMessage(
    276       MessageType::ACCEPT_CHILD, sizeof(AcceptChildData), 0, &data);
    277   data->parent_name = parent_name;
    278   data->token = token;
    279   WriteChannelMessage(std::move(message));
    280 }
    281 
    282 void NodeChannel::AcceptParent(const ports::NodeName& token,
    283                                const ports::NodeName& child_name) {
    284   AcceptParentData* data;
    285   Channel::MessagePtr message = CreateMessage(
    286       MessageType::ACCEPT_PARENT, sizeof(AcceptParentData), 0, &data);
    287   data->token = token;
    288   data->child_name = child_name;
    289   WriteChannelMessage(std::move(message));
    290 }
    291 
    292 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
    293                              const ports::NodeName& token,
    294                              const ports::PortName& port_name) {
    295   AcceptPeerData* data;
    296   Channel::MessagePtr message =
    297       CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
    298   data->token = token;
    299   data->peer_name = sender_name;
    300   data->port_name = port_name;
    301   WriteChannelMessage(std::move(message));
    302 }
    303 
    304 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
    305                                   base::ProcessHandle process_handle) {
    306   AddBrokerClientData* data;
    307   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
    308 #if defined(OS_WIN)
    309   handles->push_back(PlatformHandle(process_handle));
    310 #endif
    311   Channel::MessagePtr message = CreateMessage(
    312       MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
    313       handles->size(), &data);
    314   message->SetHandles(std::move(handles));
    315   data->client_name = client_name;
    316 #if !defined(OS_WIN)
    317   data->process_handle = process_handle;
    318   data->padding = 0;
    319 #endif
    320   WriteChannelMessage(std::move(message));
    321 }
    322 
    323 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
    324                                     ScopedPlatformHandle broker_channel) {
    325   BrokerClientAddedData* data;
    326   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
    327   if (broker_channel.is_valid())
    328     handles->push_back(broker_channel.release());
    329   Channel::MessagePtr message = CreateMessage(
    330       MessageType::BROKER_CLIENT_ADDED, sizeof(BrokerClientAddedData),
    331       handles->size(), &data);
    332   message->SetHandles(std::move(handles));
    333   data->client_name = client_name;
    334   WriteChannelMessage(std::move(message));
    335 }
    336 
    337 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
    338                                      ScopedPlatformHandle broker_channel) {
    339   AcceptBrokerClientData* data;
    340   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
    341   if (broker_channel.is_valid())
    342     handles->push_back(broker_channel.release());
    343   Channel::MessagePtr message = CreateMessage(
    344       MessageType::ACCEPT_BROKER_CLIENT, sizeof(AcceptBrokerClientData),
    345       handles->size(), &data);
    346   message->SetHandles(std::move(handles));
    347   data->broker_name = broker_name;
    348   WriteChannelMessage(std::move(message));
    349 }
    350 
    351 void NodeChannel::PortsMessage(Channel::MessagePtr message) {
    352   WriteChannelMessage(std::move(message));
    353 }
    354 
    355 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
    356                                    const std::string& token) {
    357   RequestPortMergeData* data;
    358   Channel::MessagePtr message = CreateMessage(
    359       MessageType::REQUEST_PORT_MERGE,
    360       sizeof(RequestPortMergeData) + token.size(), 0, &data);
    361   data->connector_port_name = connector_port_name;
    362   memcpy(data + 1, token.data(), token.size());
    363   WriteChannelMessage(std::move(message));
    364 }
    365 
    366 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
    367   IntroductionData* data;
    368   Channel::MessagePtr message = CreateMessage(
    369       MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
    370   data->name = name;
    371   WriteChannelMessage(std::move(message));
    372 }
    373 
    374 void NodeChannel::Introduce(const ports::NodeName& name,
    375                             ScopedPlatformHandle channel_handle) {
    376   IntroductionData* data;
    377   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
    378   if (channel_handle.is_valid())
    379     handles->push_back(channel_handle.release());
    380   Channel::MessagePtr message = CreateMessage(
    381       MessageType::INTRODUCE, sizeof(IntroductionData), handles->size(), &data);
    382   message->SetHandles(std::move(handles));
    383   data->name = name;
    384   WriteChannelMessage(std::move(message));
    385 }
    386 
    387 void NodeChannel::Broadcast(Channel::MessagePtr message) {
    388   DCHECK(!message->has_handles());
    389   void* data;
    390   Channel::MessagePtr broadcast_message = CreateMessage(
    391       MessageType::BROADCAST, message->data_num_bytes(), 0, &data);
    392   memcpy(data, message->data(), message->data_num_bytes());
    393   WriteChannelMessage(std::move(broadcast_message));
    394 }
    395 
    396 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    397 void NodeChannel::RelayPortsMessage(const ports::NodeName& destination,
    398                                     Channel::MessagePtr message) {
    399 #if defined(OS_WIN)
    400   DCHECK(message->has_handles());
    401 
    402   // Note that this is only used on Windows, and on Windows all platform
    403   // handles are included in the message data. We blindly copy all the data
    404   // here and the relay node (the parent) will duplicate handles as needed.
    405   size_t num_bytes = sizeof(RelayPortsMessageData) + message->data_num_bytes();
    406   RelayPortsMessageData* data;
    407   Channel::MessagePtr relay_message = CreateMessage(
    408       MessageType::RELAY_PORTS_MESSAGE, num_bytes, 0, &data);
    409   data->destination = destination;
    410   memcpy(data + 1, message->data(), message->data_num_bytes());
    411 
    412   // When the handles are duplicated in the parent, the source handles will
    413   // be closed. If the parent never receives this message then these handles
    414   // will leak, but that means something else has probably broken and the
    415   // sending process won't likely be around much longer.
    416   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
    417   handles->clear();
    418 
    419 #else
    420   DCHECK(message->has_mach_ports());
    421 
    422   // On OSX, the handles are extracted from the relayed message and attached to
    423   // the wrapper. The broker then takes the handles attached to the wrapper and
    424   // moves them back to the relayed message. This is necessary because the
    425   // message may contain fds which need to be attached to the outer message so
    426   // that they can be transferred to the broker.
    427   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
    428   size_t num_bytes = sizeof(RelayPortsMessageData) + message->data_num_bytes();
    429   RelayPortsMessageData* data;
    430   Channel::MessagePtr relay_message = CreateMessage(
    431       MessageType::RELAY_PORTS_MESSAGE, num_bytes, handles->size(), &data);
    432   data->destination = destination;
    433   memcpy(data + 1, message->data(), message->data_num_bytes());
    434   relay_message->SetHandles(std::move(handles));
    435 #endif  // defined(OS_WIN)
    436 
    437   WriteChannelMessage(std::move(relay_message));
    438 }
    439 
    440 void NodeChannel::PortsMessageFromRelay(const ports::NodeName& source,
    441                                         Channel::MessagePtr message) {
    442   size_t num_bytes = sizeof(PortsMessageFromRelayData) +
    443       message->payload_size();
    444   PortsMessageFromRelayData* data;
    445   Channel::MessagePtr relayed_message = CreateMessage(
    446       MessageType::PORTS_MESSAGE_FROM_RELAY, num_bytes, message->num_handles(),
    447       &data);
    448   data->source = source;
    449   if (message->payload_size())
    450     memcpy(data + 1, message->payload(), message->payload_size());
    451   relayed_message->SetHandles(message->TakeHandles());
    452   WriteChannelMessage(std::move(relayed_message));
    453 }
    454 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    455 
    456 NodeChannel::NodeChannel(Delegate* delegate,
    457                          ConnectionParams connection_params,
    458                          scoped_refptr<base::TaskRunner> io_task_runner,
    459                          const ProcessErrorCallback& process_error_callback)
    460     : delegate_(delegate),
    461       io_task_runner_(io_task_runner),
    462       process_error_callback_(process_error_callback)
    463 #if !defined(OS_NACL_SFI)
    464       ,
    465       channel_(
    466           Channel::Create(this, std::move(connection_params), io_task_runner_))
    467 #endif
    468 {
    469 }
    470 
    471 NodeChannel::~NodeChannel() {
    472   ShutDown();
    473 }
    474 
    475 void NodeChannel::OnChannelMessage(const void* payload,
    476                                    size_t payload_size,
    477                                    ScopedPlatformHandleVectorPtr handles) {
    478   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    479 
    480   RequestContext request_context(RequestContext::Source::SYSTEM);
    481 
    482   // Ensure this NodeChannel stays alive through the extent of this method. The
    483   // delegate may have the only other reference to this object and it may choose
    484   // to drop it here in response to, e.g., a malformed message.
    485   scoped_refptr<NodeChannel> keepalive = this;
    486 
    487 #if defined(OS_WIN)
    488   // If we receive handles from a known process, rewrite them to our own
    489   // process. This can occur when a privileged node receives handles directly
    490   // from a privileged descendant.
    491   {
    492     base::AutoLock lock(remote_process_handle_lock_);
    493     if (handles && remote_process_handle_ != base::kNullProcessHandle) {
    494       // Note that we explicitly mark the handles as being owned by the sending
    495       // process before rewriting them, in order to accommodate RewriteHandles'
    496       // internal sanity checks.
    497       for (auto& handle : *handles)
    498         handle.owning_process = remote_process_handle_;
    499       if (!Channel::Message::RewriteHandles(remote_process_handle_,
    500                                             base::GetCurrentProcessHandle(),
    501                                             handles.get())) {
    502         DLOG(ERROR) << "Received one or more invalid handles.";
    503       }
    504     } else if (handles) {
    505       // Handles received by an unknown process must already be owned by us.
    506       for (auto& handle : *handles)
    507         handle.owning_process = base::GetCurrentProcessHandle();
    508     }
    509   }
    510 #elif defined(OS_MACOSX) && !defined(OS_IOS)
    511   // If we're not the root, receive any mach ports from the message. If we're
    512   // the root, the only message containing mach ports should be a
    513   // RELAY_PORTS_MESSAGE.
    514   {
    515     MachPortRelay* relay = delegate_->GetMachPortRelay();
    516     if (handles && !relay) {
    517       if (!MachPortRelay::ReceivePorts(handles.get())) {
    518         LOG(ERROR) << "Error receiving mach ports.";
    519       }
    520     }
    521   }
    522 #endif  // defined(OS_WIN)
    523 
    524 
    525   if (payload_size <= sizeof(Header)) {
    526     delegate_->OnChannelError(remote_node_name_, this);
    527     return;
    528   }
    529 
    530   const Header* header = static_cast<const Header*>(payload);
    531   switch (header->type) {
    532     case MessageType::ACCEPT_CHILD: {
    533       const AcceptChildData* data;
    534       if (GetMessagePayload(payload, payload_size, &data)) {
    535         delegate_->OnAcceptChild(remote_node_name_, data->parent_name,
    536                                  data->token);
    537         return;
    538       }
    539       break;
    540     }
    541 
    542     case MessageType::ACCEPT_PARENT: {
    543       const AcceptParentData* data;
    544       if (GetMessagePayload(payload, payload_size, &data)) {
    545         delegate_->OnAcceptParent(remote_node_name_, data->token,
    546                                   data->child_name);
    547         return;
    548       }
    549       break;
    550     }
    551 
    552     case MessageType::ADD_BROKER_CLIENT: {
    553       const AddBrokerClientData* data;
    554       if (GetMessagePayload(payload, payload_size, &data)) {
    555         ScopedPlatformHandle process_handle;
    556 #if defined(OS_WIN)
    557         if (!handles || handles->size() != 1) {
    558           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
    559           break;
    560         }
    561         process_handle = ScopedPlatformHandle(handles->at(0));
    562         handles->clear();
    563         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
    564                                      process_handle.release().handle);
    565 #else
    566         if (handles && handles->size() != 0) {
    567           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
    568           break;
    569         }
    570         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
    571                                      data->process_handle);
    572 #endif
    573         return;
    574       }
    575       break;
    576     }
    577 
    578     case MessageType::BROKER_CLIENT_ADDED: {
    579       const BrokerClientAddedData* data;
    580       if (GetMessagePayload(payload, payload_size, &data)) {
    581         ScopedPlatformHandle broker_channel;
    582         if (!handles || handles->size() != 1) {
    583           DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
    584           break;
    585         }
    586         broker_channel = ScopedPlatformHandle(handles->at(0));
    587         handles->clear();
    588         delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
    589                                        std::move(broker_channel));
    590         return;
    591       }
    592       break;
    593     }
    594 
    595     case MessageType::ACCEPT_BROKER_CLIENT: {
    596       const AcceptBrokerClientData* data;
    597       if (GetMessagePayload(payload, payload_size, &data)) {
    598         ScopedPlatformHandle broker_channel;
    599         if (handles && handles->size() > 1) {
    600           DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
    601           break;
    602         }
    603         if (handles && handles->size() == 1) {
    604           broker_channel = ScopedPlatformHandle(handles->at(0));
    605           handles->clear();
    606         }
    607         delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
    608                                         std::move(broker_channel));
    609         return;
    610       }
    611       break;
    612     }
    613 
    614     case MessageType::PORTS_MESSAGE: {
    615       size_t num_handles = handles ? handles->size() : 0;
    616       Channel::MessagePtr message(
    617           new Channel::Message(payload_size, num_handles));
    618       message->SetHandles(std::move(handles));
    619       memcpy(message->mutable_payload(), payload, payload_size);
    620       delegate_->OnPortsMessage(remote_node_name_, std::move(message));
    621       return;
    622     }
    623 
    624     case MessageType::REQUEST_PORT_MERGE: {
    625       const RequestPortMergeData* data;
    626       if (GetMessagePayload(payload, payload_size, &data)) {
    627         // Don't accept an empty token.
    628         size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
    629         if (token_size == 0)
    630           break;
    631         std::string token(reinterpret_cast<const char*>(data + 1), token_size);
    632         delegate_->OnRequestPortMerge(remote_node_name_,
    633                                       data->connector_port_name, token);
    634         return;
    635       }
    636       break;
    637     }
    638 
    639     case MessageType::REQUEST_INTRODUCTION: {
    640       const IntroductionData* data;
    641       if (GetMessagePayload(payload, payload_size, &data)) {
    642         delegate_->OnRequestIntroduction(remote_node_name_, data->name);
    643         return;
    644       }
    645       break;
    646     }
    647 
    648     case MessageType::INTRODUCE: {
    649       const IntroductionData* data;
    650       if (GetMessagePayload(payload, payload_size, &data)) {
    651         if (handles && handles->size() > 1) {
    652           DLOG(ERROR) << "Dropping invalid introduction message.";
    653           break;
    654         }
    655         ScopedPlatformHandle channel_handle;
    656         if (handles && handles->size() == 1) {
    657           channel_handle = ScopedPlatformHandle(handles->at(0));
    658           handles->clear();
    659         }
    660         delegate_->OnIntroduce(remote_node_name_, data->name,
    661                                std::move(channel_handle));
    662         return;
    663       }
    664       break;
    665     }
    666 
    667 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    668     case MessageType::RELAY_PORTS_MESSAGE: {
    669       base::ProcessHandle from_process;
    670       {
    671         base::AutoLock lock(remote_process_handle_lock_);
    672         from_process = remote_process_handle_;
    673       }
    674       const RelayPortsMessageData* data;
    675       if (GetMessagePayload(payload, payload_size, &data)) {
    676         // Don't try to relay an empty message.
    677         if (payload_size <= sizeof(Header) + sizeof(RelayPortsMessageData))
    678           break;
    679 
    680         const void* message_start = data + 1;
    681         Channel::MessagePtr message = Channel::Message::Deserialize(
    682             message_start, payload_size - sizeof(Header) - sizeof(*data));
    683         if (!message) {
    684           DLOG(ERROR) << "Dropping invalid relay message.";
    685           break;
    686         }
    687   #if defined(OS_MACOSX) && !defined(OS_IOS)
    688         message->SetHandles(std::move(handles));
    689         MachPortRelay* relay = delegate_->GetMachPortRelay();
    690         if (!relay) {
    691           LOG(ERROR) << "Receiving mach ports without a port relay from "
    692                      << remote_node_name_ << ". Dropping message.";
    693           break;
    694         }
    695         {
    696           base::AutoLock lock(pending_mach_messages_lock_);
    697           if (relay->port_provider()->TaskForPid(from_process) ==
    698               MACH_PORT_NULL) {
    699             pending_relay_messages_.push(
    700                 std::make_pair(data->destination, std::move(message)));
    701             break;
    702           }
    703         }
    704   #endif
    705         delegate_->OnRelayPortsMessage(remote_node_name_, from_process,
    706                                        data->destination, std::move(message));
    707         return;
    708       }
    709       break;
    710     }
    711 #endif
    712 
    713     case MessageType::BROADCAST: {
    714       if (payload_size <= sizeof(Header))
    715         break;
    716       const void* data = static_cast<const void*>(
    717           reinterpret_cast<const Header*>(payload) + 1);
    718       Channel::MessagePtr message =
    719           Channel::Message::Deserialize(data, payload_size - sizeof(Header));
    720       if (!message || message->has_handles()) {
    721         DLOG(ERROR) << "Dropping invalid broadcast message.";
    722         break;
    723       }
    724       delegate_->OnBroadcast(remote_node_name_, std::move(message));
    725       return;
    726     }
    727 
    728 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    729     case MessageType::PORTS_MESSAGE_FROM_RELAY:
    730       const PortsMessageFromRelayData* data;
    731       if (GetMessagePayload(payload, payload_size, &data)) {
    732         size_t num_bytes = payload_size - sizeof(*data);
    733         if (num_bytes < sizeof(Header))
    734           break;
    735         num_bytes -= sizeof(Header);
    736 
    737         size_t num_handles = handles ? handles->size() : 0;
    738         Channel::MessagePtr message(
    739             new Channel::Message(num_bytes, num_handles));
    740         message->SetHandles(std::move(handles));
    741         if (num_bytes)
    742           memcpy(message->mutable_payload(), data + 1, num_bytes);
    743         delegate_->OnPortsMessageFromRelay(
    744             remote_node_name_, data->source, std::move(message));
    745         return;
    746       }
    747       break;
    748 
    749 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    750 
    751     case MessageType::ACCEPT_PEER: {
    752       const AcceptPeerData* data;
    753       if (GetMessagePayload(payload, payload_size, &data)) {
    754         delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
    755                                 data->port_name);
    756         return;
    757       }
    758       break;
    759     }
    760 
    761     default:
    762       break;
    763   }
    764 
    765   DLOG(ERROR) << "Received invalid message. Closing channel.";
    766   delegate_->OnChannelError(remote_node_name_, this);
    767 }
    768 
    769 void NodeChannel::OnChannelError() {
    770   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
    771 
    772   RequestContext request_context(RequestContext::Source::SYSTEM);
    773 
    774   ShutDown();
    775   // |OnChannelError()| may cause |this| to be destroyed, but still need access
    776   // to the name name after that destruction. So may a copy of
    777   // |remote_node_name_| so it can be used if |this| becomes destroyed.
    778   ports::NodeName node_name = remote_node_name_;
    779   delegate_->OnChannelError(node_name, this);
    780 }
    781 
    782 #if defined(OS_MACOSX) && !defined(OS_IOS)
    783 void NodeChannel::OnProcessReady(base::ProcessHandle process) {
    784   io_task_runner_->PostTask(FROM_HERE, base::Bind(
    785       &NodeChannel::ProcessPendingMessagesWithMachPorts, this));
    786 }
    787 
    788 void NodeChannel::ProcessPendingMessagesWithMachPorts() {
    789   MachPortRelay* relay = delegate_->GetMachPortRelay();
    790   DCHECK(relay);
    791 
    792   base::ProcessHandle remote_process_handle;
    793   {
    794     base::AutoLock lock(remote_process_handle_lock_);
    795     remote_process_handle = remote_process_handle_;
    796   }
    797   PendingMessageQueue pending_writes;
    798   PendingRelayMessageQueue pending_relays;
    799   {
    800     base::AutoLock lock(pending_mach_messages_lock_);
    801     pending_writes.swap(pending_write_messages_);
    802     pending_relays.swap(pending_relay_messages_);
    803   }
    804 
    805   while (!pending_writes.empty()) {
    806     Channel::MessagePtr message = std::move(pending_writes.front());
    807     pending_writes.pop();
    808     if (!relay->SendPortsToProcess(message.get(), remote_process_handle)) {
    809       LOG(ERROR) << "Error on sending mach ports. Remote process is likely "
    810                  << "gone. Dropping message.";
    811       return;
    812     }
    813 
    814     base::AutoLock lock(channel_lock_);
    815     if (!channel_) {
    816       DLOG(ERROR) << "Dropping message on closed channel.";
    817       break;
    818     } else {
    819       channel_->Write(std::move(message));
    820     }
    821   }
    822 
    823   // Ensure this NodeChannel stays alive while flushing relay messages.
    824   scoped_refptr<NodeChannel> keepalive = this;
    825 
    826   while (!pending_relays.empty()) {
    827     ports::NodeName destination = pending_relays.front().first;
    828     Channel::MessagePtr message = std::move(pending_relays.front().second);
    829     pending_relays.pop();
    830     delegate_->OnRelayPortsMessage(remote_node_name_, remote_process_handle,
    831                                    destination, std::move(message));
    832   }
    833 }
    834 #endif
    835 
    836 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
    837 #if defined(OS_WIN)
    838   // Map handles to the destination process. Note: only messages from a
    839   // privileged node should contain handles on Windows. If an unprivileged
    840   // node needs to send handles, it should do so via RelayPortsMessage which
    841   // stashes the handles in the message in such a way that they go undetected
    842   // here (they'll be unpacked and duplicated by a privileged parent.)
    843 
    844   if (message->has_handles()) {
    845     base::ProcessHandle remote_process_handle;
    846     {
    847       base::AutoLock lock(remote_process_handle_lock_);
    848       remote_process_handle = remote_process_handle_;
    849     }
    850 
    851     // Rewrite outgoing handles if we have a handle to the destination process.
    852     if (remote_process_handle != base::kNullProcessHandle) {
    853       ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
    854       if (!Channel::Message::RewriteHandles(base::GetCurrentProcessHandle(),
    855                                             remote_process_handle,
    856                                             handles.get())) {
    857         DLOG(ERROR) << "Failed to duplicate one or more outgoing handles.";
    858       }
    859       message->SetHandles(std::move(handles));
    860     }
    861   }
    862 #elif defined(OS_MACOSX) && !defined(OS_IOS)
    863   // On OSX, we need to transfer mach ports to the destination process before
    864   // transferring the message itself.
    865   if (message->has_mach_ports()) {
    866     MachPortRelay* relay = delegate_->GetMachPortRelay();
    867     if (relay) {
    868       base::ProcessHandle remote_process_handle;
    869       {
    870         base::AutoLock lock(remote_process_handle_lock_);
    871         // Expect that the receiving node is a child.
    872         DCHECK(remote_process_handle_ != base::kNullProcessHandle);
    873         remote_process_handle = remote_process_handle_;
    874       }
    875       {
    876         base::AutoLock lock(pending_mach_messages_lock_);
    877         if (relay->port_provider()->TaskForPid(remote_process_handle) ==
    878             MACH_PORT_NULL) {
    879           // It is also possible for TaskForPid() to return MACH_PORT_NULL when
    880           // the process has started, then died. In that case, the queued
    881           // message will never be processed. But that's fine since we're about
    882           // to die anyway.
    883           pending_write_messages_.push(std::move(message));
    884           return;
    885         }
    886       }
    887 
    888       if (!relay->SendPortsToProcess(message.get(), remote_process_handle)) {
    889         LOG(ERROR) << "Error on sending mach ports. Remote process is likely "
    890                    << "gone. Dropping message.";
    891         return;
    892       }
    893     }
    894   }
    895 #endif
    896 
    897   base::AutoLock lock(channel_lock_);
    898   if (!channel_)
    899     DLOG(ERROR) << "Dropping message on closed channel.";
    900   else
    901     channel_->Write(std::move(message));
    902 }
    903 
    904 }  // namespace edk
    905 }  // namespace mojo
    906