1 /* 2 * libjingle 3 * Copyright 2004--2010, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/base/asynctcpsocket.h" 29 30 #include <cstring> 31 32 #include "talk/base/byteorder.h" 33 #include "talk/base/common.h" 34 #include "talk/base/logging.h" 35 36 #ifdef POSIX 37 #include <errno.h> 38 #endif // POSIX 39 40 namespace talk_base { 41 42 static const size_t MAX_PACKET_SIZE = 64 * 1024; 43 44 typedef uint16 PacketLength; 45 static const size_t PKT_LEN_SIZE = sizeof(PacketLength); 46 47 static const size_t BUF_SIZE = MAX_PACKET_SIZE + PKT_LEN_SIZE; 48 49 static const int LISTEN_BACKLOG = 5; 50 51 AsyncTCPSocket* AsyncTCPSocket::Create(SocketFactory* factory, bool listen) { 52 AsyncSocket* sock = factory->CreateAsyncSocket(SOCK_STREAM); 53 // This will still return a socket even if we failed to listen on 54 // it. It is neccessary because even if we can't accept new 55 // connections on this socket, the corresponding port is still 56 // useful for outgoing connections. 57 // 58 // TODO: It might be better to pass listen() error to the 59 // upper layer and let it handle the problem. 60 return (sock) ? new AsyncTCPSocket(sock, listen) : NULL; 61 } 62 63 AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket, bool listen) 64 : socket_(socket), 65 listen_(listen), 66 insize_(BUF_SIZE), 67 inpos_(0), 68 outsize_(BUF_SIZE), 69 outpos_(0) { 70 inbuf_ = new char[insize_]; 71 outbuf_ = new char[outsize_]; 72 73 ASSERT(socket_.get() != NULL); 74 socket_->SignalConnectEvent.connect(this, &AsyncTCPSocket::OnConnectEvent); 75 socket_->SignalReadEvent.connect(this, &AsyncTCPSocket::OnReadEvent); 76 socket_->SignalWriteEvent.connect(this, &AsyncTCPSocket::OnWriteEvent); 77 socket_->SignalCloseEvent.connect(this, &AsyncTCPSocket::OnCloseEvent); 78 79 if (listen_) { 80 if (socket_->Listen(LISTEN_BACKLOG) < 0) { 81 LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError(); 82 } 83 } 84 } 85 86 AsyncTCPSocket::~AsyncTCPSocket() { 87 delete [] inbuf_; 88 delete [] outbuf_; 89 } 90 91 SocketAddress AsyncTCPSocket::GetLocalAddress(bool* allocated) const { 92 if (allocated) 93 *allocated = true; 94 return socket_->GetLocalAddress(); 95 } 96 97 SocketAddress AsyncTCPSocket::GetRemoteAddress() const { 98 return socket_->GetRemoteAddress(); 99 } 100 101 int AsyncTCPSocket::Send(const void *pv, size_t cb) { 102 if (cb > MAX_PACKET_SIZE) { 103 socket_->SetError(EMSGSIZE); 104 return -1; 105 } 106 107 // If we are blocking on send, then silently drop this packet 108 if (outpos_) 109 return static_cast<int>(cb); 110 111 PacketLength pkt_len = HostToNetwork16(static_cast<PacketLength>(cb)); 112 memcpy(outbuf_, &pkt_len, PKT_LEN_SIZE); 113 memcpy(outbuf_ + PKT_LEN_SIZE, pv, cb); 114 outpos_ = PKT_LEN_SIZE + cb; 115 116 int res = Flush(); 117 if (res <= 0) { 118 // drop packet if we made no progress 119 outpos_ = 0; 120 return res; 121 } 122 123 // We claim to have sent the whole thing, even if we only sent partial 124 return static_cast<int>(cb); 125 } 126 127 int AsyncTCPSocket::SendTo(const void *pv, size_t cb, 128 const SocketAddress& addr) { 129 if (addr == GetRemoteAddress()) 130 return Send(pv, cb); 131 132 ASSERT(false); 133 socket_->SetError(ENOTCONN); 134 return -1; 135 } 136 137 int AsyncTCPSocket::Close() { 138 return socket_->Close(); 139 } 140 141 Socket::ConnState AsyncTCPSocket::GetState() const { 142 return socket_->GetState(); 143 } 144 145 int AsyncTCPSocket::GetOption(Socket::Option opt, int* value) { 146 return socket_->GetOption(opt, value); 147 } 148 149 int AsyncTCPSocket::SetOption(Socket::Option opt, int value) { 150 return socket_->SetOption(opt, value); 151 } 152 153 int AsyncTCPSocket::GetError() const { 154 return socket_->GetError(); 155 } 156 157 void AsyncTCPSocket::SetError(int error) { 158 return socket_->SetError(error); 159 } 160 161 int AsyncTCPSocket::SendRaw(const void * pv, size_t cb) { 162 if (outpos_ + cb > outsize_) { 163 socket_->SetError(EMSGSIZE); 164 return -1; 165 } 166 167 memcpy(outbuf_ + outpos_, pv, cb); 168 outpos_ += cb; 169 170 return Flush(); 171 } 172 173 void AsyncTCPSocket::ProcessInput(char * data, size_t& len) { 174 SocketAddress remote_addr(GetRemoteAddress()); 175 176 while (true) { 177 if (len < PKT_LEN_SIZE) 178 return; 179 180 PacketLength pkt_len; 181 memcpy(&pkt_len, data, PKT_LEN_SIZE); 182 pkt_len = NetworkToHost16(pkt_len); 183 184 if (len < PKT_LEN_SIZE + pkt_len) 185 return; 186 187 SignalReadPacket(this, data + PKT_LEN_SIZE, pkt_len, remote_addr); 188 189 len -= PKT_LEN_SIZE + pkt_len; 190 if (len > 0) { 191 memmove(data, data + PKT_LEN_SIZE + pkt_len, len); 192 } 193 } 194 } 195 196 int AsyncTCPSocket::Flush() { 197 int res = socket_->Send(outbuf_, outpos_); 198 if (res <= 0) { 199 return res; 200 } 201 if (static_cast<size_t>(res) <= outpos_) { 202 outpos_ -= res; 203 } else { 204 ASSERT(false); 205 return -1; 206 } 207 if (outpos_ > 0) { 208 memmove(outbuf_, outbuf_ + res, outpos_); 209 } 210 return res; 211 } 212 213 void AsyncTCPSocket::OnConnectEvent(AsyncSocket* socket) { 214 SignalConnect(this); 215 } 216 217 void AsyncTCPSocket::OnReadEvent(AsyncSocket* socket) { 218 ASSERT(socket_.get() == socket); 219 220 if (listen_) { 221 talk_base::SocketAddress address; 222 talk_base::AsyncSocket* new_socket = socket->Accept(&address); 223 if (!new_socket) { 224 // TODO: Do something better like forwarding the error 225 // to the user. 226 LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); 227 return; 228 } 229 230 SignalNewConnection(this, new AsyncTCPSocket(new_socket, false)); 231 232 // Prime a read event in case data is waiting. 233 new_socket->SignalReadEvent(new_socket); 234 } else { 235 int len = socket_->Recv(inbuf_ + inpos_, insize_ - inpos_); 236 if (len < 0) { 237 // TODO: Do something better like forwarding the error to the user. 238 if (!socket_->IsBlocking()) { 239 LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); 240 } 241 return; 242 } 243 244 inpos_ += len; 245 246 ProcessInput(inbuf_, inpos_); 247 248 if (inpos_ >= insize_) { 249 LOG(LS_ERROR) << "input buffer overflow"; 250 ASSERT(false); 251 inpos_ = 0; 252 } 253 } 254 } 255 256 void AsyncTCPSocket::OnWriteEvent(AsyncSocket* socket) { 257 ASSERT(socket_.get() == socket); 258 259 if (outpos_ > 0) { 260 Flush(); 261 } 262 } 263 264 void AsyncTCPSocket::OnCloseEvent(AsyncSocket* socket, int error) { 265 SignalClose(this, error); 266 } 267 268 } // namespace talk_base 269