1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /* 18 * Encapsulates exchange protocol between the emulator, and an Android device 19 * that is connected to the host via USB. The communication is established over 20 * a TCP port forwarding, enabled by ADB. 21 */ 22 23 #include "android/async-socket-connector.h" 24 #include "android/async-socket.h" 25 #include "android/utils/debug.h" 26 #include "android/utils/eintr_wrapper.h" 27 #include "android/utils/panic.h" 28 #include "android/iolooper.h" 29 30 #define E(...) derror(__VA_ARGS__) 31 #define W(...) dwarning(__VA_ARGS__) 32 #define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) 33 #define D_ACTIVE VERBOSE_CHECK(asyncsocket) 34 35 #define TRACE_ON 0 36 37 #if TRACE_ON 38 #define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) 39 #else 40 #define T(...) 41 #endif 42 43 /******************************************************************************** 44 * Asynchronous Socket internal API declarations 45 *******************************************************************************/ 46 47 /* Gets socket's address string. */ 48 static const char* _async_socket_string(AsyncSocket* as); 49 50 /* Gets socket's looper. */ 51 static Looper* _async_socket_get_looper(AsyncSocket* as); 52 53 /* Handler for the I/O time out. 54 * Param: 55 * as - Asynchronous socket for the I/O. 56 * asio - Desciptor for the timed out I/O. 57 */ 58 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as, 59 AsyncSocketIO* asio); 60 61 /******************************************************************************** 62 * Asynchronous Socket Reader / Writer 63 *******************************************************************************/ 64 65 struct AsyncSocketIO { 66 /* Next I/O in the reader, or writer list. */ 67 AsyncSocketIO* next; 68 /* Asynchronous socket for this I/O. */ 69 AsyncSocket* as; 70 /* Timer used for time outs on this I/O. */ 71 LoopTimer timer[1]; 72 /* An opaque pointer associated with this I/O. */ 73 void* io_opaque; 74 /* Buffer where to read / write data. */ 75 uint8_t* buffer; 76 /* Bytes to transfer through the socket for this I/O. */ 77 uint32_t to_transfer; 78 /* Bytes thransferred through the socket in this I/O. */ 79 uint32_t transferred; 80 /* I/O callback for this I/O. */ 81 on_as_io_cb on_io; 82 /* I/O type selector: 1 - read, 0 - write. */ 83 int is_io_read; 84 /* State of the I/O. */ 85 AsyncIOState state; 86 /* Number of outstanding references to the I/O. */ 87 int ref_count; 88 /* Deadline for this I/O */ 89 Duration deadline; 90 }; 91 92 /* 93 * Recycling I/O instances. 94 * Since AsyncSocketIO instances are not that large, it makes sence to recycle 95 * them for faster allocation, rather than allocating and freeing them for each 96 * I/O on the socket. 97 */ 98 99 /* List of recycled I/O descriptors. */ 100 static AsyncSocketIO* _asio_recycled = NULL; 101 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */ 102 static int _recycled_asio_count = 0; 103 /* Maximum number of I/O descriptors that can be recycled. */ 104 static const int _max_recycled_asio_num = 32; 105 106 /* Handler for an I/O time-out timer event. 107 * When this routine is invoked, it indicates that a time out has occurred on an 108 * I/O. 109 * Param: 110 * opaque - AsyncSocketIO instance representing the timed out I/O. 111 */ 112 static void _on_async_socket_io_timed_out(void* opaque); 113 114 /* Creates new I/O descriptor. 115 * Param: 116 * as - Asynchronous socket for the I/O. 117 * is_io_read - I/O type selector: 1 - read, 0 - write. 118 * buffer, len - Reader / writer buffer address. 119 * io_cb - Callback for this reader / writer. 120 * io_opaque - An opaque pointer associated with the I/O. 121 * deadline - Deadline to complete the I/O. 122 * Return: 123 * Initialized AsyncSocketIO instance. 124 */ 125 static AsyncSocketIO* 126 _async_socket_rw_new(AsyncSocket* as, 127 int is_io_read, 128 void* buffer, 129 uint32_t len, 130 on_as_io_cb io_cb, 131 void* io_opaque, 132 Duration deadline) 133 { 134 /* Lookup in the recycler first. */ 135 AsyncSocketIO* asio = _asio_recycled; 136 if (asio != NULL) { 137 /* Pull the descriptor from recycler. */ 138 _asio_recycled = asio->next; 139 _recycled_asio_count--; 140 } else { 141 /* No recycled descriptors. Allocate new one. */ 142 ANEW0(asio); 143 } 144 145 asio->next = NULL; 146 asio->as = as; 147 asio->is_io_read = is_io_read; 148 asio->buffer = (uint8_t*)buffer; 149 asio->to_transfer = len; 150 asio->transferred = 0; 151 asio->on_io = io_cb; 152 asio->io_opaque = io_opaque; 153 asio->state = ASIO_STATE_QUEUED; 154 asio->ref_count = 1; 155 asio->deadline = deadline; 156 loopTimer_init(asio->timer, _async_socket_get_looper(as), 157 _on_async_socket_io_timed_out, asio); 158 loopTimer_startAbsolute(asio->timer, deadline); 159 160 /* Reference socket that is holding this I/O. */ 161 async_socket_reference(as); 162 163 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data", 164 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len); 165 166 return asio; 167 } 168 169 /* Destroys and frees I/O descriptor. */ 170 static void 171 _async_socket_io_free(AsyncSocketIO* asio) 172 { 173 AsyncSocket* const as = asio->as; 174 175 T("ASocket %s: %s I/O descriptor %p is destroyed.", 176 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 177 178 loopTimer_done(asio->timer); 179 180 /* Try to recycle it first, and free the memory if recycler is full. */ 181 if (_recycled_asio_count < _max_recycled_asio_num) { 182 asio->next = _asio_recycled; 183 _asio_recycled = asio; 184 _recycled_asio_count++; 185 } else { 186 AFREE(asio); 187 } 188 189 /* Release socket that is holding this I/O. */ 190 async_socket_release(as); 191 } 192 193 /* An I/O has been finished and its descriptor is about to be discarded. */ 194 static void 195 _async_socket_io_finished(AsyncSocketIO* asio) 196 { 197 /* Notify the client of the I/O that I/O is finished. */ 198 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED); 199 } 200 201 int 202 async_socket_io_reference(AsyncSocketIO* asio) 203 { 204 assert(asio->ref_count > 0); 205 asio->ref_count++; 206 return asio->ref_count; 207 } 208 209 int 210 async_socket_io_release(AsyncSocketIO* asio) 211 { 212 assert(asio->ref_count > 0); 213 asio->ref_count--; 214 if (asio->ref_count == 0) { 215 _async_socket_io_finished(asio); 216 /* Last reference has been dropped. Destroy this object. */ 217 _async_socket_io_free(asio); 218 return 0; 219 } 220 return asio->ref_count; 221 } 222 223 /* Creates new asynchronous socket reader. 224 * Param: 225 * as - Asynchronous socket for the reader. 226 * buffer, len - Reader's buffer. 227 * io_cb - Reader's callback. 228 * reader_opaque - An opaque pointer associated with the reader. 229 * deadline - Deadline to complete the operation. 230 * Return: 231 * An initialized AsyncSocketIO intance. 232 */ 233 static AsyncSocketIO* 234 _async_socket_reader_new(AsyncSocket* as, 235 void* buffer, 236 uint32_t len, 237 on_as_io_cb io_cb, 238 void* reader_opaque, 239 Duration deadline) 240 { 241 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb, 242 reader_opaque, deadline); 243 return asio; 244 } 245 246 /* Creates new asynchronous socket writer. 247 * Param: 248 * as - Asynchronous socket for the writer. 249 * buffer, len - Writer's buffer. 250 * io_cb - Writer's callback. 251 * writer_opaque - An opaque pointer associated with the writer. 252 * deadline - Deadline to complete the operation. 253 * Return: 254 * An initialized AsyncSocketIO intance. 255 */ 256 static AsyncSocketIO* 257 _async_socket_writer_new(AsyncSocket* as, 258 const void* buffer, 259 uint32_t len, 260 on_as_io_cb io_cb, 261 void* writer_opaque, 262 Duration deadline) 263 { 264 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len, 265 io_cb, writer_opaque, 266 deadline); 267 return asio; 268 } 269 270 /* I/O timed out. */ 271 static void 272 _on_async_socket_io_timed_out(void* opaque) 273 { 274 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque; 275 AsyncSocket* const as = asio->as; 276 277 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld", 278 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", 279 asio->deadline, async_socket_deadline(as, 0)); 280 281 /* Reference while in callback. */ 282 async_socket_io_reference(asio); 283 _async_socket_io_timed_out(asio->as, asio); 284 async_socket_io_release(asio); 285 } 286 287 /******************************************************************************** 288 * Public Asynchronous Socket I/O API 289 *******************************************************************************/ 290 291 AsyncSocket* 292 async_socket_io_get_socket(const AsyncSocketIO* asio) 293 { 294 async_socket_reference(asio->as); 295 return asio->as; 296 } 297 298 void 299 async_socket_io_cancel_time_out(AsyncSocketIO* asio) 300 { 301 loopTimer_stop(asio->timer); 302 } 303 304 void* 305 async_socket_io_get_io_opaque(const AsyncSocketIO* asio) 306 { 307 return asio->io_opaque; 308 } 309 310 void* 311 async_socket_io_get_client_opaque(const AsyncSocketIO* asio) 312 { 313 return async_socket_get_client_opaque(asio->as); 314 } 315 316 void* 317 async_socket_io_get_buffer_info(const AsyncSocketIO* asio, 318 uint32_t* transferred, 319 uint32_t* to_transfer) 320 { 321 if (transferred != NULL) { 322 *transferred = asio->transferred; 323 } 324 if (to_transfer != NULL) { 325 *to_transfer = asio->to_transfer; 326 } 327 return asio->buffer; 328 } 329 330 void* 331 async_socket_io_get_buffer(const AsyncSocketIO* asio) 332 { 333 return asio->buffer; 334 } 335 336 uint32_t 337 async_socket_io_get_transferred(const AsyncSocketIO* asio) 338 { 339 return asio->transferred; 340 } 341 342 uint32_t 343 async_socket_io_get_to_transfer(const AsyncSocketIO* asio) 344 { 345 return asio->to_transfer; 346 } 347 348 int 349 async_socket_io_is_read(const AsyncSocketIO* asio) 350 { 351 return asio->is_io_read; 352 } 353 354 /******************************************************************************** 355 * Asynchronous Socket internals 356 *******************************************************************************/ 357 358 struct AsyncSocket { 359 /* TCP address for the socket. */ 360 SockAddress address; 361 /* Connection callback for this socket. */ 362 on_as_connection_cb on_connection; 363 /* An opaque pointer associated with this socket by the client. */ 364 void* client_opaque; 365 /* I/O looper for asynchronous I/O on the socket. */ 366 Looper* looper; 367 /* I/O descriptor for asynchronous I/O on the socket. */ 368 LoopIo io[1]; 369 /* Timer to use for reconnection attempts. */ 370 LoopTimer reconnect_timer[1]; 371 /* Head of the list of the active readers. */ 372 AsyncSocketIO* readers_head; 373 /* Tail of the list of the active readers. */ 374 AsyncSocketIO* readers_tail; 375 /* Head of the list of the active writers. */ 376 AsyncSocketIO* writers_head; 377 /* Tail of the list of the active writers. */ 378 AsyncSocketIO* writers_tail; 379 /* Socket's file descriptor. */ 380 int fd; 381 /* Timeout to use for reconnection attempts. */ 382 int reconnect_to; 383 /* Number of outstanding references to the socket. */ 384 int ref_count; 385 /* Flags whether (1) or not (0) socket owns the looper. */ 386 int owns_looper; 387 }; 388 389 static const char* 390 _async_socket_string(AsyncSocket* as) 391 { 392 return sock_address_to_string(&as->address); 393 } 394 395 static Looper* 396 _async_socket_get_looper(AsyncSocket* as) 397 { 398 return as->looper; 399 } 400 401 /* Pulls first reader out of the list. 402 * Param: 403 * as - Initialized AsyncSocket instance. 404 * Return: 405 * First I/O pulled out of the list, or NULL if there are no I/O in the list. 406 * Note that the caller is responsible for releasing the I/O object returned 407 * from this routine. 408 */ 409 static AsyncSocketIO* 410 _async_socket_pull_first_io(AsyncSocket* as, 411 AsyncSocketIO** list_head, 412 AsyncSocketIO** list_tail) 413 { 414 AsyncSocketIO* const ret = *list_head; 415 if (ret != NULL) { 416 *list_head = ret->next; 417 ret->next = NULL; 418 if (*list_head == NULL) { 419 *list_tail = NULL; 420 } 421 } 422 return ret; 423 } 424 425 /* Pulls first reader out of the list. 426 * Param: 427 * as - Initialized AsyncSocket instance. 428 * Return: 429 * First reader pulled out of the list, or NULL if there are no readers in the 430 * list. 431 * Note that the caller is responsible for releasing the I/O object returned 432 * from this routine. 433 */ 434 static AsyncSocketIO* 435 _async_socket_pull_first_reader(AsyncSocket* as) 436 { 437 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail); 438 } 439 440 /* Pulls first writer out of the list. 441 * Param: 442 * as - Initialized AsyncSocket instance. 443 * Return: 444 * First writer pulled out of the list, or NULL if there are no writers in the 445 * list. 446 * Note that the caller is responsible for releasing the I/O object returned 447 * from this routine. 448 */ 449 static AsyncSocketIO* 450 _async_socket_pull_first_writer(AsyncSocket* as) 451 { 452 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail); 453 } 454 455 /* Removes an I/O descriptor from a list of active I/O. 456 * Param: 457 * as - Initialized AsyncSocket instance. 458 * list_head, list_tail - Pointers to the list head and tail. 459 * io - I/O to remove. 460 * Return: 461 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list. 462 */ 463 static int 464 _async_socket_remove_io(AsyncSocket* as, 465 AsyncSocketIO** list_head, 466 AsyncSocketIO** list_tail, 467 AsyncSocketIO* io) 468 { 469 AsyncSocketIO* prev = NULL; 470 471 while (*list_head != NULL && io != *list_head) { 472 prev = *list_head; 473 list_head = &((*list_head)->next); 474 } 475 if (*list_head == NULL) { 476 D("%s: I/O %p is not found in the list for socket '%s'", 477 __FUNCTION__, io, _async_socket_string(as)); 478 return 0; 479 } 480 481 *list_head = io->next; 482 if (prev != NULL) { 483 prev->next = io->next; 484 } 485 if (*list_tail == io) { 486 *list_tail = prev; 487 } 488 489 /* Release I/O adjusting reference added when I/O has been saved in the list. */ 490 async_socket_io_release(io); 491 492 return 1; 493 } 494 495 /* Advances to the next I/O in the list. 496 * Param: 497 * as - Initialized AsyncSocket instance. 498 * list_head, list_tail - Pointers to the list head and tail. 499 */ 500 static void 501 _async_socket_advance_io(AsyncSocket* as, 502 AsyncSocketIO** list_head, 503 AsyncSocketIO** list_tail) 504 { 505 AsyncSocketIO* first_io = *list_head; 506 if (first_io != NULL) { 507 *list_head = first_io->next; 508 first_io->next = NULL; 509 } 510 if (*list_head == NULL) { 511 *list_tail = NULL; 512 } 513 if (first_io != NULL) { 514 /* Release I/O removed from the head of the list. */ 515 async_socket_io_release(first_io); 516 } 517 } 518 519 /* Advances to the next reader in the list. 520 * Param: 521 * as - Initialized AsyncSocket instance. 522 */ 523 static void 524 _async_socket_advance_reader(AsyncSocket* as) 525 { 526 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail); 527 } 528 529 /* Advances to the next writer in the list. 530 * Param: 531 * as - Initialized AsyncSocket instance. 532 */ 533 static void 534 _async_socket_advance_writer(AsyncSocket* as) 535 { 536 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail); 537 } 538 539 /* Completes an I/O. 540 * Param: 541 * as - Initialized AsyncSocket instance. 542 * asio - I/O to complete. 543 * Return: 544 * One of AsyncIOAction values. 545 */ 546 static AsyncIOAction 547 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio) 548 { 549 T("ASocket %s: %s I/O %p is completed.", 550 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 551 552 /* Stop the timer. */ 553 async_socket_io_cancel_time_out(asio); 554 555 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED); 556 } 557 558 /* Timeouts an I/O. 559 * Param: 560 * as - Initialized AsyncSocket instance. 561 * asio - An I/O that has timed out. 562 * Return: 563 * One of AsyncIOAction values. 564 */ 565 static AsyncIOAction 566 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio) 567 { 568 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld", 569 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, 570 asio->deadline, async_socket_deadline(as, 0)); 571 572 /* Report to the client. */ 573 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio, 574 ASIO_STATE_TIMED_OUT); 575 576 /* Remove the I/O from a list of active I/O for actions other than retry. */ 577 if (action != ASIO_ACTION_RETRY) { 578 if (asio->is_io_read) { 579 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio); 580 } else { 581 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio); 582 } 583 } 584 585 return action; 586 } 587 588 /* Cancels an I/O. 589 * Param: 590 * as - Initialized AsyncSocket instance. 591 * asio - An I/O to cancel. 592 * Return: 593 * One of AsyncIOAction values. 594 */ 595 static AsyncIOAction 596 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio) 597 { 598 T("ASocket %s: %s I/O %p is cancelled.", 599 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 600 601 /* Stop the timer. */ 602 async_socket_io_cancel_time_out(asio); 603 604 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED); 605 } 606 607 /* Reports an I/O failure. 608 * Param: 609 * as - Initialized AsyncSocket instance. 610 * asio - An I/O that has failed. Can be NULL for general failures. 611 * failure - Failure (errno) that has occurred. 612 * Return: 613 * One of AsyncIOAction values. 614 */ 615 static AsyncIOAction 616 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure) 617 { 618 T("ASocket %s: %s I/O %p has failed: %d -> %s", 619 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, 620 failure, strerror(failure)); 621 622 /* Stop the timer. */ 623 async_socket_io_cancel_time_out(asio); 624 625 errno = failure; 626 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED); 627 } 628 629 /* Cancels all the active socket readers. 630 * Param: 631 * as - Initialized AsyncSocket instance. 632 */ 633 static void 634 _async_socket_cancel_readers(AsyncSocket* as) 635 { 636 while (as->readers_head != NULL) { 637 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as); 638 /* We ignore action returned from the cancellation callback, since we're 639 * in a disconnected state here. */ 640 _async_socket_cancel_io(as, to_cancel); 641 async_socket_io_release(to_cancel); 642 } 643 } 644 645 /* Cancels all the active socket writers. 646 * Param: 647 * as - Initialized AsyncSocket instance. 648 */ 649 static void 650 _async_socket_cancel_writers(AsyncSocket* as) 651 { 652 while (as->writers_head != NULL) { 653 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as); 654 /* We ignore action returned from the cancellation callback, since we're 655 * in a disconnected state here. */ 656 _async_socket_cancel_io(as, to_cancel); 657 async_socket_io_release(to_cancel); 658 } 659 } 660 661 /* Cancels all the I/O on the socket. */ 662 static void 663 _async_socket_cancel_all_io(AsyncSocket* as) 664 { 665 /* Stop the reconnection timer. */ 666 loopTimer_stop(as->reconnect_timer); 667 668 /* Stop read / write on the socket. */ 669 loopIo_dontWantWrite(as->io); 670 loopIo_dontWantRead(as->io); 671 672 /* Cancel active readers and writers. */ 673 _async_socket_cancel_readers(as); 674 _async_socket_cancel_writers(as); 675 } 676 677 /* Closes socket handle used by the async socket. 678 * Param: 679 * as - Initialized AsyncSocket instance. 680 */ 681 static void 682 _async_socket_close_socket(AsyncSocket* as) 683 { 684 if (as->fd >= 0) { 685 T("ASocket %s: Socket handle %d is closed.", 686 _async_socket_string(as), as->fd); 687 loopIo_done(as->io); 688 socket_close(as->fd); 689 as->fd = -1; 690 } 691 } 692 693 /* Destroys AsyncSocket instance. 694 * Param: 695 * as - Initialized AsyncSocket instance. 696 */ 697 static void 698 _async_socket_free(AsyncSocket* as) 699 { 700 if (as != NULL) { 701 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as)); 702 703 /* Close socket. */ 704 _async_socket_close_socket(as); 705 706 /* Free allocated resources. */ 707 if (as->looper != NULL) { 708 loopTimer_done(as->reconnect_timer); 709 if (as->owns_looper) { 710 looper_free(as->looper); 711 } 712 } 713 sock_address_done(&as->address); 714 AFREE(as); 715 } 716 } 717 718 /* Starts reconnection attempts after connection has been lost. 719 * Param: 720 * as - Initialized AsyncSocket instance. 721 * to - Milliseconds to wait before reconnection attempt. 722 */ 723 static void 724 _async_socket_reconnect(AsyncSocket* as, int to) 725 { 726 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to); 727 728 /* Make sure that no I/O is active, and socket is closed before we 729 * reconnect. */ 730 _async_socket_cancel_all_io(as); 731 732 /* Set the timer for reconnection attempt. */ 733 loopTimer_startRelative(as->reconnect_timer, to); 734 } 735 736 /******************************************************************************** 737 * Asynchronous Socket callbacks 738 *******************************************************************************/ 739 740 /* A callback that is invoked when socket gets disconnected. 741 * Param: 742 * as - Initialized AsyncSocket instance. 743 */ 744 static void 745 _on_async_socket_disconnected(AsyncSocket* as) 746 { 747 /* Save error to restore it for the client's callback. */ 748 const int save_errno = errno; 749 AsyncIOAction action = ASIO_ACTION_ABORT; 750 751 D("ASocket %s: Disconnected.", _async_socket_string(as)); 752 753 /* Cancel all the I/O on this socket. */ 754 _async_socket_cancel_all_io(as); 755 756 /* Close the socket. */ 757 _async_socket_close_socket(as); 758 759 /* Restore errno, and invoke client's callback. */ 760 errno = save_errno; 761 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); 762 763 if (action == ASIO_ACTION_RETRY) { 764 /* Client requested reconnection. */ 765 _async_socket_reconnect(as, as->reconnect_to); 766 } 767 } 768 769 /* A callback that is invoked on socket's I/O failure. 770 * Param: 771 * as - Initialized AsyncSocket instance. 772 * asio - Descriptor for the failed I/O. Can be NULL for general failures. 773 */ 774 static AsyncIOAction 775 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio) 776 { 777 D("ASocket %s: %s I/O failure: %d -> %s", 778 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", 779 errno, strerror(errno)); 780 781 /* Report the failure. */ 782 return _async_socket_io_failure(as, asio, errno); 783 } 784 785 /* A callback that is invoked when there is data available to read. 786 * Param: 787 * as - Initialized AsyncSocket instance. 788 * Return: 789 * 0 on success, or -1 on failure. Failure returned from this routine will 790 * skip writes (if awailable) behind this read. 791 */ 792 static int 793 _on_async_socket_recv(AsyncSocket* as) 794 { 795 AsyncIOAction action; 796 797 /* Get current reader. */ 798 AsyncSocketIO* const asr = as->readers_head; 799 if (asr == NULL) { 800 D("ASocket %s: No reader is available.", _async_socket_string(as)); 801 loopIo_dontWantRead(as->io); 802 return 0; 803 } 804 805 /* Reference the reader while we're working with it in this callback. */ 806 async_socket_io_reference(asr); 807 808 /* Bump I/O state, and inform the client that I/O is in progress. */ 809 if (asr->state == ASIO_STATE_QUEUED) { 810 asr->state = ASIO_STATE_STARTED; 811 } else { 812 asr->state = ASIO_STATE_CONTINUES; 813 } 814 action = asr->on_io(asr->io_opaque, asr, asr->state); 815 if (action == ASIO_ACTION_ABORT) { 816 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as)); 817 /* Move on to the next reader. */ 818 _async_socket_advance_reader(as); 819 /* Lets see if there are still active readers, and enable, or disable 820 * read I/O callback accordingly. */ 821 if (as->readers_head != NULL) { 822 loopIo_wantRead(as->io); 823 } else { 824 loopIo_dontWantRead(as->io); 825 } 826 async_socket_io_release(asr); 827 return 0; 828 } 829 830 /* Read next chunk of data. */ 831 int res = HANDLE_EINTR( 832 socket_recv(as->fd, 833 asr->buffer + asr->transferred, 834 asr->to_transfer - asr->transferred)); 835 if (res == 0) { 836 /* Socket has been disconnected. */ 837 errno = ECONNRESET; 838 _on_async_socket_disconnected(as); 839 async_socket_io_release(asr); 840 return -1; 841 } 842 843 if (res < 0) { 844 if (errno == EWOULDBLOCK || errno == EAGAIN) { 845 /* Yield to writes behind this read. */ 846 loopIo_wantRead(as->io); 847 async_socket_io_release(asr); 848 return 0; 849 } 850 851 /* An I/O error. */ 852 action = _on_async_socket_failure(as, asr); 853 if (action != ASIO_ACTION_RETRY) { 854 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as)); 855 /* Move on to the next reader. */ 856 _async_socket_advance_reader(as); 857 /* Lets see if there are still active readers, and enable, or disable 858 * read I/O callback accordingly. */ 859 if (as->readers_head != NULL) { 860 loopIo_wantRead(as->io); 861 } else { 862 loopIo_dontWantRead(as->io); 863 } 864 } 865 async_socket_io_release(asr); 866 return -1; 867 } 868 869 /* Update the reader's descriptor. */ 870 asr->transferred += res; 871 if (asr->transferred == asr->to_transfer) { 872 /* This read is completed. Move on to the next reader. */ 873 _async_socket_advance_reader(as); 874 875 /* Notify reader completion. */ 876 _async_socket_complete_io(as, asr); 877 } 878 879 /* Lets see if there are still active readers, and enable, or disable read 880 * I/O callback accordingly. */ 881 if (as->readers_head != NULL) { 882 loopIo_wantRead(as->io); 883 } else { 884 loopIo_dontWantRead(as->io); 885 } 886 887 async_socket_io_release(asr); 888 889 return 0; 890 } 891 892 /* A callback that is invoked when there is data available to write. 893 * Param: 894 * as - Initialized AsyncSocket instance. 895 * Return: 896 * 0 on success, or -1 on failure. Failure returned from this routine will 897 * skip reads (if awailable) behind this write. 898 */ 899 static int 900 _on_async_socket_send(AsyncSocket* as) 901 { 902 AsyncIOAction action; 903 904 /* Get current writer. */ 905 AsyncSocketIO* const asw = as->writers_head; 906 if (asw == NULL) { 907 D("ASocket %s: No writer is available.", _async_socket_string(as)); 908 loopIo_dontWantWrite(as->io); 909 return 0; 910 } 911 912 /* Reference the writer while we're working with it in this callback. */ 913 async_socket_io_reference(asw); 914 915 /* Bump I/O state, and inform the client that I/O is in progress. */ 916 if (asw->state == ASIO_STATE_QUEUED) { 917 asw->state = ASIO_STATE_STARTED; 918 } else { 919 asw->state = ASIO_STATE_CONTINUES; 920 } 921 action = asw->on_io(asw->io_opaque, asw, asw->state); 922 if (action == ASIO_ACTION_ABORT) { 923 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as)); 924 /* Move on to the next writer. */ 925 _async_socket_advance_writer(as); 926 /* Lets see if there are still active writers, and enable, or disable 927 * write I/O callback accordingly. */ 928 if (as->writers_head != NULL) { 929 loopIo_wantWrite(as->io); 930 } else { 931 loopIo_dontWantWrite(as->io); 932 } 933 async_socket_io_release(asw); 934 return 0; 935 } 936 937 /* Write next chunk of data. */ 938 int res = HANDLE_EINTR( 939 socket_send(as->fd, 940 asw->buffer + asw->transferred, 941 asw->to_transfer - asw->transferred)); 942 if (res == 0) { 943 /* Socket has been disconnected. */ 944 errno = ECONNRESET; 945 _on_async_socket_disconnected(as); 946 async_socket_io_release(asw); 947 return -1; 948 } 949 950 if (res < 0) { 951 if (errno == EWOULDBLOCK || errno == EAGAIN) { 952 /* Yield to reads behind this write. */ 953 loopIo_wantWrite(as->io); 954 async_socket_io_release(asw); 955 return 0; 956 } 957 958 /* An I/O error. */ 959 action = _on_async_socket_failure(as, asw); 960 if (action != ASIO_ACTION_RETRY) { 961 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as)); 962 /* Move on to the next writer. */ 963 _async_socket_advance_writer(as); 964 /* Lets see if there are still active writers, and enable, or disable 965 * write I/O callback accordingly. */ 966 if (as->writers_head != NULL) { 967 loopIo_wantWrite(as->io); 968 } else { 969 loopIo_dontWantWrite(as->io); 970 } 971 } 972 async_socket_io_release(asw); 973 return -1; 974 } 975 976 /* Update the writer descriptor. */ 977 asw->transferred += res; 978 if (asw->transferred == asw->to_transfer) { 979 /* This write is completed. Move on to the next writer. */ 980 _async_socket_advance_writer(as); 981 982 /* Notify writer completion. */ 983 _async_socket_complete_io(as, asw); 984 } 985 986 /* Lets see if there are still active writers, and enable, or disable write 987 * I/O callback accordingly. */ 988 if (as->writers_head != NULL) { 989 loopIo_wantWrite(as->io); 990 } else { 991 loopIo_dontWantWrite(as->io); 992 } 993 994 async_socket_io_release(asw); 995 996 return 0; 997 } 998 999 /* A callback that is invoked when an I/O is available on socket. 1000 * Param: 1001 * as - Initialized AsyncSocket instance. 1002 * fd - Socket's file descriptor. 1003 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask. 1004 */ 1005 static void 1006 _on_async_socket_io(void* opaque, int fd, unsigned events) 1007 { 1008 AsyncSocket* const as = (AsyncSocket*)opaque; 1009 1010 /* Reference the socket while we're working with it in this callback. */ 1011 async_socket_reference(as); 1012 1013 if ((events & LOOP_IO_READ) != 0) { 1014 if (_on_async_socket_recv(as) != 0) { 1015 async_socket_release(as); 1016 return; 1017 } 1018 } 1019 1020 if ((events & LOOP_IO_WRITE) != 0) { 1021 if (_on_async_socket_send(as) != 0) { 1022 async_socket_release(as); 1023 return; 1024 } 1025 } 1026 1027 async_socket_release(as); 1028 } 1029 1030 /* A callback that is invoked by asynchronous socket connector on connection 1031 * events. 1032 * Param: 1033 * opaque - Initialized AsyncSocket instance. 1034 * connector - Connector that is used to connect this socket. 1035 * event - Connection event. 1036 * Return: 1037 * One of AsyncIOAction values. 1038 */ 1039 static AsyncIOAction 1040 _on_connector_events(void* opaque, 1041 AsyncSocketConnector* connector, 1042 AsyncIOState event) 1043 { 1044 AsyncIOAction action; 1045 AsyncSocket* const as = (AsyncSocket*)opaque; 1046 1047 /* Reference the socket while we're working with it in this callback. */ 1048 async_socket_reference(as); 1049 1050 if (event == ASIO_STATE_SUCCEEDED) { 1051 /* Accept the connection. */ 1052 as->fd = async_socket_connector_pull_fd(connector); 1053 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as); 1054 } 1055 1056 /* Invoke client's callback. */ 1057 action = as->on_connection(as->client_opaque, as, event); 1058 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) { 1059 /* For whatever reason the client didn't want to keep this connection. 1060 * Close it. */ 1061 D("ASocket %s: Connection is discarded by the client.", 1062 _async_socket_string(as)); 1063 _async_socket_close_socket(as); 1064 } 1065 1066 if (action != ASIO_ACTION_RETRY) { 1067 async_socket_connector_release(connector); 1068 } 1069 1070 async_socket_release(as); 1071 1072 return action; 1073 } 1074 1075 /* Timer callback invoked to reconnect the lost connection. 1076 * Param: 1077 * as - Initialized AsyncSocket instance. 1078 */ 1079 void 1080 _on_async_socket_reconnect(void* opaque) 1081 { 1082 AsyncSocket* as = (AsyncSocket*)opaque; 1083 1084 /* Reference the socket while we're working with it in this callback. */ 1085 async_socket_reference(as); 1086 async_socket_connect(as, as->reconnect_to); 1087 async_socket_release(as); 1088 } 1089 1090 1091 /******************************************************************************** 1092 * Android Device Socket public API 1093 *******************************************************************************/ 1094 1095 AsyncSocket* 1096 async_socket_new(int port, 1097 int reconnect_to, 1098 on_as_connection_cb client_cb, 1099 void* client_opaque, 1100 Looper* looper) 1101 { 1102 AsyncSocket* as; 1103 1104 if (client_cb == NULL) { 1105 E("Invalid client_cb parameter"); 1106 return NULL; 1107 } 1108 1109 ANEW0(as); 1110 1111 as->fd = -1; 1112 as->client_opaque = client_opaque; 1113 as->on_connection = client_cb; 1114 as->readers_head = as->readers_tail = NULL; 1115 as->reconnect_to = reconnect_to; 1116 as->ref_count = 1; 1117 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port); 1118 if (looper == NULL) { 1119 as->looper = looper_newCore(); 1120 if (as->looper == NULL) { 1121 E("Unable to create I/O looper for async socket '%s'", 1122 _async_socket_string(as)); 1123 client_cb(client_opaque, as, ASIO_STATE_FAILED); 1124 _async_socket_free(as); 1125 return NULL; 1126 } 1127 as->owns_looper = 1; 1128 } else { 1129 as->looper = looper; 1130 as->owns_looper = 0; 1131 } 1132 1133 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as); 1134 1135 T("ASocket %s: Descriptor is created.", _async_socket_string(as)); 1136 1137 return as; 1138 } 1139 1140 int 1141 async_socket_reference(AsyncSocket* as) 1142 { 1143 assert(as->ref_count > 0); 1144 as->ref_count++; 1145 return as->ref_count; 1146 } 1147 1148 int 1149 async_socket_release(AsyncSocket* as) 1150 { 1151 assert(as->ref_count > 0); 1152 as->ref_count--; 1153 if (as->ref_count == 0) { 1154 /* Last reference has been dropped. Destroy this object. */ 1155 _async_socket_cancel_all_io(as); 1156 _async_socket_free(as); 1157 return 0; 1158 } 1159 return as->ref_count; 1160 } 1161 1162 void 1163 async_socket_connect(AsyncSocket* as, int retry_to) 1164 { 1165 T("ASocket %s: Handling connection request for %dms...", 1166 _async_socket_string(as), retry_to); 1167 1168 AsyncSocketConnector* const connector = 1169 async_socket_connector_new(&as->address, retry_to, _on_connector_events, 1170 as, as->looper); 1171 if (connector != NULL) { 1172 async_socket_connector_connect(connector); 1173 } else { 1174 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); 1175 } 1176 } 1177 1178 void 1179 async_socket_disconnect(AsyncSocket* as) 1180 { 1181 T("ASocket %s: Handling disconnection request...", _async_socket_string(as)); 1182 1183 if (as != NULL) { 1184 _async_socket_cancel_all_io(as); 1185 _async_socket_close_socket(as); 1186 } 1187 } 1188 1189 void 1190 async_socket_reconnect(AsyncSocket* as, int retry_to) 1191 { 1192 T("ASocket %s: Handling reconnection request for %dms...", 1193 _async_socket_string(as), retry_to); 1194 1195 _async_socket_cancel_all_io(as); 1196 _async_socket_close_socket(as); 1197 _async_socket_reconnect(as, retry_to); 1198 } 1199 1200 void 1201 async_socket_read_abs(AsyncSocket* as, 1202 void* buffer, uint32_t len, 1203 on_as_io_cb reader_cb, 1204 void* reader_opaque, 1205 Duration deadline) 1206 { 1207 T("ASocket %s: Handling read for %d bytes with deadline %lld...", 1208 _async_socket_string(as), len, deadline); 1209 1210 AsyncSocketIO* const asr = 1211 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque, 1212 deadline); 1213 if (async_socket_is_connected(as)) { 1214 /* Add new reader to the list. Note that we use initial reference from I/O 1215 * 'new' routine as "in the list" reference counter. */ 1216 if (as->readers_head == NULL) { 1217 as->readers_head = as->readers_tail = asr; 1218 } else { 1219 as->readers_tail->next = asr; 1220 as->readers_tail = asr; 1221 } 1222 loopIo_wantRead(as->io); 1223 } else { 1224 D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as)); 1225 errno = ECONNRESET; 1226 reader_cb(reader_opaque, asr, ASIO_STATE_FAILED); 1227 async_socket_io_release(asr); 1228 } 1229 } 1230 1231 void 1232 async_socket_read_rel(AsyncSocket* as, 1233 void* buffer, uint32_t len, 1234 on_as_io_cb reader_cb, 1235 void* reader_opaque, 1236 int to) 1237 { 1238 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : 1239 DURATION_INFINITE; 1240 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl); 1241 } 1242 1243 void 1244 async_socket_write_abs(AsyncSocket* as, 1245 const void* buffer, uint32_t len, 1246 on_as_io_cb writer_cb, 1247 void* writer_opaque, 1248 Duration deadline) 1249 { 1250 T("ASocket %s: Handling write for %d bytes with deadline %lld...", 1251 _async_socket_string(as), len, deadline); 1252 1253 AsyncSocketIO* const asw = 1254 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque, 1255 deadline); 1256 if (async_socket_is_connected(as)) { 1257 /* Add new writer to the list. Note that we use initial reference from I/O 1258 * 'new' routine as "in the list" reference counter. */ 1259 if (as->writers_head == NULL) { 1260 as->writers_head = as->writers_tail = asw; 1261 } else { 1262 as->writers_tail->next = asw; 1263 as->writers_tail = asw; 1264 } 1265 loopIo_wantWrite(as->io); 1266 } else { 1267 D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as)); 1268 errno = ECONNRESET; 1269 writer_cb(writer_opaque, asw, ASIO_STATE_FAILED); 1270 async_socket_io_release(asw); 1271 } 1272 } 1273 1274 void 1275 async_socket_write_rel(AsyncSocket* as, 1276 const void* buffer, uint32_t len, 1277 on_as_io_cb writer_cb, 1278 void* writer_opaque, 1279 int to) 1280 { 1281 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : 1282 DURATION_INFINITE; 1283 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl); 1284 } 1285 1286 void* 1287 async_socket_get_client_opaque(const AsyncSocket* as) 1288 { 1289 return as->client_opaque; 1290 } 1291 1292 Duration 1293 async_socket_deadline(AsyncSocket* as, int rel) 1294 { 1295 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel : 1296 DURATION_INFINITE; 1297 } 1298 1299 int 1300 async_socket_get_port(const AsyncSocket* as) 1301 { 1302 return sock_address_get_port(&as->address); 1303 } 1304 1305 int 1306 async_socket_is_connected(const AsyncSocket* as) 1307 { 1308 return as->fd >= 0; 1309 } 1310