1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/raw_channel.h" 6 7 #include <windows.h> 8 9 #include "base/auto_reset.h" 10 #include "base/bind.h" 11 #include "base/compiler_specific.h" 12 #include "base/lazy_instance.h" 13 #include "base/location.h" 14 #include "base/logging.h" 15 #include "base/macros.h" 16 #include "base/memory/scoped_ptr.h" 17 #include "base/message_loop/message_loop.h" 18 #include "base/synchronization/lock.h" 19 #include "base/win/windows_version.h" 20 #include "mojo/embedder/platform_handle.h" 21 22 namespace mojo { 23 namespace system { 24 25 namespace { 26 27 class VistaOrHigherFunctions { 28 public: 29 VistaOrHigherFunctions(); 30 31 bool is_vista_or_higher() const { return is_vista_or_higher_; } 32 33 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) { 34 return set_file_completion_notification_modes_(handle, flags); 35 } 36 37 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) { 38 return cancel_io_ex_(handle, overlapped); 39 } 40 41 private: 42 typedef BOOL(WINAPI* SetFileCompletionNotificationModesFunc)(HANDLE, UCHAR); 43 typedef BOOL(WINAPI* CancelIoExFunc)(HANDLE, LPOVERLAPPED); 44 45 bool is_vista_or_higher_; 46 SetFileCompletionNotificationModesFunc 47 set_file_completion_notification_modes_; 48 CancelIoExFunc cancel_io_ex_; 49 }; 50 51 VistaOrHigherFunctions::VistaOrHigherFunctions() 52 : is_vista_or_higher_(base::win::GetVersion() >= base::win::VERSION_VISTA), 53 set_file_completion_notification_modes_(nullptr), 54 cancel_io_ex_(nullptr) { 55 if (!is_vista_or_higher_) 56 return; 57 58 HMODULE module = GetModuleHandleW(L"kernel32.dll"); 59 set_file_completion_notification_modes_ = 60 reinterpret_cast<SetFileCompletionNotificationModesFunc>( 61 GetProcAddress(module, "SetFileCompletionNotificationModes")); 62 DCHECK(set_file_completion_notification_modes_); 63 64 cancel_io_ex_ = 65 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx")); 66 DCHECK(cancel_io_ex_); 67 } 68 69 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions = 70 LAZY_INSTANCE_INITIALIZER; 71 72 class RawChannelWin : public RawChannel { 73 public: 74 RawChannelWin(embedder::ScopedPlatformHandle handle); 75 virtual ~RawChannelWin(); 76 77 // |RawChannel| public methods: 78 virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE; 79 80 private: 81 // RawChannelIOHandler receives OS notifications for I/O completion. It must 82 // be created on the I/O thread. 83 // 84 // It manages its own destruction. Destruction happens on the I/O thread when 85 // all the following conditions are satisfied: 86 // - |DetachFromOwnerNoLock()| has been called; 87 // - there is no pending read; 88 // - there is no pending write. 89 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler { 90 public: 91 RawChannelIOHandler(RawChannelWin* owner, 92 embedder::ScopedPlatformHandle handle); 93 94 HANDLE handle() const { return handle_.get().handle; } 95 96 // The following methods are only called by the owner on the I/O thread. 97 bool pending_read() const; 98 base::MessageLoopForIO::IOContext* read_context(); 99 // Instructs the object to wait for an |OnIOCompleted()| notification. 100 void OnPendingReadStarted(); 101 102 // The following methods are only called by the owner under 103 // |owner_->write_lock()|. 104 bool pending_write_no_lock() const; 105 base::MessageLoopForIO::IOContext* write_context_no_lock(); 106 // Instructs the object to wait for an |OnIOCompleted()| notification. 107 void OnPendingWriteStartedNoLock(); 108 109 // |base::MessageLoopForIO::IOHandler| implementation: 110 // Must be called on the I/O thread. It could be called before or after 111 // detached from the owner. 112 virtual void OnIOCompleted(base::MessageLoopForIO::IOContext* context, 113 DWORD bytes_transferred, 114 DWORD error) OVERRIDE; 115 116 // Must be called on the I/O thread under |owner_->write_lock()|. 117 // After this call, the owner must not make any further calls on this 118 // object, and therefore the object is used on the I/O thread exclusively 119 // (if it stays alive). 120 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer, 121 scoped_ptr<WriteBuffer> write_buffer); 122 123 private: 124 virtual ~RawChannelIOHandler(); 125 126 // Returns true if |owner_| has been reset and there is not pending read or 127 // write. 128 // Must be called on the I/O thread. 129 bool ShouldSelfDestruct() const; 130 131 // Must be called on the I/O thread. It may be called before or after 132 // detaching from the owner. 133 void OnReadCompleted(DWORD bytes_read, DWORD error); 134 // Must be called on the I/O thread. It may be called before or after 135 // detaching from the owner. 136 void OnWriteCompleted(DWORD bytes_written, DWORD error); 137 138 embedder::ScopedPlatformHandle handle_; 139 140 // |owner_| is reset on the I/O thread under |owner_->write_lock()|. 141 // Therefore, it may be used on any thread under lock; or on the I/O thread 142 // without locking. 143 RawChannelWin* owner_; 144 145 // The following members must be used on the I/O thread. 146 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_; 147 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_; 148 bool suppress_self_destruct_; 149 150 bool pending_read_; 151 base::MessageLoopForIO::IOContext read_context_; 152 153 // The following members must be used under |owner_->write_lock()| while the 154 // object is still attached to the owner, and only on the I/O thread 155 // afterwards. 156 bool pending_write_; 157 base::MessageLoopForIO::IOContext write_context_; 158 159 DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler); 160 }; 161 162 // |RawChannel| private methods: 163 virtual IOResult Read(size_t* bytes_read) OVERRIDE; 164 virtual IOResult ScheduleRead() OVERRIDE; 165 virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 166 size_t num_platform_handles, 167 const void* platform_handle_table) OVERRIDE; 168 virtual IOResult WriteNoLock(size_t* platform_handles_written, 169 size_t* bytes_written) OVERRIDE; 170 virtual IOResult ScheduleWriteNoLock() OVERRIDE; 171 virtual bool OnInit() OVERRIDE; 172 virtual void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, 173 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; 174 175 // Passed to |io_handler_| during initialization. 176 embedder::ScopedPlatformHandle handle_; 177 178 RawChannelIOHandler* io_handler_; 179 180 const bool skip_completion_port_on_success_; 181 182 DISALLOW_COPY_AND_ASSIGN(RawChannelWin); 183 }; 184 185 RawChannelWin::RawChannelIOHandler::RawChannelIOHandler( 186 RawChannelWin* owner, 187 embedder::ScopedPlatformHandle handle) 188 : handle_(handle.Pass()), 189 owner_(owner), 190 suppress_self_destruct_(false), 191 pending_read_(false), 192 pending_write_(false) { 193 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); 194 read_context_.handler = this; 195 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); 196 write_context_.handler = this; 197 198 owner_->message_loop_for_io()->RegisterIOHandler(handle_.get().handle, this); 199 } 200 201 RawChannelWin::RawChannelIOHandler::~RawChannelIOHandler() { 202 DCHECK(ShouldSelfDestruct()); 203 } 204 205 bool RawChannelWin::RawChannelIOHandler::pending_read() const { 206 DCHECK(owner_); 207 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 208 return pending_read_; 209 } 210 211 base::MessageLoopForIO::IOContext* 212 RawChannelWin::RawChannelIOHandler::read_context() { 213 DCHECK(owner_); 214 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 215 return &read_context_; 216 } 217 218 void RawChannelWin::RawChannelIOHandler::OnPendingReadStarted() { 219 DCHECK(owner_); 220 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 221 DCHECK(!pending_read_); 222 pending_read_ = true; 223 } 224 225 bool RawChannelWin::RawChannelIOHandler::pending_write_no_lock() const { 226 DCHECK(owner_); 227 owner_->write_lock().AssertAcquired(); 228 return pending_write_; 229 } 230 231 base::MessageLoopForIO::IOContext* 232 RawChannelWin::RawChannelIOHandler::write_context_no_lock() { 233 DCHECK(owner_); 234 owner_->write_lock().AssertAcquired(); 235 return &write_context_; 236 } 237 238 void RawChannelWin::RawChannelIOHandler::OnPendingWriteStartedNoLock() { 239 DCHECK(owner_); 240 owner_->write_lock().AssertAcquired(); 241 DCHECK(!pending_write_); 242 pending_write_ = true; 243 } 244 245 void RawChannelWin::RawChannelIOHandler::OnIOCompleted( 246 base::MessageLoopForIO::IOContext* context, 247 DWORD bytes_transferred, 248 DWORD error) { 249 DCHECK(!owner_ || 250 base::MessageLoop::current() == owner_->message_loop_for_io()); 251 252 { 253 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case they 254 // result in a call to |Shutdown()|). 255 base::AutoReset<bool> resetter(&suppress_self_destruct_, true); 256 257 if (context == &read_context_) 258 OnReadCompleted(bytes_transferred, error); 259 else if (context == &write_context_) 260 OnWriteCompleted(bytes_transferred, error); 261 else 262 NOTREACHED(); 263 } 264 265 if (ShouldSelfDestruct()) 266 delete this; 267 } 268 269 void RawChannelWin::RawChannelIOHandler::DetachFromOwnerNoLock( 270 scoped_ptr<ReadBuffer> read_buffer, 271 scoped_ptr<WriteBuffer> write_buffer) { 272 DCHECK(owner_); 273 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 274 owner_->write_lock().AssertAcquired(); 275 276 // If read/write is pending, we have to retain the corresponding buffer. 277 if (pending_read_) 278 preserved_read_buffer_after_detach_ = read_buffer.Pass(); 279 if (pending_write_) 280 preserved_write_buffer_after_detach_ = write_buffer.Pass(); 281 282 owner_ = nullptr; 283 if (ShouldSelfDestruct()) 284 delete this; 285 } 286 287 bool RawChannelWin::RawChannelIOHandler::ShouldSelfDestruct() const { 288 if (owner_ || suppress_self_destruct_) 289 return false; 290 291 // Note: Detached, hence no lock needed for |pending_write_|. 292 return !pending_read_ && !pending_write_; 293 } 294 295 void RawChannelWin::RawChannelIOHandler::OnReadCompleted(DWORD bytes_read, 296 DWORD error) { 297 DCHECK(!owner_ || 298 base::MessageLoop::current() == owner_->message_loop_for_io()); 299 DCHECK(suppress_self_destruct_); 300 301 CHECK(pending_read_); 302 pending_read_ = false; 303 if (!owner_) 304 return; 305 306 if (error == ERROR_SUCCESS) { 307 DCHECK_GT(bytes_read, 0u); 308 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); 309 } else if (error == ERROR_BROKEN_PIPE) { 310 DCHECK_EQ(bytes_read, 0u); 311 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0); 312 } else { 313 DCHECK_EQ(bytes_read, 0u); 314 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); 315 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0); 316 } 317 } 318 319 void RawChannelWin::RawChannelIOHandler::OnWriteCompleted(DWORD bytes_written, 320 DWORD error) { 321 DCHECK(!owner_ || 322 base::MessageLoop::current() == owner_->message_loop_for_io()); 323 DCHECK(suppress_self_destruct_); 324 325 if (!owner_) { 326 // No lock needed. 327 CHECK(pending_write_); 328 pending_write_ = false; 329 return; 330 } 331 332 { 333 base::AutoLock locker(owner_->write_lock()); 334 CHECK(pending_write_); 335 pending_write_ = false; 336 } 337 338 if (error == ERROR_SUCCESS) { 339 owner_->OnWriteCompleted(IO_SUCCEEDED, 0, bytes_written); 340 } else if (error == ERROR_BROKEN_PIPE) { 341 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0); 342 } else { 343 LOG(WARNING) << "WriteFile: " << logging::SystemErrorCodeToString(error); 344 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); 345 } 346 } 347 348 RawChannelWin::RawChannelWin(embedder::ScopedPlatformHandle handle) 349 : handle_(handle.Pass()), 350 io_handler_(nullptr), 351 skip_completion_port_on_success_( 352 g_vista_or_higher_functions.Get().is_vista_or_higher()) { 353 DCHECK(handle_.is_valid()); 354 } 355 356 RawChannelWin::~RawChannelWin() { 357 DCHECK(!io_handler_); 358 } 359 360 size_t RawChannelWin::GetSerializedPlatformHandleSize() const { 361 // TODO(vtl): Implement. 362 return 0; 363 } 364 365 RawChannel::IOResult RawChannelWin::Read(size_t* bytes_read) { 366 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 367 DCHECK(io_handler_); 368 DCHECK(!io_handler_->pending_read()); 369 370 char* buffer = nullptr; 371 size_t bytes_to_read = 0; 372 read_buffer()->GetBuffer(&buffer, &bytes_to_read); 373 374 DWORD bytes_read_dword = 0; 375 BOOL result = ReadFile(io_handler_->handle(), 376 buffer, 377 static_cast<DWORD>(bytes_to_read), 378 &bytes_read_dword, 379 &io_handler_->read_context()->overlapped); 380 if (!result) { 381 DCHECK_EQ(bytes_read_dword, 0u); 382 DWORD error = GetLastError(); 383 if (error == ERROR_BROKEN_PIPE) 384 return IO_FAILED_SHUTDOWN; 385 if (error != ERROR_IO_PENDING) { 386 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); 387 return IO_FAILED_UNKNOWN; 388 } 389 } 390 391 if (result && skip_completion_port_on_success_) { 392 *bytes_read = bytes_read_dword; 393 return IO_SUCCEEDED; 394 } 395 396 // If the read is pending or the read has succeeded but we don't skip 397 // completion port on success, instruct |io_handler_| to wait for the 398 // completion packet. 399 // 400 // TODO(yzshen): It seems there isn't document saying that all error cases 401 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion 402 // packet. If we do get one for errors, |RawChannelIOHandler::OnIOCompleted()| 403 // will crash so we will learn about it. 404 405 io_handler_->OnPendingReadStarted(); 406 return IO_PENDING; 407 } 408 409 RawChannel::IOResult RawChannelWin::ScheduleRead() { 410 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 411 DCHECK(io_handler_); 412 DCHECK(!io_handler_->pending_read()); 413 414 size_t bytes_read = 0; 415 IOResult io_result = Read(&bytes_read); 416 if (io_result == IO_SUCCEEDED) { 417 DCHECK(skip_completion_port_on_success_); 418 419 // We have finished reading successfully. Queue a notification manually. 420 io_handler_->OnPendingReadStarted(); 421 // |io_handler_| won't go away before the task is run, so it is safe to use 422 // |base::Unretained()|. 423 message_loop_for_io()->PostTask( 424 FROM_HERE, 425 base::Bind(&RawChannelIOHandler::OnIOCompleted, 426 base::Unretained(io_handler_), 427 base::Unretained(io_handler_->read_context()), 428 static_cast<DWORD>(bytes_read), 429 ERROR_SUCCESS)); 430 return IO_PENDING; 431 } 432 433 return io_result; 434 } 435 436 embedder::ScopedPlatformHandleVectorPtr RawChannelWin::GetReadPlatformHandles( 437 size_t num_platform_handles, 438 const void* platform_handle_table) { 439 // TODO(vtl): Implement. 440 NOTIMPLEMENTED(); 441 return embedder::ScopedPlatformHandleVectorPtr(); 442 } 443 444 RawChannel::IOResult RawChannelWin::WriteNoLock( 445 size_t* platform_handles_written, 446 size_t* bytes_written) { 447 write_lock().AssertAcquired(); 448 449 DCHECK(io_handler_); 450 DCHECK(!io_handler_->pending_write_no_lock()); 451 452 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { 453 // TODO(vtl): Implement. 454 NOTIMPLEMENTED(); 455 } 456 457 std::vector<WriteBuffer::Buffer> buffers; 458 write_buffer_no_lock()->GetBuffers(&buffers); 459 DCHECK(!buffers.empty()); 460 461 // TODO(yzshen): Handle multi-segment writes more efficiently. 462 DWORD bytes_written_dword = 0; 463 BOOL result = WriteFile(io_handler_->handle(), 464 buffers[0].addr, 465 static_cast<DWORD>(buffers[0].size), 466 &bytes_written_dword, 467 &io_handler_->write_context_no_lock()->overlapped); 468 if (!result) { 469 DWORD error = GetLastError(); 470 if (error == ERROR_BROKEN_PIPE) 471 return IO_FAILED_SHUTDOWN; 472 if (error != ERROR_IO_PENDING) { 473 LOG(WARNING) << "WriteFile: " << logging::SystemErrorCodeToString(error); 474 return IO_FAILED_UNKNOWN; 475 } 476 } 477 478 if (result && skip_completion_port_on_success_) { 479 *platform_handles_written = 0; 480 *bytes_written = bytes_written_dword; 481 return IO_SUCCEEDED; 482 } 483 484 // If the write is pending or the write has succeeded but we don't skip 485 // completion port on success, instruct |io_handler_| to wait for the 486 // completion packet. 487 // 488 // TODO(yzshen): it seems there isn't document saying that all error cases 489 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion 490 // packet. If we do get one for errors, |RawChannelIOHandler::OnIOCompleted()| 491 // will crash so we will learn about it. 492 493 io_handler_->OnPendingWriteStartedNoLock(); 494 return IO_PENDING; 495 } 496 497 RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() { 498 write_lock().AssertAcquired(); 499 500 DCHECK(io_handler_); 501 DCHECK(!io_handler_->pending_write_no_lock()); 502 503 // TODO(vtl): Do something with |platform_handles_written|. 504 size_t platform_handles_written = 0; 505 size_t bytes_written = 0; 506 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 507 if (io_result == IO_SUCCEEDED) { 508 DCHECK(skip_completion_port_on_success_); 509 510 // We have finished writing successfully. Queue a notification manually. 511 io_handler_->OnPendingWriteStartedNoLock(); 512 // |io_handler_| won't go away before that task is run, so it is safe to use 513 // |base::Unretained()|. 514 message_loop_for_io()->PostTask( 515 FROM_HERE, 516 base::Bind(&RawChannelIOHandler::OnIOCompleted, 517 base::Unretained(io_handler_), 518 base::Unretained(io_handler_->write_context_no_lock()), 519 static_cast<DWORD>(bytes_written), 520 ERROR_SUCCESS)); 521 return IO_PENDING; 522 } 523 524 return io_result; 525 } 526 527 bool RawChannelWin::OnInit() { 528 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 529 530 DCHECK(handle_.is_valid()); 531 if (skip_completion_port_on_success_ && 532 !g_vista_or_higher_functions.Get().SetFileCompletionNotificationModes( 533 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) { 534 return false; 535 } 536 537 DCHECK(!io_handler_); 538 io_handler_ = new RawChannelIOHandler(this, handle_.Pass()); 539 540 return true; 541 } 542 543 void RawChannelWin::OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, 544 scoped_ptr<WriteBuffer> write_buffer) { 545 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 546 DCHECK(io_handler_); 547 548 write_lock().AssertAcquired(); 549 550 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) { 551 // |io_handler_| will be alive until pending read/write (if any) completes. 552 // Call |CancelIoEx()| or |CancelIo()| so that resources can be freed up as 553 // soon as possible. 554 // Note: |CancelIo()| only cancels read/write requests started from this 555 // thread. 556 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) { 557 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), 558 nullptr); 559 } else { 560 CancelIo(io_handler_->handle()); 561 } 562 } 563 564 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass()); 565 io_handler_ = nullptr; 566 } 567 568 } // namespace 569 570 // ----------------------------------------------------------------------------- 571 572 // Static factory method declared in raw_channel.h. 573 // static 574 scoped_ptr<RawChannel> RawChannel::Create( 575 embedder::ScopedPlatformHandle handle) { 576 return scoped_ptr<RawChannel>(new RawChannelWin(handle.Pass())); 577 } 578 579 } // namespace system 580 } // namespace mojo 581