1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "nacl_io/ossocket.h" 6 #ifdef PROVIDES_SOCKET_API 7 8 #include <assert.h> 9 #include <errno.h> 10 #include <string.h> 11 #include <algorithm> 12 13 #include "nacl_io/kernel_handle.h" 14 #include "nacl_io/pepper_interface.h" 15 #include "nacl_io/socket/tcp_node.h" 16 #include "nacl_io/stream/stream_fs.h" 17 18 namespace { 19 const size_t kMaxPacketSize = 65536; 20 const size_t kDefaultFifoSize = kMaxPacketSize * 8; 21 } 22 23 namespace nacl_io { 24 25 class TcpWork : public StreamFs::Work { 26 public: 27 explicit TcpWork(const ScopedTcpEventEmitter& emitter) 28 : StreamFs::Work(emitter->stream()->stream()), 29 emitter_(emitter), 30 data_(NULL) {} 31 32 ~TcpWork() { delete[] data_; } 33 34 TCPSocketInterface* TCPInterface() { 35 return filesystem()->ppapi()->GetTCPSocketInterface(); 36 } 37 38 protected: 39 ScopedTcpEventEmitter emitter_; 40 char* data_; 41 }; 42 43 class TcpSendWork : public TcpWork { 44 public: 45 explicit TcpSendWork(const ScopedTcpEventEmitter& emitter, 46 const ScopedSocketNode& stream) 47 : TcpWork(emitter), node_(stream) {} 48 49 virtual bool Start(int32_t val) { 50 AUTO_LOCK(emitter_->GetLock()); 51 52 // Does the stream exist, and can it send? 53 if (!node_->TestStreamFlags(SSF_CAN_SEND)) 54 return false; 55 56 // Check if we are already sending. 57 if (node_->TestStreamFlags(SSF_SENDING)) 58 return false; 59 60 size_t tx_data_avail = emitter_->BytesInOutputFIFO(); 61 int capped_len = std::min(tx_data_avail, kMaxPacketSize); 62 if (capped_len == 0) 63 return false; 64 65 data_ = new char[capped_len]; 66 emitter_->ReadOut_Locked(data_, capped_len); 67 68 int err = TCPInterface()->Write(node_->socket_resource(), 69 data_, 70 capped_len, 71 filesystem()->GetRunCompletion(this)); 72 73 if (err != PP_OK_COMPLETIONPENDING) { 74 // Anything else, we should assume the socket has gone bad. 75 node_->SetError_Locked(err); 76 return false; 77 } 78 79 node_->SetStreamFlags(SSF_SENDING); 80 return true; 81 } 82 83 virtual void Run(int32_t length_error) { 84 AUTO_LOCK(emitter_->GetLock()); 85 86 if (length_error < 0) { 87 // Send failed, mark the socket as bad 88 node_->SetError_Locked(length_error); 89 return; 90 } 91 92 // If we did send, then Q more work. 93 node_->ClearStreamFlags(SSF_SENDING); 94 node_->QueueOutput(); 95 } 96 97 private: 98 // We assume that transmits will always complete. If the upstream 99 // actually back pressures, enough to prevent the Send callback 100 // from triggering, this resource may never go away. 101 ScopedSocketNode node_; 102 }; 103 104 class TcpRecvWork : public TcpWork { 105 public: 106 explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter) 107 : TcpWork(emitter) {} 108 109 virtual bool Start(int32_t val) { 110 AUTO_LOCK(emitter_->GetLock()); 111 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream()); 112 113 // Does the stream exist, and can it recv? 114 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) 115 return false; 116 117 // If we are not currently receiving 118 if (stream->TestStreamFlags(SSF_RECVING)) 119 return false; 120 121 size_t rx_space_avail = emitter_->SpaceInInputFIFO(); 122 int capped_len = 123 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); 124 125 if (capped_len == 0) 126 return false; 127 128 data_ = new char[capped_len]; 129 int err = TCPInterface()->Read(stream->socket_resource(), 130 data_, 131 capped_len, 132 filesystem()->GetRunCompletion(this)); 133 if (err != PP_OK_COMPLETIONPENDING) { 134 // Anything else, we should assume the socket has gone bad. 135 stream->SetError_Locked(err); 136 return false; 137 } 138 139 stream->SetStreamFlags(SSF_RECVING); 140 return true; 141 } 142 143 virtual void Run(int32_t length_error) { 144 AUTO_LOCK(emitter_->GetLock()); 145 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream()); 146 147 if (!stream) 148 return; 149 150 if (length_error <= 0) { 151 stream->SetError_Locked(length_error); 152 return; 153 } 154 155 // If we successfully received, queue more input 156 emitter_->WriteIn_Locked(data_, length_error); 157 stream->ClearStreamFlags(SSF_RECVING); 158 stream->QueueInput(); 159 } 160 }; 161 162 class TCPAcceptWork : public StreamFs::Work { 163 public: 164 explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter) 165 : StreamFs::Work(stream), emitter_(emitter) {} 166 167 TCPSocketInterface* TCPInterface() { 168 return filesystem()->ppapi()->GetTCPSocketInterface(); 169 } 170 171 virtual bool Start(int32_t val) { 172 AUTO_LOCK(emitter_->GetLock()); 173 TcpNode* node = static_cast<TcpNode*>(emitter_->stream()); 174 175 // Does the stream exist, and can it accept? 176 if (NULL == node) 177 return false; 178 179 // If we are not currently accepting 180 if (!node->TestStreamFlags(SSF_LISTENING)) 181 return false; 182 183 int err = TCPInterface()->Accept(node->socket_resource(), 184 &new_socket_, 185 filesystem()->GetRunCompletion(this)); 186 187 if (err != PP_OK_COMPLETIONPENDING) { 188 // Anything else, we should assume the socket has gone bad. 189 node->SetError_Locked(err); 190 return false; 191 } 192 193 return true; 194 } 195 196 virtual void Run(int32_t error) { 197 AUTO_LOCK(emitter_->GetLock()); 198 TcpNode* node = static_cast<TcpNode*>(emitter_->stream()); 199 200 if (node == NULL) 201 return; 202 203 if (error != PP_OK) { 204 node->SetError_Locked(error); 205 return; 206 } 207 208 emitter_->SetAcceptedSocket_Locked(new_socket_); 209 } 210 211 protected: 212 PP_Resource new_socket_; 213 ScopedTcpEventEmitter emitter_; 214 }; 215 216 class TCPConnectWork : public StreamFs::Work { 217 public: 218 explicit TCPConnectWork(StreamFs* stream, 219 const ScopedTcpEventEmitter& emitter) 220 : StreamFs::Work(stream), emitter_(emitter) {} 221 222 TCPSocketInterface* TCPInterface() { 223 return filesystem()->ppapi()->GetTCPSocketInterface(); 224 } 225 226 virtual bool Start(int32_t val) { 227 AUTO_LOCK(emitter_->GetLock()); 228 TcpNode* node = static_cast<TcpNode*>(emitter_->stream()); 229 230 // Does the stream exist, and can it connect? 231 if (NULL == node) 232 return false; 233 234 int err = TCPInterface()->Connect(node->socket_resource(), 235 node->remote_addr(), 236 filesystem()->GetRunCompletion(this)); 237 if (err != PP_OK_COMPLETIONPENDING) { 238 // Anything else, we should assume the socket has gone bad. 239 node->SetError_Locked(err); 240 return false; 241 } 242 243 return true; 244 } 245 246 virtual void Run(int32_t error) { 247 AUTO_LOCK(emitter_->GetLock()); 248 TcpNode* node = static_cast<TcpNode*>(emitter_->stream()); 249 250 if (node == NULL) 251 return; 252 253 if (error != PP_OK) { 254 node->ConnectFailed_Locked(); 255 node->SetError_Locked(error); 256 return; 257 } 258 259 node->ConnectDone_Locked(); 260 } 261 262 protected: 263 ScopedTcpEventEmitter emitter_; 264 }; 265 266 TcpNode::TcpNode(Filesystem* filesystem) 267 : SocketNode(filesystem), 268 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)), 269 tcp_nodelay_(false) { 270 emitter_->AttachStream(this); 271 } 272 273 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket) 274 : SocketNode(filesystem, socket), 275 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)), 276 tcp_nodelay_(false) { 277 emitter_->AttachStream(this); 278 } 279 280 void TcpNode::Destroy() { 281 emitter_->DetachStream(); 282 SocketNode::Destroy(); 283 } 284 285 Error TcpNode::Init(int open_flags) { 286 Error err = SocketNode::Init(open_flags); 287 if (err != 0) 288 return err; 289 290 if (TCPInterface() == NULL) 291 return EACCES; 292 293 if (socket_resource_ != 0) { 294 // TCP sockets that are contructed with an existing socket_resource_ 295 // are those that generated from calls to Accept() and therefore are 296 // already connected. 297 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_); 298 ConnectDone_Locked(); 299 } else { 300 socket_resource_ = 301 TCPInterface()->Create(filesystem_->ppapi()->GetInstance()); 302 if (0 == socket_resource_) 303 return EACCES; 304 SetStreamFlags(SSF_CAN_CONNECT); 305 } 306 307 return 0; 308 } 309 310 EventEmitter* TcpNode::GetEventEmitter() { 311 return emitter_.get(); 312 } 313 314 void TcpNode::SetError_Locked(int pp_error_num) { 315 SocketNode::SetError_Locked(pp_error_num); 316 emitter_->SetError_Locked(); 317 } 318 319 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) { 320 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) { 321 AUTO_LOCK(node_lock_); 322 int value = tcp_nodelay_; 323 socklen_t value_len = static_cast<socklen_t>(sizeof(value)); 324 int copy_bytes = std::min(value_len, *len); 325 memcpy(optval, &value, copy_bytes); 326 *len = value_len; 327 return 0; 328 } 329 330 return SocketNode::GetSockOpt(lvl, optname, optval, len); 331 } 332 333 Error TcpNode::SetNoDelay_Locked() { 334 if (!IsConnected()) 335 return 0; 336 337 int32_t error = 338 TCPInterface()->SetOption(socket_resource_, 339 PP_TCPSOCKET_OPTION_NO_DELAY, 340 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE), 341 PP_BlockUntilComplete()); 342 return PPErrorToErrno(error); 343 } 344 345 Error TcpNode::SetSockOpt(int lvl, 346 int optname, 347 const void* optval, 348 socklen_t len) { 349 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) { 350 if (static_cast<size_t>(len) < sizeof(int)) 351 return EINVAL; 352 AUTO_LOCK(node_lock_); 353 tcp_nodelay_ = *static_cast<const int*>(optval) != 0; 354 return SetNoDelay_Locked(); 355 } 356 357 return SocketNode::SetSockOpt(lvl, optname, optval, len); 358 } 359 360 void TcpNode::QueueAccept() { 361 StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_); 362 stream()->EnqueueWork(work); 363 } 364 365 void TcpNode::QueueConnect() { 366 StreamFs::Work* work = new TCPConnectWork(stream(), emitter_); 367 stream()->EnqueueWork(work); 368 } 369 370 void TcpNode::QueueInput() { 371 StreamFs::Work* work = new TcpRecvWork(emitter_); 372 stream()->EnqueueWork(work); 373 } 374 375 void TcpNode::QueueOutput() { 376 if (TestStreamFlags(SSF_SENDING)) 377 return; 378 379 if (!TestStreamFlags(SSF_CAN_SEND)) 380 return; 381 382 if (0 == emitter_->BytesInOutputFIFO()) 383 return; 384 385 StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this)); 386 stream()->EnqueueWork(work); 387 } 388 389 Error TcpNode::Accept(const HandleAttr& attr, 390 PP_Resource* out_sock, 391 struct sockaddr* addr, 392 socklen_t* len) { 393 EventListenerLock wait(GetEventEmitter()); 394 395 if (!TestStreamFlags(SSF_LISTENING)) 396 return EINVAL; 397 398 // Either block forever or not at all 399 int ms = attr.IsBlocking() ? -1 : 0; 400 401 Error err = wait.WaitOnEvent(POLLIN, ms); 402 if (ETIMEDOUT == err) 403 return EWOULDBLOCK; 404 405 int s = emitter_->GetAcceptedSocket_Locked(); 406 // Non-blocking case. 407 if (s == 0) 408 return EAGAIN; 409 410 // Consume the new socket and start listening for the next one 411 *out_sock = s; 412 emitter_->ClearEvents_Locked(POLLIN); 413 414 // Set the out paramaters 415 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock); 416 *len = ResourceToSockAddr(remote_addr, *len, addr); 417 filesystem_->ppapi()->ReleaseResource(remote_addr); 418 419 QueueAccept(); 420 return 0; 421 } 422 423 // We can not bind a client socket with PPAPI. For now we ignore the 424 // bind but report the correct address later, just in case someone is 425 // binding without really caring what the address is (for example to 426 // select a more optimized interface/route.) 427 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) { 428 AUTO_LOCK(node_lock_); 429 430 /* Only bind once. */ 431 if (IsBound()) 432 return EINVAL; 433 434 local_addr_ = SockAddrToResource(addr, len); 435 int err = TCPInterface()->Bind( 436 socket_resource_, local_addr_, PP_BlockUntilComplete()); 437 438 // If we fail, release the local addr resource 439 if (err != PP_OK) { 440 filesystem_->ppapi()->ReleaseResource(local_addr_); 441 local_addr_ = 0; 442 return PPErrorToErrno(err); 443 } 444 445 return 0; 446 } 447 448 Error TcpNode::Connect(const HandleAttr& attr, 449 const struct sockaddr* addr, 450 socklen_t len) { 451 EventListenerLock wait(GetEventEmitter()); 452 453 if (TestStreamFlags(SSF_CONNECTING)) 454 return EALREADY; 455 456 if (IsConnected()) 457 return EISCONN; 458 459 remote_addr_ = SockAddrToResource(addr, len); 460 if (0 == remote_addr_) 461 return EINVAL; 462 463 int ms = attr.IsBlocking() ? -1 : 0; 464 465 SetStreamFlags(SSF_CONNECTING); 466 QueueConnect(); 467 468 Error err = wait.WaitOnEvent(POLLOUT, ms); 469 if (ETIMEDOUT == err) 470 return EINPROGRESS; 471 472 // If we fail, release the dest addr resource 473 if (err != 0) { 474 ConnectFailed_Locked(); 475 return err; 476 } 477 478 ConnectDone_Locked(); 479 return 0; 480 } 481 482 Error TcpNode::Shutdown(int how) { 483 AUTO_LOCK(node_lock_); 484 if (!IsConnected()) 485 return ENOTCONN; 486 { 487 AUTO_LOCK(emitter_->GetLock()); 488 emitter_->SetError_Locked(); 489 } 490 return 0; 491 } 492 493 void TcpNode::ConnectDone_Locked() { 494 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); 495 496 // Now that we are connected, we can start sending and receiving. 497 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT); 498 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); 499 500 emitter_->ConnectDone_Locked(); 501 502 // The NODELAY option cannot be set in PPAPI before the socket 503 // is connected, but setsockopt() might have already set it. 504 SetNoDelay_Locked(); 505 506 // Begin the input pump 507 QueueInput(); 508 } 509 510 void TcpNode::ConnectFailed_Locked() { 511 filesystem_->ppapi()->ReleaseResource(remote_addr_); 512 remote_addr_ = 0; 513 } 514 515 Error TcpNode::Listen(int backlog) { 516 AUTO_LOCK(node_lock_); 517 if (!IsBound()) 518 return EINVAL; 519 520 int err = TCPInterface()->Listen( 521 socket_resource_, backlog, PP_BlockUntilComplete()); 522 if (err != PP_OK) 523 return PPErrorToErrno(err); 524 525 ClearStreamFlags(SSF_CAN_CONNECT); 526 SetStreamFlags(SSF_LISTENING); 527 emitter_->SetListening_Locked(); 528 QueueAccept(); 529 return 0; 530 } 531 532 Error TcpNode::Recv_Locked(void* buf, 533 size_t len, 534 PP_Resource* out_addr, 535 int* out_len) { 536 assert(emitter_.get()); 537 *out_len = emitter_->ReadIn_Locked((char*)buf, len); 538 *out_addr = remote_addr_; 539 540 // Ref the address copy we pass back. 541 filesystem_->ppapi()->AddRefResource(remote_addr_); 542 return 0; 543 } 544 545 // TCP ignores dst addr passed to send_to, and always uses bound address 546 Error TcpNode::Send_Locked(const void* buf, 547 size_t len, 548 PP_Resource, 549 int* out_len) { 550 assert(emitter_.get()); 551 if (emitter_->GetError_Locked()) 552 return EPIPE; 553 *out_len = emitter_->WriteOut_Locked((char*)buf, len); 554 return 0; 555 } 556 557 } // namespace nacl_io 558 559 #endif // PROVIDES_SOCKET_API 560