1 /* 2 * libjingle 3 * Copyright 2004--2010, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "talk/base/asynctcpsocket.h" 29 30 #include <cstring> 31 32 #include "talk/base/byteorder.h" 33 #include "talk/base/common.h" 34 #include "talk/base/logging.h" 35 36 #ifdef POSIX 37 #include <errno.h> 38 #endif // POSIX 39 40 namespace talk_base { 41 42 static const size_t kMaxPacketSize = 64 * 1024; 43 44 typedef uint16 PacketLength; 45 static const size_t kPacketLenSize = sizeof(PacketLength); 46 47 static const size_t kBufSize = kMaxPacketSize + kPacketLenSize; 48 49 static const int kListenBacklog = 5; 50 51 // Binds and connects |socket| 52 AsyncSocket* AsyncTCPSocketBase::ConnectSocket( 53 talk_base::AsyncSocket* socket, 54 const talk_base::SocketAddress& bind_address, 55 const talk_base::SocketAddress& remote_address) { 56 talk_base::scoped_ptr<talk_base::AsyncSocket> owned_socket(socket); 57 if (socket->Bind(bind_address) < 0) { 58 LOG(LS_ERROR) << "Bind() failed with error " << socket->GetError(); 59 return NULL; 60 } 61 if (socket->Connect(remote_address) < 0) { 62 LOG(LS_ERROR) << "Connect() failed with error " << socket->GetError(); 63 return NULL; 64 } 65 return owned_socket.release(); 66 } 67 68 AsyncTCPSocketBase::AsyncTCPSocketBase(AsyncSocket* socket, bool listen, 69 size_t max_packet_size) 70 : socket_(socket), 71 listen_(listen), 72 insize_(max_packet_size), 73 inpos_(0), 74 outsize_(max_packet_size), 75 outpos_(0) { 76 inbuf_ = new char[insize_]; 77 outbuf_ = new char[outsize_]; 78 79 ASSERT(socket_.get() != NULL); 80 socket_->SignalConnectEvent.connect( 81 this, &AsyncTCPSocketBase::OnConnectEvent); 82 socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); 83 socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent); 84 socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent); 85 86 if (listen_) { 87 if (socket_->Listen(kListenBacklog) < 0) { 88 LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError(); 89 } 90 } 91 } 92 93 AsyncTCPSocketBase::~AsyncTCPSocketBase() { 94 delete [] inbuf_; 95 delete [] outbuf_; 96 } 97 98 SocketAddress AsyncTCPSocketBase::GetLocalAddress() const { 99 return socket_->GetLocalAddress(); 100 } 101 102 SocketAddress AsyncTCPSocketBase::GetRemoteAddress() const { 103 return socket_->GetRemoteAddress(); 104 } 105 106 int AsyncTCPSocketBase::Close() { 107 return socket_->Close(); 108 } 109 110 AsyncTCPSocket::State AsyncTCPSocketBase::GetState() const { 111 switch (socket_->GetState()) { 112 case Socket::CS_CLOSED: 113 return STATE_CLOSED; 114 case Socket::CS_CONNECTING: 115 if (listen_) { 116 return STATE_BOUND; 117 } else { 118 return STATE_CONNECTING; 119 } 120 case Socket::CS_CONNECTED: 121 return STATE_CONNECTED; 122 default: 123 ASSERT(false); 124 return STATE_CLOSED; 125 } 126 } 127 128 int AsyncTCPSocketBase::GetOption(Socket::Option opt, int* value) { 129 return socket_->GetOption(opt, value); 130 } 131 132 int AsyncTCPSocketBase::SetOption(Socket::Option opt, int value) { 133 return socket_->SetOption(opt, value); 134 } 135 136 int AsyncTCPSocketBase::GetError() const { 137 return socket_->GetError(); 138 } 139 140 void AsyncTCPSocketBase::SetError(int error) { 141 return socket_->SetError(error); 142 } 143 144 // TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. 145 int AsyncTCPSocketBase::SendTo(const void *pv, size_t cb, 146 const SocketAddress& addr, 147 DiffServCodePoint dscp) { 148 if (addr == GetRemoteAddress()) 149 return Send(pv, cb, dscp); 150 151 ASSERT(false); 152 socket_->SetError(ENOTCONN); 153 return -1; 154 } 155 156 int AsyncTCPSocketBase::SendRaw(const void * pv, size_t cb) { 157 if (outpos_ + cb > outsize_) { 158 socket_->SetError(EMSGSIZE); 159 return -1; 160 } 161 162 memcpy(outbuf_ + outpos_, pv, cb); 163 outpos_ += cb; 164 165 return FlushOutBuffer(); 166 } 167 168 int AsyncTCPSocketBase::FlushOutBuffer() { 169 int res = socket_->Send(outbuf_, outpos_); 170 if (res <= 0) { 171 return res; 172 } 173 if (static_cast<size_t>(res) <= outpos_) { 174 outpos_ -= res; 175 } else { 176 ASSERT(false); 177 return -1; 178 } 179 if (outpos_ > 0) { 180 memmove(outbuf_, outbuf_ + res, outpos_); 181 } 182 return res; 183 } 184 185 void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) { 186 ASSERT(outpos_ + cb < outsize_); 187 memcpy(outbuf_ + outpos_, pv, cb); 188 outpos_ += cb; 189 } 190 191 void AsyncTCPSocketBase::OnConnectEvent(AsyncSocket* socket) { 192 SignalConnect(this); 193 } 194 195 void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { 196 ASSERT(socket_.get() == socket); 197 198 if (listen_) { 199 talk_base::SocketAddress address; 200 talk_base::AsyncSocket* new_socket = socket->Accept(&address); 201 if (!new_socket) { 202 // TODO: Do something better like forwarding the error 203 // to the user. 204 LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); 205 return; 206 } 207 208 HandleIncomingConnection(new_socket); 209 210 // Prime a read event in case data is waiting. 211 new_socket->SignalReadEvent(new_socket); 212 } else { 213 int len = socket_->Recv(inbuf_ + inpos_, insize_ - inpos_); 214 if (len < 0) { 215 // TODO: Do something better like forwarding the error to the user. 216 if (!socket_->IsBlocking()) { 217 LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); 218 } 219 return; 220 } 221 222 inpos_ += len; 223 224 ProcessInput(inbuf_, &inpos_); 225 226 if (inpos_ >= insize_) { 227 LOG(LS_ERROR) << "input buffer overflow"; 228 ASSERT(false); 229 inpos_ = 0; 230 } 231 } 232 } 233 234 void AsyncTCPSocketBase::OnWriteEvent(AsyncSocket* socket) { 235 ASSERT(socket_.get() == socket); 236 237 if (outpos_ > 0) { 238 FlushOutBuffer(); 239 } 240 241 if (outpos_ == 0) { 242 SignalReadyToSend(this); 243 } 244 } 245 246 void AsyncTCPSocketBase::OnCloseEvent(AsyncSocket* socket, int error) { 247 SignalClose(this, error); 248 } 249 250 // AsyncTCPSocket 251 // Binds and connects |socket| and creates AsyncTCPSocket for 252 // it. Takes ownership of |socket|. Returns NULL if bind() or 253 // connect() fail (|socket| is destroyed in that case). 254 AsyncTCPSocket* AsyncTCPSocket::Create( 255 AsyncSocket* socket, 256 const SocketAddress& bind_address, 257 const SocketAddress& remote_address) { 258 return new AsyncTCPSocket(AsyncTCPSocketBase::ConnectSocket( 259 socket, bind_address, remote_address), false); 260 } 261 262 AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket, bool listen) 263 : AsyncTCPSocketBase(socket, listen, kBufSize) { 264 } 265 266 // TODO(mallinath) - Add support of setting DSCP code on AsyncSocket. 267 int AsyncTCPSocket::Send(const void *pv, size_t cb, DiffServCodePoint dscp) { 268 if (cb > kBufSize) { 269 SetError(EMSGSIZE); 270 return -1; 271 } 272 273 // If we are blocking on send, then silently drop this packet 274 if (!IsOutBufferEmpty()) 275 return static_cast<int>(cb); 276 277 PacketLength pkt_len = HostToNetwork16(static_cast<PacketLength>(cb)); 278 AppendToOutBuffer(&pkt_len, kPacketLenSize); 279 AppendToOutBuffer(pv, cb); 280 281 int res = FlushOutBuffer(); 282 if (res <= 0) { 283 // drop packet if we made no progress 284 ClearOutBuffer(); 285 return res; 286 } 287 288 // We claim to have sent the whole thing, even if we only sent partial 289 return static_cast<int>(cb); 290 } 291 292 void AsyncTCPSocket::ProcessInput(char * data, size_t* len) { 293 SocketAddress remote_addr(GetRemoteAddress()); 294 295 while (true) { 296 if (*len < kPacketLenSize) 297 return; 298 299 PacketLength pkt_len = talk_base::GetBE16(data); 300 if (*len < kPacketLenSize + pkt_len) 301 return; 302 303 SignalReadPacket(this, data + kPacketLenSize, pkt_len, remote_addr, 304 CreatePacketTime(0)); 305 306 *len -= kPacketLenSize + pkt_len; 307 if (*len > 0) { 308 memmove(data, data + kPacketLenSize + pkt_len, *len); 309 } 310 } 311 } 312 313 void AsyncTCPSocket::HandleIncomingConnection(AsyncSocket* socket) { 314 SignalNewConnection(this, new AsyncTCPSocket(socket, false)); 315 } 316 317 } // namespace talk_base 318