1 // Copyright 2010 Google Inc. All Rights Reserved. 2 3 // thaloun (at) google.com (Tim Haloun) 4 // 5 // MacAsyncSocket is a kind of AsyncSocket. It does not support the SOCK_DGRAM 6 // type (yet). It works asynchronously, which means that users of this socket 7 // should connect to the various events declared in asyncsocket.h to receive 8 // notifications about this socket. It uses CFSockets for signals, but prefers 9 // the basic bsd socket operations rather than their CFSocket wrappers when 10 // possible. 11 12 #include <CoreFoundation/CoreFoundation.h> 13 #include <fcntl.h> 14 15 #include "talk/base/macasyncsocket.h" 16 17 #include "talk/base/logging.h" 18 #include "talk/base/macsocketserver.h" 19 20 namespace talk_base { 21 22 static const int kCallbackFlags = kCFSocketReadCallBack | 23 kCFSocketConnectCallBack | 24 kCFSocketWriteCallBack; 25 26 MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss, int family) 27 : ss_(ss), 28 socket_(NULL), 29 native_socket_(INVALID_SOCKET), 30 source_(NULL), 31 current_callbacks_(0), 32 disabled_(false), 33 error_(0), 34 state_(CS_CLOSED), 35 resolver_(NULL) { 36 Initialize(family); 37 } 38 39 MacAsyncSocket::~MacAsyncSocket() { 40 Close(); 41 } 42 43 // Returns the address to which the socket is bound. If the socket is not 44 // bound, then the any-address is returned. 45 SocketAddress MacAsyncSocket::GetLocalAddress() const { 46 SocketAddress address; 47 48 // The CFSocket doesn't pick up on implicit binds from the connect call. 49 // Calling bind in before connect explicitly causes errors, so just query 50 // the underlying bsd socket. 51 sockaddr_storage addr; 52 socklen_t addrlen = sizeof(addr); 53 int result = ::getsockname(native_socket_, 54 reinterpret_cast<sockaddr*>(&addr), &addrlen); 55 if (result >= 0) { 56 SocketAddressFromSockAddrStorage(addr, &address); 57 } 58 return address; 59 } 60 61 // Returns the address to which the socket is connected. If the socket is not 62 // connected, then the any-address is returned. 63 SocketAddress MacAsyncSocket::GetRemoteAddress() const { 64 SocketAddress address; 65 66 // Use native_socket for consistency with GetLocalAddress. 67 sockaddr_storage addr; 68 socklen_t addrlen = sizeof(addr); 69 int result = ::getpeername(native_socket_, 70 reinterpret_cast<sockaddr*>(&addr), &addrlen); 71 if (result >= 0) { 72 SocketAddressFromSockAddrStorage(addr, &address); 73 } 74 return address; 75 } 76 77 // Bind the socket to a local address. 78 int MacAsyncSocket::Bind(const SocketAddress& address) { 79 sockaddr_storage saddr = {0}; 80 size_t len = address.ToSockAddrStorage(&saddr); 81 int err = ::bind(native_socket_, reinterpret_cast<sockaddr*>(&saddr), len); 82 if (err == SOCKET_ERROR) error_ = errno; 83 return err; 84 } 85 86 void MacAsyncSocket::OnResolveResult(SignalThread* thread) { 87 if (thread != resolver_) { 88 return; 89 } 90 int error = resolver_->error(); 91 if (error == 0) { 92 error = DoConnect(resolver_->address()); 93 } else { 94 Close(); 95 } 96 if (error) { 97 error_ = error; 98 SignalCloseEvent(this, error_); 99 } 100 } 101 102 // Connect to a remote address. 103 int MacAsyncSocket::Connect(const SocketAddress& addr) { 104 // TODO(djw): Consolidate all the connect->resolve->doconnect implementations. 105 if (state_ != CS_CLOSED) { 106 SetError(EALREADY); 107 return SOCKET_ERROR; 108 } 109 if (addr.IsUnresolved()) { 110 LOG(LS_VERBOSE) << "Resolving addr in MacAsyncSocket::Connect"; 111 resolver_ = new AsyncResolver(); 112 resolver_->set_address(addr); 113 resolver_->SignalWorkDone.connect(this, 114 &MacAsyncSocket::OnResolveResult); 115 resolver_->Start(); 116 state_ = CS_CONNECTING; 117 return 0; 118 } 119 return DoConnect(addr); 120 } 121 122 int MacAsyncSocket::DoConnect(const SocketAddress& addr) { 123 if (!valid()) { 124 Initialize(addr.family()); 125 if (!valid()) 126 return SOCKET_ERROR; 127 } 128 129 sockaddr_storage saddr; 130 size_t len = addr.ToSockAddrStorage(&saddr); 131 int result = ::connect(native_socket_, reinterpret_cast<sockaddr*>(&saddr), 132 len); 133 134 if (result != SOCKET_ERROR) { 135 state_ = CS_CONNECTED; 136 } else { 137 error_ = errno; 138 if (error_ == EINPROGRESS) { 139 state_ = CS_CONNECTING; 140 result = 0; 141 } 142 } 143 return result; 144 } 145 146 // Send to the remote end we're connected to. 147 int MacAsyncSocket::Send(const void* buffer, size_t length) { 148 if (!valid()) { 149 return SOCKET_ERROR; 150 } 151 152 int sent = ::send(native_socket_, buffer, length, 0); 153 154 if (sent == SOCKET_ERROR) { 155 error_ = errno; 156 157 if (IsBlocking()) { 158 // Reenable the writable callback (once), since we are flow controlled. 159 CFSocketEnableCallBacks(socket_, kCallbackFlags); 160 current_callbacks_ = kCallbackFlags; 161 } 162 } 163 return sent; 164 } 165 166 // Send to the given address. We may or may not be connected to anyone. 167 int MacAsyncSocket::SendTo(const void* buffer, size_t length, 168 const SocketAddress& address) { 169 if (!valid()) { 170 return SOCKET_ERROR; 171 } 172 173 sockaddr_storage saddr; 174 size_t len = address.ToSockAddrStorage(&saddr); 175 int sent = ::sendto(native_socket_, buffer, length, 0, 176 reinterpret_cast<sockaddr*>(&saddr), len); 177 178 if (sent == SOCKET_ERROR) { 179 error_ = errno; 180 } 181 182 return sent; 183 } 184 185 // Read data received from the remote end we're connected to. 186 int MacAsyncSocket::Recv(void* buffer, size_t length) { 187 int received = ::recv(native_socket_, reinterpret_cast<char*>(buffer), 188 length, 0); 189 if (received == SOCKET_ERROR) error_ = errno; 190 191 // Recv should only be called when there is data to read 192 ASSERT((received != 0) || (length == 0)); 193 return received; 194 } 195 196 // Read data received from any remote party 197 int MacAsyncSocket::RecvFrom(void* buffer, size_t length, 198 SocketAddress* out_addr) { 199 sockaddr_storage saddr; 200 socklen_t addr_len = sizeof(saddr); 201 int received = ::recvfrom(native_socket_, reinterpret_cast<char*>(buffer), 202 length, 0, reinterpret_cast<sockaddr*>(&saddr), 203 &addr_len); 204 if (received >= 0 && out_addr != NULL) { 205 SocketAddressFromSockAddrStorage(saddr, out_addr); 206 } else if (received == SOCKET_ERROR) { 207 error_ = errno; 208 } 209 return received; 210 } 211 212 int MacAsyncSocket::Listen(int backlog) { 213 if (!valid()) { 214 return SOCKET_ERROR; 215 } 216 217 int res = ::listen(native_socket_, backlog); 218 if (res != SOCKET_ERROR) 219 state_ = CS_CONNECTING; 220 else 221 error_ = errno; 222 223 return res; 224 } 225 226 MacAsyncSocket* MacAsyncSocket::Accept(SocketAddress* out_addr) { 227 sockaddr_storage saddr; 228 socklen_t addr_len = sizeof(saddr); 229 230 int socket_fd = ::accept(native_socket_, reinterpret_cast<sockaddr*>(&saddr), 231 &addr_len); 232 if (socket_fd == INVALID_SOCKET) { 233 error_ = errno; 234 return NULL; 235 } 236 237 MacAsyncSocket* s = new MacAsyncSocket(ss_, saddr.ss_family, socket_fd); 238 if (s && s->valid()) { 239 s->state_ = CS_CONNECTED; 240 if (out_addr) 241 SocketAddressFromSockAddrStorage(saddr, out_addr); 242 } else { 243 delete s; 244 s = NULL; 245 } 246 return s; 247 } 248 249 int MacAsyncSocket::Close() { 250 if (source_ != NULL) { 251 CFRunLoopSourceInvalidate(source_); 252 CFRelease(source_); 253 if (ss_) ss_->UnregisterSocket(this); 254 source_ = NULL; 255 } 256 257 if (socket_ != NULL) { 258 CFSocketInvalidate(socket_); 259 CFRelease(socket_); 260 socket_ = NULL; 261 } 262 263 if (resolver_) { 264 resolver_->Destroy(false); 265 resolver_ = NULL; 266 } 267 268 native_socket_ = INVALID_SOCKET; // invalidates the socket 269 error_ = 0; 270 state_ = CS_CLOSED; 271 return 0; 272 } 273 274 int MacAsyncSocket::EstimateMTU(uint16* mtu) { 275 ASSERT(false && "NYI"); 276 return -1; 277 } 278 279 int MacAsyncSocket::GetError() const { 280 return error_; 281 } 282 283 void MacAsyncSocket::SetError(int error) { 284 error_ = error; 285 } 286 287 Socket::ConnState MacAsyncSocket::GetState() const { 288 return state_; 289 } 290 291 int MacAsyncSocket::GetOption(Option opt, int* value) { 292 ASSERT(false && "NYI"); 293 return -1; 294 } 295 296 int MacAsyncSocket::SetOption(Option opt, int value) { 297 ASSERT(false && "NYI"); 298 return -1; 299 } 300 301 void MacAsyncSocket::EnableCallbacks() { 302 if (valid()) { 303 disabled_ = false; 304 CFSocketEnableCallBacks(socket_, current_callbacks_); 305 } 306 } 307 308 void MacAsyncSocket::DisableCallbacks() { 309 if (valid()) { 310 disabled_ = true; 311 CFSocketDisableCallBacks(socket_, kCallbackFlags); 312 } 313 } 314 315 MacAsyncSocket::MacAsyncSocket(MacBaseSocketServer* ss, int family, 316 int native_socket) 317 : ss_(ss), 318 socket_(NULL), 319 native_socket_(native_socket), 320 source_(NULL), 321 current_callbacks_(0), 322 disabled_(false), 323 error_(0), 324 state_(CS_CLOSED), 325 resolver_(NULL) { 326 Initialize(family); 327 } 328 329 // Create a new socket, wrapping the native socket if provided or creating one 330 // otherwise. In case of any failure, consume the native socket. We assume the 331 // wrapped socket is in the closed state. If this is not the case you must 332 // update the state_ field for this socket yourself. 333 void MacAsyncSocket::Initialize(int family) { 334 CFSocketContext ctx = { 0 }; 335 ctx.info = this; 336 337 // First create the CFSocket 338 CFSocketRef cf_socket = NULL; 339 bool res = false; 340 if (native_socket_ == INVALID_SOCKET) { 341 cf_socket = CFSocketCreate(kCFAllocatorDefault, 342 family, SOCK_STREAM, IPPROTO_TCP, 343 kCallbackFlags, MacAsyncSocketCallBack, &ctx); 344 } else { 345 cf_socket = CFSocketCreateWithNative(kCFAllocatorDefault, 346 native_socket_, kCallbackFlags, 347 MacAsyncSocketCallBack, &ctx); 348 } 349 350 if (cf_socket) { 351 res = true; 352 socket_ = cf_socket; 353 native_socket_ = CFSocketGetNative(cf_socket); 354 current_callbacks_ = kCallbackFlags; 355 } 356 357 if (res) { 358 // Make the underlying socket asynchronous 359 res = (-1 != ::fcntl(native_socket_, F_SETFL, 360 ::fcntl(native_socket_, F_GETFL, 0) | O_NONBLOCK)); 361 } 362 363 if (res) { 364 // Add this socket to the run loop, at priority 1 so that it will be 365 // queued behind any pending signals. 366 source_ = CFSocketCreateRunLoopSource(kCFAllocatorDefault, socket_, 1); 367 res = (source_ != NULL); 368 if (!res) errno = EINVAL; 369 } 370 371 if (res) { 372 if (ss_) ss_->RegisterSocket(this); 373 CFRunLoopAddSource(CFRunLoopGetCurrent(), source_, kCFRunLoopCommonModes); 374 } 375 376 if (!res) { 377 int error = errno; 378 Close(); // Clears error_. 379 error_ = error; 380 } 381 } 382 383 // Call CFRelease on the result when done using it 384 CFDataRef MacAsyncSocket::CopyCFAddress(const SocketAddress& address) { 385 sockaddr_storage saddr; 386 size_t len = address.ToSockAddrStorage(&saddr); 387 388 const UInt8* bytes = reinterpret_cast<UInt8*>(&saddr); 389 390 CFDataRef cf_address = CFDataCreate(kCFAllocatorDefault, 391 bytes, len); 392 393 ASSERT(cf_address != NULL); 394 return cf_address; 395 } 396 397 void MacAsyncSocket::MacAsyncSocketCallBack(CFSocketRef s, 398 CFSocketCallBackType callbackType, 399 CFDataRef address, 400 const void* data, 401 void* info) { 402 MacAsyncSocket* this_socket = 403 reinterpret_cast<MacAsyncSocket*>(info); 404 ASSERT(this_socket != NULL && this_socket->socket_ == s); 405 406 // Don't signal any socket messages if the socketserver is not listening on 407 // them. When we are reenabled they will be requeued and will fire again. 408 if (this_socket->disabled_) 409 return; 410 411 switch (callbackType) { 412 case kCFSocketReadCallBack: 413 // This callback is invoked in one of 3 situations: 414 // 1. A new connection is waiting to be accepted. 415 // 2. The remote end closed the connection (a recv will return 0). 416 // 3. Data is available to read. 417 // 4. The connection closed unhappily (recv will return -1). 418 if (this_socket->state_ == CS_CONNECTING) { 419 // Case 1. 420 this_socket->SignalReadEvent(this_socket); 421 } else { 422 char ch, amt; 423 amt = ::recv(this_socket->native_socket_, &ch, 1, MSG_PEEK); 424 if (amt == 0) { 425 // Case 2. 426 this_socket->state_ = CS_CLOSED; 427 428 // Disable additional callbacks or we will signal close twice. 429 CFSocketDisableCallBacks(this_socket->socket_, kCFSocketReadCallBack); 430 this_socket->current_callbacks_ &= ~kCFSocketReadCallBack; 431 this_socket->SignalCloseEvent(this_socket, 0); 432 } else if (amt > 0) { 433 // Case 3. 434 this_socket->SignalReadEvent(this_socket); 435 } else { 436 // Case 4. 437 int error = errno; 438 if (error == EAGAIN) { 439 // Observed in practice. Let's hope it's a spurious or out of date 440 // signal, since we just eat it. 441 } else { 442 this_socket->error_ = error; 443 this_socket->SignalCloseEvent(this_socket, error); 444 } 445 } 446 } 447 break; 448 449 case kCFSocketConnectCallBack: 450 if (data != NULL) { 451 // An error occured in the background while connecting 452 this_socket->error_ = errno; 453 this_socket->state_ = CS_CLOSED; 454 this_socket->SignalCloseEvent(this_socket, this_socket->error_); 455 } else { 456 this_socket->state_ = CS_CONNECTED; 457 this_socket->SignalConnectEvent(this_socket); 458 } 459 break; 460 461 case kCFSocketWriteCallBack: 462 // Update our callback tracking. Write doesn't reenable, so it's off now. 463 this_socket->current_callbacks_ &= ~kCFSocketWriteCallBack; 464 this_socket->SignalWriteEvent(this_socket); 465 break; 466 467 default: 468 ASSERT(false && "Invalid callback type for socket"); 469 } 470 } 471 472 } // namespace talk_base 473