1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/system/raw_channel.h" 6 7 #include <errno.h> 8 #include <string.h> 9 #include <unistd.h> 10 11 #include <algorithm> 12 #include <deque> 13 #include <vector> 14 15 #include "base/basictypes.h" 16 #include "base/bind.h" 17 #include "base/compiler_specific.h" 18 #include "base/location.h" 19 #include "base/logging.h" 20 #include "base/memory/scoped_ptr.h" 21 #include "base/memory/weak_ptr.h" 22 #include "base/message_loop/message_loop.h" 23 #include "base/posix/eintr_wrapper.h" 24 #include "base/synchronization/lock.h" 25 #include "mojo/system/message_in_transit.h" 26 #include "mojo/system/platform_channel_handle.h" 27 28 namespace mojo { 29 namespace system { 30 31 namespace { 32 33 const size_t kReadSize = 4096; 34 35 class RawChannelPosix : public RawChannel, 36 public base::MessageLoopForIO::Watcher { 37 public: 38 RawChannelPosix(const PlatformChannelHandle& handle, 39 Delegate* delegate, 40 base::MessageLoop* message_loop); 41 virtual ~RawChannelPosix(); 42 43 // |RawChannel| implementation: 44 virtual bool Init() OVERRIDE; 45 virtual void Shutdown() OVERRIDE; 46 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; 47 48 private: 49 // |base::MessageLoopForIO::Watcher| implementation: 50 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 51 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 52 53 // Watches for |fd_| to become writable. Must be called on the I/O thread. 54 void WaitToWrite(); 55 56 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O 57 // thread WITHOUT |write_lock_| held. 58 void CallOnFatalError(Delegate::FatalError fatal_error); 59 60 // Writes the message at the front of |write_message_queue_|, starting at 61 // |write_message_offset_|. It removes and destroys if the write completes and 62 // otherwise updates |write_message_offset_|. Returns true on success. Must be 63 // called under |write_lock_|. 64 bool WriteFrontMessageNoLock(); 65 66 // Cancels all pending writes and destroys the contents of 67 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets 68 // |is_dead_| to true. Must be called under |write_lock_|. 69 void CancelPendingWritesNoLock(); 70 71 base::MessageLoopForIO* message_loop_for_io() { 72 return static_cast<base::MessageLoopForIO*>(message_loop()); 73 } 74 75 int fd_; 76 77 // Only used on the I/O thread: 78 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 79 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 80 81 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 82 // is always aligned with a message boundary (we will copy memory to ensure 83 // this), but |read_buffer_| may be larger than the actual number of bytes we 84 // have. 85 std::vector<char> read_buffer_; 86 size_t read_buffer_num_valid_bytes_; 87 88 base::Lock write_lock_; // Protects the following members. 89 bool is_dead_; 90 std::deque<MessageInTransit*> write_message_queue_; 91 size_t write_message_offset_; 92 // This is used for posting tasks from write threads to the I/O thread. It 93 // must only be accessed under |write_lock_|. The weak pointers it produces 94 // are only used/invalidated on the I/O thread. 95 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 96 97 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 98 }; 99 100 RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, 101 Delegate* delegate, 102 base::MessageLoop* message_loop) 103 : RawChannel(delegate, message_loop), 104 fd_(handle.fd), 105 read_buffer_num_valid_bytes_(0), 106 is_dead_(false), 107 write_message_offset_(0), 108 weak_ptr_factory_(this) { 109 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); 110 DCHECK_NE(fd_, -1); 111 } 112 113 RawChannelPosix::~RawChannelPosix() { 114 DCHECK(is_dead_); 115 DCHECK_EQ(fd_, -1); 116 117 // No need to take the |write_lock_| here -- if there are still weak pointers 118 // outstanding, then we're hosed anyway (since we wouldn't be able to 119 // invalidate them cleanly, since we might not be on the I/O thread). 120 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 121 122 // These must have been shut down/destroyed on the I/O thread. 123 DCHECK(!read_watcher_.get()); 124 DCHECK(!write_watcher_.get()); 125 } 126 127 bool RawChannelPosix::Init() { 128 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 129 130 DCHECK(!read_watcher_.get()); 131 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 132 DCHECK(!write_watcher_.get()); 133 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 134 135 // No need to take the lock. No one should be using us yet. 136 DCHECK(write_message_queue_.empty()); 137 138 if (!message_loop_for_io()->WatchFileDescriptor(fd_, true, 139 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 140 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 141 // (in the sense of returning the message loop's state to what it was before 142 // it was called). 143 read_watcher_.reset(); 144 write_watcher_.reset(); 145 return false; 146 } 147 148 return true; 149 } 150 151 void RawChannelPosix::Shutdown() { 152 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 153 154 base::AutoLock locker(write_lock_); 155 if (!is_dead_) 156 CancelPendingWritesNoLock(); 157 158 DCHECK_NE(fd_, -1); 159 if (close(fd_) != 0) 160 PLOG(ERROR) << "close"; 161 fd_ = -1; 162 163 weak_ptr_factory_.InvalidateWeakPtrs(); 164 165 read_watcher_.reset(); // This will stop watching (if necessary). 166 write_watcher_.reset(); // This will stop watching (if necessary). 167 } 168 169 // Reminder: This must be thread-safe, and takes ownership of |message| on 170 // success. 171 bool RawChannelPosix::WriteMessage(MessageInTransit* message) { 172 base::AutoLock locker(write_lock_); 173 if (is_dead_) { 174 message->Destroy(); 175 return false; 176 } 177 178 if (!write_message_queue_.empty()) { 179 write_message_queue_.push_back(message); 180 return true; 181 } 182 183 write_message_queue_.push_front(message); 184 DCHECK_EQ(write_message_offset_, 0u); 185 bool result = WriteFrontMessageNoLock(); 186 DCHECK(result || write_message_queue_.empty()); 187 188 if (!result) { 189 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 190 // nested context. 191 message_loop()->PostTask(FROM_HERE, 192 base::Bind(&RawChannelPosix::CallOnFatalError, 193 weak_ptr_factory_.GetWeakPtr(), 194 Delegate::FATAL_ERROR_FAILED_WRITE)); 195 } else if (!write_message_queue_.empty()) { 196 // Set up to wait for the FD to become writable. If we're not on the I/O 197 // thread, we have to post a task to do this. 198 if (base::MessageLoop::current() == message_loop()) { 199 WaitToWrite(); 200 } else { 201 message_loop()->PostTask(FROM_HERE, 202 base::Bind(&RawChannelPosix::WaitToWrite, 203 weak_ptr_factory_.GetWeakPtr())); 204 } 205 } 206 207 return result; 208 } 209 210 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 211 DCHECK_EQ(fd, fd_); 212 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 213 214 bool did_dispatch_message = false; 215 // Tracks the offset of the first undispatched message in |read_buffer_|. 216 // Currently, we copy data to ensure that this is zero at the beginning. 217 size_t read_buffer_start = 0; 218 for (;;) { 219 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) 220 < kReadSize) { 221 // Use power-of-2 buffer sizes. 222 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 223 // maximum message size to whatever extent necessary). 224 // TODO(vtl): We may often be able to peek at the header and get the real 225 // required extra space (which may be much bigger than |kReadSize|). 226 size_t new_size = std::max(read_buffer_.size(), kReadSize); 227 while (new_size < 228 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) 229 new_size *= 2; 230 231 // TODO(vtl): It's suboptimal to zero out the fresh memory. 232 read_buffer_.resize(new_size, 0); 233 } 234 235 ssize_t bytes_read = HANDLE_EINTR( 236 read(fd_, 237 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], 238 kReadSize)); 239 if (bytes_read < 0) { 240 if (errno != EAGAIN && errno != EWOULDBLOCK) { 241 PLOG(ERROR) << "read"; 242 { 243 base::AutoLock locker(write_lock_); 244 CancelPendingWritesNoLock(); 245 } 246 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 247 return; 248 } 249 250 break; 251 } 252 253 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); 254 255 // Dispatch all the messages that we can. 256 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { 257 const MessageInTransit* message = 258 reinterpret_cast<const MessageInTransit*>( 259 &read_buffer_[read_buffer_start]); 260 DCHECK_EQ(reinterpret_cast<size_t>(message) % 261 MessageInTransit::kMessageAlignment, 0u); 262 // If we have the header, not the whole message.... 263 if (read_buffer_num_valid_bytes_ < 264 message->size_with_header_and_padding()) 265 break; 266 267 // Dispatch the message. 268 delegate()->OnReadMessage(*message); 269 if (!read_watcher_.get()) { 270 // |Shutdown()| was called in |OnReadMessage()|. 271 // TODO(vtl): Add test for this case. 272 return; 273 } 274 did_dispatch_message = true; 275 276 // Update our state. 277 read_buffer_start += message->size_with_header_and_padding(); 278 read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); 279 } 280 281 // If we dispatched any messages, stop reading for now (and let the message 282 // loop do its thing for another round). 283 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 284 // a single message. Risks: slower, more complex if we want to avoid lots of 285 // copying. ii. Keep reading until there's no more data and dispatch all the 286 // messages we can. Risks: starvation of other users of the message loop.) 287 if (did_dispatch_message) 288 break; 289 290 // If we didn't max out |kReadSize|, stop reading for now. 291 if (static_cast<size_t>(bytes_read) < kReadSize) 292 break; 293 294 // Else try to read some more.... 295 } 296 297 // Move data back to start. 298 if (read_buffer_start > 0) { 299 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], 300 read_buffer_num_valid_bytes_); 301 read_buffer_start = 0; 302 } 303 } 304 305 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 306 DCHECK_EQ(fd, fd_); 307 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 308 309 bool did_fail = false; 310 { 311 base::AutoLock locker(write_lock_); 312 DCHECK(!is_dead_); 313 DCHECK(!write_message_queue_.empty()); 314 315 bool result = WriteFrontMessageNoLock(); 316 DCHECK(result || write_message_queue_.empty()); 317 318 if (!result) 319 did_fail = true; 320 else if (!write_message_queue_.empty()) 321 WaitToWrite(); 322 } 323 if (did_fail) 324 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 325 } 326 327 void RawChannelPosix::WaitToWrite() { 328 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 329 330 DCHECK(write_watcher_.get()); 331 bool result = message_loop_for_io()->WatchFileDescriptor( 332 fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), 333 this); 334 DCHECK(result); 335 } 336 337 void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { 338 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 339 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 340 delegate()->OnFatalError(fatal_error); 341 } 342 343 bool RawChannelPosix::WriteFrontMessageNoLock() { 344 write_lock_.AssertAcquired(); 345 346 DCHECK(!is_dead_); 347 DCHECK(!write_message_queue_.empty()); 348 349 MessageInTransit* message = write_message_queue_.front(); 350 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); 351 size_t bytes_to_write = 352 message->size_with_header_and_padding() - write_message_offset_; 353 ssize_t bytes_written = HANDLE_EINTR( 354 write(fd_, 355 reinterpret_cast<char*>(message) + write_message_offset_, 356 bytes_to_write)); 357 if (bytes_written < 0) { 358 if (errno != EAGAIN && errno != EWOULDBLOCK) { 359 PLOG(ERROR) << "write of size " << bytes_to_write; 360 CancelPendingWritesNoLock(); 361 return false; 362 } 363 364 // We simply failed to write since we'd block. The logic is the same as if 365 // we got a partial write. 366 bytes_written = 0; 367 } 368 369 DCHECK_GE(bytes_written, 0); 370 if (static_cast<size_t>(bytes_written) < bytes_to_write) { 371 // Partial (or no) write. 372 write_message_offset_ += static_cast<size_t>(bytes_written); 373 } else { 374 // Complete write. 375 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); 376 write_message_queue_.pop_front(); 377 write_message_offset_ = 0; 378 message->Destroy(); 379 } 380 381 return true; 382 } 383 384 void RawChannelPosix::CancelPendingWritesNoLock() { 385 write_lock_.AssertAcquired(); 386 DCHECK(!is_dead_); 387 388 is_dead_ = true; 389 for (std::deque<MessageInTransit*>::iterator it = 390 write_message_queue_.begin(); it != write_message_queue_.end(); 391 ++it) { 392 (*it)->Destroy(); 393 } 394 write_message_queue_.clear(); 395 } 396 397 } // namespace 398 399 // ----------------------------------------------------------------------------- 400 401 // Static factory method declared in raw_channel.h. 402 // static 403 RawChannel* RawChannel::Create(const PlatformChannelHandle& handle, 404 Delegate* delegate, 405 base::MessageLoop* message_loop) { 406 return new RawChannelPosix(handle, delegate, message_loop); 407 } 408 409 } // namespace system 410 } // namespace mojo 411