1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /* 18 * Encapsulates exchange protocol between the emulator, and an Android device 19 * that is connected to the host via USB. The communication is established over 20 * a TCP port forwarding, enabled by ADB. 21 */ 22 23 #include "android/utils/debug.h" 24 #include "android/async-socket-connector.h" 25 #include "android/async-socket.h" 26 #include "utils/panic.h" 27 #include "iolooper.h" 28 29 #define E(...) derror(__VA_ARGS__) 30 #define W(...) dwarning(__VA_ARGS__) 31 #define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) 32 #define D_ACTIVE VERBOSE_CHECK(asyncsocket) 33 34 #define TRACE_ON 0 35 36 #if TRACE_ON 37 #define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) 38 #else 39 #define T(...) 40 #endif 41 42 /******************************************************************************** 43 * Asynchronous Socket internal API declarations 44 *******************************************************************************/ 45 46 /* Gets socket's address string. */ 47 static const char* _async_socket_string(AsyncSocket* as); 48 49 /* Gets socket's looper. */ 50 static Looper* _async_socket_get_looper(AsyncSocket* as); 51 52 /* Handler for the I/O time out. 53 * Param: 54 * as - Asynchronous socket for the I/O. 55 * asio - Desciptor for the timed out I/O. 56 */ 57 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as, 58 AsyncSocketIO* asio); 59 60 /******************************************************************************** 61 * Asynchronous Socket Reader / Writer 62 *******************************************************************************/ 63 64 struct AsyncSocketIO { 65 /* Next I/O in the reader, or writer list. */ 66 AsyncSocketIO* next; 67 /* Asynchronous socket for this I/O. */ 68 AsyncSocket* as; 69 /* Timer used for time outs on this I/O. */ 70 LoopTimer timer[1]; 71 /* An opaque pointer associated with this I/O. */ 72 void* io_opaque; 73 /* Buffer where to read / write data. */ 74 uint8_t* buffer; 75 /* Bytes to transfer through the socket for this I/O. */ 76 uint32_t to_transfer; 77 /* Bytes thransferred through the socket in this I/O. */ 78 uint32_t transferred; 79 /* I/O callback for this I/O. */ 80 on_as_io_cb on_io; 81 /* I/O type selector: 1 - read, 0 - write. */ 82 int is_io_read; 83 /* State of the I/O. */ 84 AsyncIOState state; 85 /* Number of outstanding references to the I/O. */ 86 int ref_count; 87 /* Deadline for this I/O */ 88 Duration deadline; 89 }; 90 91 /* 92 * Recycling I/O instances. 93 * Since AsyncSocketIO instances are not that large, it makes sence to recycle 94 * them for faster allocation, rather than allocating and freeing them for each 95 * I/O on the socket. 96 */ 97 98 /* List of recycled I/O descriptors. */ 99 static AsyncSocketIO* _asio_recycled = NULL; 100 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */ 101 static int _recycled_asio_count = 0; 102 /* Maximum number of I/O descriptors that can be recycled. */ 103 static const int _max_recycled_asio_num = 32; 104 105 /* Handler for an I/O time-out timer event. 106 * When this routine is invoked, it indicates that a time out has occurred on an 107 * I/O. 108 * Param: 109 * opaque - AsyncSocketIO instance representing the timed out I/O. 110 */ 111 static void _on_async_socket_io_timed_out(void* opaque); 112 113 /* Creates new I/O descriptor. 114 * Param: 115 * as - Asynchronous socket for the I/O. 116 * is_io_read - I/O type selector: 1 - read, 0 - write. 117 * buffer, len - Reader / writer buffer address. 118 * io_cb - Callback for this reader / writer. 119 * io_opaque - An opaque pointer associated with the I/O. 120 * deadline - Deadline to complete the I/O. 121 * Return: 122 * Initialized AsyncSocketIO instance. 123 */ 124 static AsyncSocketIO* 125 _async_socket_rw_new(AsyncSocket* as, 126 int is_io_read, 127 void* buffer, 128 uint32_t len, 129 on_as_io_cb io_cb, 130 void* io_opaque, 131 Duration deadline) 132 { 133 /* Lookup in the recycler first. */ 134 AsyncSocketIO* asio = _asio_recycled; 135 if (asio != NULL) { 136 /* Pull the descriptor from recycler. */ 137 _asio_recycled = asio->next; 138 _recycled_asio_count--; 139 } else { 140 /* No recycled descriptors. Allocate new one. */ 141 ANEW0(asio); 142 } 143 144 asio->next = NULL; 145 asio->as = as; 146 asio->is_io_read = is_io_read; 147 asio->buffer = (uint8_t*)buffer; 148 asio->to_transfer = len; 149 asio->transferred = 0; 150 asio->on_io = io_cb; 151 asio->io_opaque = io_opaque; 152 asio->state = ASIO_STATE_QUEUED; 153 asio->ref_count = 1; 154 asio->deadline = deadline; 155 loopTimer_init(asio->timer, _async_socket_get_looper(as), 156 _on_async_socket_io_timed_out, asio); 157 loopTimer_startAbsolute(asio->timer, deadline); 158 159 /* Reference socket that is holding this I/O. */ 160 async_socket_reference(as); 161 162 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data", 163 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len); 164 165 return asio; 166 } 167 168 /* Destroys and frees I/O descriptor. */ 169 static void 170 _async_socket_io_free(AsyncSocketIO* asio) 171 { 172 AsyncSocket* const as = asio->as; 173 174 T("ASocket %s: %s I/O descriptor %p is destroyed.", 175 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 176 177 loopTimer_done(asio->timer); 178 179 /* Try to recycle it first, and free the memory if recycler is full. */ 180 if (_recycled_asio_count < _max_recycled_asio_num) { 181 asio->next = _asio_recycled; 182 _asio_recycled = asio; 183 _recycled_asio_count++; 184 } else { 185 AFREE(asio); 186 } 187 188 /* Release socket that is holding this I/O. */ 189 async_socket_release(as); 190 } 191 192 /* An I/O has been finished and its descriptor is about to be discarded. */ 193 static void 194 _async_socket_io_finished(AsyncSocketIO* asio) 195 { 196 /* Notify the client of the I/O that I/O is finished. */ 197 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED); 198 } 199 200 int 201 async_socket_io_reference(AsyncSocketIO* asio) 202 { 203 assert(asio->ref_count > 0); 204 asio->ref_count++; 205 return asio->ref_count; 206 } 207 208 int 209 async_socket_io_release(AsyncSocketIO* asio) 210 { 211 assert(asio->ref_count > 0); 212 asio->ref_count--; 213 if (asio->ref_count == 0) { 214 _async_socket_io_finished(asio); 215 /* Last reference has been dropped. Destroy this object. */ 216 _async_socket_io_free(asio); 217 return 0; 218 } 219 return asio->ref_count; 220 } 221 222 /* Creates new asynchronous socket reader. 223 * Param: 224 * as - Asynchronous socket for the reader. 225 * buffer, len - Reader's buffer. 226 * io_cb - Reader's callback. 227 * reader_opaque - An opaque pointer associated with the reader. 228 * deadline - Deadline to complete the operation. 229 * Return: 230 * An initialized AsyncSocketIO intance. 231 */ 232 static AsyncSocketIO* 233 _async_socket_reader_new(AsyncSocket* as, 234 void* buffer, 235 uint32_t len, 236 on_as_io_cb io_cb, 237 void* reader_opaque, 238 Duration deadline) 239 { 240 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb, 241 reader_opaque, deadline); 242 return asio; 243 } 244 245 /* Creates new asynchronous socket writer. 246 * Param: 247 * as - Asynchronous socket for the writer. 248 * buffer, len - Writer's buffer. 249 * io_cb - Writer's callback. 250 * writer_opaque - An opaque pointer associated with the writer. 251 * deadline - Deadline to complete the operation. 252 * Return: 253 * An initialized AsyncSocketIO intance. 254 */ 255 static AsyncSocketIO* 256 _async_socket_writer_new(AsyncSocket* as, 257 const void* buffer, 258 uint32_t len, 259 on_as_io_cb io_cb, 260 void* writer_opaque, 261 Duration deadline) 262 { 263 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len, 264 io_cb, writer_opaque, 265 deadline); 266 return asio; 267 } 268 269 /* I/O timed out. */ 270 static void 271 _on_async_socket_io_timed_out(void* opaque) 272 { 273 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque; 274 AsyncSocket* const as = asio->as; 275 276 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld", 277 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", 278 asio->deadline, async_socket_deadline(as, 0)); 279 280 /* Reference while in callback. */ 281 async_socket_io_reference(asio); 282 _async_socket_io_timed_out(asio->as, asio); 283 async_socket_io_release(asio); 284 } 285 286 /******************************************************************************** 287 * Public Asynchronous Socket I/O API 288 *******************************************************************************/ 289 290 AsyncSocket* 291 async_socket_io_get_socket(const AsyncSocketIO* asio) 292 { 293 async_socket_reference(asio->as); 294 return asio->as; 295 } 296 297 void 298 async_socket_io_cancel_time_out(AsyncSocketIO* asio) 299 { 300 loopTimer_stop(asio->timer); 301 } 302 303 void* 304 async_socket_io_get_io_opaque(const AsyncSocketIO* asio) 305 { 306 return asio->io_opaque; 307 } 308 309 void* 310 async_socket_io_get_client_opaque(const AsyncSocketIO* asio) 311 { 312 return async_socket_get_client_opaque(asio->as); 313 } 314 315 void* 316 async_socket_io_get_buffer_info(const AsyncSocketIO* asio, 317 uint32_t* transferred, 318 uint32_t* to_transfer) 319 { 320 if (transferred != NULL) { 321 *transferred = asio->transferred; 322 } 323 if (to_transfer != NULL) { 324 *to_transfer = asio->to_transfer; 325 } 326 return asio->buffer; 327 } 328 329 void* 330 async_socket_io_get_buffer(const AsyncSocketIO* asio) 331 { 332 return asio->buffer; 333 } 334 335 uint32_t 336 async_socket_io_get_transferred(const AsyncSocketIO* asio) 337 { 338 return asio->transferred; 339 } 340 341 uint32_t 342 async_socket_io_get_to_transfer(const AsyncSocketIO* asio) 343 { 344 return asio->to_transfer; 345 } 346 347 int 348 async_socket_io_is_read(const AsyncSocketIO* asio) 349 { 350 return asio->is_io_read; 351 } 352 353 /******************************************************************************** 354 * Asynchronous Socket internals 355 *******************************************************************************/ 356 357 struct AsyncSocket { 358 /* TCP address for the socket. */ 359 SockAddress address; 360 /* Connection callback for this socket. */ 361 on_as_connection_cb on_connection; 362 /* An opaque pointer associated with this socket by the client. */ 363 void* client_opaque; 364 /* I/O looper for asynchronous I/O on the socket. */ 365 Looper* looper; 366 /* I/O descriptor for asynchronous I/O on the socket. */ 367 LoopIo io[1]; 368 /* Timer to use for reconnection attempts. */ 369 LoopTimer reconnect_timer[1]; 370 /* Head of the list of the active readers. */ 371 AsyncSocketIO* readers_head; 372 /* Tail of the list of the active readers. */ 373 AsyncSocketIO* readers_tail; 374 /* Head of the list of the active writers. */ 375 AsyncSocketIO* writers_head; 376 /* Tail of the list of the active writers. */ 377 AsyncSocketIO* writers_tail; 378 /* Socket's file descriptor. */ 379 int fd; 380 /* Timeout to use for reconnection attempts. */ 381 int reconnect_to; 382 /* Number of outstanding references to the socket. */ 383 int ref_count; 384 /* Flags whether (1) or not (0) socket owns the looper. */ 385 int owns_looper; 386 }; 387 388 static const char* 389 _async_socket_string(AsyncSocket* as) 390 { 391 return sock_address_to_string(&as->address); 392 } 393 394 static Looper* 395 _async_socket_get_looper(AsyncSocket* as) 396 { 397 return as->looper; 398 } 399 400 /* Pulls first reader out of the list. 401 * Param: 402 * as - Initialized AsyncSocket instance. 403 * Return: 404 * First I/O pulled out of the list, or NULL if there are no I/O in the list. 405 * Note that the caller is responsible for releasing the I/O object returned 406 * from this routine. 407 */ 408 static AsyncSocketIO* 409 _async_socket_pull_first_io(AsyncSocket* as, 410 AsyncSocketIO** list_head, 411 AsyncSocketIO** list_tail) 412 { 413 AsyncSocketIO* const ret = *list_head; 414 if (ret != NULL) { 415 *list_head = ret->next; 416 ret->next = NULL; 417 if (*list_head == NULL) { 418 *list_tail = NULL; 419 } 420 } 421 return ret; 422 } 423 424 /* Pulls first reader out of the list. 425 * Param: 426 * as - Initialized AsyncSocket instance. 427 * Return: 428 * First reader pulled out of the list, or NULL if there are no readers in the 429 * list. 430 * Note that the caller is responsible for releasing the I/O object returned 431 * from this routine. 432 */ 433 static AsyncSocketIO* 434 _async_socket_pull_first_reader(AsyncSocket* as) 435 { 436 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail); 437 } 438 439 /* Pulls first writer out of the list. 440 * Param: 441 * as - Initialized AsyncSocket instance. 442 * Return: 443 * First writer pulled out of the list, or NULL if there are no writers in the 444 * list. 445 * Note that the caller is responsible for releasing the I/O object returned 446 * from this routine. 447 */ 448 static AsyncSocketIO* 449 _async_socket_pull_first_writer(AsyncSocket* as) 450 { 451 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail); 452 } 453 454 /* Removes an I/O descriptor from a list of active I/O. 455 * Param: 456 * as - Initialized AsyncSocket instance. 457 * list_head, list_tail - Pointers to the list head and tail. 458 * io - I/O to remove. 459 * Return: 460 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list. 461 */ 462 static int 463 _async_socket_remove_io(AsyncSocket* as, 464 AsyncSocketIO** list_head, 465 AsyncSocketIO** list_tail, 466 AsyncSocketIO* io) 467 { 468 AsyncSocketIO* prev = NULL; 469 470 while (*list_head != NULL && io != *list_head) { 471 prev = *list_head; 472 list_head = &((*list_head)->next); 473 } 474 if (*list_head == NULL) { 475 D("%s: I/O %p is not found in the list for socket '%s'", 476 __FUNCTION__, io, _async_socket_string(as)); 477 return 0; 478 } 479 480 *list_head = io->next; 481 if (prev != NULL) { 482 prev->next = io->next; 483 } 484 if (*list_tail == io) { 485 *list_tail = prev; 486 } 487 488 /* Release I/O adjusting reference added when I/O has been saved in the list. */ 489 async_socket_io_release(io); 490 491 return 1; 492 } 493 494 /* Advances to the next I/O in the list. 495 * Param: 496 * as - Initialized AsyncSocket instance. 497 * list_head, list_tail - Pointers to the list head and tail. 498 */ 499 static void 500 _async_socket_advance_io(AsyncSocket* as, 501 AsyncSocketIO** list_head, 502 AsyncSocketIO** list_tail) 503 { 504 AsyncSocketIO* first_io = *list_head; 505 if (first_io != NULL) { 506 *list_head = first_io->next; 507 first_io->next = NULL; 508 } 509 if (*list_head == NULL) { 510 *list_tail = NULL; 511 } 512 if (first_io != NULL) { 513 /* Release I/O removed from the head of the list. */ 514 async_socket_io_release(first_io); 515 } 516 } 517 518 /* Advances to the next reader in the list. 519 * Param: 520 * as - Initialized AsyncSocket instance. 521 */ 522 static void 523 _async_socket_advance_reader(AsyncSocket* as) 524 { 525 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail); 526 } 527 528 /* Advances to the next writer in the list. 529 * Param: 530 * as - Initialized AsyncSocket instance. 531 */ 532 static void 533 _async_socket_advance_writer(AsyncSocket* as) 534 { 535 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail); 536 } 537 538 /* Completes an I/O. 539 * Param: 540 * as - Initialized AsyncSocket instance. 541 * asio - I/O to complete. 542 * Return: 543 * One of AsyncIOAction values. 544 */ 545 static AsyncIOAction 546 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio) 547 { 548 T("ASocket %s: %s I/O %p is completed.", 549 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 550 551 /* Stop the timer. */ 552 async_socket_io_cancel_time_out(asio); 553 554 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED); 555 } 556 557 /* Timeouts an I/O. 558 * Param: 559 * as - Initialized AsyncSocket instance. 560 * asio - An I/O that has timed out. 561 * Return: 562 * One of AsyncIOAction values. 563 */ 564 static AsyncIOAction 565 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio) 566 { 567 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld", 568 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, 569 asio->deadline, async_socket_deadline(as, 0)); 570 571 /* Report to the client. */ 572 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio, 573 ASIO_STATE_TIMED_OUT); 574 575 /* Remove the I/O from a list of active I/O for actions other than retry. */ 576 if (action != ASIO_ACTION_RETRY) { 577 if (asio->is_io_read) { 578 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio); 579 } else { 580 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio); 581 } 582 } 583 584 return action; 585 } 586 587 /* Cancels an I/O. 588 * Param: 589 * as - Initialized AsyncSocket instance. 590 * asio - An I/O to cancel. 591 * Return: 592 * One of AsyncIOAction values. 593 */ 594 static AsyncIOAction 595 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio) 596 { 597 T("ASocket %s: %s I/O %p is cancelled.", 598 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 599 600 /* Stop the timer. */ 601 async_socket_io_cancel_time_out(asio); 602 603 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED); 604 } 605 606 /* Reports an I/O failure. 607 * Param: 608 * as - Initialized AsyncSocket instance. 609 * asio - An I/O that has failed. Can be NULL for general failures. 610 * failure - Failure (errno) that has occurred. 611 * Return: 612 * One of AsyncIOAction values. 613 */ 614 static AsyncIOAction 615 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure) 616 { 617 T("ASocket %s: %s I/O %p has failed: %d -> %s", 618 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, 619 failure, strerror(failure)); 620 621 /* Stop the timer. */ 622 async_socket_io_cancel_time_out(asio); 623 624 errno = failure; 625 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED); 626 } 627 628 /* Cancels all the active socket readers. 629 * Param: 630 * as - Initialized AsyncSocket instance. 631 */ 632 static void 633 _async_socket_cancel_readers(AsyncSocket* as) 634 { 635 while (as->readers_head != NULL) { 636 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as); 637 /* We ignore action returned from the cancellation callback, since we're 638 * in a disconnected state here. */ 639 _async_socket_cancel_io(as, to_cancel); 640 async_socket_io_release(to_cancel); 641 } 642 } 643 644 /* Cancels all the active socket writers. 645 * Param: 646 * as - Initialized AsyncSocket instance. 647 */ 648 static void 649 _async_socket_cancel_writers(AsyncSocket* as) 650 { 651 while (as->writers_head != NULL) { 652 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as); 653 /* We ignore action returned from the cancellation callback, since we're 654 * in a disconnected state here. */ 655 _async_socket_cancel_io(as, to_cancel); 656 async_socket_io_release(to_cancel); 657 } 658 } 659 660 /* Cancels all the I/O on the socket. */ 661 static void 662 _async_socket_cancel_all_io(AsyncSocket* as) 663 { 664 /* Stop the reconnection timer. */ 665 loopTimer_stop(as->reconnect_timer); 666 667 /* Stop read / write on the socket. */ 668 loopIo_dontWantWrite(as->io); 669 loopIo_dontWantRead(as->io); 670 671 /* Cancel active readers and writers. */ 672 _async_socket_cancel_readers(as); 673 _async_socket_cancel_writers(as); 674 } 675 676 /* Closes socket handle used by the async socket. 677 * Param: 678 * as - Initialized AsyncSocket instance. 679 */ 680 static void 681 _async_socket_close_socket(AsyncSocket* as) 682 { 683 if (as->fd >= 0) { 684 T("ASocket %s: Socket handle %d is closed.", 685 _async_socket_string(as), as->fd); 686 loopIo_done(as->io); 687 socket_close(as->fd); 688 as->fd = -1; 689 } 690 } 691 692 /* Destroys AsyncSocket instance. 693 * Param: 694 * as - Initialized AsyncSocket instance. 695 */ 696 static void 697 _async_socket_free(AsyncSocket* as) 698 { 699 if (as != NULL) { 700 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as)); 701 702 /* Close socket. */ 703 _async_socket_close_socket(as); 704 705 /* Free allocated resources. */ 706 if (as->looper != NULL) { 707 loopTimer_done(as->reconnect_timer); 708 if (as->owns_looper) { 709 looper_free(as->looper); 710 } 711 } 712 sock_address_done(&as->address); 713 AFREE(as); 714 } 715 } 716 717 /* Starts reconnection attempts after connection has been lost. 718 * Param: 719 * as - Initialized AsyncSocket instance. 720 * to - Milliseconds to wait before reconnection attempt. 721 */ 722 static void 723 _async_socket_reconnect(AsyncSocket* as, int to) 724 { 725 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to); 726 727 /* Make sure that no I/O is active, and socket is closed before we 728 * reconnect. */ 729 _async_socket_cancel_all_io(as); 730 731 /* Set the timer for reconnection attempt. */ 732 loopTimer_startRelative(as->reconnect_timer, to); 733 } 734 735 /******************************************************************************** 736 * Asynchronous Socket callbacks 737 *******************************************************************************/ 738 739 /* A callback that is invoked when socket gets disconnected. 740 * Param: 741 * as - Initialized AsyncSocket instance. 742 */ 743 static void 744 _on_async_socket_disconnected(AsyncSocket* as) 745 { 746 /* Save error to restore it for the client's callback. */ 747 const int save_errno = errno; 748 AsyncIOAction action = ASIO_ACTION_ABORT; 749 750 D("ASocket %s: Disconnected.", _async_socket_string(as)); 751 752 /* Cancel all the I/O on this socket. */ 753 _async_socket_cancel_all_io(as); 754 755 /* Close the socket. */ 756 _async_socket_close_socket(as); 757 758 /* Restore errno, and invoke client's callback. */ 759 errno = save_errno; 760 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); 761 762 if (action == ASIO_ACTION_RETRY) { 763 /* Client requested reconnection. */ 764 _async_socket_reconnect(as, as->reconnect_to); 765 } 766 } 767 768 /* A callback that is invoked on socket's I/O failure. 769 * Param: 770 * as - Initialized AsyncSocket instance. 771 * asio - Descriptor for the failed I/O. Can be NULL for general failures. 772 */ 773 static AsyncIOAction 774 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio) 775 { 776 D("ASocket %s: %s I/O failure: %d -> %s", 777 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", 778 errno, strerror(errno)); 779 780 /* Report the failure. */ 781 return _async_socket_io_failure(as, asio, errno); 782 } 783 784 /* A callback that is invoked when there is data available to read. 785 * Param: 786 * as - Initialized AsyncSocket instance. 787 * Return: 788 * 0 on success, or -1 on failure. Failure returned from this routine will 789 * skip writes (if awailable) behind this read. 790 */ 791 static int 792 _on_async_socket_recv(AsyncSocket* as) 793 { 794 AsyncIOAction action; 795 796 /* Get current reader. */ 797 AsyncSocketIO* const asr = as->readers_head; 798 if (asr == NULL) { 799 D("ASocket %s: No reader is available.", _async_socket_string(as)); 800 loopIo_dontWantRead(as->io); 801 return 0; 802 } 803 804 /* Reference the reader while we're working with it in this callback. */ 805 async_socket_io_reference(asr); 806 807 /* Bump I/O state, and inform the client that I/O is in progress. */ 808 if (asr->state == ASIO_STATE_QUEUED) { 809 asr->state = ASIO_STATE_STARTED; 810 } else { 811 asr->state = ASIO_STATE_CONTINUES; 812 } 813 action = asr->on_io(asr->io_opaque, asr, asr->state); 814 if (action == ASIO_ACTION_ABORT) { 815 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as)); 816 /* Move on to the next reader. */ 817 _async_socket_advance_reader(as); 818 /* Lets see if there are still active readers, and enable, or disable 819 * read I/O callback accordingly. */ 820 if (as->readers_head != NULL) { 821 loopIo_wantRead(as->io); 822 } else { 823 loopIo_dontWantRead(as->io); 824 } 825 async_socket_io_release(asr); 826 return 0; 827 } 828 829 /* Read next chunk of data. */ 830 int res = socket_recv(as->fd, asr->buffer + asr->transferred, 831 asr->to_transfer - asr->transferred); 832 while (res < 0 && errno == EINTR) { 833 res = socket_recv(as->fd, asr->buffer + asr->transferred, 834 asr->to_transfer - asr->transferred); 835 } 836 837 if (res == 0) { 838 /* Socket has been disconnected. */ 839 errno = ECONNRESET; 840 _on_async_socket_disconnected(as); 841 async_socket_io_release(asr); 842 return -1; 843 } 844 845 if (res < 0) { 846 if (errno == EWOULDBLOCK || errno == EAGAIN) { 847 /* Yield to writes behind this read. */ 848 loopIo_wantRead(as->io); 849 async_socket_io_release(asr); 850 return 0; 851 } 852 853 /* An I/O error. */ 854 action = _on_async_socket_failure(as, asr); 855 if (action != ASIO_ACTION_RETRY) { 856 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as)); 857 /* Move on to the next reader. */ 858 _async_socket_advance_reader(as); 859 /* Lets see if there are still active readers, and enable, or disable 860 * read I/O callback accordingly. */ 861 if (as->readers_head != NULL) { 862 loopIo_wantRead(as->io); 863 } else { 864 loopIo_dontWantRead(as->io); 865 } 866 } 867 async_socket_io_release(asr); 868 return -1; 869 } 870 871 /* Update the reader's descriptor. */ 872 asr->transferred += res; 873 if (asr->transferred == asr->to_transfer) { 874 /* This read is completed. Move on to the next reader. */ 875 _async_socket_advance_reader(as); 876 877 /* Notify reader completion. */ 878 _async_socket_complete_io(as, asr); 879 } 880 881 /* Lets see if there are still active readers, and enable, or disable read 882 * I/O callback accordingly. */ 883 if (as->readers_head != NULL) { 884 loopIo_wantRead(as->io); 885 } else { 886 loopIo_dontWantRead(as->io); 887 } 888 889 async_socket_io_release(asr); 890 891 return 0; 892 } 893 894 /* A callback that is invoked when there is data available to write. 895 * Param: 896 * as - Initialized AsyncSocket instance. 897 * Return: 898 * 0 on success, or -1 on failure. Failure returned from this routine will 899 * skip reads (if awailable) behind this write. 900 */ 901 static int 902 _on_async_socket_send(AsyncSocket* as) 903 { 904 AsyncIOAction action; 905 906 /* Get current writer. */ 907 AsyncSocketIO* const asw = as->writers_head; 908 if (asw == NULL) { 909 D("ASocket %s: No writer is available.", _async_socket_string(as)); 910 loopIo_dontWantWrite(as->io); 911 return 0; 912 } 913 914 /* Reference the writer while we're working with it in this callback. */ 915 async_socket_io_reference(asw); 916 917 /* Bump I/O state, and inform the client that I/O is in progress. */ 918 if (asw->state == ASIO_STATE_QUEUED) { 919 asw->state = ASIO_STATE_STARTED; 920 } else { 921 asw->state = ASIO_STATE_CONTINUES; 922 } 923 action = asw->on_io(asw->io_opaque, asw, asw->state); 924 if (action == ASIO_ACTION_ABORT) { 925 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as)); 926 /* Move on to the next writer. */ 927 _async_socket_advance_writer(as); 928 /* Lets see if there are still active writers, and enable, or disable 929 * write I/O callback accordingly. */ 930 if (as->writers_head != NULL) { 931 loopIo_wantWrite(as->io); 932 } else { 933 loopIo_dontWantWrite(as->io); 934 } 935 async_socket_io_release(asw); 936 return 0; 937 } 938 939 /* Write next chunk of data. */ 940 int res = socket_send(as->fd, asw->buffer + asw->transferred, 941 asw->to_transfer - asw->transferred); 942 while (res < 0 && errno == EINTR) { 943 res = socket_send(as->fd, asw->buffer + asw->transferred, 944 asw->to_transfer - asw->transferred); 945 } 946 947 if (res == 0) { 948 /* Socket has been disconnected. */ 949 errno = ECONNRESET; 950 _on_async_socket_disconnected(as); 951 async_socket_io_release(asw); 952 return -1; 953 } 954 955 if (res < 0) { 956 if (errno == EWOULDBLOCK || errno == EAGAIN) { 957 /* Yield to reads behind this write. */ 958 loopIo_wantWrite(as->io); 959 async_socket_io_release(asw); 960 return 0; 961 } 962 963 /* An I/O error. */ 964 action = _on_async_socket_failure(as, asw); 965 if (action != ASIO_ACTION_RETRY) { 966 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as)); 967 /* Move on to the next writer. */ 968 _async_socket_advance_writer(as); 969 /* Lets see if there are still active writers, and enable, or disable 970 * write I/O callback accordingly. */ 971 if (as->writers_head != NULL) { 972 loopIo_wantWrite(as->io); 973 } else { 974 loopIo_dontWantWrite(as->io); 975 } 976 } 977 async_socket_io_release(asw); 978 return -1; 979 } 980 981 /* Update the writer descriptor. */ 982 asw->transferred += res; 983 if (asw->transferred == asw->to_transfer) { 984 /* This write is completed. Move on to the next writer. */ 985 _async_socket_advance_writer(as); 986 987 /* Notify writer completion. */ 988 _async_socket_complete_io(as, asw); 989 } 990 991 /* Lets see if there are still active writers, and enable, or disable write 992 * I/O callback accordingly. */ 993 if (as->writers_head != NULL) { 994 loopIo_wantWrite(as->io); 995 } else { 996 loopIo_dontWantWrite(as->io); 997 } 998 999 async_socket_io_release(asw); 1000 1001 return 0; 1002 } 1003 1004 /* A callback that is invoked when an I/O is available on socket. 1005 * Param: 1006 * as - Initialized AsyncSocket instance. 1007 * fd - Socket's file descriptor. 1008 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask. 1009 */ 1010 static void 1011 _on_async_socket_io(void* opaque, int fd, unsigned events) 1012 { 1013 AsyncSocket* const as = (AsyncSocket*)opaque; 1014 1015 /* Reference the socket while we're working with it in this callback. */ 1016 async_socket_reference(as); 1017 1018 if ((events & LOOP_IO_READ) != 0) { 1019 if (_on_async_socket_recv(as) != 0) { 1020 async_socket_release(as); 1021 return; 1022 } 1023 } 1024 1025 if ((events & LOOP_IO_WRITE) != 0) { 1026 if (_on_async_socket_send(as) != 0) { 1027 async_socket_release(as); 1028 return; 1029 } 1030 } 1031 1032 async_socket_release(as); 1033 } 1034 1035 /* A callback that is invoked by asynchronous socket connector on connection 1036 * events. 1037 * Param: 1038 * opaque - Initialized AsyncSocket instance. 1039 * connector - Connector that is used to connect this socket. 1040 * event - Connection event. 1041 * Return: 1042 * One of AsyncIOAction values. 1043 */ 1044 static AsyncIOAction 1045 _on_connector_events(void* opaque, 1046 AsyncSocketConnector* connector, 1047 AsyncIOState event) 1048 { 1049 AsyncIOAction action; 1050 AsyncSocket* const as = (AsyncSocket*)opaque; 1051 1052 /* Reference the socket while we're working with it in this callback. */ 1053 async_socket_reference(as); 1054 1055 if (event == ASIO_STATE_SUCCEEDED) { 1056 /* Accept the connection. */ 1057 as->fd = async_socket_connector_pull_fd(connector); 1058 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as); 1059 } 1060 1061 /* Invoke client's callback. */ 1062 action = as->on_connection(as->client_opaque, as, event); 1063 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) { 1064 /* For whatever reason the client didn't want to keep this connection. 1065 * Close it. */ 1066 D("ASocket %s: Connection is discarded by the client.", 1067 _async_socket_string(as)); 1068 _async_socket_close_socket(as); 1069 } 1070 1071 if (action != ASIO_ACTION_RETRY) { 1072 async_socket_connector_release(connector); 1073 } 1074 1075 async_socket_release(as); 1076 1077 return action; 1078 } 1079 1080 /* Timer callback invoked to reconnect the lost connection. 1081 * Param: 1082 * as - Initialized AsyncSocket instance. 1083 */ 1084 void 1085 _on_async_socket_reconnect(void* opaque) 1086 { 1087 AsyncSocket* as = (AsyncSocket*)opaque; 1088 1089 /* Reference the socket while we're working with it in this callback. */ 1090 async_socket_reference(as); 1091 async_socket_connect(as, as->reconnect_to); 1092 async_socket_release(as); 1093 } 1094 1095 1096 /******************************************************************************** 1097 * Android Device Socket public API 1098 *******************************************************************************/ 1099 1100 AsyncSocket* 1101 async_socket_new(int port, 1102 int reconnect_to, 1103 on_as_connection_cb client_cb, 1104 void* client_opaque, 1105 Looper* looper) 1106 { 1107 AsyncSocket* as; 1108 1109 if (client_cb == NULL) { 1110 E("Invalid client_cb parameter"); 1111 return NULL; 1112 } 1113 1114 ANEW0(as); 1115 1116 as->fd = -1; 1117 as->client_opaque = client_opaque; 1118 as->on_connection = client_cb; 1119 as->readers_head = as->readers_tail = NULL; 1120 as->reconnect_to = reconnect_to; 1121 as->ref_count = 1; 1122 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port); 1123 if (looper == NULL) { 1124 as->looper = looper_newCore(); 1125 if (as->looper == NULL) { 1126 E("Unable to create I/O looper for async socket '%s'", 1127 _async_socket_string(as)); 1128 client_cb(client_opaque, as, ASIO_STATE_FAILED); 1129 _async_socket_free(as); 1130 return NULL; 1131 } 1132 as->owns_looper = 1; 1133 } else { 1134 as->looper = looper; 1135 as->owns_looper = 0; 1136 } 1137 1138 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as); 1139 1140 T("ASocket %s: Descriptor is created.", _async_socket_string(as)); 1141 1142 return as; 1143 } 1144 1145 int 1146 async_socket_reference(AsyncSocket* as) 1147 { 1148 assert(as->ref_count > 0); 1149 as->ref_count++; 1150 return as->ref_count; 1151 } 1152 1153 int 1154 async_socket_release(AsyncSocket* as) 1155 { 1156 assert(as->ref_count > 0); 1157 as->ref_count--; 1158 if (as->ref_count == 0) { 1159 /* Last reference has been dropped. Destroy this object. */ 1160 _async_socket_cancel_all_io(as); 1161 _async_socket_free(as); 1162 return 0; 1163 } 1164 return as->ref_count; 1165 } 1166 1167 void 1168 async_socket_connect(AsyncSocket* as, int retry_to) 1169 { 1170 T("ASocket %s: Handling connection request for %dms...", 1171 _async_socket_string(as), retry_to); 1172 1173 AsyncSocketConnector* const connector = 1174 async_socket_connector_new(&as->address, retry_to, _on_connector_events, 1175 as, as->looper); 1176 if (connector != NULL) { 1177 async_socket_connector_connect(connector); 1178 } else { 1179 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); 1180 } 1181 } 1182 1183 void 1184 async_socket_disconnect(AsyncSocket* as) 1185 { 1186 T("ASocket %s: Handling disconnection request...", _async_socket_string(as)); 1187 1188 if (as != NULL) { 1189 _async_socket_cancel_all_io(as); 1190 _async_socket_close_socket(as); 1191 } 1192 } 1193 1194 void 1195 async_socket_reconnect(AsyncSocket* as, int retry_to) 1196 { 1197 T("ASocket %s: Handling reconnection request for %dms...", 1198 _async_socket_string(as), retry_to); 1199 1200 _async_socket_cancel_all_io(as); 1201 _async_socket_close_socket(as); 1202 _async_socket_reconnect(as, retry_to); 1203 } 1204 1205 void 1206 async_socket_read_abs(AsyncSocket* as, 1207 void* buffer, uint32_t len, 1208 on_as_io_cb reader_cb, 1209 void* reader_opaque, 1210 Duration deadline) 1211 { 1212 T("ASocket %s: Handling read for %d bytes with deadline %lld...", 1213 _async_socket_string(as), len, deadline); 1214 1215 AsyncSocketIO* const asr = 1216 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque, 1217 deadline); 1218 if (async_socket_is_connected(as)) { 1219 /* Add new reader to the list. Note that we use initial reference from I/O 1220 * 'new' routine as "in the list" reference counter. */ 1221 if (as->readers_head == NULL) { 1222 as->readers_head = as->readers_tail = asr; 1223 } else { 1224 as->readers_tail->next = asr; 1225 as->readers_tail = asr; 1226 } 1227 loopIo_wantRead(as->io); 1228 } else { 1229 D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as)); 1230 errno = ECONNRESET; 1231 reader_cb(reader_opaque, asr, ASIO_STATE_FAILED); 1232 async_socket_io_release(asr); 1233 } 1234 } 1235 1236 void 1237 async_socket_read_rel(AsyncSocket* as, 1238 void* buffer, uint32_t len, 1239 on_as_io_cb reader_cb, 1240 void* reader_opaque, 1241 int to) 1242 { 1243 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : 1244 DURATION_INFINITE; 1245 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl); 1246 } 1247 1248 void 1249 async_socket_write_abs(AsyncSocket* as, 1250 const void* buffer, uint32_t len, 1251 on_as_io_cb writer_cb, 1252 void* writer_opaque, 1253 Duration deadline) 1254 { 1255 T("ASocket %s: Handling write for %d bytes with deadline %lld...", 1256 _async_socket_string(as), len, deadline); 1257 1258 AsyncSocketIO* const asw = 1259 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque, 1260 deadline); 1261 if (async_socket_is_connected(as)) { 1262 /* Add new writer to the list. Note that we use initial reference from I/O 1263 * 'new' routine as "in the list" reference counter. */ 1264 if (as->writers_head == NULL) { 1265 as->writers_head = as->writers_tail = asw; 1266 } else { 1267 as->writers_tail->next = asw; 1268 as->writers_tail = asw; 1269 } 1270 loopIo_wantWrite(as->io); 1271 } else { 1272 D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as)); 1273 errno = ECONNRESET; 1274 writer_cb(writer_opaque, asw, ASIO_STATE_FAILED); 1275 async_socket_io_release(asw); 1276 } 1277 } 1278 1279 void 1280 async_socket_write_rel(AsyncSocket* as, 1281 const void* buffer, uint32_t len, 1282 on_as_io_cb writer_cb, 1283 void* writer_opaque, 1284 int to) 1285 { 1286 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : 1287 DURATION_INFINITE; 1288 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl); 1289 } 1290 1291 void* 1292 async_socket_get_client_opaque(const AsyncSocket* as) 1293 { 1294 return as->client_opaque; 1295 } 1296 1297 Duration 1298 async_socket_deadline(AsyncSocket* as, int rel) 1299 { 1300 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel : 1301 DURATION_INFINITE; 1302 } 1303 1304 int 1305 async_socket_get_port(const AsyncSocket* as) 1306 { 1307 return sock_address_get_port(&as->address); 1308 } 1309 1310 int 1311 async_socket_is_connected(const AsyncSocket* as) 1312 { 1313 return as->fd >= 0; 1314 } 1315