1 // Copyright 2017 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/core/channel.h" 6 7 #include <lib/fdio/limits.h> 8 #include <lib/fdio/util.h> 9 #include <lib/zx/channel.h> 10 #include <lib/zx/handle.h> 11 #include <zircon/processargs.h> 12 #include <zircon/status.h> 13 #include <zircon/syscalls.h> 14 #include <algorithm> 15 16 #include "base/bind.h" 17 #include "base/containers/circular_deque.h" 18 #include "base/files/scoped_file.h" 19 #include "base/fuchsia/fuchsia_logging.h" 20 #include "base/location.h" 21 #include "base/macros.h" 22 #include "base/memory/ref_counted.h" 23 #include "base/message_loop/message_loop_current.h" 24 #include "base/message_loop/message_pump_for_io.h" 25 #include "base/stl_util.h" 26 #include "base/synchronization/lock.h" 27 #include "base/task_runner.h" 28 #include "mojo/core/platform_handle_in_transit.h" 29 30 namespace mojo { 31 namespace core { 32 33 namespace { 34 35 const size_t kMaxBatchReadCapacity = 256 * 1024; 36 37 bool UnwrapPlatformHandle(PlatformHandleInTransit handle, 38 Channel::Message::HandleInfoEntry* info_out, 39 std::vector<PlatformHandleInTransit>* handles_out) { 40 DCHECK(handle.handle().is_valid()); 41 42 if (!handle.handle().is_valid_fd()) { 43 *info_out = {0u, 0u}; 44 handles_out->emplace_back(std::move(handle)); 45 return true; 46 } 47 48 // Each FDIO file descriptor is implemented using one or more native resources 49 // and can be un-wrapped into a set of |handle| and |info| pairs, with |info| 50 // consisting of an FDIO-defined type & arguments (see zircon/processargs.h). 51 // 52 // We try to transfer the FD, but if that fails (for example if the file has 53 // already been dup()d into another FD) we may need to clone. 54 zx_handle_t handles[FDIO_MAX_HANDLES] = {}; 55 uint32_t info[FDIO_MAX_HANDLES] = {}; 56 zx_status_t result = 57 fdio_transfer_fd(handle.handle().GetFD().get(), 0, handles, info); 58 if (result > 0) { 59 // On success, the fd in |handle| has been transferred and is no longer 60 // valid. Release from the PlatformHandle to avoid close()ing an invalid 61 // an invalid handle. 62 handle.CompleteTransit(); 63 } else if (result == ZX_ERR_UNAVAILABLE) { 64 // No luck, try cloning instead. 65 result = fdio_clone_fd(handle.handle().GetFD().get(), 0, handles, info); 66 } 67 68 if (result <= 0) { 69 ZX_DLOG(ERROR, result) << "fdio_transfer_fd(" 70 << handle.handle().GetFD().get() << ")"; 71 return false; 72 } 73 DCHECK_LE(result, FDIO_MAX_HANDLES); 74 75 // We assume here that only the |PA_HND_TYPE| of the |info| really matters, 76 // and that that is the same for all the underlying handles. 77 *info_out = {PA_HND_TYPE(info[0]), result}; 78 for (int i = 0; i < result; ++i) { 79 DCHECK_EQ(PA_HND_TYPE(info[0]), PA_HND_TYPE(info[i])); 80 DCHECK_EQ(0u, PA_HND_SUBTYPE(info[i])); 81 handles_out->emplace_back( 82 PlatformHandleInTransit(PlatformHandle(zx::handle(handles[i])))); 83 } 84 85 return true; 86 } 87 88 PlatformHandle WrapPlatformHandles(Channel::Message::HandleInfoEntry info, 89 base::circular_deque<zx::handle>* handles) { 90 PlatformHandle out_handle; 91 if (!info.type) { 92 out_handle = PlatformHandle(std::move(handles->front())); 93 handles->pop_front(); 94 } else { 95 if (info.count > FDIO_MAX_HANDLES) 96 return PlatformHandle(); 97 98 // Fetch the required number of handles from |handles| and set up type info. 99 zx_handle_t fd_handles[FDIO_MAX_HANDLES] = {}; 100 uint32_t fd_infos[FDIO_MAX_HANDLES] = {}; 101 for (int i = 0; i < info.count; ++i) { 102 fd_handles[i] = (*handles)[i].get(); 103 fd_infos[i] = PA_HND(info.type, 0); 104 } 105 106 // Try to wrap the handles into an FDIO file descriptor. 107 base::ScopedFD out_fd; 108 zx_status_t result = 109 fdio_create_fd(fd_handles, fd_infos, info.count, out_fd.receive()); 110 if (result != ZX_OK) { 111 ZX_DLOG(ERROR, result) << "fdio_create_fd"; 112 return PlatformHandle(); 113 } 114 115 // The handles are owned by FDIO now, so |release()| them before removing 116 // the entries from |handles|. 117 for (int i = 0; i < info.count; ++i) { 118 ignore_result(handles->front().release()); 119 handles->pop_front(); 120 } 121 122 out_handle = PlatformHandle(std::move(out_fd)); 123 } 124 return out_handle; 125 } 126 127 // A view over a Channel::Message object. The write queue uses these since 128 // large messages may need to be sent in chunks. 129 class MessageView { 130 public: 131 // Owns |message|. |offset| indexes the first unsent byte in the message. 132 MessageView(Channel::MessagePtr message, size_t offset) 133 : message_(std::move(message)), 134 offset_(offset), 135 handles_(message_->TakeHandlesForTransport()) { 136 DCHECK_GT(message_->data_num_bytes(), offset_); 137 } 138 139 MessageView(MessageView&& other) { *this = std::move(other); } 140 141 MessageView& operator=(MessageView&& other) { 142 message_ = std::move(other.message_); 143 offset_ = other.offset_; 144 handles_ = std::move(other.handles_); 145 return *this; 146 } 147 148 ~MessageView() {} 149 150 const void* data() const { 151 return static_cast<const char*>(message_->data()) + offset_; 152 } 153 154 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } 155 156 size_t data_offset() const { return offset_; } 157 void advance_data_offset(size_t num_bytes) { 158 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); 159 offset_ += num_bytes; 160 } 161 162 std::vector<PlatformHandleInTransit> TakeHandles() { 163 if (handles_.empty()) 164 return std::vector<PlatformHandleInTransit>(); 165 166 // We can only pass Fuchsia handles via IPC, so unwrap any FDIO file- 167 // descriptors in |handles_| into the underlying handles, and serialize the 168 // metadata, if any, into the extra header. 169 auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>( 170 message_->mutable_extra_header()); 171 memset(handles_info, 0, message_->extra_header_size()); 172 173 std::vector<PlatformHandleInTransit> in_handles = std::move(handles_); 174 handles_.reserve(in_handles.size()); 175 for (size_t i = 0; i < in_handles.size(); i++) { 176 if (!UnwrapPlatformHandle(std::move(in_handles[i]), &handles_info[i], 177 &handles_)) 178 return std::vector<PlatformHandleInTransit>(); 179 } 180 return std::move(handles_); 181 } 182 183 private: 184 Channel::MessagePtr message_; 185 size_t offset_; 186 std::vector<PlatformHandleInTransit> handles_; 187 188 DISALLOW_COPY_AND_ASSIGN(MessageView); 189 }; 190 191 class ChannelFuchsia : public Channel, 192 public base::MessageLoopCurrent::DestructionObserver, 193 public base::MessagePumpForIO::ZxHandleWatcher { 194 public: 195 ChannelFuchsia(Delegate* delegate, 196 ConnectionParams connection_params, 197 scoped_refptr<base::TaskRunner> io_task_runner) 198 : Channel(delegate), 199 self_(this), 200 handle_( 201 connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()), 202 io_task_runner_(io_task_runner) { 203 CHECK(handle_.is_valid()); 204 } 205 206 void Start() override { 207 if (io_task_runner_->RunsTasksInCurrentSequence()) { 208 StartOnIOThread(); 209 } else { 210 io_task_runner_->PostTask( 211 FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this)); 212 } 213 } 214 215 void ShutDownImpl() override { 216 // Always shut down asynchronously when called through the public interface. 217 io_task_runner_->PostTask( 218 FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this)); 219 } 220 221 void Write(MessagePtr message) override { 222 bool write_error = false; 223 { 224 base::AutoLock lock(write_lock_); 225 if (reject_writes_) 226 return; 227 if (!WriteNoLock(MessageView(std::move(message), 0))) 228 reject_writes_ = write_error = true; 229 } 230 if (write_error) { 231 // Do not synchronously invoke OnWriteError(). Write() may have been 232 // called by the delegate and we don't want to re-enter it. 233 io_task_runner_->PostTask( 234 FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this, 235 Error::kDisconnected)); 236 } 237 } 238 239 void LeakHandle() override { 240 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 241 leak_handle_ = true; 242 } 243 244 bool GetReadPlatformHandles(const void* payload, 245 size_t payload_size, 246 size_t num_handles, 247 const void* extra_header, 248 size_t extra_header_size, 249 std::vector<PlatformHandle>* handles, 250 bool* deferred) override { 251 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 252 if (num_handles > std::numeric_limits<uint16_t>::max()) 253 return false; 254 255 // Locate the handle info and verify there is enough of it. 256 if (!extra_header) 257 return false; 258 const auto* handles_info = 259 reinterpret_cast<const Channel::Message::HandleInfoEntry*>( 260 extra_header); 261 size_t handles_info_size = sizeof(handles_info[0]) * num_handles; 262 if (handles_info_size > extra_header_size) 263 return false; 264 265 // Some caller-supplied handles may be FDIO file-descriptors, which were 266 // un-wrapped to more than one native platform resource handle for transfer. 267 // We may therefore need to expect more than |num_handles| handles to have 268 // been accumulated in |incoming_handles_|, based on the handle info. 269 size_t num_raw_handles = 0u; 270 for (size_t i = 0; i < num_handles; ++i) 271 num_raw_handles += handles_info[i].type ? handles_info[i].count : 1; 272 273 // If there are too few handles then we're not ready yet, so return true 274 // indicating things are OK, but leave |handles| empty. 275 if (incoming_handles_.size() < num_raw_handles) 276 return true; 277 278 handles->reserve(num_handles); 279 for (size_t i = 0; i < num_handles; ++i) { 280 handles->emplace_back( 281 WrapPlatformHandles(handles_info[i], &incoming_handles_)); 282 } 283 return true; 284 } 285 286 private: 287 ~ChannelFuchsia() override { DCHECK(!read_watch_); } 288 289 void StartOnIOThread() { 290 DCHECK(!read_watch_); 291 292 base::MessageLoopCurrent::Get()->AddDestructionObserver(this); 293 294 read_watch_.reset( 295 new base::MessagePumpForIO::ZxHandleWatchController(FROM_HERE)); 296 base::MessageLoopCurrentForIO::Get()->WatchZxHandle( 297 handle_.get(), true /* persistent */, 298 ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this); 299 } 300 301 void ShutDownOnIOThread() { 302 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); 303 304 read_watch_.reset(); 305 if (leak_handle_) 306 ignore_result(handle_.release()); 307 handle_.reset(); 308 309 // May destroy the |this| if it was the last reference. 310 self_ = nullptr; 311 } 312 313 // base::MessageLoopCurrent::DestructionObserver: 314 void WillDestroyCurrentMessageLoop() override { 315 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 316 if (self_) 317 ShutDownOnIOThread(); 318 } 319 320 // base::MessagePumpForIO::ZxHandleWatcher: 321 void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override { 322 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 323 CHECK_EQ(handle, handle_.get()); 324 DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals); 325 326 // We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since 327 // the peer may have closed while messages were still unread, in the pipe. 328 329 bool validation_error = false; 330 bool read_error = false; 331 size_t next_read_size = 0; 332 size_t buffer_capacity = 0; 333 size_t total_bytes_read = 0; 334 do { 335 buffer_capacity = next_read_size; 336 char* buffer = GetReadBuffer(&buffer_capacity); 337 DCHECK_GT(buffer_capacity, 0u); 338 339 uint32_t bytes_read = 0; 340 uint32_t handles_read = 0; 341 zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {}; 342 343 zx_status_t read_result = 344 handle_.read(0, buffer, buffer_capacity, &bytes_read, handles, 345 base::size(handles), &handles_read); 346 if (read_result == ZX_OK) { 347 for (size_t i = 0; i < handles_read; ++i) { 348 incoming_handles_.emplace_back(handles[i]); 349 } 350 total_bytes_read += bytes_read; 351 if (!OnReadComplete(bytes_read, &next_read_size)) { 352 read_error = true; 353 validation_error = true; 354 break; 355 } 356 } else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) { 357 DCHECK_LE(handles_read, base::size(handles)); 358 next_read_size = bytes_read; 359 } else if (read_result == ZX_ERR_SHOULD_WAIT) { 360 break; 361 } else { 362 ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result) 363 << "zx_channel_read"; 364 read_error = true; 365 break; 366 } 367 } while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0); 368 if (read_error) { 369 // Stop receiving read notifications. 370 read_watch_.reset(); 371 if (validation_error) 372 OnError(Error::kReceivedMalformedData); 373 else 374 OnError(Error::kDisconnected); 375 } 376 } 377 378 // Attempts to write a message directly to the channel. If the full message 379 // cannot be written, it's queued and a wait is initiated to write the message 380 // ASAP on the I/O thread. 381 bool WriteNoLock(MessageView message_view) { 382 uint32_t write_bytes = 0; 383 do { 384 message_view.advance_data_offset(write_bytes); 385 386 std::vector<PlatformHandleInTransit> outgoing_handles = 387 message_view.TakeHandles(); 388 zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {}; 389 size_t handles_count = outgoing_handles.size(); 390 391 DCHECK_LE(handles_count, base::size(handles)); 392 for (size_t i = 0; i < handles_count; ++i) { 393 DCHECK(outgoing_handles[i].handle().is_valid()); 394 handles[i] = outgoing_handles[i].handle().GetHandle().get(); 395 } 396 397 write_bytes = std::min(message_view.data_num_bytes(), 398 static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES)); 399 zx_status_t result = handle_.write(0, message_view.data(), write_bytes, 400 handles, handles_count); 401 // zx_channel_write() consumes |handles| whether or not it succeeds, so 402 // release() our copies now, to avoid them being double-closed. 403 for (auto& outgoing_handle : outgoing_handles) 404 outgoing_handle.CompleteTransit(); 405 406 if (result != ZX_OK) { 407 // TODO(fuchsia): Handle ZX_ERR_SHOULD_WAIT flow-control errors, once 408 // the platform starts generating them. See https://crbug.com/754084. 409 ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result) 410 << "WriteNoLock(zx_channel_write)"; 411 return false; 412 } 413 414 } while (write_bytes < message_view.data_num_bytes()); 415 416 return true; 417 } 418 419 void OnWriteError(Error error) { 420 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); 421 DCHECK(reject_writes_); 422 423 if (error == Error::kDisconnected) { 424 // If we can't write because the pipe is disconnected then continue 425 // reading to fetch any in-flight messages, relying on end-of-stream to 426 // signal the actual disconnection. 427 if (read_watch_) { 428 // TODO: When we add flow-control for writes, we also need to reset the 429 // write-watcher here. 430 return; 431 } 432 } 433 434 OnError(error); 435 } 436 437 // Keeps the Channel alive at least until explicit shutdown on the IO thread. 438 scoped_refptr<Channel> self_; 439 440 zx::channel handle_; 441 scoped_refptr<base::TaskRunner> io_task_runner_; 442 443 // These members are only used on the IO thread. 444 std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_; 445 base::circular_deque<zx::handle> incoming_handles_; 446 bool leak_handle_ = false; 447 448 base::Lock write_lock_; 449 bool reject_writes_ = false; 450 451 DISALLOW_COPY_AND_ASSIGN(ChannelFuchsia); 452 }; 453 454 } // namespace 455 456 // static 457 scoped_refptr<Channel> Channel::Create( 458 Delegate* delegate, 459 ConnectionParams connection_params, 460 scoped_refptr<base::TaskRunner> io_task_runner) { 461 return new ChannelFuchsia(delegate, std::move(connection_params), 462 std::move(io_task_runner)); 463 } 464 465 } // namespace core 466 } // namespace mojo 467