      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      5 #include "mojo/core/node_channel.h"
      7 #include <cstring>
      8 #include <limits>
      9 #include <sstream>
     11 #include "base/bind.h"
     12 #include "base/location.h"
     13 #include "base/logging.h"
     14 #include "base/memory/ptr_util.h"
     15 #include "mojo/core/channel.h"
     16 #include "mojo/core/configuration.h"
     17 #include "mojo/core/core.h"
     18 #include "mojo/core/request_context.h"
     20 namespace mojo {
     21 namespace core {
     23 namespace {
     25 // NOTE: Please ONLY append messages to the end of this enum.
     26 enum class MessageType : uint32_t {
     32   EVENT_MESSAGE,
     35   INTRODUCE,
     36 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
     38 #endif
     40 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
     42 #endif
     43   ACCEPT_PEER,
     44 };
     46 struct Header {
     47   MessageType type;
     48   uint32_t padding;
     49 };
     51 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
     52               "Invalid header size.");
     54 struct AcceptInviteeData {
     55   ports::NodeName inviter_name;
     56   ports::NodeName token;
     57 };
     59 struct AcceptInvitationData {
     60   ports::NodeName token;
     61   ports::NodeName invitee_name;
     62 };
     64 struct AcceptPeerData {
     65   ports::NodeName token;
     66   ports::NodeName peer_name;
     67   ports::PortName port_name;
     68 };
     70 // This message may include a process handle on plaforms that require it.
     71 struct AddBrokerClientData {
     72   ports::NodeName client_name;
     73 #if !defined(OS_WIN)
     74   uint32_t process_handle;
     75   uint32_t padding;
     76 #endif
     77 };
     79 #if !defined(OS_WIN)
     80 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
     81               "Unexpected pid size");
     82 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
     83               "Invalid AddBrokerClientData size.");
     84 #endif
     86 // This data is followed by a platform channel handle to the broker.
     87 struct BrokerClientAddedData {
     88   ports::NodeName client_name;
     89 };
     91 // This data may be followed by a platform channel handle to the broker. If not,
     92 // then the inviter is the broker and its channel should be used as such.
     93 struct AcceptBrokerClientData {
     94   ports::NodeName broker_name;
     95 };
     97 // This is followed by arbitrary payload data which is interpreted as a token
     98 // string for port location.
     99 struct RequestPortMergeData {
    100   ports::PortName connector_port_name;
    101 };
    103 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
    104 //
    105 // For INTRODUCE the message also includes a valid platform handle for a channel
    106 // the receiver may use to communicate with the named node directly, or an
    107 // invalid platform handle if the node is unknown to the sender or otherwise
    108 // cannot be introduced.
    109 struct IntroductionData {
    110   ports::NodeName name;
    111 };
    113 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    114 // This struct is followed by the full payload of a message to be relayed.
    115 struct RelayEventMessageData {
    116   ports::NodeName destination;
    117 };
    119 // This struct is followed by the full payload of a relayed message.
    120 struct EventMessageFromRelayData {
    121   ports::NodeName source;
    122 };
    123 #endif
    125 template <typename DataType>
    126 Channel::MessagePtr CreateMessage(MessageType type,
    127                                   size_t payload_size,
    128                                   size_t num_handles,
    129                                   DataType** out_data,
    130                                   size_t capacity = 0) {
    131   const size_t total_size = payload_size + sizeof(Header);
    132   if (capacity == 0)
    133     capacity = total_size;
    134   else
    135     capacity = std::max(total_size, capacity);
    136   auto message =
    137       std::make_unique<Channel::Message>(capacity, total_size, num_handles);
    138   Header* header = reinterpret_cast<Header*>(message->mutable_payload());
    139   header->type = type;
    140   header->padding = 0;
    141   *out_data = reinterpret_cast<DataType*>(&header[1]);
    142   return message;
    143 }
    145 template <typename DataType>
    146 bool GetMessagePayload(const void* bytes,
    147                        size_t num_bytes,
    148                        DataType** out_data) {
    149   static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
    150   if (num_bytes < sizeof(Header) + sizeof(DataType))
    151     return false;
    152   *out_data = reinterpret_cast<const DataType*>(
    153       static_cast<const char*>(bytes) + sizeof(Header));
    154   return true;
    155 }
    157 }  // namespace
    159 // static
    160 scoped_refptr<NodeChannel> NodeChannel::Create(
    161     Delegate* delegate,
    162     ConnectionParams connection_params,
    163     scoped_refptr<base::TaskRunner> io_task_runner,
    164     const ProcessErrorCallback& process_error_callback) {
    165 #if defined(OS_NACL_SFI)
    166   LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
    167   return nullptr;
    168 #else
    169   return new NodeChannel(delegate, std::move(connection_params), io_task_runner,
    170                          process_error_callback);
    171 #endif
    172 }
    174 // static
    175 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
    176                                                     size_t payload_size,
    177                                                     void** payload,
    178                                                     size_t num_handles) {
    179   return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
    180                        payload, capacity);
    181 }
    183 // static
    184 void NodeChannel::GetEventMessageData(Channel::Message* message,
    185                                       void** data,
    186                                       size_t* num_data_bytes) {
    187   // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
    188   // with a payload of fewer than |sizeof(Header)| bytes.
    189   *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
    190   *num_data_bytes = message->payload_size() - sizeof(Header);
    191 }
    193 void NodeChannel::Start() {
    194   base::AutoLock lock(channel_lock_);
    195   // ShutDown() may have already been called, in which case |channel_| is null.
    196   if (channel_)
    197     channel_->Start();
    198 }
    200 void NodeChannel::ShutDown() {
    201   base::AutoLock lock(channel_lock_);
    202   if (channel_) {
    203     channel_->ShutDown();
    204     channel_ = nullptr;
    205   }
    206 }
    208 void NodeChannel::LeakHandleOnShutdown() {
    209   base::AutoLock lock(channel_lock_);
    210   if (channel_) {
    211     channel_->LeakHandle();
    212   }
    213 }
    215 void NodeChannel::NotifyBadMessage(const std::string& error) {
    216   if (!process_error_callback_.is_null())
    217     process_error_callback_.Run("Received bad user message: " + error);
    218 }
    220 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
    221   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    222   {
    223     base::AutoLock lock(channel_lock_);
    224     if (channel_)
    225       channel_->set_remote_process(process_handle.Clone());
    226   }
    227   base::AutoLock lock(remote_process_handle_lock_);
    228   DCHECK(!remote_process_handle_.is_valid());
    229   CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
    230   remote_process_handle_ = std::move(process_handle);
    231 }
    233 bool NodeChannel::HasRemoteProcessHandle() {
    234   base::AutoLock lock(remote_process_handle_lock_);
    235   return remote_process_handle_.is_valid();
    236 }
    238 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
    239   base::AutoLock lock(remote_process_handle_lock_);
    240   if (!remote_process_handle_.is_valid())
    241     return ScopedProcessHandle();
    242   return remote_process_handle_.Clone();
    243 }
    245 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
    246   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    247   remote_node_name_ = name;
    248 }
    250 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
    251                                 const ports::NodeName& token) {
    252   AcceptInviteeData* data;
    253   Channel::MessagePtr message = CreateMessage(
    254       MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
    255   data->inviter_name = inviter_name;
    256   data->token = token;
    257   WriteChannelMessage(std::move(message));
    258 }
    260 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
    261                                    const ports::NodeName& invitee_name) {
    262   AcceptInvitationData* data;
    263   Channel::MessagePtr message = CreateMessage(
    264       MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
    265   data->token = token;
    266   data->invitee_name = invitee_name;
    267   WriteChannelMessage(std::move(message));
    268 }
    270 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
    271                              const ports::NodeName& token,
    272                              const ports::PortName& port_name) {
    273   AcceptPeerData* data;
    274   Channel::MessagePtr message =
    275       CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
    276   data->token = token;
    277   data->peer_name = sender_name;
    278   data->port_name = port_name;
    279   WriteChannelMessage(std::move(message));
    280 }
    282 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
    283                                   ScopedProcessHandle process_handle) {
    284   AddBrokerClientData* data;
    285   std::vector<PlatformHandle> handles;
    286 #if defined(OS_WIN)
    287   handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
    288 #endif
    289   Channel::MessagePtr message =
    290       CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
    291                     handles.size(), &data);
    292   message->SetHandles(std::move(handles));
    293   data->client_name = client_name;
    294 #if !defined(OS_WIN)
    295   data->process_handle = process_handle.get();
    296   data->padding = 0;
    297 #endif
    298   WriteChannelMessage(std::move(message));
    299 }
    301 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
    302                                     PlatformHandle broker_channel) {
    303   BrokerClientAddedData* data;
    304   std::vector<PlatformHandle> handles;
    305   if (broker_channel.is_valid())
    306     handles.emplace_back(std::move(broker_channel));
    307   Channel::MessagePtr message =
    308       CreateMessage(MessageType::BROKER_CLIENT_ADDED,
    309                     sizeof(BrokerClientAddedData), handles.size(), &data);
    310   message->SetHandles(std::move(handles));
    311   data->client_name = client_name;
    312   WriteChannelMessage(std::move(message));
    313 }
    315 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
    316                                      PlatformHandle broker_channel) {
    317   AcceptBrokerClientData* data;
    318   std::vector<PlatformHandle> handles;
    319   if (broker_channel.is_valid())
    320     handles.emplace_back(std::move(broker_channel));
    321   Channel::MessagePtr message =
    322       CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
    323                     sizeof(AcceptBrokerClientData), handles.size(), &data);
    324   message->SetHandles(std::move(handles));
    325   data->broker_name = broker_name;
    326   WriteChannelMessage(std::move(message));
    327 }
    329 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
    330                                    const std::string& token) {
    331   RequestPortMergeData* data;
    332   Channel::MessagePtr message =
    333       CreateMessage(MessageType::REQUEST_PORT_MERGE,
    334                     sizeof(RequestPortMergeData) + token.size(), 0, &data);
    335   data->connector_port_name = connector_port_name;
    336   memcpy(data + 1, token.data(), token.size());
    337   WriteChannelMessage(std::move(message));
    338 }
    340 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
    341   IntroductionData* data;
    342   Channel::MessagePtr message = CreateMessage(
    343       MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
    344   data->name = name;
    345   WriteChannelMessage(std::move(message));
    346 }
    348 void NodeChannel::Introduce(const ports::NodeName& name,
    349                             PlatformHandle channel_handle) {
    350   IntroductionData* data;
    351   std::vector<PlatformHandle> handles;
    352   if (channel_handle.is_valid())
    353     handles.emplace_back(std::move(channel_handle));
    354   Channel::MessagePtr message = CreateMessage(
    355       MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
    356   message->SetHandles(std::move(handles));
    357   data->name = name;
    358   WriteChannelMessage(std::move(message));
    359 }
    361 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
    362   WriteChannelMessage(std::move(message));
    363 }
    365 void NodeChannel::Broadcast(Channel::MessagePtr message) {
    366   DCHECK(!message->has_handles());
    367   void* data;
    368   Channel::MessagePtr broadcast_message = CreateMessage(
    369       MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
    370   memcpy(data, message->data(), message->data_num_bytes());
    371   WriteChannelMessage(std::move(broadcast_message));
    372 }
    374 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    375 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
    376                                     Channel::MessagePtr message) {
    377 #if defined(OS_WIN)
    378   DCHECK(message->has_handles());
    380   // Note that this is only used on Windows, and on Windows all platform
    381   // handles are included in the message data. We blindly copy all the data
    382   // here and the relay node (the broker) will duplicate handles as needed.
    383   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
    384   RelayEventMessageData* data;
    385   Channel::MessagePtr relay_message =
    386       CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
    387   data->destination = destination;
    388   memcpy(data + 1, message->data(), message->data_num_bytes());
    390   // When the handles are duplicated in the broker, the source handles will
    391   // be closed. If the broker never receives this message then these handles
    392   // will leak, but that means something else has probably broken and the
    393   // sending process won't likely be around much longer.
    394   //
    395   // TODO(https://crbug.com/813112): We would like to be able to violate the
    396   // above stated assumption. We should not leak handles in cases where we
    397   // outlive the broker, as we may continue existing and eventually accept a new
    398   // broker invitation.
    399   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
    400   for (auto& handle : handles)
    401     handle.TakeHandle().release();
    403 #else
    404   DCHECK(message->has_mach_ports());
    406   // On OSX, the handles are extracted from the relayed message and attached to
    407   // the wrapper. The broker then takes the handles attached to the wrapper and
    408   // moves them back to the relayed message. This is necessary because the
    409   // message may contain fds which need to be attached to the outer message so
    410   // that they can be transferred to the broker.
    411   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
    412   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
    413   RelayEventMessageData* data;
    414   Channel::MessagePtr relay_message = CreateMessage(
    415       MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
    416   data->destination = destination;
    417   memcpy(data + 1, message->data(), message->data_num_bytes());
    418   relay_message->SetHandles(std::move(handles));
    419 #endif  // defined(OS_WIN)
    421   WriteChannelMessage(std::move(relay_message));
    422 }
    424 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
    425                                         Channel::MessagePtr message) {
    426   size_t num_bytes =
    427       sizeof(EventMessageFromRelayData) + message->payload_size();
    428   EventMessageFromRelayData* data;
    429   Channel::MessagePtr relayed_message =
    430       CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
    431                     message->num_handles(), &data);
    432   data->source = source;
    433   if (message->payload_size())
    434     memcpy(data + 1, message->payload(), message->payload_size());
    435   relayed_message->SetHandles(message->TakeHandles());
    436   WriteChannelMessage(std::move(relayed_message));
    437 }
    438 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    440 NodeChannel::NodeChannel(Delegate* delegate,
    441                          ConnectionParams connection_params,
    442                          scoped_refptr<base::TaskRunner> io_task_runner,
    443                          const ProcessErrorCallback& process_error_callback)
    444     : delegate_(delegate),
    445       io_task_runner_(io_task_runner),
    446       process_error_callback_(process_error_callback)
    447 #if !defined(OS_NACL_SFI)
    448       ,
    449       channel_(
    450           Channel::Create(this, std::move(connection_params), io_task_runner_))
    451 #endif
    452 {
    453 }
    455 NodeChannel::~NodeChannel() {
    456   ShutDown();
    457 }
    459 void NodeChannel::OnChannelMessage(const void* payload,
    460                                    size_t payload_size,
    461                                    std::vector<PlatformHandle> handles) {
    462   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    464   RequestContext request_context(RequestContext::Source::SYSTEM);
    466   // Ensure this NodeChannel stays alive through the extent of this method. The
    467   // delegate may have the only other reference to this object and it may choose
    468   // to drop it here in response to, e.g., a malformed message.
    469   scoped_refptr<NodeChannel> keepalive = this;
    471   if (payload_size <= sizeof(Header)) {
    472     delegate_->OnChannelError(remote_node_name_, this);
    473     return;
    474   }
    476   const Header* header = static_cast<const Header*>(payload);
    477   switch (header->type) {
    478     case MessageType::ACCEPT_INVITEE: {
    479       const AcceptInviteeData* data;
    480       if (GetMessagePayload(payload, payload_size, &data)) {
    481         delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
    482                                    data->token);
    483         return;
    484       }
    485       break;
    486     }
    488     case MessageType::ACCEPT_INVITATION: {
    489       const AcceptInvitationData* data;
    490       if (GetMessagePayload(payload, payload_size, &data)) {
    491         delegate_->OnAcceptInvitation(remote_node_name_, data->token,
    492                                       data->invitee_name);
    493         return;
    494       }
    495       break;
    496     }
    498     case MessageType::ADD_BROKER_CLIENT: {
    499       const AddBrokerClientData* data;
    500       if (GetMessagePayload(payload, payload_size, &data)) {
    501 #if defined(OS_WIN)
    502         if (handles.size() != 1) {
    503           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
    504           break;
    505         }
    506         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
    507                                      handles[0].ReleaseHandle());
    508 #else
    509         if (!handles.empty()) {
    510           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
    511           break;
    512         }
    513         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
    514                                      data->process_handle);
    515 #endif
    516         return;
    517       }
    518       break;
    519     }
    521     case MessageType::BROKER_CLIENT_ADDED: {
    522       const BrokerClientAddedData* data;
    523       if (GetMessagePayload(payload, payload_size, &data)) {
    524         if (handles.size() != 1) {
    525           DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
    526           break;
    527         }
    528         delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
    529                                        std::move(handles[0]));
    530         return;
    531       }
    532       break;
    533     }
    535     case MessageType::ACCEPT_BROKER_CLIENT: {
    536       const AcceptBrokerClientData* data;
    537       if (GetMessagePayload(payload, payload_size, &data)) {
    538         PlatformHandle broker_channel;
    539         if (handles.size() > 1) {
    540           DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
    541           break;
    542         }
    543         if (handles.size() == 1)
    544           broker_channel = std::move(handles[0]);
    546         delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
    547                                         std::move(broker_channel));
    548         return;
    549       }
    550       break;
    551     }
    553     case MessageType::EVENT_MESSAGE: {
    554       Channel::MessagePtr message(
    555           new Channel::Message(payload_size, handles.size()));
    556       message->SetHandles(std::move(handles));
    557       memcpy(message->mutable_payload(), payload, payload_size);
    558       delegate_->OnEventMessage(remote_node_name_, std::move(message));
    559       return;
    560     }
    562     case MessageType::REQUEST_PORT_MERGE: {
    563       const RequestPortMergeData* data;
    564       if (GetMessagePayload(payload, payload_size, &data)) {
    565         // Don't accept an empty token.
    566         size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
    567         if (token_size == 0)
    568           break;
    569         std::string token(reinterpret_cast<const char*>(data + 1), token_size);
    570         delegate_->OnRequestPortMerge(remote_node_name_,
    571                                       data->connector_port_name, token);
    572         return;
    573       }
    574       break;
    575     }
    577     case MessageType::REQUEST_INTRODUCTION: {
    578       const IntroductionData* data;
    579       if (GetMessagePayload(payload, payload_size, &data)) {
    580         delegate_->OnRequestIntroduction(remote_node_name_, data->name);
    581         return;
    582       }
    583       break;
    584     }
    586     case MessageType::INTRODUCE: {
    587       const IntroductionData* data;
    588       if (GetMessagePayload(payload, payload_size, &data)) {
    589         if (handles.size() > 1) {
    590           DLOG(ERROR) << "Dropping invalid introduction message.";
    591           break;
    592         }
    593         PlatformHandle channel_handle;
    594         if (handles.size() == 1)
    595           channel_handle = std::move(handles[0]);
    597         delegate_->OnIntroduce(remote_node_name_, data->name,
    598                                std::move(channel_handle));
    599         return;
    600       }
    601       break;
    602     }
    604 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    605     case MessageType::RELAY_EVENT_MESSAGE: {
    606       base::ProcessHandle from_process;
    607       {
    608         base::AutoLock lock(remote_process_handle_lock_);
    609         // NOTE: It's safe to retain a weak reference to this process handle
    610         // through the extent of this call because |this| is kept alive and
    611         // |remote_process_handle_| is never reset once set.
    612         from_process = remote_process_handle_.get();
    613       }
    614       const RelayEventMessageData* data;
    615       if (GetMessagePayload(payload, payload_size, &data)) {
    616         // Don't try to relay an empty message.
    617         if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
    618           break;
    620         const void* message_start = data + 1;
    621         Channel::MessagePtr message = Channel::Message::Deserialize(
    622             message_start, payload_size - sizeof(Header) - sizeof(*data),
    623             from_process);
    624         if (!message) {
    625           DLOG(ERROR) << "Dropping invalid relay message.";
    626           break;
    627         }
    628 #if defined(OS_MACOSX) && !defined(OS_IOS)
    629         message->SetHandles(std::move(handles));
    630 #endif
    631         delegate_->OnRelayEventMessage(remote_node_name_, from_process,
    632                                        data->destination, std::move(message));
    633         return;
    634       }
    635       break;
    636     }
    637 #endif
    639     case MessageType::BROADCAST_EVENT: {
    640       if (payload_size <= sizeof(Header))
    641         break;
    642       const void* data = static_cast<const void*>(
    643           reinterpret_cast<const Header*>(payload) + 1);
    644       Channel::MessagePtr message =
    645           Channel::Message::Deserialize(data, payload_size - sizeof(Header));
    646       if (!message || message->has_handles()) {
    647         DLOG(ERROR) << "Dropping invalid broadcast message.";
    648         break;
    649       }
    650       delegate_->OnBroadcast(remote_node_name_, std::move(message));
    651       return;
    652     }
    654 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    655     case MessageType::EVENT_MESSAGE_FROM_RELAY:
    656       const EventMessageFromRelayData* data;
    657       if (GetMessagePayload(payload, payload_size, &data)) {
    658         size_t num_bytes = payload_size - sizeof(*data);
    659         if (num_bytes < sizeof(Header))
    660           break;
    661         num_bytes -= sizeof(Header);
    663         Channel::MessagePtr message(
    664             new Channel::Message(num_bytes, handles.size()));
    665         message->SetHandles(std::move(handles));
    666         if (num_bytes)
    667           memcpy(message->mutable_payload(), data + 1, num_bytes);
    668         delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
    669                                            std::move(message));
    670         return;
    671       }
    672       break;
    674 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
    676     case MessageType::ACCEPT_PEER: {
    677       const AcceptPeerData* data;
    678       if (GetMessagePayload(payload, payload_size, &data)) {
    679         delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
    680                                 data->port_name);
    681         return;
    682       }
    683       break;
    684     }
    686     default:
    687       // Ignore unrecognized message types, allowing for future extensibility.
    688       return;
    689   }
    691   DLOG(ERROR) << "Received invalid message. Closing channel.";
    692   if (process_error_callback_)
    693     process_error_callback_.Run("NodeChannel received a malformed message");
    694   delegate_->OnChannelError(remote_node_name_, this);
    695 }
    697 void NodeChannel::OnChannelError(Channel::Error error) {
    698   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
    700   RequestContext request_context(RequestContext::Source::SYSTEM);
    702   ShutDown();
    704   if (process_error_callback_ &&
    705       error == Channel::Error::kReceivedMalformedData) {
    706     process_error_callback_.Run("Channel received a malformed message");
    707   }
    709   // |OnChannelError()| may cause |this| to be destroyed, but still need access
    710   // to the name name after that destruction. So may a copy of
    711   // |remote_node_name_| so it can be used if |this| becomes destroyed.
    712   ports::NodeName node_name = remote_node_name_;
    713   delegate_->OnChannelError(node_name, this);
    714 }
    716 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
    717   // Force a crash if this process attempts to send a message larger than the
    718   // maximum allowed size. This is more useful than killing a Channel when we
    719   // *receive* an oversized message, as we should consider oversized message
    720   // transmission to be a bug and this helps easily identify offending code.
    721   CHECK(message->data_num_bytes() < GetConfiguration().max_message_num_bytes);
    723   base::AutoLock lock(channel_lock_);
    724   if (!channel_)
    725     DLOG(ERROR) << "Dropping message on closed channel.";
    726   else
    727     channel_->Write(std::move(message));
    728 }
    730 }  // namespace core
    731 }  // namespace mojo