Home | History | Annotate | Download | only in socket
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "nacl_io/ossocket.h"
      6 #ifdef PROVIDES_SOCKET_API
      7 
      8 #include <assert.h>
      9 #include <errno.h>
     10 #include <string.h>
     11 #include <algorithm>
     12 
     13 #include "nacl_io/kernel_handle.h"
     14 #include "nacl_io/pepper_interface.h"
     15 #include "nacl_io/socket/tcp_node.h"
     16 #include "nacl_io/stream/stream_fs.h"
     17 
     18 namespace {
     19 const size_t kMaxPacketSize = 65536;
     20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
     21 }
     22 
     23 namespace nacl_io {
     24 
     25 class TcpWork : public StreamFs::Work {
     26  public:
     27   explicit TcpWork(const ScopedTcpEventEmitter& emitter)
     28       : StreamFs::Work(emitter->stream()->stream()),
     29         emitter_(emitter),
     30         data_(NULL) {}
     31 
     32   ~TcpWork() { delete[] data_; }
     33 
     34   TCPSocketInterface* TCPInterface() {
     35     return filesystem()->ppapi()->GetTCPSocketInterface();
     36   }
     37 
     38  protected:
     39   ScopedTcpEventEmitter emitter_;
     40   char* data_;
     41 };
     42 
     43 class TcpSendWork : public TcpWork {
     44  public:
     45   explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
     46                        const ScopedSocketNode& stream)
     47       : TcpWork(emitter), node_(stream) {}
     48 
     49   virtual bool Start(int32_t val) {
     50     AUTO_LOCK(emitter_->GetLock());
     51 
     52     // Does the stream exist, and can it send?
     53     if (!node_->TestStreamFlags(SSF_CAN_SEND))
     54       return false;
     55 
     56     // Check if we are already sending.
     57     if (node_->TestStreamFlags(SSF_SENDING))
     58       return false;
     59 
     60     size_t tx_data_avail = emitter_->BytesInOutputFIFO();
     61     int capped_len = std::min(tx_data_avail, kMaxPacketSize);
     62     if (capped_len == 0)
     63       return false;
     64 
     65     data_ = new char[capped_len];
     66     emitter_->ReadOut_Locked(data_, capped_len);
     67 
     68     int err = TCPInterface()->Write(node_->socket_resource(),
     69                                     data_,
     70                                     capped_len,
     71                                     filesystem()->GetRunCompletion(this));
     72 
     73     if (err != PP_OK_COMPLETIONPENDING) {
     74       // Anything else, we should assume the socket has gone bad.
     75       node_->SetError_Locked(err);
     76       return false;
     77     }
     78 
     79     node_->SetStreamFlags(SSF_SENDING);
     80     return true;
     81   }
     82 
     83   virtual void Run(int32_t length_error) {
     84     AUTO_LOCK(emitter_->GetLock());
     85 
     86     if (length_error < 0) {
     87       // Send failed, mark the socket as bad
     88       node_->SetError_Locked(length_error);
     89       return;
     90     }
     91 
     92     // If we did send, then Q more work.
     93     node_->ClearStreamFlags(SSF_SENDING);
     94     node_->QueueOutput();
     95   }
     96 
     97  private:
     98   // We assume that transmits will always complete.  If the upstream
     99   // actually back pressures, enough to prevent the Send callback
    100   // from triggering, this resource may never go away.
    101   ScopedSocketNode node_;
    102 };
    103 
    104 class TcpRecvWork : public TcpWork {
    105  public:
    106   explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
    107       : TcpWork(emitter) {}
    108 
    109   virtual bool Start(int32_t val) {
    110     AUTO_LOCK(emitter_->GetLock());
    111     TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
    112 
    113     // Does the stream exist, and can it recv?
    114     if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
    115       return false;
    116 
    117     // If we are not currently receiving
    118     if (stream->TestStreamFlags(SSF_RECVING))
    119       return false;
    120 
    121     size_t rx_space_avail = emitter_->SpaceInInputFIFO();
    122     int capped_len =
    123         static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
    124 
    125     if (capped_len == 0)
    126       return false;
    127 
    128     data_ = new char[capped_len];
    129     int err = TCPInterface()->Read(stream->socket_resource(),
    130                                    data_,
    131                                    capped_len,
    132                                    filesystem()->GetRunCompletion(this));
    133     if (err != PP_OK_COMPLETIONPENDING) {
    134       // Anything else, we should assume the socket has gone bad.
    135       stream->SetError_Locked(err);
    136       return false;
    137     }
    138 
    139     stream->SetStreamFlags(SSF_RECVING);
    140     return true;
    141   }
    142 
    143   virtual void Run(int32_t length_error) {
    144     AUTO_LOCK(emitter_->GetLock());
    145     TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
    146 
    147     if (!stream)
    148       return;
    149 
    150     if (length_error <= 0) {
    151       stream->SetError_Locked(length_error);
    152       return;
    153     }
    154 
    155     // If we successfully received, queue more input
    156     emitter_->WriteIn_Locked(data_, length_error);
    157     stream->ClearStreamFlags(SSF_RECVING);
    158     stream->QueueInput();
    159   }
    160 };
    161 
    162 class TCPAcceptWork : public StreamFs::Work {
    163  public:
    164   explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
    165       : StreamFs::Work(stream), emitter_(emitter) {}
    166 
    167   TCPSocketInterface* TCPInterface() {
    168     return filesystem()->ppapi()->GetTCPSocketInterface();
    169   }
    170 
    171   virtual bool Start(int32_t val) {
    172     AUTO_LOCK(emitter_->GetLock());
    173     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
    174 
    175     // Does the stream exist, and can it accept?
    176     if (NULL == node)
    177       return false;
    178 
    179     // If we are not currently accepting
    180     if (!node->TestStreamFlags(SSF_LISTENING))
    181       return false;
    182 
    183     int err = TCPInterface()->Accept(node->socket_resource(),
    184                                      &new_socket_,
    185                                      filesystem()->GetRunCompletion(this));
    186 
    187     if (err != PP_OK_COMPLETIONPENDING) {
    188       // Anything else, we should assume the socket has gone bad.
    189       node->SetError_Locked(err);
    190       return false;
    191     }
    192 
    193     return true;
    194   }
    195 
    196   virtual void Run(int32_t error) {
    197     AUTO_LOCK(emitter_->GetLock());
    198     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
    199 
    200     if (node == NULL)
    201       return;
    202 
    203     if (error != PP_OK) {
    204       node->SetError_Locked(error);
    205       return;
    206     }
    207 
    208     emitter_->SetAcceptedSocket_Locked(new_socket_);
    209   }
    210 
    211  protected:
    212   PP_Resource new_socket_;
    213   ScopedTcpEventEmitter emitter_;
    214 };
    215 
    216 class TCPConnectWork : public StreamFs::Work {
    217  public:
    218   explicit TCPConnectWork(StreamFs* stream,
    219                           const ScopedTcpEventEmitter& emitter)
    220       : StreamFs::Work(stream), emitter_(emitter) {}
    221 
    222   TCPSocketInterface* TCPInterface() {
    223     return filesystem()->ppapi()->GetTCPSocketInterface();
    224   }
    225 
    226   virtual bool Start(int32_t val) {
    227     AUTO_LOCK(emitter_->GetLock());
    228     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
    229 
    230     // Does the stream exist, and can it connect?
    231     if (NULL == node)
    232       return false;
    233 
    234     int err = TCPInterface()->Connect(node->socket_resource(),
    235                                       node->remote_addr(),
    236                                       filesystem()->GetRunCompletion(this));
    237     if (err != PP_OK_COMPLETIONPENDING) {
    238       // Anything else, we should assume the socket has gone bad.
    239       node->SetError_Locked(err);
    240       return false;
    241     }
    242 
    243     return true;
    244   }
    245 
    246   virtual void Run(int32_t error) {
    247     AUTO_LOCK(emitter_->GetLock());
    248     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
    249 
    250     if (node == NULL)
    251       return;
    252 
    253     if (error != PP_OK) {
    254       node->ConnectFailed_Locked();
    255       node->SetError_Locked(error);
    256       return;
    257     }
    258 
    259     node->ConnectDone_Locked();
    260   }
    261 
    262  protected:
    263   ScopedTcpEventEmitter emitter_;
    264 };
    265 
    266 TcpNode::TcpNode(Filesystem* filesystem)
    267     : SocketNode(filesystem),
    268       emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
    269       tcp_nodelay_(false) {
    270   emitter_->AttachStream(this);
    271 }
    272 
    273 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
    274     : SocketNode(filesystem, socket),
    275       emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
    276       tcp_nodelay_(false) {
    277   emitter_->AttachStream(this);
    278 }
    279 
    280 void TcpNode::Destroy() {
    281   emitter_->DetachStream();
    282   SocketNode::Destroy();
    283 }
    284 
    285 Error TcpNode::Init(int open_flags) {
    286   Error err = SocketNode::Init(open_flags);
    287   if (err != 0)
    288     return err;
    289 
    290   if (TCPInterface() == NULL)
    291     return EACCES;
    292 
    293   if (socket_resource_ != 0) {
    294     // TCP sockets that are contructed with an existing socket_resource_
    295     // are those that generated from calls to Accept() and therefore are
    296     // already connected.
    297     remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
    298     ConnectDone_Locked();
    299   } else {
    300     socket_resource_ =
    301         TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
    302     if (0 == socket_resource_)
    303       return EACCES;
    304     SetStreamFlags(SSF_CAN_CONNECT);
    305   }
    306 
    307   return 0;
    308 }
    309 
    310 EventEmitter* TcpNode::GetEventEmitter() {
    311   return emitter_.get();
    312 }
    313 
    314 void TcpNode::SetError_Locked(int pp_error_num) {
    315   SocketNode::SetError_Locked(pp_error_num);
    316   emitter_->SetError_Locked();
    317 }
    318 
    319 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
    320   if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
    321     AUTO_LOCK(node_lock_);
    322     int value = tcp_nodelay_;
    323     socklen_t value_len = static_cast<socklen_t>(sizeof(value));
    324     int copy_bytes = std::min(value_len, *len);
    325     memcpy(optval, &value, copy_bytes);
    326     *len = value_len;
    327     return 0;
    328   }
    329 
    330   return SocketNode::GetSockOpt(lvl, optname, optval, len);
    331 }
    332 
    333 Error TcpNode::SetNoDelay_Locked() {
    334   if (!IsConnected())
    335     return 0;
    336 
    337   int32_t error =
    338       TCPInterface()->SetOption(socket_resource_,
    339                                 PP_TCPSOCKET_OPTION_NO_DELAY,
    340                                 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
    341                                 PP_BlockUntilComplete());
    342   return PPErrorToErrno(error);
    343 }
    344 
    345 Error TcpNode::SetSockOpt(int lvl,
    346                           int optname,
    347                           const void* optval,
    348                           socklen_t len) {
    349   if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
    350     if (static_cast<size_t>(len) < sizeof(int))
    351       return EINVAL;
    352     AUTO_LOCK(node_lock_);
    353     tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
    354     return SetNoDelay_Locked();
    355   }
    356 
    357   return SocketNode::SetSockOpt(lvl, optname, optval, len);
    358 }
    359 
    360 void TcpNode::QueueAccept() {
    361   StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
    362   stream()->EnqueueWork(work);
    363 }
    364 
    365 void TcpNode::QueueConnect() {
    366   StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
    367   stream()->EnqueueWork(work);
    368 }
    369 
    370 void TcpNode::QueueInput() {
    371   StreamFs::Work* work = new TcpRecvWork(emitter_);
    372   stream()->EnqueueWork(work);
    373 }
    374 
    375 void TcpNode::QueueOutput() {
    376   if (TestStreamFlags(SSF_SENDING))
    377     return;
    378 
    379   if (!TestStreamFlags(SSF_CAN_SEND))
    380     return;
    381 
    382   if (0 == emitter_->BytesInOutputFIFO())
    383     return;
    384 
    385   StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
    386   stream()->EnqueueWork(work);
    387 }
    388 
    389 Error TcpNode::Accept(const HandleAttr& attr,
    390                       PP_Resource* out_sock,
    391                       struct sockaddr* addr,
    392                       socklen_t* len) {
    393   EventListenerLock wait(GetEventEmitter());
    394 
    395   if (!TestStreamFlags(SSF_LISTENING))
    396     return EINVAL;
    397 
    398   // Either block forever or not at all
    399   int ms = attr.IsBlocking() ? -1 : 0;
    400 
    401   Error err = wait.WaitOnEvent(POLLIN, ms);
    402   if (ETIMEDOUT == err)
    403     return EWOULDBLOCK;
    404 
    405   int s = emitter_->GetAcceptedSocket_Locked();
    406   // Non-blocking case.
    407   if (s == 0)
    408     return EAGAIN;
    409 
    410   // Consume the new socket and start listening for the next one
    411   *out_sock = s;
    412   emitter_->ClearEvents_Locked(POLLIN);
    413 
    414   // Set the out paramaters
    415   PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
    416   *len = ResourceToSockAddr(remote_addr, *len, addr);
    417   filesystem_->ppapi()->ReleaseResource(remote_addr);
    418 
    419   QueueAccept();
    420   return 0;
    421 }
    422 
    423 // We can not bind a client socket with PPAPI.  For now we ignore the
    424 // bind but report the correct address later, just in case someone is
    425 // binding without really caring what the address is (for example to
    426 // select a more optimized interface/route.)
    427 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
    428   AUTO_LOCK(node_lock_);
    429 
    430   /* Only bind once. */
    431   if (IsBound())
    432     return EINVAL;
    433 
    434   local_addr_ = SockAddrToResource(addr, len);
    435   int err = TCPInterface()->Bind(
    436       socket_resource_, local_addr_, PP_BlockUntilComplete());
    437 
    438   // If we fail, release the local addr resource
    439   if (err != PP_OK) {
    440     filesystem_->ppapi()->ReleaseResource(local_addr_);
    441     local_addr_ = 0;
    442     return PPErrorToErrno(err);
    443   }
    444 
    445   return 0;
    446 }
    447 
    448 Error TcpNode::Connect(const HandleAttr& attr,
    449                        const struct sockaddr* addr,
    450                        socklen_t len) {
    451   EventListenerLock wait(GetEventEmitter());
    452 
    453   if (TestStreamFlags(SSF_CONNECTING))
    454     return EALREADY;
    455 
    456   if (IsConnected())
    457     return EISCONN;
    458 
    459   remote_addr_ = SockAddrToResource(addr, len);
    460   if (0 == remote_addr_)
    461     return EINVAL;
    462 
    463   int ms = attr.IsBlocking() ? -1 : 0;
    464 
    465   SetStreamFlags(SSF_CONNECTING);
    466   QueueConnect();
    467 
    468   Error err = wait.WaitOnEvent(POLLOUT, ms);
    469   if (ETIMEDOUT == err)
    470     return EINPROGRESS;
    471 
    472   // If we fail, release the dest addr resource
    473   if (err != 0) {
    474     ConnectFailed_Locked();
    475     return err;
    476   }
    477 
    478   ConnectDone_Locked();
    479   return 0;
    480 }
    481 
    482 Error TcpNode::Shutdown(int how) {
    483   AUTO_LOCK(node_lock_);
    484   if (!IsConnected())
    485     return ENOTCONN;
    486   {
    487     AUTO_LOCK(emitter_->GetLock());
    488     emitter_->SetError_Locked();
    489   }
    490   return 0;
    491 }
    492 
    493 void TcpNode::ConnectDone_Locked() {
    494   local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
    495 
    496   // Now that we are connected, we can start sending and receiving.
    497   ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
    498   SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
    499 
    500   emitter_->ConnectDone_Locked();
    501 
    502   // The NODELAY option cannot be set in PPAPI before the socket
    503   // is connected, but setsockopt() might have already set it.
    504   SetNoDelay_Locked();
    505 
    506   // Begin the input pump
    507   QueueInput();
    508 }
    509 
    510 void TcpNode::ConnectFailed_Locked() {
    511   filesystem_->ppapi()->ReleaseResource(remote_addr_);
    512   remote_addr_ = 0;
    513 }
    514 
    515 Error TcpNode::Listen(int backlog) {
    516   AUTO_LOCK(node_lock_);
    517   if (!IsBound())
    518     return EINVAL;
    519 
    520   int err = TCPInterface()->Listen(
    521       socket_resource_, backlog, PP_BlockUntilComplete());
    522   if (err != PP_OK)
    523     return PPErrorToErrno(err);
    524 
    525   ClearStreamFlags(SSF_CAN_CONNECT);
    526   SetStreamFlags(SSF_LISTENING);
    527   emitter_->SetListening_Locked();
    528   QueueAccept();
    529   return 0;
    530 }
    531 
    532 Error TcpNode::Recv_Locked(void* buf,
    533                            size_t len,
    534                            PP_Resource* out_addr,
    535                            int* out_len) {
    536   assert(emitter_.get());
    537   *out_len = emitter_->ReadIn_Locked((char*)buf, len);
    538   *out_addr = remote_addr_;
    539 
    540   // Ref the address copy we pass back.
    541   filesystem_->ppapi()->AddRefResource(remote_addr_);
    542   return 0;
    543 }
    544 
    545 // TCP ignores dst addr passed to send_to, and always uses bound address
    546 Error TcpNode::Send_Locked(const void* buf,
    547                            size_t len,
    548                            PP_Resource,
    549                            int* out_len) {
    550   assert(emitter_.get());
    551   if (emitter_->GetError_Locked())
    552     return EPIPE;
    553   *out_len = emitter_->WriteOut_Locked((char*)buf, len);
    554   return 0;
    555 }
    556 
    557 }  // namespace nacl_io
    558 
    559 #endif  // PROVIDES_SOCKET_API
    560