1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/edk/system/channel.h" 6 7 #include <errno.h> 8 #include <sys/socket.h> 9 10 #include <algorithm> 11 #include <deque> 12 #include <limits> 13 #include <memory> 14 15 #include "base/bind.h" 16 #include "base/location.h" 17 #include "base/macros.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/message_loop/message_loop.h" 20 #include "base/synchronization/lock.h" 21 #include "base/task_runner.h" 22 #include "mojo/edk/embedder/platform_channel_utils_posix.h" 23 #include "mojo/edk/embedder/platform_handle_vector.h" 24 25 #if !defined(OS_NACL) 26 #include <sys/uio.h> 27 #endif 28 29 namespace mojo { 30 namespace edk { 31 32 namespace { 33 34 const size_t kMaxBatchReadCapacity = 256 * 1024; 35 36 // A view over a Channel::Message object. The write queue uses these since 37 // large messages may need to be sent in chunks. 38 class MessageView { 39 public: 40 // Owns |message|. |offset| indexes the first unsent byte in the message. 41 MessageView(Channel::MessagePtr message, size_t offset) 42 : message_(std::move(message)), 43 offset_(offset), 44 handles_(message_->TakeHandlesForTransport()) { 45 DCHECK_GT(message_->data_num_bytes(), offset_); 46 } 47 48 MessageView(MessageView&& other) { *this = std::move(other); } 49 50 MessageView& operator=(MessageView&& other) { 51 message_ = std::move(other.message_); 52 offset_ = other.offset_; 53 handles_ = std::move(other.handles_); 54 return *this; 55 } 56 57 ~MessageView() {} 58 59 const void* data() const { 60 return static_cast<const char*>(message_->data()) + offset_; 61 } 62 63 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } 64 65 size_t data_offset() const { return offset_; } 66 void advance_data_offset(size_t num_bytes) { 67 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); 68 offset_ += num_bytes; 69 } 70 71 ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); } 72 Channel::MessagePtr TakeMessage() { return std::move(message_); } 73 74 void SetHandles(ScopedPlatformHandleVectorPtr handles) { 75 handles_ = std::move(handles); 76 } 77 78 private: 79 Channel::MessagePtr message_; 80 size_t offset_; 81 ScopedPlatformHandleVectorPtr handles_; 82 83 DISALLOW_COPY_AND_ASSIGN(MessageView); 84 }; 85 86 class ChannelPosix : public Channel, 87 public base::MessageLoop::DestructionObserver, 88 public base::MessageLoopForIO::Watcher { 89 public: 90 ChannelPosix(Delegate* delegate, 91 ConnectionParams connection_params, 92 scoped_refptr<base::TaskRunner> io_task_runner) 93 : Channel(delegate), 94 self_(this), 95 handle_(connection_params.TakeChannelHandle()), 96 io_task_runner_(io_task_runner) 97 #if defined(OS_MACOSX) 98 , 99 handles_to_close_(new PlatformHandleVector) 100 #endif 101 { 102 CHECK(handle_.is_valid()); 103 } 104 105 void Start() override { 106 if (io_task_runner_->RunsTasksOnCurrentThread()) { 107 StartOnIOThread(); 108 } else { 109 io_task_runner_->PostTask( 110 FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this)); 111 } 112 } 113 114 void ShutDownImpl() override { 115 // Always shut down asynchronously when called through the public interface. 116 io_task_runner_->PostTask( 117 FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this)); 118 } 119 120 void Write(MessagePtr message) override { 121 bool write_error = false; 122 { 123 base::AutoLock lock(write_lock_); 124 if (reject_writes_) 125 return; 126 if (outgoing_messages_.empty()) { 127 if (!WriteNoLock(MessageView(std::move(message), 0))) 128 reject_writes_ = write_error = true; 129 } else { 130 outgoing_messages_.emplace_back(std::move(message), 0); 131 } 132 } 133 if (write_error) { 134 // Do not synchronously invoke OnError(). Write() may have been called by 135 // the delegate and we don't want to re-enter it. 136 io_task_runner_->PostTask(FROM_HERE, 137 base::Bind(&ChannelPosix::OnError, this)); 138 } 139 } 140 141 void LeakHandle() override { 142 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 143 leak_handle_ = true; 144 } 145 146 bool GetReadPlatformHandles( 147 size_t num_handles, 148 const void* extra_header, 149 size_t extra_header_size, 150 ScopedPlatformHandleVectorPtr* handles) override { 151 if (num_handles > std::numeric_limits<uint16_t>::max()) 152 return false; 153 #if defined(OS_MACOSX) && !defined(OS_IOS) 154 // On OSX, we can have mach ports which are located in the extra header 155 // section. 156 using MachPortsEntry = Channel::Message::MachPortsEntry; 157 using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader; 158 CHECK(extra_header_size >= 159 sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)); 160 const MachPortsExtraHeader* mach_ports_header = 161 reinterpret_cast<const MachPortsExtraHeader*>(extra_header); 162 size_t num_mach_ports = mach_ports_header->num_ports; 163 CHECK(num_mach_ports <= num_handles); 164 if (incoming_platform_handles_.size() + num_mach_ports < num_handles) { 165 handles->reset(); 166 return true; 167 } 168 169 handles->reset(new PlatformHandleVector(num_handles)); 170 const MachPortsEntry* mach_ports = mach_ports_header->entries; 171 for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) { 172 if (mach_port_index < num_mach_ports && 173 mach_ports[mach_port_index].index == i) { 174 (*handles)->at(i) = PlatformHandle( 175 static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port)); 176 CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH); 177 // These are actually just Mach port names until they're resolved from 178 // the remote process. 179 (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME; 180 mach_port_index++; 181 } else { 182 CHECK(!incoming_platform_handles_.empty()); 183 (*handles)->at(i) = incoming_platform_handles_.front(); 184 incoming_platform_handles_.pop_front(); 185 } 186 } 187 #else 188 if (incoming_platform_handles_.size() < num_handles) { 189 handles->reset(); 190 return true; 191 } 192 193 handles->reset(new PlatformHandleVector(num_handles)); 194 for (size_t i = 0; i < num_handles; ++i) { 195 (*handles)->at(i) = incoming_platform_handles_.front(); 196 incoming_platform_handles_.pop_front(); 197 } 198 #endif 199 200 return true; 201 } 202 203 private: 204 ~ChannelPosix() override { 205 DCHECK(!read_watcher_); 206 DCHECK(!write_watcher_); 207 for (auto handle : incoming_platform_handles_) 208 handle.CloseIfNecessary(); 209 } 210 211 void StartOnIOThread() { 212 DCHECK(!read_watcher_); 213 DCHECK(!write_watcher_); 214 read_watcher_.reset( 215 new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE)); 216 base::MessageLoop::current()->AddDestructionObserver(this); 217 if (handle_.get().needs_connection) { 218 base::MessageLoopForIO::current()->WatchFileDescriptor( 219 handle_.get().handle, false /* persistent */, 220 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); 221 } else { 222 write_watcher_.reset( 223 new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE)); 224 base::MessageLoopForIO::current()->WatchFileDescriptor( 225 handle_.get().handle, true /* persistent */, 226 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); 227 base::AutoLock lock(write_lock_); 228 FlushOutgoingMessagesNoLock(); 229 } 230 } 231 232 void WaitForWriteOnIOThread() { 233 base::AutoLock lock(write_lock_); 234 WaitForWriteOnIOThreadNoLock(); 235 } 236 237 void WaitForWriteOnIOThreadNoLock() { 238 if (pending_write_) 239 return; 240 if (!write_watcher_) 241 return; 242 if (io_task_runner_->RunsTasksOnCurrentThread()) { 243 pending_write_ = true; 244 base::MessageLoopForIO::current()->WatchFileDescriptor( 245 handle_.get().handle, false /* persistent */, 246 base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this); 247 } else { 248 io_task_runner_->PostTask( 249 FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this)); 250 } 251 } 252 253 void ShutDownOnIOThread() { 254 base::MessageLoop::current()->RemoveDestructionObserver(this); 255 256 read_watcher_.reset(); 257 write_watcher_.reset(); 258 if (leak_handle_) 259 ignore_result(handle_.release()); 260 handle_.reset(); 261 #if defined(OS_MACOSX) 262 handles_to_close_.reset(); 263 #endif 264 265 // May destroy the |this| if it was the last reference. 266 self_ = nullptr; 267 } 268 269 // base::MessageLoop::DestructionObserver: 270 void WillDestroyCurrentMessageLoop() override { 271 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 272 if (self_) 273 ShutDownOnIOThread(); 274 } 275 276 // base::MessageLoopForIO::Watcher: 277 void OnFileCanReadWithoutBlocking(int fd) override { 278 CHECK_EQ(fd, handle_.get().handle); 279 if (handle_.get().needs_connection) { 280 #if !defined(OS_NACL) 281 read_watcher_.reset(); 282 base::MessageLoop::current()->RemoveDestructionObserver(this); 283 284 ScopedPlatformHandle accept_fd; 285 ServerAcceptConnection(handle_.get(), &accept_fd); 286 if (!accept_fd.is_valid()) { 287 OnError(); 288 return; 289 } 290 handle_ = std::move(accept_fd); 291 StartOnIOThread(); 292 #else 293 NOTREACHED(); 294 #endif 295 return; 296 } 297 298 bool read_error = false; 299 size_t next_read_size = 0; 300 size_t buffer_capacity = 0; 301 size_t total_bytes_read = 0; 302 size_t bytes_read = 0; 303 do { 304 buffer_capacity = next_read_size; 305 char* buffer = GetReadBuffer(&buffer_capacity); 306 DCHECK_GT(buffer_capacity, 0u); 307 308 ssize_t read_result = PlatformChannelRecvmsg( 309 handle_.get(), 310 buffer, 311 buffer_capacity, 312 &incoming_platform_handles_); 313 314 if (read_result > 0) { 315 bytes_read = static_cast<size_t>(read_result); 316 total_bytes_read += bytes_read; 317 if (!OnReadComplete(bytes_read, &next_read_size)) { 318 read_error = true; 319 break; 320 } 321 } else if (read_result == 0 || 322 (errno != EAGAIN && errno != EWOULDBLOCK)) { 323 read_error = true; 324 break; 325 } 326 } while (bytes_read == buffer_capacity && 327 total_bytes_read < kMaxBatchReadCapacity && 328 next_read_size > 0); 329 if (read_error) { 330 // Stop receiving read notifications. 331 read_watcher_.reset(); 332 333 OnError(); 334 } 335 } 336 337 void OnFileCanWriteWithoutBlocking(int fd) override { 338 bool write_error = false; 339 { 340 base::AutoLock lock(write_lock_); 341 pending_write_ = false; 342 if (!FlushOutgoingMessagesNoLock()) 343 reject_writes_ = write_error = true; 344 } 345 if (write_error) 346 OnError(); 347 } 348 349 // Attempts to write a message directly to the channel. If the full message 350 // cannot be written, it's queued and a wait is initiated to write the message 351 // ASAP on the I/O thread. 352 bool WriteNoLock(MessageView message_view) { 353 if (handle_.get().needs_connection) { 354 outgoing_messages_.emplace_front(std::move(message_view)); 355 return true; 356 } 357 size_t bytes_written = 0; 358 do { 359 message_view.advance_data_offset(bytes_written); 360 361 ssize_t result; 362 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); 363 if (handles && handles->size()) { 364 iovec iov = { 365 const_cast<void*>(message_view.data()), 366 message_view.data_num_bytes() 367 }; 368 // TODO: Handle lots of handles. 369 result = PlatformChannelSendmsgWithHandles( 370 handle_.get(), &iov, 1, handles->data(), handles->size()); 371 if (result >= 0) { 372 #if defined(OS_MACOSX) 373 // There is a bug on OSX which makes it dangerous to close 374 // a file descriptor while it is in transit. So instead we 375 // store the file descriptor in a set and send a message to 376 // the recipient, which is queued AFTER the message that 377 // sent the FD. The recipient will reply to the message, 378 // letting us know that it is now safe to close the file 379 // descriptor. For more information, see: 380 // http://crbug.com/298276 381 std::vector<int> fds; 382 for (auto& handle : *handles) 383 fds.push_back(handle.handle); 384 { 385 base::AutoLock l(handles_to_close_lock_); 386 for (auto& handle : *handles) 387 handles_to_close_->push_back(handle); 388 } 389 MessagePtr fds_message( 390 new Channel::Message(sizeof(fds[0]) * fds.size(), 0, 391 Message::MessageType::HANDLES_SENT)); 392 memcpy(fds_message->mutable_payload(), fds.data(), 393 sizeof(fds[0]) * fds.size()); 394 outgoing_messages_.emplace_back(std::move(fds_message), 0); 395 handles->clear(); 396 #else 397 handles.reset(); 398 #endif // defined(OS_MACOSX) 399 } 400 } else { 401 result = PlatformChannelWrite(handle_.get(), message_view.data(), 402 message_view.data_num_bytes()); 403 } 404 405 if (result < 0) { 406 if (errno != EAGAIN && errno != EWOULDBLOCK 407 #if defined(OS_MACOSX) 408 // On OS X if sendmsg() is trying to send fds between processes and 409 // there isn't enough room in the output buffer to send the fd 410 // structure over atomically then EMSGSIZE is returned. 411 // 412 // EMSGSIZE presents a problem since the system APIs can only call 413 // us when there's room in the socket buffer and not when there is 414 // "enough" room. 415 // 416 // The current behavior is to return to the event loop when EMSGSIZE 417 // is received and hopefull service another FD. This is however 418 // still technically a busy wait since the event loop will call us 419 // right back until the receiver has read enough data to allow 420 // passing the FD over atomically. 421 && errno != EMSGSIZE 422 #endif 423 ) { 424 return false; 425 } 426 message_view.SetHandles(std::move(handles)); 427 outgoing_messages_.emplace_front(std::move(message_view)); 428 WaitForWriteOnIOThreadNoLock(); 429 return true; 430 } 431 432 bytes_written = static_cast<size_t>(result); 433 } while (bytes_written < message_view.data_num_bytes()); 434 435 return FlushOutgoingMessagesNoLock(); 436 } 437 438 bool FlushOutgoingMessagesNoLock() { 439 std::deque<MessageView> messages; 440 std::swap(outgoing_messages_, messages); 441 442 while (!messages.empty()) { 443 if (!WriteNoLock(std::move(messages.front()))) 444 return false; 445 446 messages.pop_front(); 447 if (!outgoing_messages_.empty()) { 448 // The message was requeued by WriteNoLock(), so we have to wait for 449 // pipe to become writable again. Repopulate the message queue and exit. 450 // If sending the message triggered any control messages, they may be 451 // in |outgoing_messages_| in addition to or instead of the message 452 // being sent. 453 std::swap(messages, outgoing_messages_); 454 while (!messages.empty()) { 455 outgoing_messages_.push_front(std::move(messages.back())); 456 messages.pop_back(); 457 } 458 return true; 459 } 460 } 461 462 return true; 463 } 464 465 #if defined(OS_MACOSX) 466 bool OnControlMessage(Message::MessageType message_type, 467 const void* payload, 468 size_t payload_size, 469 ScopedPlatformHandleVectorPtr handles) override { 470 switch (message_type) { 471 case Message::MessageType::HANDLES_SENT: { 472 if (payload_size == 0) 473 break; 474 MessagePtr message(new Channel::Message( 475 payload_size, 0, Message::MessageType::HANDLES_SENT_ACK)); 476 memcpy(message->mutable_payload(), payload, payload_size); 477 Write(std::move(message)); 478 return true; 479 } 480 481 case Message::MessageType::HANDLES_SENT_ACK: { 482 size_t num_fds = payload_size / sizeof(int); 483 if (num_fds == 0 || payload_size % sizeof(int) != 0) 484 break; 485 486 const int* fds = reinterpret_cast<const int*>(payload); 487 if (!CloseHandles(fds, num_fds)) 488 break; 489 return true; 490 } 491 492 default: 493 break; 494 } 495 496 return false; 497 } 498 499 // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if 500 // |fds| does not match a sequence of handles in |handles_to_close_|. 501 bool CloseHandles(const int* fds, size_t num_fds) { 502 base::AutoLock l(handles_to_close_lock_); 503 if (!num_fds) 504 return false; 505 506 auto start = 507 std::find_if(handles_to_close_->begin(), handles_to_close_->end(), 508 [&fds](const PlatformHandle& handle) { 509 return handle.handle == fds[0]; 510 }); 511 if (start == handles_to_close_->end()) 512 return false; 513 514 auto it = start; 515 size_t i = 0; 516 // The FDs in the message should match a sequence of handles in 517 // |handles_to_close_|. 518 for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) { 519 if (it->handle != fds[i]) 520 return false; 521 522 it->CloseIfNecessary(); 523 } 524 if (i != num_fds) 525 return false; 526 527 handles_to_close_->erase(start, it); 528 return true; 529 } 530 #endif // defined(OS_MACOSX) 531 532 // Keeps the Channel alive at least until explicit shutdown on the IO thread. 533 scoped_refptr<Channel> self_; 534 535 ScopedPlatformHandle handle_; 536 scoped_refptr<base::TaskRunner> io_task_runner_; 537 538 // These watchers must only be accessed on the IO thread. 539 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 540 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 541 542 std::deque<PlatformHandle> incoming_platform_handles_; 543 544 // Protects |pending_write_| and |outgoing_messages_|. 545 base::Lock write_lock_; 546 bool pending_write_ = false; 547 bool reject_writes_ = false; 548 std::deque<MessageView> outgoing_messages_; 549 550 bool leak_handle_ = false; 551 552 #if defined(OS_MACOSX) 553 base::Lock handles_to_close_lock_; 554 ScopedPlatformHandleVectorPtr handles_to_close_; 555 #endif 556 557 DISALLOW_COPY_AND_ASSIGN(ChannelPosix); 558 }; 559 560 } // namespace 561 562 // static 563 scoped_refptr<Channel> Channel::Create( 564 Delegate* delegate, 565 ConnectionParams connection_params, 566 scoped_refptr<base::TaskRunner> io_task_runner) { 567 return new ChannelPosix(delegate, std::move(connection_params), 568 io_task_runner); 569 } 570 571 } // namespace edk 572 } // namespace mojo 573