1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/core/node_channel.h" 6 7 #include <cstring> 8 #include <limits> 9 #include <sstream> 10 11 #include "base/bind.h" 12 #include "base/location.h" 13 #include "base/logging.h" 14 #include "base/memory/ptr_util.h" 15 #include "mojo/core/channel.h" 16 #include "mojo/core/configuration.h" 17 #include "mojo/core/core.h" 18 #include "mojo/core/request_context.h" 19 20 namespace mojo { 21 namespace core { 22 23 namespace { 24 25 // NOTE: Please ONLY append messages to the end of this enum. 26 enum class MessageType : uint32_t { 27 ACCEPT_INVITEE, 28 ACCEPT_INVITATION, 29 ADD_BROKER_CLIENT, 30 BROKER_CLIENT_ADDED, 31 ACCEPT_BROKER_CLIENT, 32 EVENT_MESSAGE, 33 REQUEST_PORT_MERGE, 34 REQUEST_INTRODUCTION, 35 INTRODUCE, 36 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 37 RELAY_EVENT_MESSAGE, 38 #endif 39 BROADCAST_EVENT, 40 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 41 EVENT_MESSAGE_FROM_RELAY, 42 #endif 43 ACCEPT_PEER, 44 }; 45 46 struct Header { 47 MessageType type; 48 uint32_t padding; 49 }; 50 51 static_assert(IsAlignedForChannelMessage(sizeof(Header)), 52 "Invalid header size."); 53 54 struct AcceptInviteeData { 55 ports::NodeName inviter_name; 56 ports::NodeName token; 57 }; 58 59 struct AcceptInvitationData { 60 ports::NodeName token; 61 ports::NodeName invitee_name; 62 }; 63 64 struct AcceptPeerData { 65 ports::NodeName token; 66 ports::NodeName peer_name; 67 ports::PortName port_name; 68 }; 69 70 // This message may include a process handle on plaforms that require it. 71 struct AddBrokerClientData { 72 ports::NodeName client_name; 73 #if !defined(OS_WIN) 74 uint32_t process_handle; 75 uint32_t padding; 76 #endif 77 }; 78 79 #if !defined(OS_WIN) 80 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t), 81 "Unexpected pid size"); 82 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0, 83 "Invalid AddBrokerClientData size."); 84 #endif 85 86 // This data is followed by a platform channel handle to the broker. 87 struct BrokerClientAddedData { 88 ports::NodeName client_name; 89 }; 90 91 // This data may be followed by a platform channel handle to the broker. If not, 92 // then the inviter is the broker and its channel should be used as such. 93 struct AcceptBrokerClientData { 94 ports::NodeName broker_name; 95 }; 96 97 // This is followed by arbitrary payload data which is interpreted as a token 98 // string for port location. 99 struct RequestPortMergeData { 100 ports::PortName connector_port_name; 101 }; 102 103 // Used for both REQUEST_INTRODUCTION and INTRODUCE. 104 // 105 // For INTRODUCE the message also includes a valid platform handle for a channel 106 // the receiver may use to communicate with the named node directly, or an 107 // invalid platform handle if the node is unknown to the sender or otherwise 108 // cannot be introduced. 109 struct IntroductionData { 110 ports::NodeName name; 111 }; 112 113 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 114 // This struct is followed by the full payload of a message to be relayed. 115 struct RelayEventMessageData { 116 ports::NodeName destination; 117 }; 118 119 // This struct is followed by the full payload of a relayed message. 120 struct EventMessageFromRelayData { 121 ports::NodeName source; 122 }; 123 #endif 124 125 template <typename DataType> 126 Channel::MessagePtr CreateMessage(MessageType type, 127 size_t payload_size, 128 size_t num_handles, 129 DataType** out_data, 130 size_t capacity = 0) { 131 const size_t total_size = payload_size + sizeof(Header); 132 if (capacity == 0) 133 capacity = total_size; 134 else 135 capacity = std::max(total_size, capacity); 136 auto message = 137 std::make_unique<Channel::Message>(capacity, total_size, num_handles); 138 Header* header = reinterpret_cast<Header*>(message->mutable_payload()); 139 header->type = type; 140 header->padding = 0; 141 *out_data = reinterpret_cast<DataType*>(&header[1]); 142 return message; 143 } 144 145 template <typename DataType> 146 bool GetMessagePayload(const void* bytes, 147 size_t num_bytes, 148 DataType** out_data) { 149 static_assert(sizeof(DataType) > 0, "DataType must have non-zero size."); 150 if (num_bytes < sizeof(Header) + sizeof(DataType)) 151 return false; 152 *out_data = reinterpret_cast<const DataType*>( 153 static_cast<const char*>(bytes) + sizeof(Header)); 154 return true; 155 } 156 157 } // namespace 158 159 // static 160 scoped_refptr<NodeChannel> NodeChannel::Create( 161 Delegate* delegate, 162 ConnectionParams connection_params, 163 scoped_refptr<base::TaskRunner> io_task_runner, 164 const ProcessErrorCallback& process_error_callback) { 165 #if defined(OS_NACL_SFI) 166 LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI"; 167 return nullptr; 168 #else 169 return new NodeChannel(delegate, std::move(connection_params), io_task_runner, 170 process_error_callback); 171 #endif 172 } 173 174 // static 175 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity, 176 size_t payload_size, 177 void** payload, 178 size_t num_handles) { 179 return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles, 180 payload, capacity); 181 } 182 183 // static 184 void NodeChannel::GetEventMessageData(Channel::Message* message, 185 void** data, 186 size_t* num_data_bytes) { 187 // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message 188 // with a payload of fewer than |sizeof(Header)| bytes. 189 *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1; 190 *num_data_bytes = message->payload_size() - sizeof(Header); 191 } 192 193 void NodeChannel::Start() { 194 base::AutoLock lock(channel_lock_); 195 // ShutDown() may have already been called, in which case |channel_| is null. 196 if (channel_) 197 channel_->Start(); 198 } 199 200 void NodeChannel::ShutDown() { 201 base::AutoLock lock(channel_lock_); 202 if (channel_) { 203 channel_->ShutDown(); 204 channel_ = nullptr; 205 } 206 } 207 208 void NodeChannel::LeakHandleOnShutdown() { 209 base::AutoLock lock(channel_lock_); 210 if (channel_) { 211 channel_->LeakHandle(); 212 } 213 } 214 215 void NodeChannel::NotifyBadMessage(const std::string& error) { 216 if (!process_error_callback_.is_null()) 217 process_error_callback_.Run("Received bad user message: " + error); 218 } 219 220 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) { 221 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 222 { 223 base::AutoLock lock(channel_lock_); 224 if (channel_) 225 channel_->set_remote_process(process_handle.Clone()); 226 } 227 base::AutoLock lock(remote_process_handle_lock_); 228 DCHECK(!remote_process_handle_.is_valid()); 229 CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle()); 230 remote_process_handle_ = std::move(process_handle); 231 } 232 233 bool NodeChannel::HasRemoteProcessHandle() { 234 base::AutoLock lock(remote_process_handle_lock_); 235 return remote_process_handle_.is_valid(); 236 } 237 238 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() { 239 base::AutoLock lock(remote_process_handle_lock_); 240 if (!remote_process_handle_.is_valid()) 241 return ScopedProcessHandle(); 242 return remote_process_handle_.Clone(); 243 } 244 245 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) { 246 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 247 remote_node_name_ = name; 248 } 249 250 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name, 251 const ports::NodeName& token) { 252 AcceptInviteeData* data; 253 Channel::MessagePtr message = CreateMessage( 254 MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data); 255 data->inviter_name = inviter_name; 256 data->token = token; 257 WriteChannelMessage(std::move(message)); 258 } 259 260 void NodeChannel::AcceptInvitation(const ports::NodeName& token, 261 const ports::NodeName& invitee_name) { 262 AcceptInvitationData* data; 263 Channel::MessagePtr message = CreateMessage( 264 MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data); 265 data->token = token; 266 data->invitee_name = invitee_name; 267 WriteChannelMessage(std::move(message)); 268 } 269 270 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name, 271 const ports::NodeName& token, 272 const ports::PortName& port_name) { 273 AcceptPeerData* data; 274 Channel::MessagePtr message = 275 CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data); 276 data->token = token; 277 data->peer_name = sender_name; 278 data->port_name = port_name; 279 WriteChannelMessage(std::move(message)); 280 } 281 282 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name, 283 ScopedProcessHandle process_handle) { 284 AddBrokerClientData* data; 285 std::vector<PlatformHandle> handles; 286 #if defined(OS_WIN) 287 handles.emplace_back(base::win::ScopedHandle(process_handle.release())); 288 #endif 289 Channel::MessagePtr message = 290 CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData), 291 handles.size(), &data); 292 message->SetHandles(std::move(handles)); 293 data->client_name = client_name; 294 #if !defined(OS_WIN) 295 data->process_handle = process_handle.get(); 296 data->padding = 0; 297 #endif 298 WriteChannelMessage(std::move(message)); 299 } 300 301 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name, 302 PlatformHandle broker_channel) { 303 BrokerClientAddedData* data; 304 std::vector<PlatformHandle> handles; 305 if (broker_channel.is_valid()) 306 handles.emplace_back(std::move(broker_channel)); 307 Channel::MessagePtr message = 308 CreateMessage(MessageType::BROKER_CLIENT_ADDED, 309 sizeof(BrokerClientAddedData), handles.size(), &data); 310 message->SetHandles(std::move(handles)); 311 data->client_name = client_name; 312 WriteChannelMessage(std::move(message)); 313 } 314 315 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name, 316 PlatformHandle broker_channel) { 317 AcceptBrokerClientData* data; 318 std::vector<PlatformHandle> handles; 319 if (broker_channel.is_valid()) 320 handles.emplace_back(std::move(broker_channel)); 321 Channel::MessagePtr message = 322 CreateMessage(MessageType::ACCEPT_BROKER_CLIENT, 323 sizeof(AcceptBrokerClientData), handles.size(), &data); 324 message->SetHandles(std::move(handles)); 325 data->broker_name = broker_name; 326 WriteChannelMessage(std::move(message)); 327 } 328 329 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name, 330 const std::string& token) { 331 RequestPortMergeData* data; 332 Channel::MessagePtr message = 333 CreateMessage(MessageType::REQUEST_PORT_MERGE, 334 sizeof(RequestPortMergeData) + token.size(), 0, &data); 335 data->connector_port_name = connector_port_name; 336 memcpy(data + 1, token.data(), token.size()); 337 WriteChannelMessage(std::move(message)); 338 } 339 340 void NodeChannel::RequestIntroduction(const ports::NodeName& name) { 341 IntroductionData* data; 342 Channel::MessagePtr message = CreateMessage( 343 MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data); 344 data->name = name; 345 WriteChannelMessage(std::move(message)); 346 } 347 348 void NodeChannel::Introduce(const ports::NodeName& name, 349 PlatformHandle channel_handle) { 350 IntroductionData* data; 351 std::vector<PlatformHandle> handles; 352 if (channel_handle.is_valid()) 353 handles.emplace_back(std::move(channel_handle)); 354 Channel::MessagePtr message = CreateMessage( 355 MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data); 356 message->SetHandles(std::move(handles)); 357 data->name = name; 358 WriteChannelMessage(std::move(message)); 359 } 360 361 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) { 362 WriteChannelMessage(std::move(message)); 363 } 364 365 void NodeChannel::Broadcast(Channel::MessagePtr message) { 366 DCHECK(!message->has_handles()); 367 void* data; 368 Channel::MessagePtr broadcast_message = CreateMessage( 369 MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data); 370 memcpy(data, message->data(), message->data_num_bytes()); 371 WriteChannelMessage(std::move(broadcast_message)); 372 } 373 374 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 375 void NodeChannel::RelayEventMessage(const ports::NodeName& destination, 376 Channel::MessagePtr message) { 377 #if defined(OS_WIN) 378 DCHECK(message->has_handles()); 379 380 // Note that this is only used on Windows, and on Windows all platform 381 // handles are included in the message data. We blindly copy all the data 382 // here and the relay node (the broker) will duplicate handles as needed. 383 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes(); 384 RelayEventMessageData* data; 385 Channel::MessagePtr relay_message = 386 CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data); 387 data->destination = destination; 388 memcpy(data + 1, message->data(), message->data_num_bytes()); 389 390 // When the handles are duplicated in the broker, the source handles will 391 // be closed. If the broker never receives this message then these handles 392 // will leak, but that means something else has probably broken and the 393 // sending process won't likely be around much longer. 394 // 395 // TODO(https://crbug.com/813112): We would like to be able to violate the 396 // above stated assumption. We should not leak handles in cases where we 397 // outlive the broker, as we may continue existing and eventually accept a new 398 // broker invitation. 399 std::vector<PlatformHandleInTransit> handles = message->TakeHandles(); 400 for (auto& handle : handles) 401 handle.TakeHandle().release(); 402 403 #else 404 DCHECK(message->has_mach_ports()); 405 406 // On OSX, the handles are extracted from the relayed message and attached to 407 // the wrapper. The broker then takes the handles attached to the wrapper and 408 // moves them back to the relayed message. This is necessary because the 409 // message may contain fds which need to be attached to the outer message so 410 // that they can be transferred to the broker. 411 std::vector<PlatformHandleInTransit> handles = message->TakeHandles(); 412 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes(); 413 RelayEventMessageData* data; 414 Channel::MessagePtr relay_message = CreateMessage( 415 MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data); 416 data->destination = destination; 417 memcpy(data + 1, message->data(), message->data_num_bytes()); 418 relay_message->SetHandles(std::move(handles)); 419 #endif // defined(OS_WIN) 420 421 WriteChannelMessage(std::move(relay_message)); 422 } 423 424 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source, 425 Channel::MessagePtr message) { 426 size_t num_bytes = 427 sizeof(EventMessageFromRelayData) + message->payload_size(); 428 EventMessageFromRelayData* data; 429 Channel::MessagePtr relayed_message = 430 CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes, 431 message->num_handles(), &data); 432 data->source = source; 433 if (message->payload_size()) 434 memcpy(data + 1, message->payload(), message->payload_size()); 435 relayed_message->SetHandles(message->TakeHandles()); 436 WriteChannelMessage(std::move(relayed_message)); 437 } 438 #endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 439 440 NodeChannel::NodeChannel(Delegate* delegate, 441 ConnectionParams connection_params, 442 scoped_refptr<base::TaskRunner> io_task_runner, 443 const ProcessErrorCallback& process_error_callback) 444 : delegate_(delegate), 445 io_task_runner_(io_task_runner), 446 process_error_callback_(process_error_callback) 447 #if !defined(OS_NACL_SFI) 448 , 449 channel_( 450 Channel::Create(this, std::move(connection_params), io_task_runner_)) 451 #endif 452 { 453 } 454 455 NodeChannel::~NodeChannel() { 456 ShutDown(); 457 } 458 459 void NodeChannel::OnChannelMessage(const void* payload, 460 size_t payload_size, 461 std::vector<PlatformHandle> handles) { 462 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 463 464 RequestContext request_context(RequestContext::Source::SYSTEM); 465 466 // Ensure this NodeChannel stays alive through the extent of this method. The 467 // delegate may have the only other reference to this object and it may choose 468 // to drop it here in response to, e.g., a malformed message. 469 scoped_refptr<NodeChannel> keepalive = this; 470 471 if (payload_size <= sizeof(Header)) { 472 delegate_->OnChannelError(remote_node_name_, this); 473 return; 474 } 475 476 const Header* header = static_cast<const Header*>(payload); 477 switch (header->type) { 478 case MessageType::ACCEPT_INVITEE: { 479 const AcceptInviteeData* data; 480 if (GetMessagePayload(payload, payload_size, &data)) { 481 delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name, 482 data->token); 483 return; 484 } 485 break; 486 } 487 488 case MessageType::ACCEPT_INVITATION: { 489 const AcceptInvitationData* data; 490 if (GetMessagePayload(payload, payload_size, &data)) { 491 delegate_->OnAcceptInvitation(remote_node_name_, data->token, 492 data->invitee_name); 493 return; 494 } 495 break; 496 } 497 498 case MessageType::ADD_BROKER_CLIENT: { 499 const AddBrokerClientData* data; 500 if (GetMessagePayload(payload, payload_size, &data)) { 501 #if defined(OS_WIN) 502 if (handles.size() != 1) { 503 DLOG(ERROR) << "Dropping invalid AddBrokerClient message."; 504 break; 505 } 506 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name, 507 handles[0].ReleaseHandle()); 508 #else 509 if (!handles.empty()) { 510 DLOG(ERROR) << "Dropping invalid AddBrokerClient message."; 511 break; 512 } 513 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name, 514 data->process_handle); 515 #endif 516 return; 517 } 518 break; 519 } 520 521 case MessageType::BROKER_CLIENT_ADDED: { 522 const BrokerClientAddedData* data; 523 if (GetMessagePayload(payload, payload_size, &data)) { 524 if (handles.size() != 1) { 525 DLOG(ERROR) << "Dropping invalid BrokerClientAdded message."; 526 break; 527 } 528 delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name, 529 std::move(handles[0])); 530 return; 531 } 532 break; 533 } 534 535 case MessageType::ACCEPT_BROKER_CLIENT: { 536 const AcceptBrokerClientData* data; 537 if (GetMessagePayload(payload, payload_size, &data)) { 538 PlatformHandle broker_channel; 539 if (handles.size() > 1) { 540 DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message."; 541 break; 542 } 543 if (handles.size() == 1) 544 broker_channel = std::move(handles[0]); 545 546 delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name, 547 std::move(broker_channel)); 548 return; 549 } 550 break; 551 } 552 553 case MessageType::EVENT_MESSAGE: { 554 Channel::MessagePtr message( 555 new Channel::Message(payload_size, handles.size())); 556 message->SetHandles(std::move(handles)); 557 memcpy(message->mutable_payload(), payload, payload_size); 558 delegate_->OnEventMessage(remote_node_name_, std::move(message)); 559 return; 560 } 561 562 case MessageType::REQUEST_PORT_MERGE: { 563 const RequestPortMergeData* data; 564 if (GetMessagePayload(payload, payload_size, &data)) { 565 // Don't accept an empty token. 566 size_t token_size = payload_size - sizeof(*data) - sizeof(Header); 567 if (token_size == 0) 568 break; 569 std::string token(reinterpret_cast<const char*>(data + 1), token_size); 570 delegate_->OnRequestPortMerge(remote_node_name_, 571 data->connector_port_name, token); 572 return; 573 } 574 break; 575 } 576 577 case MessageType::REQUEST_INTRODUCTION: { 578 const IntroductionData* data; 579 if (GetMessagePayload(payload, payload_size, &data)) { 580 delegate_->OnRequestIntroduction(remote_node_name_, data->name); 581 return; 582 } 583 break; 584 } 585 586 case MessageType::INTRODUCE: { 587 const IntroductionData* data; 588 if (GetMessagePayload(payload, payload_size, &data)) { 589 if (handles.size() > 1) { 590 DLOG(ERROR) << "Dropping invalid introduction message."; 591 break; 592 } 593 PlatformHandle channel_handle; 594 if (handles.size() == 1) 595 channel_handle = std::move(handles[0]); 596 597 delegate_->OnIntroduce(remote_node_name_, data->name, 598 std::move(channel_handle)); 599 return; 600 } 601 break; 602 } 603 604 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 605 case MessageType::RELAY_EVENT_MESSAGE: { 606 base::ProcessHandle from_process; 607 { 608 base::AutoLock lock(remote_process_handle_lock_); 609 // NOTE: It's safe to retain a weak reference to this process handle 610 // through the extent of this call because |this| is kept alive and 611 // |remote_process_handle_| is never reset once set. 612 from_process = remote_process_handle_.get(); 613 } 614 const RelayEventMessageData* data; 615 if (GetMessagePayload(payload, payload_size, &data)) { 616 // Don't try to relay an empty message. 617 if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData)) 618 break; 619 620 const void* message_start = data + 1; 621 Channel::MessagePtr message = Channel::Message::Deserialize( 622 message_start, payload_size - sizeof(Header) - sizeof(*data), 623 from_process); 624 if (!message) { 625 DLOG(ERROR) << "Dropping invalid relay message."; 626 break; 627 } 628 #if defined(OS_MACOSX) && !defined(OS_IOS) 629 message->SetHandles(std::move(handles)); 630 #endif 631 delegate_->OnRelayEventMessage(remote_node_name_, from_process, 632 data->destination, std::move(message)); 633 return; 634 } 635 break; 636 } 637 #endif 638 639 case MessageType::BROADCAST_EVENT: { 640 if (payload_size <= sizeof(Header)) 641 break; 642 const void* data = static_cast<const void*>( 643 reinterpret_cast<const Header*>(payload) + 1); 644 Channel::MessagePtr message = 645 Channel::Message::Deserialize(data, payload_size - sizeof(Header)); 646 if (!message || message->has_handles()) { 647 DLOG(ERROR) << "Dropping invalid broadcast message."; 648 break; 649 } 650 delegate_->OnBroadcast(remote_node_name_, std::move(message)); 651 return; 652 } 653 654 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 655 case MessageType::EVENT_MESSAGE_FROM_RELAY: 656 const EventMessageFromRelayData* data; 657 if (GetMessagePayload(payload, payload_size, &data)) { 658 size_t num_bytes = payload_size - sizeof(*data); 659 if (num_bytes < sizeof(Header)) 660 break; 661 num_bytes -= sizeof(Header); 662 663 Channel::MessagePtr message( 664 new Channel::Message(num_bytes, handles.size())); 665 message->SetHandles(std::move(handles)); 666 if (num_bytes) 667 memcpy(message->mutable_payload(), data + 1, num_bytes); 668 delegate_->OnEventMessageFromRelay(remote_node_name_, data->source, 669 std::move(message)); 670 return; 671 } 672 break; 673 674 #endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 675 676 case MessageType::ACCEPT_PEER: { 677 const AcceptPeerData* data; 678 if (GetMessagePayload(payload, payload_size, &data)) { 679 delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name, 680 data->port_name); 681 return; 682 } 683 break; 684 } 685 686 default: 687 // Ignore unrecognized message types, allowing for future extensibility. 688 return; 689 } 690 691 DLOG(ERROR) << "Received invalid message. Closing channel."; 692 if (process_error_callback_) 693 process_error_callback_.Run("NodeChannel received a malformed message"); 694 delegate_->OnChannelError(remote_node_name_, this); 695 } 696 697 void NodeChannel::OnChannelError(Channel::Error error) { 698 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 699 700 RequestContext request_context(RequestContext::Source::SYSTEM); 701 702 ShutDown(); 703 704 if (process_error_callback_ && 705 error == Channel::Error::kReceivedMalformedData) { 706 process_error_callback_.Run("Channel received a malformed message"); 707 } 708 709 // |OnChannelError()| may cause |this| to be destroyed, but still need access 710 // to the name name after that destruction. So may a copy of 711 // |remote_node_name_| so it can be used if |this| becomes destroyed. 712 ports::NodeName node_name = remote_node_name_; 713 delegate_->OnChannelError(node_name, this); 714 } 715 716 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) { 717 // Force a crash if this process attempts to send a message larger than the 718 // maximum allowed size. This is more useful than killing a Channel when we 719 // *receive* an oversized message, as we should consider oversized message 720 // transmission to be a bug and this helps easily identify offending code. 721 CHECK(message->data_num_bytes() < GetConfiguration().max_message_num_bytes); 722 723 base::AutoLock lock(channel_lock_); 724 if (!channel_) 725 DLOG(ERROR) << "Dropping message on closed channel."; 726 else 727 channel_->Write(std::move(message)); 728 } 729 730 } // namespace core 731 } // namespace mojo 732