1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /* 18 * Encapsulates exchange protocol between the emulator, and an Android device 19 * that is connected to the host via USB. The communication is established over 20 * a TCP port forwarding, enabled by ADB. 21 */ 22 23 #include "qemu-common.h" 24 #include "android/async-utils.h" 25 #include "android/utils/debug.h" 26 #include "android/async-socket-connector.h" 27 #include "android/async-socket.h" 28 #include "utils/panic.h" 29 #include "iolooper.h" 30 31 #define E(...) derror(__VA_ARGS__) 32 #define W(...) dwarning(__VA_ARGS__) 33 #define D(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) 34 #define D_ACTIVE VERBOSE_CHECK(asyncsocket) 35 36 #define TRACE_ON 0 37 38 #if TRACE_ON 39 #define T(...) VERBOSE_PRINT(asyncsocket,__VA_ARGS__) 40 #else 41 #define T(...) 42 #endif 43 44 /******************************************************************************** 45 * Asynchronous Socket internal API declarations 46 *******************************************************************************/ 47 48 /* Gets socket's address string. */ 49 static const char* _async_socket_string(AsyncSocket* as); 50 51 /* Gets socket's looper. */ 52 static Looper* _async_socket_get_looper(AsyncSocket* as); 53 54 /* Handler for the I/O time out. 55 * Param: 56 * as - Asynchronous socket for the I/O. 57 * asio - Desciptor for the timed out I/O. 58 */ 59 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as, 60 AsyncSocketIO* asio); 61 62 /******************************************************************************** 63 * Asynchronous Socket Reader / Writer 64 *******************************************************************************/ 65 66 struct AsyncSocketIO { 67 /* Next I/O in the reader, or writer list. */ 68 AsyncSocketIO* next; 69 /* Asynchronous socket for this I/O. */ 70 AsyncSocket* as; 71 /* Timer used for time outs on this I/O. */ 72 LoopTimer timer[1]; 73 /* An opaque pointer associated with this I/O. */ 74 void* io_opaque; 75 /* Buffer where to read / write data. */ 76 uint8_t* buffer; 77 /* Bytes to transfer through the socket for this I/O. */ 78 uint32_t to_transfer; 79 /* Bytes thransferred through the socket in this I/O. */ 80 uint32_t transferred; 81 /* I/O callback for this I/O. */ 82 on_as_io_cb on_io; 83 /* I/O type selector: 1 - read, 0 - write. */ 84 int is_io_read; 85 /* State of the I/O. */ 86 AsyncIOState state; 87 /* Number of outstanding references to the I/O. */ 88 int ref_count; 89 /* Deadline for this I/O */ 90 Duration deadline; 91 }; 92 93 /* 94 * Recycling I/O instances. 95 * Since AsyncSocketIO instances are not that large, it makes sence to recycle 96 * them for faster allocation, rather than allocating and freeing them for each 97 * I/O on the socket. 98 */ 99 100 /* List of recycled I/O descriptors. */ 101 static AsyncSocketIO* _asio_recycled = NULL; 102 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */ 103 static int _recycled_asio_count = 0; 104 /* Maximum number of I/O descriptors that can be recycled. */ 105 static const int _max_recycled_asio_num = 32; 106 107 /* Handler for an I/O time-out timer event. 108 * When this routine is invoked, it indicates that a time out has occurred on an 109 * I/O. 110 * Param: 111 * opaque - AsyncSocketIO instance representing the timed out I/O. 112 */ 113 static void _on_async_socket_io_timed_out(void* opaque); 114 115 /* Creates new I/O descriptor. 116 * Param: 117 * as - Asynchronous socket for the I/O. 118 * is_io_read - I/O type selector: 1 - read, 0 - write. 119 * buffer, len - Reader / writer buffer address. 120 * io_cb - Callback for this reader / writer. 121 * io_opaque - An opaque pointer associated with the I/O. 122 * deadline - Deadline to complete the I/O. 123 * Return: 124 * Initialized AsyncSocketIO instance. 125 */ 126 static AsyncSocketIO* 127 _async_socket_rw_new(AsyncSocket* as, 128 int is_io_read, 129 void* buffer, 130 uint32_t len, 131 on_as_io_cb io_cb, 132 void* io_opaque, 133 Duration deadline) 134 { 135 /* Lookup in the recycler first. */ 136 AsyncSocketIO* asio = _asio_recycled; 137 if (asio != NULL) { 138 /* Pull the descriptor from recycler. */ 139 _asio_recycled = asio->next; 140 _recycled_asio_count--; 141 } else { 142 /* No recycled descriptors. Allocate new one. */ 143 ANEW0(asio); 144 } 145 146 asio->next = NULL; 147 asio->as = as; 148 asio->is_io_read = is_io_read; 149 asio->buffer = (uint8_t*)buffer; 150 asio->to_transfer = len; 151 asio->transferred = 0; 152 asio->on_io = io_cb; 153 asio->io_opaque = io_opaque; 154 asio->state = ASIO_STATE_QUEUED; 155 asio->ref_count = 1; 156 asio->deadline = deadline; 157 loopTimer_init(asio->timer, _async_socket_get_looper(as), 158 _on_async_socket_io_timed_out, asio); 159 loopTimer_startAbsolute(asio->timer, deadline); 160 161 /* Reference socket that is holding this I/O. */ 162 async_socket_reference(as); 163 164 T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data", 165 _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len); 166 167 return asio; 168 } 169 170 /* Destroys and frees I/O descriptor. */ 171 static void 172 _async_socket_io_free(AsyncSocketIO* asio) 173 { 174 AsyncSocket* const as = asio->as; 175 176 T("ASocket %s: %s I/O descriptor %p is destroyed.", 177 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 178 179 loopTimer_done(asio->timer); 180 181 /* Try to recycle it first, and free the memory if recycler is full. */ 182 if (_recycled_asio_count < _max_recycled_asio_num) { 183 asio->next = _asio_recycled; 184 _asio_recycled = asio; 185 _recycled_asio_count++; 186 } else { 187 AFREE(asio); 188 } 189 190 /* Release socket that is holding this I/O. */ 191 async_socket_release(as); 192 } 193 194 /* An I/O has been finished and its descriptor is about to be discarded. */ 195 static void 196 _async_socket_io_finished(AsyncSocketIO* asio) 197 { 198 /* Notify the client of the I/O that I/O is finished. */ 199 asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED); 200 } 201 202 int 203 async_socket_io_reference(AsyncSocketIO* asio) 204 { 205 assert(asio->ref_count > 0); 206 asio->ref_count++; 207 return asio->ref_count; 208 } 209 210 int 211 async_socket_io_release(AsyncSocketIO* asio) 212 { 213 assert(asio->ref_count > 0); 214 asio->ref_count--; 215 if (asio->ref_count == 0) { 216 _async_socket_io_finished(asio); 217 /* Last reference has been dropped. Destroy this object. */ 218 _async_socket_io_free(asio); 219 return 0; 220 } 221 return asio->ref_count; 222 } 223 224 /* Creates new asynchronous socket reader. 225 * Param: 226 * as - Asynchronous socket for the reader. 227 * buffer, len - Reader's buffer. 228 * io_cb - Reader's callback. 229 * reader_opaque - An opaque pointer associated with the reader. 230 * deadline - Deadline to complete the operation. 231 * Return: 232 * An initialized AsyncSocketIO intance. 233 */ 234 static AsyncSocketIO* 235 _async_socket_reader_new(AsyncSocket* as, 236 void* buffer, 237 uint32_t len, 238 on_as_io_cb io_cb, 239 void* reader_opaque, 240 Duration deadline) 241 { 242 AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb, 243 reader_opaque, deadline); 244 return asio; 245 } 246 247 /* Creates new asynchronous socket writer. 248 * Param: 249 * as - Asynchronous socket for the writer. 250 * buffer, len - Writer's buffer. 251 * io_cb - Writer's callback. 252 * writer_opaque - An opaque pointer associated with the writer. 253 * deadline - Deadline to complete the operation. 254 * Return: 255 * An initialized AsyncSocketIO intance. 256 */ 257 static AsyncSocketIO* 258 _async_socket_writer_new(AsyncSocket* as, 259 const void* buffer, 260 uint32_t len, 261 on_as_io_cb io_cb, 262 void* writer_opaque, 263 Duration deadline) 264 { 265 AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len, 266 io_cb, writer_opaque, 267 deadline); 268 return asio; 269 } 270 271 /* I/O timed out. */ 272 static void 273 _on_async_socket_io_timed_out(void* opaque) 274 { 275 AsyncSocketIO* const asio = (AsyncSocketIO*)opaque; 276 AsyncSocket* const as = asio->as; 277 278 D("ASocket %s: %s I/O with deadline %lld has timed out at %lld", 279 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", 280 asio->deadline, async_socket_deadline(as, 0)); 281 282 /* Reference while in callback. */ 283 async_socket_io_reference(asio); 284 _async_socket_io_timed_out(asio->as, asio); 285 async_socket_io_release(asio); 286 } 287 288 /******************************************************************************** 289 * Public Asynchronous Socket I/O API 290 *******************************************************************************/ 291 292 AsyncSocket* 293 async_socket_io_get_socket(const AsyncSocketIO* asio) 294 { 295 async_socket_reference(asio->as); 296 return asio->as; 297 } 298 299 void 300 async_socket_io_cancel_time_out(AsyncSocketIO* asio) 301 { 302 loopTimer_stop(asio->timer); 303 } 304 305 void* 306 async_socket_io_get_io_opaque(const AsyncSocketIO* asio) 307 { 308 return asio->io_opaque; 309 } 310 311 void* 312 async_socket_io_get_client_opaque(const AsyncSocketIO* asio) 313 { 314 return async_socket_get_client_opaque(asio->as); 315 } 316 317 void* 318 async_socket_io_get_buffer_info(const AsyncSocketIO* asio, 319 uint32_t* transferred, 320 uint32_t* to_transfer) 321 { 322 if (transferred != NULL) { 323 *transferred = asio->transferred; 324 } 325 if (to_transfer != NULL) { 326 *to_transfer = asio->to_transfer; 327 } 328 return asio->buffer; 329 } 330 331 void* 332 async_socket_io_get_buffer(const AsyncSocketIO* asio) 333 { 334 return asio->buffer; 335 } 336 337 uint32_t 338 async_socket_io_get_transferred(const AsyncSocketIO* asio) 339 { 340 return asio->transferred; 341 } 342 343 uint32_t 344 async_socket_io_get_to_transfer(const AsyncSocketIO* asio) 345 { 346 return asio->to_transfer; 347 } 348 349 int 350 async_socket_io_is_read(const AsyncSocketIO* asio) 351 { 352 return asio->is_io_read; 353 } 354 355 /******************************************************************************** 356 * Asynchronous Socket internals 357 *******************************************************************************/ 358 359 struct AsyncSocket { 360 /* TCP address for the socket. */ 361 SockAddress address; 362 /* Connection callback for this socket. */ 363 on_as_connection_cb on_connection; 364 /* An opaque pointer associated with this socket by the client. */ 365 void* client_opaque; 366 /* I/O looper for asynchronous I/O on the socket. */ 367 Looper* looper; 368 /* I/O descriptor for asynchronous I/O on the socket. */ 369 LoopIo io[1]; 370 /* Timer to use for reconnection attempts. */ 371 LoopTimer reconnect_timer[1]; 372 /* Head of the list of the active readers. */ 373 AsyncSocketIO* readers_head; 374 /* Tail of the list of the active readers. */ 375 AsyncSocketIO* readers_tail; 376 /* Head of the list of the active writers. */ 377 AsyncSocketIO* writers_head; 378 /* Tail of the list of the active writers. */ 379 AsyncSocketIO* writers_tail; 380 /* Socket's file descriptor. */ 381 int fd; 382 /* Timeout to use for reconnection attempts. */ 383 int reconnect_to; 384 /* Number of outstanding references to the socket. */ 385 int ref_count; 386 /* Flags whether (1) or not (0) socket owns the looper. */ 387 int owns_looper; 388 }; 389 390 static const char* 391 _async_socket_string(AsyncSocket* as) 392 { 393 return sock_address_to_string(&as->address); 394 } 395 396 static Looper* 397 _async_socket_get_looper(AsyncSocket* as) 398 { 399 return as->looper; 400 } 401 402 /* Pulls first reader out of the list. 403 * Param: 404 * as - Initialized AsyncSocket instance. 405 * Return: 406 * First I/O pulled out of the list, or NULL if there are no I/O in the list. 407 * Note that the caller is responsible for releasing the I/O object returned 408 * from this routine. 409 */ 410 static AsyncSocketIO* 411 _async_socket_pull_first_io(AsyncSocket* as, 412 AsyncSocketIO** list_head, 413 AsyncSocketIO** list_tail) 414 { 415 AsyncSocketIO* const ret = *list_head; 416 if (ret != NULL) { 417 *list_head = ret->next; 418 ret->next = NULL; 419 if (*list_head == NULL) { 420 *list_tail = NULL; 421 } 422 } 423 return ret; 424 } 425 426 /* Pulls first reader out of the list. 427 * Param: 428 * as - Initialized AsyncSocket instance. 429 * Return: 430 * First reader pulled out of the list, or NULL if there are no readers in the 431 * list. 432 * Note that the caller is responsible for releasing the I/O object returned 433 * from this routine. 434 */ 435 static AsyncSocketIO* 436 _async_socket_pull_first_reader(AsyncSocket* as) 437 { 438 return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail); 439 } 440 441 /* Pulls first writer out of the list. 442 * Param: 443 * as - Initialized AsyncSocket instance. 444 * Return: 445 * First writer pulled out of the list, or NULL if there are no writers in the 446 * list. 447 * Note that the caller is responsible for releasing the I/O object returned 448 * from this routine. 449 */ 450 static AsyncSocketIO* 451 _async_socket_pull_first_writer(AsyncSocket* as) 452 { 453 return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail); 454 } 455 456 /* Removes an I/O descriptor from a list of active I/O. 457 * Param: 458 * as - Initialized AsyncSocket instance. 459 * list_head, list_tail - Pointers to the list head and tail. 460 * io - I/O to remove. 461 * Return: 462 * Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list. 463 */ 464 static int 465 _async_socket_remove_io(AsyncSocket* as, 466 AsyncSocketIO** list_head, 467 AsyncSocketIO** list_tail, 468 AsyncSocketIO* io) 469 { 470 AsyncSocketIO* prev = NULL; 471 472 while (*list_head != NULL && io != *list_head) { 473 prev = *list_head; 474 list_head = &((*list_head)->next); 475 } 476 if (*list_head == NULL) { 477 D("%s: I/O %p is not found in the list for socket '%s'", 478 __FUNCTION__, io, _async_socket_string(as)); 479 return 0; 480 } 481 482 *list_head = io->next; 483 if (prev != NULL) { 484 prev->next = io->next; 485 } 486 if (*list_tail == io) { 487 *list_tail = prev; 488 } 489 490 /* Release I/O adjusting reference added when I/O has been saved in the list. */ 491 async_socket_io_release(io); 492 493 return 1; 494 } 495 496 /* Advances to the next I/O in the list. 497 * Param: 498 * as - Initialized AsyncSocket instance. 499 * list_head, list_tail - Pointers to the list head and tail. 500 */ 501 static void 502 _async_socket_advance_io(AsyncSocket* as, 503 AsyncSocketIO** list_head, 504 AsyncSocketIO** list_tail) 505 { 506 AsyncSocketIO* first_io = *list_head; 507 if (first_io != NULL) { 508 *list_head = first_io->next; 509 first_io->next = NULL; 510 } 511 if (*list_head == NULL) { 512 *list_tail = NULL; 513 } 514 if (first_io != NULL) { 515 /* Release I/O removed from the head of the list. */ 516 async_socket_io_release(first_io); 517 } 518 } 519 520 /* Advances to the next reader in the list. 521 * Param: 522 * as - Initialized AsyncSocket instance. 523 */ 524 static void 525 _async_socket_advance_reader(AsyncSocket* as) 526 { 527 _async_socket_advance_io(as, &as->readers_head, &as->readers_tail); 528 } 529 530 /* Advances to the next writer in the list. 531 * Param: 532 * as - Initialized AsyncSocket instance. 533 */ 534 static void 535 _async_socket_advance_writer(AsyncSocket* as) 536 { 537 _async_socket_advance_io(as, &as->writers_head, &as->writers_tail); 538 } 539 540 /* Completes an I/O. 541 * Param: 542 * as - Initialized AsyncSocket instance. 543 * asio - I/O to complete. 544 * Return: 545 * One of AsyncIOAction values. 546 */ 547 static AsyncIOAction 548 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio) 549 { 550 T("ASocket %s: %s I/O %p is completed.", 551 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 552 553 /* Stop the timer. */ 554 async_socket_io_cancel_time_out(asio); 555 556 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED); 557 } 558 559 /* Timeouts an I/O. 560 * Param: 561 * as - Initialized AsyncSocket instance. 562 * asio - An I/O that has timed out. 563 * Return: 564 * One of AsyncIOAction values. 565 */ 566 static AsyncIOAction 567 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio) 568 { 569 T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld", 570 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, 571 asio->deadline, async_socket_deadline(as, 0)); 572 573 /* Report to the client. */ 574 const AsyncIOAction action = asio->on_io(asio->io_opaque, asio, 575 ASIO_STATE_TIMED_OUT); 576 577 /* Remove the I/O from a list of active I/O for actions other than retry. */ 578 if (action != ASIO_ACTION_RETRY) { 579 if (asio->is_io_read) { 580 _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio); 581 } else { 582 _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio); 583 } 584 } 585 586 return action; 587 } 588 589 /* Cancels an I/O. 590 * Param: 591 * as - Initialized AsyncSocket instance. 592 * asio - An I/O to cancel. 593 * Return: 594 * One of AsyncIOAction values. 595 */ 596 static AsyncIOAction 597 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio) 598 { 599 T("ASocket %s: %s I/O %p is cancelled.", 600 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio); 601 602 /* Stop the timer. */ 603 async_socket_io_cancel_time_out(asio); 604 605 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED); 606 } 607 608 /* Reports an I/O failure. 609 * Param: 610 * as - Initialized AsyncSocket instance. 611 * asio - An I/O that has failed. Can be NULL for general failures. 612 * failure - Failure (errno) that has occurred. 613 * Return: 614 * One of AsyncIOAction values. 615 */ 616 static AsyncIOAction 617 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure) 618 { 619 T("ASocket %s: %s I/O %p has failed: %d -> %s", 620 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio, 621 failure, strerror(failure)); 622 623 /* Stop the timer. */ 624 async_socket_io_cancel_time_out(asio); 625 626 errno = failure; 627 return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED); 628 } 629 630 /* Cancels all the active socket readers. 631 * Param: 632 * as - Initialized AsyncSocket instance. 633 */ 634 static void 635 _async_socket_cancel_readers(AsyncSocket* as) 636 { 637 while (as->readers_head != NULL) { 638 AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as); 639 /* We ignore action returned from the cancellation callback, since we're 640 * in a disconnected state here. */ 641 _async_socket_cancel_io(as, to_cancel); 642 async_socket_io_release(to_cancel); 643 } 644 } 645 646 /* Cancels all the active socket writers. 647 * Param: 648 * as - Initialized AsyncSocket instance. 649 */ 650 static void 651 _async_socket_cancel_writers(AsyncSocket* as) 652 { 653 while (as->writers_head != NULL) { 654 AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as); 655 /* We ignore action returned from the cancellation callback, since we're 656 * in a disconnected state here. */ 657 _async_socket_cancel_io(as, to_cancel); 658 async_socket_io_release(to_cancel); 659 } 660 } 661 662 /* Cancels all the I/O on the socket. */ 663 static void 664 _async_socket_cancel_all_io(AsyncSocket* as) 665 { 666 /* Stop the reconnection timer. */ 667 loopTimer_stop(as->reconnect_timer); 668 669 /* Stop read / write on the socket. */ 670 loopIo_dontWantWrite(as->io); 671 loopIo_dontWantRead(as->io); 672 673 /* Cancel active readers and writers. */ 674 _async_socket_cancel_readers(as); 675 _async_socket_cancel_writers(as); 676 } 677 678 /* Closes socket handle used by the async socket. 679 * Param: 680 * as - Initialized AsyncSocket instance. 681 */ 682 static void 683 _async_socket_close_socket(AsyncSocket* as) 684 { 685 if (as->fd >= 0) { 686 loopIo_done(as->io); 687 socket_close(as->fd); 688 T("ASocket %s: Socket handle %d is closed.", 689 _async_socket_string(as), as->fd); 690 as->fd = -1; 691 } 692 } 693 694 /* Destroys AsyncSocket instance. 695 * Param: 696 * as - Initialized AsyncSocket instance. 697 */ 698 static void 699 _async_socket_free(AsyncSocket* as) 700 { 701 if (as != NULL) { 702 T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as)); 703 704 /* Close socket. */ 705 _async_socket_close_socket(as); 706 707 /* Free allocated resources. */ 708 if (as->looper != NULL) { 709 loopTimer_done(as->reconnect_timer); 710 if (as->owns_looper) { 711 looper_free(as->looper); 712 } 713 } 714 sock_address_done(&as->address); 715 AFREE(as); 716 } 717 } 718 719 /* Starts reconnection attempts after connection has been lost. 720 * Param: 721 * as - Initialized AsyncSocket instance. 722 * to - Milliseconds to wait before reconnection attempt. 723 */ 724 static void 725 _async_socket_reconnect(AsyncSocket* as, int to) 726 { 727 T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to); 728 729 /* Make sure that no I/O is active, and socket is closed before we 730 * reconnect. */ 731 _async_socket_cancel_all_io(as); 732 733 /* Set the timer for reconnection attempt. */ 734 loopTimer_startRelative(as->reconnect_timer, to); 735 } 736 737 /******************************************************************************** 738 * Asynchronous Socket callbacks 739 *******************************************************************************/ 740 741 /* A callback that is invoked when socket gets disconnected. 742 * Param: 743 * as - Initialized AsyncSocket instance. 744 */ 745 static void 746 _on_async_socket_disconnected(AsyncSocket* as) 747 { 748 /* Save error to restore it for the client's callback. */ 749 const int save_errno = errno; 750 AsyncIOAction action = ASIO_ACTION_ABORT; 751 752 D("ASocket %s: Disconnected.", _async_socket_string(as)); 753 754 /* Cancel all the I/O on this socket. */ 755 _async_socket_cancel_all_io(as); 756 757 /* Close the socket. */ 758 _async_socket_close_socket(as); 759 760 /* Restore errno, and invoke client's callback. */ 761 errno = save_errno; 762 action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); 763 764 if (action == ASIO_ACTION_RETRY) { 765 /* Client requested reconnection. */ 766 _async_socket_reconnect(as, as->reconnect_to); 767 } 768 } 769 770 /* A callback that is invoked on socket's I/O failure. 771 * Param: 772 * as - Initialized AsyncSocket instance. 773 * asio - Descriptor for the failed I/O. Can be NULL for general failures. 774 */ 775 static AsyncIOAction 776 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio) 777 { 778 D("ASocket %s: %s I/O failure: %d -> %s", 779 _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", 780 errno, strerror(errno)); 781 782 /* Report the failure. */ 783 return _async_socket_io_failure(as, asio, errno); 784 } 785 786 /* A callback that is invoked when there is data available to read. 787 * Param: 788 * as - Initialized AsyncSocket instance. 789 * Return: 790 * 0 on success, or -1 on failure. Failure returned from this routine will 791 * skip writes (if awailable) behind this read. 792 */ 793 static int 794 _on_async_socket_recv(AsyncSocket* as) 795 { 796 AsyncIOAction action; 797 798 /* Get current reader. */ 799 AsyncSocketIO* const asr = as->readers_head; 800 if (asr == NULL) { 801 D("ASocket %s: No reader is available.", _async_socket_string(as)); 802 loopIo_dontWantRead(as->io); 803 return 0; 804 } 805 806 /* Reference the reader while we're working with it in this callback. */ 807 async_socket_io_reference(asr); 808 809 /* Bump I/O state, and inform the client that I/O is in progress. */ 810 if (asr->state == ASIO_STATE_QUEUED) { 811 asr->state = ASIO_STATE_STARTED; 812 } else { 813 asr->state = ASIO_STATE_CONTINUES; 814 } 815 action = asr->on_io(asr->io_opaque, asr, asr->state); 816 if (action == ASIO_ACTION_ABORT) { 817 D("ASocket %s: Read is aborted by the client.", _async_socket_string(as)); 818 /* Move on to the next reader. */ 819 _async_socket_advance_reader(as); 820 /* Lets see if there are still active readers, and enable, or disable 821 * read I/O callback accordingly. */ 822 if (as->readers_head != NULL) { 823 loopIo_wantRead(as->io); 824 } else { 825 loopIo_dontWantRead(as->io); 826 } 827 async_socket_io_release(asr); 828 return 0; 829 } 830 831 /* Read next chunk of data. */ 832 int res = socket_recv(as->fd, asr->buffer + asr->transferred, 833 asr->to_transfer - asr->transferred); 834 while (res < 0 && errno == EINTR) { 835 res = socket_recv(as->fd, asr->buffer + asr->transferred, 836 asr->to_transfer - asr->transferred); 837 } 838 839 if (res == 0) { 840 /* Socket has been disconnected. */ 841 errno = ECONNRESET; 842 _on_async_socket_disconnected(as); 843 async_socket_io_release(asr); 844 return -1; 845 } 846 847 if (res < 0) { 848 if (errno == EWOULDBLOCK || errno == EAGAIN) { 849 /* Yield to writes behind this read. */ 850 loopIo_wantRead(as->io); 851 async_socket_io_release(asr); 852 return 0; 853 } 854 855 /* An I/O error. */ 856 action = _on_async_socket_failure(as, asr); 857 if (action != ASIO_ACTION_RETRY) { 858 D("ASocket %s: Read is aborted on failure.", _async_socket_string(as)); 859 /* Move on to the next reader. */ 860 _async_socket_advance_reader(as); 861 /* Lets see if there are still active readers, and enable, or disable 862 * read I/O callback accordingly. */ 863 if (as->readers_head != NULL) { 864 loopIo_wantRead(as->io); 865 } else { 866 loopIo_dontWantRead(as->io); 867 } 868 } 869 async_socket_io_release(asr); 870 return -1; 871 } 872 873 /* Update the reader's descriptor. */ 874 asr->transferred += res; 875 if (asr->transferred == asr->to_transfer) { 876 /* This read is completed. Move on to the next reader. */ 877 _async_socket_advance_reader(as); 878 879 /* Notify reader completion. */ 880 _async_socket_complete_io(as, asr); 881 } 882 883 /* Lets see if there are still active readers, and enable, or disable read 884 * I/O callback accordingly. */ 885 if (as->readers_head != NULL) { 886 loopIo_wantRead(as->io); 887 } else { 888 loopIo_dontWantRead(as->io); 889 } 890 891 async_socket_io_release(asr); 892 893 return 0; 894 } 895 896 /* A callback that is invoked when there is data available to write. 897 * Param: 898 * as - Initialized AsyncSocket instance. 899 * Return: 900 * 0 on success, or -1 on failure. Failure returned from this routine will 901 * skip reads (if awailable) behind this write. 902 */ 903 static int 904 _on_async_socket_send(AsyncSocket* as) 905 { 906 AsyncIOAction action; 907 908 /* Get current writer. */ 909 AsyncSocketIO* const asw = as->writers_head; 910 if (asw == NULL) { 911 D("ASocket %s: No writer is available.", _async_socket_string(as)); 912 loopIo_dontWantWrite(as->io); 913 return 0; 914 } 915 916 /* Reference the writer while we're working with it in this callback. */ 917 async_socket_io_reference(asw); 918 919 /* Bump I/O state, and inform the client that I/O is in progress. */ 920 if (asw->state == ASIO_STATE_QUEUED) { 921 asw->state = ASIO_STATE_STARTED; 922 } else { 923 asw->state = ASIO_STATE_CONTINUES; 924 } 925 action = asw->on_io(asw->io_opaque, asw, asw->state); 926 if (action == ASIO_ACTION_ABORT) { 927 D("ASocket %s: Write is aborted by the client.", _async_socket_string(as)); 928 /* Move on to the next writer. */ 929 _async_socket_advance_writer(as); 930 /* Lets see if there are still active writers, and enable, or disable 931 * write I/O callback accordingly. */ 932 if (as->writers_head != NULL) { 933 loopIo_wantWrite(as->io); 934 } else { 935 loopIo_dontWantWrite(as->io); 936 } 937 async_socket_io_release(asw); 938 return 0; 939 } 940 941 /* Write next chunk of data. */ 942 int res = socket_send(as->fd, asw->buffer + asw->transferred, 943 asw->to_transfer - asw->transferred); 944 while (res < 0 && errno == EINTR) { 945 res = socket_send(as->fd, asw->buffer + asw->transferred, 946 asw->to_transfer - asw->transferred); 947 } 948 949 if (res == 0) { 950 /* Socket has been disconnected. */ 951 errno = ECONNRESET; 952 _on_async_socket_disconnected(as); 953 async_socket_io_release(asw); 954 return -1; 955 } 956 957 if (res < 0) { 958 if (errno == EWOULDBLOCK || errno == EAGAIN) { 959 /* Yield to reads behind this write. */ 960 loopIo_wantWrite(as->io); 961 async_socket_io_release(asw); 962 return 0; 963 } 964 965 /* An I/O error. */ 966 action = _on_async_socket_failure(as, asw); 967 if (action != ASIO_ACTION_RETRY) { 968 D("ASocket %s: Write is aborted on failure.", _async_socket_string(as)); 969 /* Move on to the next writer. */ 970 _async_socket_advance_writer(as); 971 /* Lets see if there are still active writers, and enable, or disable 972 * write I/O callback accordingly. */ 973 if (as->writers_head != NULL) { 974 loopIo_wantWrite(as->io); 975 } else { 976 loopIo_dontWantWrite(as->io); 977 } 978 } 979 async_socket_io_release(asw); 980 return -1; 981 } 982 983 /* Update the writer descriptor. */ 984 asw->transferred += res; 985 if (asw->transferred == asw->to_transfer) { 986 /* This write is completed. Move on to the next writer. */ 987 _async_socket_advance_writer(as); 988 989 /* Notify writer completion. */ 990 _async_socket_complete_io(as, asw); 991 } 992 993 /* Lets see if there are still active writers, and enable, or disable write 994 * I/O callback accordingly. */ 995 if (as->writers_head != NULL) { 996 loopIo_wantWrite(as->io); 997 } else { 998 loopIo_dontWantWrite(as->io); 999 } 1000 1001 async_socket_io_release(asw); 1002 1003 return 0; 1004 } 1005 1006 /* A callback that is invoked when an I/O is available on socket. 1007 * Param: 1008 * as - Initialized AsyncSocket instance. 1009 * fd - Socket's file descriptor. 1010 * events - LOOP_IO_READ | LOOP_IO_WRITE bitmask. 1011 */ 1012 static void 1013 _on_async_socket_io(void* opaque, int fd, unsigned events) 1014 { 1015 AsyncSocket* const as = (AsyncSocket*)opaque; 1016 1017 /* Reference the socket while we're working with it in this callback. */ 1018 async_socket_reference(as); 1019 1020 if ((events & LOOP_IO_READ) != 0) { 1021 if (_on_async_socket_recv(as) != 0) { 1022 async_socket_release(as); 1023 return; 1024 } 1025 } 1026 1027 if ((events & LOOP_IO_WRITE) != 0) { 1028 if (_on_async_socket_send(as) != 0) { 1029 async_socket_release(as); 1030 return; 1031 } 1032 } 1033 1034 async_socket_release(as); 1035 } 1036 1037 /* A callback that is invoked by asynchronous socket connector on connection 1038 * events. 1039 * Param: 1040 * opaque - Initialized AsyncSocket instance. 1041 * connector - Connector that is used to connect this socket. 1042 * event - Connection event. 1043 * Return: 1044 * One of AsyncIOAction values. 1045 */ 1046 static AsyncIOAction 1047 _on_connector_events(void* opaque, 1048 AsyncSocketConnector* connector, 1049 AsyncIOState event) 1050 { 1051 AsyncIOAction action; 1052 AsyncSocket* const as = (AsyncSocket*)opaque; 1053 1054 /* Reference the socket while we're working with it in this callback. */ 1055 async_socket_reference(as); 1056 1057 if (event == ASIO_STATE_SUCCEEDED) { 1058 /* Accept the connection. */ 1059 as->fd = async_socket_connector_pull_fd(connector); 1060 loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as); 1061 } 1062 1063 /* Invoke client's callback. */ 1064 action = as->on_connection(as->client_opaque, as, event); 1065 if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) { 1066 /* For whatever reason the client didn't want to keep this connection. 1067 * Close it. */ 1068 D("ASocket %s: Connection is discarded by the client.", 1069 _async_socket_string(as)); 1070 _async_socket_close_socket(as); 1071 } 1072 1073 if (action != ASIO_ACTION_RETRY) { 1074 async_socket_connector_release(connector); 1075 } 1076 1077 async_socket_release(as); 1078 1079 return action; 1080 } 1081 1082 /* Timer callback invoked to reconnect the lost connection. 1083 * Param: 1084 * as - Initialized AsyncSocket instance. 1085 */ 1086 void 1087 _on_async_socket_reconnect(void* opaque) 1088 { 1089 AsyncSocket* as = (AsyncSocket*)opaque; 1090 1091 /* Reference the socket while we're working with it in this callback. */ 1092 async_socket_reference(as); 1093 async_socket_connect(as, as->reconnect_to); 1094 async_socket_release(as); 1095 } 1096 1097 1098 /******************************************************************************** 1099 * Android Device Socket public API 1100 *******************************************************************************/ 1101 1102 AsyncSocket* 1103 async_socket_new(int port, 1104 int reconnect_to, 1105 on_as_connection_cb client_cb, 1106 void* client_opaque, 1107 Looper* looper) 1108 { 1109 AsyncSocket* as; 1110 1111 if (client_cb == NULL) { 1112 E("Invalid client_cb parameter"); 1113 return NULL; 1114 } 1115 1116 ANEW0(as); 1117 1118 as->fd = -1; 1119 as->client_opaque = client_opaque; 1120 as->on_connection = client_cb; 1121 as->readers_head = as->readers_tail = NULL; 1122 as->reconnect_to = reconnect_to; 1123 as->ref_count = 1; 1124 sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port); 1125 if (looper == NULL) { 1126 as->looper = looper_newCore(); 1127 if (as->looper == NULL) { 1128 E("Unable to create I/O looper for async socket '%s'", 1129 _async_socket_string(as)); 1130 client_cb(client_opaque, as, ASIO_STATE_FAILED); 1131 _async_socket_free(as); 1132 return NULL; 1133 } 1134 as->owns_looper = 1; 1135 } else { 1136 as->looper = looper; 1137 as->owns_looper = 0; 1138 } 1139 1140 loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as); 1141 1142 T("ASocket %s: Descriptor is created.", _async_socket_string(as)); 1143 1144 return as; 1145 } 1146 1147 int 1148 async_socket_reference(AsyncSocket* as) 1149 { 1150 assert(as->ref_count > 0); 1151 as->ref_count++; 1152 return as->ref_count; 1153 } 1154 1155 int 1156 async_socket_release(AsyncSocket* as) 1157 { 1158 assert(as->ref_count > 0); 1159 as->ref_count--; 1160 if (as->ref_count == 0) { 1161 /* Last reference has been dropped. Destroy this object. */ 1162 _async_socket_cancel_all_io(as); 1163 _async_socket_free(as); 1164 return 0; 1165 } 1166 return as->ref_count; 1167 } 1168 1169 void 1170 async_socket_connect(AsyncSocket* as, int retry_to) 1171 { 1172 T("ASocket %s: Handling connection request for %dms...", 1173 _async_socket_string(as), retry_to); 1174 1175 AsyncSocketConnector* const connector = 1176 async_socket_connector_new(&as->address, retry_to, _on_connector_events, 1177 as, as->looper); 1178 if (connector != NULL) { 1179 async_socket_connector_connect(connector); 1180 } else { 1181 as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED); 1182 } 1183 } 1184 1185 void 1186 async_socket_disconnect(AsyncSocket* as) 1187 { 1188 T("ASocket %s: Handling disconnection request...", _async_socket_string(as)); 1189 1190 if (as != NULL) { 1191 _async_socket_cancel_all_io(as); 1192 _async_socket_close_socket(as); 1193 } 1194 } 1195 1196 void 1197 async_socket_reconnect(AsyncSocket* as, int retry_to) 1198 { 1199 T("ASocket %s: Handling reconnection request for %dms...", 1200 _async_socket_string(as), retry_to); 1201 1202 _async_socket_cancel_all_io(as); 1203 _async_socket_close_socket(as); 1204 _async_socket_reconnect(as, retry_to); 1205 } 1206 1207 void 1208 async_socket_read_abs(AsyncSocket* as, 1209 void* buffer, uint32_t len, 1210 on_as_io_cb reader_cb, 1211 void* reader_opaque, 1212 Duration deadline) 1213 { 1214 T("ASocket %s: Handling read for %d bytes with deadline %lld...", 1215 _async_socket_string(as), len, deadline); 1216 1217 AsyncSocketIO* const asr = 1218 _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque, 1219 deadline); 1220 /* Add new reader to the list. Note that we use initial reference from I/O 1221 * 'new' routine as "in the list" reference counter. */ 1222 if (as->readers_head == NULL) { 1223 as->readers_head = as->readers_tail = asr; 1224 } else { 1225 as->readers_tail->next = asr; 1226 as->readers_tail = asr; 1227 } 1228 loopIo_wantRead(as->io); 1229 } 1230 1231 void 1232 async_socket_read_rel(AsyncSocket* as, 1233 void* buffer, uint32_t len, 1234 on_as_io_cb reader_cb, 1235 void* reader_opaque, 1236 int to) 1237 { 1238 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : 1239 DURATION_INFINITE; 1240 async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl); 1241 } 1242 1243 void 1244 async_socket_write_abs(AsyncSocket* as, 1245 const void* buffer, uint32_t len, 1246 on_as_io_cb writer_cb, 1247 void* writer_opaque, 1248 Duration deadline) 1249 { 1250 T("ASocket %s: Handling write for %d bytes with deadline %lld...", 1251 _async_socket_string(as), len, deadline); 1252 1253 AsyncSocketIO* const asw = 1254 _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque, 1255 deadline); 1256 /* Add new writer to the list. Note that we use initial reference from I/O 1257 * 'new' routine as "in the list" reference counter. */ 1258 if (as->writers_head == NULL) { 1259 as->writers_head = as->writers_tail = asw; 1260 } else { 1261 as->writers_tail->next = asw; 1262 as->writers_tail = asw; 1263 } 1264 loopIo_wantWrite(as->io); 1265 } 1266 1267 void 1268 async_socket_write_rel(AsyncSocket* as, 1269 const void* buffer, uint32_t len, 1270 on_as_io_cb writer_cb, 1271 void* writer_opaque, 1272 int to) 1273 { 1274 const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to : 1275 DURATION_INFINITE; 1276 async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl); 1277 } 1278 1279 void* 1280 async_socket_get_client_opaque(const AsyncSocket* as) 1281 { 1282 return as->client_opaque; 1283 } 1284 1285 Duration 1286 async_socket_deadline(AsyncSocket* as, int rel) 1287 { 1288 return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel : 1289 DURATION_INFINITE; 1290 } 1291 1292 int 1293 async_socket_get_port(const AsyncSocket* as) 1294 { 1295 return sock_address_get_port(&as->address); 1296 } 1297