Home | History | Annotate | Download | only in android
      1 /*
      2  * Copyright (C) 2012 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 /*
     18  * Encapsulates exchange protocol between the emulator, and an Android device
     19  * that is connected to the host via USB. The communication is established over
     20  * a TCP port forwarding, enabled by ADB.
     21  */
     22 
     23 #include "android/utils/debug.h"
     24 #include "android/async-socket-connector.h"
     25 #include "android/async-socket.h"
     26 #include "utils/panic.h"
     27 #include "iolooper.h"
     28 
     29 #define  E(...)    derror(__VA_ARGS__)
     30 #define  W(...)    dwarning(__VA_ARGS__)
     31 #define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
     32 #define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
     33 
     34 #define TRACE_ON    0
     35 
     36 #if TRACE_ON
     37 #define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
     38 #else
     39 #define  T(...)
     40 #endif
     41 
     42 /********************************************************************************
     43  *                  Asynchronous Socket internal API declarations
     44  *******************************************************************************/
     45 
     46 /* Gets socket's address string. */
     47 static const char* _async_socket_string(AsyncSocket* as);
     48 
     49 /* Gets socket's looper. */
     50 static Looper* _async_socket_get_looper(AsyncSocket* as);
     51 
     52 /* Handler for the I/O time out.
     53  * Param:
     54  *  as - Asynchronous socket for the I/O.
     55  *  asio - Desciptor for the timed out I/O.
     56  */
     57 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
     58                                                 AsyncSocketIO* asio);
     59 
     60 /********************************************************************************
     61  *                  Asynchronous Socket Reader / Writer
     62  *******************************************************************************/
     63 
     64 struct AsyncSocketIO {
     65     /* Next I/O in the reader, or writer list. */
     66     AsyncSocketIO*      next;
     67     /* Asynchronous socket for this I/O. */
     68     AsyncSocket*        as;
     69     /* Timer used for time outs on this I/O. */
     70     LoopTimer           timer[1];
     71     /* An opaque pointer associated with this I/O. */
     72     void*               io_opaque;
     73     /* Buffer where to read / write data. */
     74     uint8_t*            buffer;
     75     /* Bytes to transfer through the socket for this I/O. */
     76     uint32_t            to_transfer;
     77     /* Bytes thransferred through the socket in this I/O. */
     78     uint32_t            transferred;
     79     /* I/O callback for this I/O. */
     80     on_as_io_cb         on_io;
     81     /* I/O type selector: 1 - read, 0 - write. */
     82     int                 is_io_read;
     83     /* State of the I/O. */
     84     AsyncIOState        state;
     85     /* Number of outstanding references to the I/O. */
     86     int                 ref_count;
     87     /* Deadline for this I/O */
     88     Duration            deadline;
     89 };
     90 
     91 /*
     92  * Recycling I/O instances.
     93  * Since AsyncSocketIO instances are not that large, it makes sence to recycle
     94  * them for faster allocation, rather than allocating and freeing them for each
     95  * I/O on the socket.
     96  */
     97 
     98 /* List of recycled I/O descriptors. */
     99 static AsyncSocketIO* _asio_recycled    = NULL;
    100 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
    101 static int _recycled_asio_count         = 0;
    102 /* Maximum number of I/O descriptors that can be recycled. */
    103 static const int _max_recycled_asio_num = 32;
    104 
    105 /* Handler for an I/O time-out timer event.
    106  * When this routine is invoked, it indicates that a time out has occurred on an
    107  * I/O.
    108  * Param:
    109  *  opaque - AsyncSocketIO instance representing the timed out I/O.
    110  */
    111 static void _on_async_socket_io_timed_out(void* opaque);
    112 
    113 /* Creates new I/O descriptor.
    114  * Param:
    115  *  as - Asynchronous socket for the I/O.
    116  *  is_io_read - I/O type selector: 1 - read, 0 - write.
    117  *  buffer, len - Reader / writer buffer address.
    118  *  io_cb - Callback for this reader / writer.
    119  *  io_opaque - An opaque pointer associated with the I/O.
    120  *  deadline - Deadline to complete the I/O.
    121  * Return:
    122  *  Initialized AsyncSocketIO instance.
    123  */
    124 static AsyncSocketIO*
    125 _async_socket_rw_new(AsyncSocket* as,
    126                      int is_io_read,
    127                      void* buffer,
    128                      uint32_t len,
    129                      on_as_io_cb io_cb,
    130                      void* io_opaque,
    131                      Duration deadline)
    132 {
    133     /* Lookup in the recycler first. */
    134     AsyncSocketIO* asio = _asio_recycled;
    135     if (asio != NULL) {
    136         /* Pull the descriptor from recycler. */
    137         _asio_recycled = asio->next;
    138         _recycled_asio_count--;
    139     } else {
    140         /* No recycled descriptors. Allocate new one. */
    141         ANEW0(asio);
    142     }
    143 
    144     asio->next          = NULL;
    145     asio->as            = as;
    146     asio->is_io_read    = is_io_read;
    147     asio->buffer        = (uint8_t*)buffer;
    148     asio->to_transfer   = len;
    149     asio->transferred   = 0;
    150     asio->on_io         = io_cb;
    151     asio->io_opaque     = io_opaque;
    152     asio->state         = ASIO_STATE_QUEUED;
    153     asio->ref_count     = 1;
    154     asio->deadline      = deadline;
    155     loopTimer_init(asio->timer, _async_socket_get_looper(as),
    156                    _on_async_socket_io_timed_out, asio);
    157     loopTimer_startAbsolute(asio->timer, deadline);
    158 
    159     /* Reference socket that is holding this I/O. */
    160     async_socket_reference(as);
    161 
    162     T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
    163       _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
    164 
    165     return asio;
    166 }
    167 
    168 /* Destroys and frees I/O descriptor. */
    169 static void
    170 _async_socket_io_free(AsyncSocketIO* asio)
    171 {
    172     AsyncSocket* const as = asio->as;
    173 
    174     T("ASocket %s: %s I/O descriptor %p is destroyed.",
    175       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    176 
    177     loopTimer_done(asio->timer);
    178 
    179     /* Try to recycle it first, and free the memory if recycler is full. */
    180     if (_recycled_asio_count < _max_recycled_asio_num) {
    181         asio->next = _asio_recycled;
    182         _asio_recycled = asio;
    183         _recycled_asio_count++;
    184     } else {
    185         AFREE(asio);
    186     }
    187 
    188     /* Release socket that is holding this I/O. */
    189     async_socket_release(as);
    190 }
    191 
    192 /* An I/O has been finished and its descriptor is about to be discarded. */
    193 static void
    194 _async_socket_io_finished(AsyncSocketIO* asio)
    195 {
    196     /* Notify the client of the I/O that I/O is finished. */
    197     asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
    198 }
    199 
    200 int
    201 async_socket_io_reference(AsyncSocketIO* asio)
    202 {
    203     assert(asio->ref_count > 0);
    204     asio->ref_count++;
    205     return asio->ref_count;
    206 }
    207 
    208 int
    209 async_socket_io_release(AsyncSocketIO* asio)
    210 {
    211     assert(asio->ref_count > 0);
    212     asio->ref_count--;
    213     if (asio->ref_count == 0) {
    214         _async_socket_io_finished(asio);
    215         /* Last reference has been dropped. Destroy this object. */
    216         _async_socket_io_free(asio);
    217         return 0;
    218     }
    219     return asio->ref_count;
    220 }
    221 
    222 /* Creates new asynchronous socket reader.
    223  * Param:
    224  *  as - Asynchronous socket for the reader.
    225  *  buffer, len - Reader's buffer.
    226  *  io_cb - Reader's callback.
    227  *  reader_opaque - An opaque pointer associated with the reader.
    228  *  deadline - Deadline to complete the operation.
    229  * Return:
    230  *  An initialized AsyncSocketIO intance.
    231  */
    232 static AsyncSocketIO*
    233 _async_socket_reader_new(AsyncSocket* as,
    234                          void* buffer,
    235                          uint32_t len,
    236                          on_as_io_cb io_cb,
    237                          void* reader_opaque,
    238                          Duration deadline)
    239 {
    240     AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
    241                                                      reader_opaque, deadline);
    242     return asio;
    243 }
    244 
    245 /* Creates new asynchronous socket writer.
    246  * Param:
    247  *  as - Asynchronous socket for the writer.
    248  *  buffer, len - Writer's buffer.
    249  *  io_cb - Writer's callback.
    250  *  writer_opaque - An opaque pointer associated with the writer.
    251  *  deadline - Deadline to complete the operation.
    252  * Return:
    253  *  An initialized AsyncSocketIO intance.
    254  */
    255 static AsyncSocketIO*
    256 _async_socket_writer_new(AsyncSocket* as,
    257                          const void* buffer,
    258                          uint32_t len,
    259                          on_as_io_cb io_cb,
    260                          void* writer_opaque,
    261                          Duration deadline)
    262 {
    263     AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
    264                                                      io_cb, writer_opaque,
    265                                                      deadline);
    266     return asio;
    267 }
    268 
    269 /* I/O timed out. */
    270 static void
    271 _on_async_socket_io_timed_out(void* opaque)
    272 {
    273     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
    274     AsyncSocket* const as = asio->as;
    275 
    276     D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
    277       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
    278       asio->deadline, async_socket_deadline(as, 0));
    279 
    280     /* Reference while in callback. */
    281     async_socket_io_reference(asio);
    282     _async_socket_io_timed_out(asio->as, asio);
    283     async_socket_io_release(asio);
    284 }
    285 
    286 /********************************************************************************
    287  *                 Public Asynchronous Socket I/O API
    288  *******************************************************************************/
    289 
    290 AsyncSocket*
    291 async_socket_io_get_socket(const AsyncSocketIO* asio)
    292 {
    293     async_socket_reference(asio->as);
    294     return asio->as;
    295 }
    296 
    297 void
    298 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
    299 {
    300     loopTimer_stop(asio->timer);
    301 }
    302 
    303 void*
    304 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
    305 {
    306     return asio->io_opaque;
    307 }
    308 
    309 void*
    310 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
    311 {
    312     return async_socket_get_client_opaque(asio->as);
    313 }
    314 
    315 void*
    316 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
    317                                 uint32_t* transferred,
    318                                 uint32_t* to_transfer)
    319 {
    320     if (transferred != NULL) {
    321         *transferred = asio->transferred;
    322     }
    323     if (to_transfer != NULL) {
    324         *to_transfer = asio->to_transfer;
    325     }
    326     return asio->buffer;
    327 }
    328 
    329 void*
    330 async_socket_io_get_buffer(const AsyncSocketIO* asio)
    331 {
    332     return asio->buffer;
    333 }
    334 
    335 uint32_t
    336 async_socket_io_get_transferred(const AsyncSocketIO* asio)
    337 {
    338     return asio->transferred;
    339 }
    340 
    341 uint32_t
    342 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
    343 {
    344     return asio->to_transfer;
    345 }
    346 
    347 int
    348 async_socket_io_is_read(const AsyncSocketIO* asio)
    349 {
    350     return asio->is_io_read;
    351 }
    352 
    353 /********************************************************************************
    354  *                      Asynchronous Socket internals
    355  *******************************************************************************/
    356 
    357 struct AsyncSocket {
    358     /* TCP address for the socket. */
    359     SockAddress         address;
    360     /* Connection callback for this socket. */
    361     on_as_connection_cb on_connection;
    362     /* An opaque pointer associated with this socket by the client. */
    363     void*               client_opaque;
    364     /* I/O looper for asynchronous I/O on the socket. */
    365     Looper*             looper;
    366     /* I/O descriptor for asynchronous I/O on the socket. */
    367     LoopIo              io[1];
    368     /* Timer to use for reconnection attempts. */
    369     LoopTimer           reconnect_timer[1];
    370     /* Head of the list of the active readers. */
    371     AsyncSocketIO*      readers_head;
    372     /* Tail of the list of the active readers. */
    373     AsyncSocketIO*      readers_tail;
    374     /* Head of the list of the active writers. */
    375     AsyncSocketIO*      writers_head;
    376     /* Tail of the list of the active writers. */
    377     AsyncSocketIO*      writers_tail;
    378     /* Socket's file descriptor. */
    379     int                 fd;
    380     /* Timeout to use for reconnection attempts. */
    381     int                 reconnect_to;
    382     /* Number of outstanding references to the socket. */
    383     int                 ref_count;
    384     /* Flags whether (1) or not (0) socket owns the looper. */
    385     int                 owns_looper;
    386 };
    387 
    388 static const char*
    389 _async_socket_string(AsyncSocket* as)
    390 {
    391     return sock_address_to_string(&as->address);
    392 }
    393 
    394 static Looper*
    395 _async_socket_get_looper(AsyncSocket* as)
    396 {
    397     return as->looper;
    398 }
    399 
    400 /* Pulls first reader out of the list.
    401  * Param:
    402  *  as - Initialized AsyncSocket instance.
    403  * Return:
    404  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
    405  *  Note that the caller is responsible for releasing the I/O object returned
    406  *  from this routine.
    407  */
    408 static AsyncSocketIO*
    409 _async_socket_pull_first_io(AsyncSocket* as,
    410                             AsyncSocketIO** list_head,
    411                             AsyncSocketIO** list_tail)
    412 {
    413     AsyncSocketIO* const ret = *list_head;
    414     if (ret != NULL) {
    415         *list_head = ret->next;
    416         ret->next = NULL;
    417         if (*list_head == NULL) {
    418             *list_tail = NULL;
    419         }
    420     }
    421     return ret;
    422 }
    423 
    424 /* Pulls first reader out of the list.
    425  * Param:
    426  *  as - Initialized AsyncSocket instance.
    427  * Return:
    428  *  First reader pulled out of the list, or NULL if there are no readers in the
    429  *  list.
    430  *  Note that the caller is responsible for releasing the I/O object returned
    431  *  from this routine.
    432  */
    433 static AsyncSocketIO*
    434 _async_socket_pull_first_reader(AsyncSocket* as)
    435 {
    436     return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
    437 }
    438 
    439 /* Pulls first writer out of the list.
    440  * Param:
    441  *  as - Initialized AsyncSocket instance.
    442  * Return:
    443  *  First writer pulled out of the list, or NULL if there are no writers in the
    444  *  list.
    445  *  Note that the caller is responsible for releasing the I/O object returned
    446  *  from this routine.
    447  */
    448 static AsyncSocketIO*
    449 _async_socket_pull_first_writer(AsyncSocket* as)
    450 {
    451     return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
    452 }
    453 
    454 /* Removes an I/O descriptor from a list of active I/O.
    455  * Param:
    456  *  as - Initialized AsyncSocket instance.
    457  *  list_head, list_tail - Pointers to the list head and tail.
    458  *  io - I/O to remove.
    459  * Return:
    460  *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
    461  */
    462 static int
    463 _async_socket_remove_io(AsyncSocket* as,
    464                         AsyncSocketIO** list_head,
    465                         AsyncSocketIO** list_tail,
    466                         AsyncSocketIO* io)
    467 {
    468     AsyncSocketIO* prev = NULL;
    469 
    470     while (*list_head != NULL && io != *list_head) {
    471         prev = *list_head;
    472         list_head = &((*list_head)->next);
    473     }
    474     if (*list_head == NULL) {
    475         D("%s: I/O %p is not found in the list for socket '%s'",
    476           __FUNCTION__, io, _async_socket_string(as));
    477         return 0;
    478     }
    479 
    480     *list_head = io->next;
    481     if (prev != NULL) {
    482         prev->next = io->next;
    483     }
    484     if (*list_tail == io) {
    485         *list_tail = prev;
    486     }
    487 
    488     /* Release I/O adjusting reference added when I/O has been saved in the list. */
    489     async_socket_io_release(io);
    490 
    491     return 1;
    492 }
    493 
    494 /* Advances to the next I/O in the list.
    495  * Param:
    496  *  as - Initialized AsyncSocket instance.
    497  *  list_head, list_tail - Pointers to the list head and tail.
    498  */
    499 static void
    500 _async_socket_advance_io(AsyncSocket* as,
    501                          AsyncSocketIO** list_head,
    502                          AsyncSocketIO** list_tail)
    503 {
    504     AsyncSocketIO* first_io = *list_head;
    505     if (first_io != NULL) {
    506         *list_head = first_io->next;
    507         first_io->next = NULL;
    508     }
    509     if (*list_head == NULL) {
    510         *list_tail = NULL;
    511     }
    512     if (first_io != NULL) {
    513         /* Release I/O removed from the head of the list. */
    514         async_socket_io_release(first_io);
    515     }
    516 }
    517 
    518 /* Advances to the next reader in the list.
    519  * Param:
    520  *  as - Initialized AsyncSocket instance.
    521  */
    522 static void
    523 _async_socket_advance_reader(AsyncSocket* as)
    524 {
    525     _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
    526 }
    527 
    528 /* Advances to the next writer in the list.
    529  * Param:
    530  *  as - Initialized AsyncSocket instance.
    531  */
    532 static void
    533 _async_socket_advance_writer(AsyncSocket* as)
    534 {
    535     _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
    536 }
    537 
    538 /* Completes an I/O.
    539  * Param:
    540  *  as - Initialized AsyncSocket instance.
    541  *  asio - I/O to complete.
    542  * Return:
    543  *  One of AsyncIOAction values.
    544  */
    545 static AsyncIOAction
    546 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
    547 {
    548     T("ASocket %s: %s I/O %p is completed.",
    549       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    550 
    551     /* Stop the timer. */
    552     async_socket_io_cancel_time_out(asio);
    553 
    554     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
    555 }
    556 
    557 /* Timeouts an I/O.
    558  * Param:
    559  *  as - Initialized AsyncSocket instance.
    560  *  asio - An I/O that has timed out.
    561  * Return:
    562  *  One of AsyncIOAction values.
    563  */
    564 static AsyncIOAction
    565 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
    566 {
    567     T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
    568       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
    569       asio->deadline, async_socket_deadline(as, 0));
    570 
    571     /* Report to the client. */
    572     const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
    573                                              ASIO_STATE_TIMED_OUT);
    574 
    575     /* Remove the I/O from a list of active I/O for actions other than retry. */
    576     if (action != ASIO_ACTION_RETRY) {
    577         if (asio->is_io_read) {
    578             _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
    579         } else {
    580             _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
    581         }
    582     }
    583 
    584     return action;
    585 }
    586 
    587 /* Cancels an I/O.
    588  * Param:
    589  *  as - Initialized AsyncSocket instance.
    590  *  asio - An I/O to cancel.
    591  * Return:
    592  *  One of AsyncIOAction values.
    593  */
    594 static AsyncIOAction
    595 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
    596 {
    597     T("ASocket %s: %s I/O %p is cancelled.",
    598       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    599 
    600     /* Stop the timer. */
    601     async_socket_io_cancel_time_out(asio);
    602 
    603     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
    604 }
    605 
    606 /* Reports an I/O failure.
    607  * Param:
    608  *  as - Initialized AsyncSocket instance.
    609  *  asio - An I/O that has failed. Can be NULL for general failures.
    610  *  failure - Failure (errno) that has occurred.
    611  * Return:
    612  *  One of AsyncIOAction values.
    613  */
    614 static AsyncIOAction
    615 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
    616 {
    617     T("ASocket %s: %s I/O %p has failed: %d -> %s",
    618       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
    619       failure, strerror(failure));
    620 
    621     /* Stop the timer. */
    622     async_socket_io_cancel_time_out(asio);
    623 
    624     errno = failure;
    625     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
    626 }
    627 
    628 /* Cancels all the active socket readers.
    629  * Param:
    630  *  as - Initialized AsyncSocket instance.
    631  */
    632 static void
    633 _async_socket_cancel_readers(AsyncSocket* as)
    634 {
    635     while (as->readers_head != NULL) {
    636         AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
    637         /* We ignore action returned from the cancellation callback, since we're
    638          * in a disconnected state here. */
    639         _async_socket_cancel_io(as, to_cancel);
    640         async_socket_io_release(to_cancel);
    641     }
    642 }
    643 
    644 /* Cancels all the active socket writers.
    645  * Param:
    646  *  as - Initialized AsyncSocket instance.
    647  */
    648 static void
    649 _async_socket_cancel_writers(AsyncSocket* as)
    650 {
    651     while (as->writers_head != NULL) {
    652         AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
    653         /* We ignore action returned from the cancellation callback, since we're
    654          * in a disconnected state here. */
    655         _async_socket_cancel_io(as, to_cancel);
    656         async_socket_io_release(to_cancel);
    657     }
    658 }
    659 
    660 /* Cancels all the I/O on the socket. */
    661 static void
    662 _async_socket_cancel_all_io(AsyncSocket* as)
    663 {
    664     /* Stop the reconnection timer. */
    665     loopTimer_stop(as->reconnect_timer);
    666 
    667     /* Stop read / write on the socket. */
    668     loopIo_dontWantWrite(as->io);
    669     loopIo_dontWantRead(as->io);
    670 
    671     /* Cancel active readers and writers. */
    672     _async_socket_cancel_readers(as);
    673     _async_socket_cancel_writers(as);
    674 }
    675 
    676 /* Closes socket handle used by the async socket.
    677  * Param:
    678  *  as - Initialized AsyncSocket instance.
    679  */
    680 static void
    681 _async_socket_close_socket(AsyncSocket* as)
    682 {
    683     if (as->fd >= 0) {
    684         T("ASocket %s: Socket handle %d is closed.",
    685           _async_socket_string(as), as->fd);
    686         loopIo_done(as->io);
    687         socket_close(as->fd);
    688         as->fd = -1;
    689     }
    690 }
    691 
    692 /* Destroys AsyncSocket instance.
    693  * Param:
    694  *  as - Initialized AsyncSocket instance.
    695  */
    696 static void
    697 _async_socket_free(AsyncSocket* as)
    698 {
    699     if (as != NULL) {
    700         T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
    701 
    702         /* Close socket. */
    703         _async_socket_close_socket(as);
    704 
    705         /* Free allocated resources. */
    706         if (as->looper != NULL) {
    707             loopTimer_done(as->reconnect_timer);
    708             if (as->owns_looper) {
    709                 looper_free(as->looper);
    710             }
    711         }
    712         sock_address_done(&as->address);
    713         AFREE(as);
    714     }
    715 }
    716 
    717 /* Starts reconnection attempts after connection has been lost.
    718  * Param:
    719  *  as - Initialized AsyncSocket instance.
    720  *  to - Milliseconds to wait before reconnection attempt.
    721  */
    722 static void
    723 _async_socket_reconnect(AsyncSocket* as, int to)
    724 {
    725     T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
    726 
    727     /* Make sure that no I/O is active, and socket is closed before we
    728      * reconnect. */
    729     _async_socket_cancel_all_io(as);
    730 
    731     /* Set the timer for reconnection attempt. */
    732     loopTimer_startRelative(as->reconnect_timer, to);
    733 }
    734 
    735 /********************************************************************************
    736  *                      Asynchronous Socket callbacks
    737  *******************************************************************************/
    738 
    739 /* A callback that is invoked when socket gets disconnected.
    740  * Param:
    741  *  as - Initialized AsyncSocket instance.
    742  */
    743 static void
    744 _on_async_socket_disconnected(AsyncSocket* as)
    745 {
    746     /* Save error to restore it for the client's callback. */
    747     const int save_errno = errno;
    748     AsyncIOAction action = ASIO_ACTION_ABORT;
    749 
    750     D("ASocket %s: Disconnected.", _async_socket_string(as));
    751 
    752     /* Cancel all the I/O on this socket. */
    753     _async_socket_cancel_all_io(as);
    754 
    755     /* Close the socket. */
    756     _async_socket_close_socket(as);
    757 
    758     /* Restore errno, and invoke client's callback. */
    759     errno = save_errno;
    760     action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
    761 
    762     if (action == ASIO_ACTION_RETRY) {
    763         /* Client requested reconnection. */
    764         _async_socket_reconnect(as, as->reconnect_to);
    765     }
    766 }
    767 
    768 /* A callback that is invoked on socket's I/O failure.
    769  * Param:
    770  *  as - Initialized AsyncSocket instance.
    771  *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
    772  */
    773 static AsyncIOAction
    774 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
    775 {
    776     D("ASocket %s: %s I/O failure: %d -> %s",
    777       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
    778       errno, strerror(errno));
    779 
    780     /* Report the failure. */
    781     return _async_socket_io_failure(as, asio, errno);
    782 }
    783 
    784 /* A callback that is invoked when there is data available to read.
    785  * Param:
    786  *  as - Initialized AsyncSocket instance.
    787  * Return:
    788  *  0 on success, or -1 on failure. Failure returned from this routine will
    789  *  skip writes (if awailable) behind this read.
    790  */
    791 static int
    792 _on_async_socket_recv(AsyncSocket* as)
    793 {
    794     AsyncIOAction action;
    795 
    796     /* Get current reader. */
    797     AsyncSocketIO* const asr = as->readers_head;
    798     if (asr == NULL) {
    799         D("ASocket %s: No reader is available.", _async_socket_string(as));
    800         loopIo_dontWantRead(as->io);
    801         return 0;
    802     }
    803 
    804     /* Reference the reader while we're working with it in this callback. */
    805     async_socket_io_reference(asr);
    806 
    807     /* Bump I/O state, and inform the client that I/O is in progress. */
    808     if (asr->state == ASIO_STATE_QUEUED) {
    809         asr->state = ASIO_STATE_STARTED;
    810     } else {
    811         asr->state = ASIO_STATE_CONTINUES;
    812     }
    813     action = asr->on_io(asr->io_opaque, asr, asr->state);
    814     if (action == ASIO_ACTION_ABORT) {
    815         D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
    816         /* Move on to the next reader. */
    817         _async_socket_advance_reader(as);
    818         /* Lets see if there are still active readers, and enable, or disable
    819          * read I/O callback accordingly. */
    820         if (as->readers_head != NULL) {
    821             loopIo_wantRead(as->io);
    822         } else {
    823             loopIo_dontWantRead(as->io);
    824         }
    825         async_socket_io_release(asr);
    826         return 0;
    827     }
    828 
    829     /* Read next chunk of data. */
    830     int res = socket_recv(as->fd, asr->buffer + asr->transferred,
    831                           asr->to_transfer - asr->transferred);
    832     while (res < 0 && errno == EINTR) {
    833         res = socket_recv(as->fd, asr->buffer + asr->transferred,
    834                           asr->to_transfer - asr->transferred);
    835     }
    836 
    837     if (res == 0) {
    838         /* Socket has been disconnected. */
    839         errno = ECONNRESET;
    840         _on_async_socket_disconnected(as);
    841         async_socket_io_release(asr);
    842         return -1;
    843     }
    844 
    845     if (res < 0) {
    846         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    847             /* Yield to writes behind this read. */
    848             loopIo_wantRead(as->io);
    849             async_socket_io_release(asr);
    850             return 0;
    851         }
    852 
    853         /* An I/O error. */
    854         action = _on_async_socket_failure(as, asr);
    855         if (action != ASIO_ACTION_RETRY) {
    856             D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
    857             /* Move on to the next reader. */
    858             _async_socket_advance_reader(as);
    859             /* Lets see if there are still active readers, and enable, or disable
    860              * read I/O callback accordingly. */
    861             if (as->readers_head != NULL) {
    862                 loopIo_wantRead(as->io);
    863             } else {
    864                 loopIo_dontWantRead(as->io);
    865             }
    866         }
    867         async_socket_io_release(asr);
    868         return -1;
    869     }
    870 
    871     /* Update the reader's descriptor. */
    872     asr->transferred += res;
    873     if (asr->transferred == asr->to_transfer) {
    874         /* This read is completed. Move on to the next reader. */
    875         _async_socket_advance_reader(as);
    876 
    877         /* Notify reader completion. */
    878         _async_socket_complete_io(as, asr);
    879     }
    880 
    881     /* Lets see if there are still active readers, and enable, or disable read
    882      * I/O callback accordingly. */
    883     if (as->readers_head != NULL) {
    884         loopIo_wantRead(as->io);
    885     } else {
    886         loopIo_dontWantRead(as->io);
    887     }
    888 
    889     async_socket_io_release(asr);
    890 
    891     return 0;
    892 }
    893 
    894 /* A callback that is invoked when there is data available to write.
    895  * Param:
    896  *  as - Initialized AsyncSocket instance.
    897  * Return:
    898  *  0 on success, or -1 on failure. Failure returned from this routine will
    899  *  skip reads (if awailable) behind this write.
    900  */
    901 static int
    902 _on_async_socket_send(AsyncSocket* as)
    903 {
    904     AsyncIOAction action;
    905 
    906     /* Get current writer. */
    907     AsyncSocketIO* const asw = as->writers_head;
    908     if (asw == NULL) {
    909         D("ASocket %s: No writer is available.", _async_socket_string(as));
    910         loopIo_dontWantWrite(as->io);
    911         return 0;
    912     }
    913 
    914     /* Reference the writer while we're working with it in this callback. */
    915     async_socket_io_reference(asw);
    916 
    917     /* Bump I/O state, and inform the client that I/O is in progress. */
    918     if (asw->state == ASIO_STATE_QUEUED) {
    919         asw->state = ASIO_STATE_STARTED;
    920     } else {
    921         asw->state = ASIO_STATE_CONTINUES;
    922     }
    923     action = asw->on_io(asw->io_opaque, asw, asw->state);
    924     if (action == ASIO_ACTION_ABORT) {
    925         D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
    926         /* Move on to the next writer. */
    927         _async_socket_advance_writer(as);
    928         /* Lets see if there are still active writers, and enable, or disable
    929          * write I/O callback accordingly. */
    930         if (as->writers_head != NULL) {
    931             loopIo_wantWrite(as->io);
    932         } else {
    933             loopIo_dontWantWrite(as->io);
    934         }
    935         async_socket_io_release(asw);
    936         return 0;
    937     }
    938 
    939     /* Write next chunk of data. */
    940     int res = socket_send(as->fd, asw->buffer + asw->transferred,
    941                           asw->to_transfer - asw->transferred);
    942     while (res < 0 && errno == EINTR) {
    943         res = socket_send(as->fd, asw->buffer + asw->transferred,
    944                           asw->to_transfer - asw->transferred);
    945     }
    946 
    947     if (res == 0) {
    948         /* Socket has been disconnected. */
    949         errno = ECONNRESET;
    950         _on_async_socket_disconnected(as);
    951         async_socket_io_release(asw);
    952         return -1;
    953     }
    954 
    955     if (res < 0) {
    956         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    957             /* Yield to reads behind this write. */
    958             loopIo_wantWrite(as->io);
    959             async_socket_io_release(asw);
    960             return 0;
    961         }
    962 
    963         /* An I/O error. */
    964         action = _on_async_socket_failure(as, asw);
    965         if (action != ASIO_ACTION_RETRY) {
    966             D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
    967             /* Move on to the next writer. */
    968             _async_socket_advance_writer(as);
    969             /* Lets see if there are still active writers, and enable, or disable
    970              * write I/O callback accordingly. */
    971             if (as->writers_head != NULL) {
    972                 loopIo_wantWrite(as->io);
    973             } else {
    974                 loopIo_dontWantWrite(as->io);
    975             }
    976         }
    977         async_socket_io_release(asw);
    978         return -1;
    979     }
    980 
    981     /* Update the writer descriptor. */
    982     asw->transferred += res;
    983     if (asw->transferred == asw->to_transfer) {
    984         /* This write is completed. Move on to the next writer. */
    985         _async_socket_advance_writer(as);
    986 
    987         /* Notify writer completion. */
    988         _async_socket_complete_io(as, asw);
    989     }
    990 
    991     /* Lets see if there are still active writers, and enable, or disable write
    992      * I/O callback accordingly. */
    993     if (as->writers_head != NULL) {
    994         loopIo_wantWrite(as->io);
    995     } else {
    996         loopIo_dontWantWrite(as->io);
    997     }
    998 
    999     async_socket_io_release(asw);
   1000 
   1001     return 0;
   1002 }
   1003 
   1004 /* A callback that is invoked when an I/O is available on socket.
   1005  * Param:
   1006  *  as - Initialized AsyncSocket instance.
   1007  *  fd - Socket's file descriptor.
   1008  *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
   1009  */
   1010 static void
   1011 _on_async_socket_io(void* opaque, int fd, unsigned events)
   1012 {
   1013     AsyncSocket* const as = (AsyncSocket*)opaque;
   1014 
   1015     /* Reference the socket while we're working with it in this callback. */
   1016     async_socket_reference(as);
   1017 
   1018     if ((events & LOOP_IO_READ) != 0) {
   1019         if (_on_async_socket_recv(as) != 0) {
   1020             async_socket_release(as);
   1021             return;
   1022         }
   1023     }
   1024 
   1025     if ((events & LOOP_IO_WRITE) != 0) {
   1026         if (_on_async_socket_send(as) != 0) {
   1027             async_socket_release(as);
   1028             return;
   1029         }
   1030     }
   1031 
   1032     async_socket_release(as);
   1033 }
   1034 
   1035 /* A callback that is invoked by asynchronous socket connector on connection
   1036  *  events.
   1037  * Param:
   1038  *  opaque - Initialized AsyncSocket instance.
   1039  *  connector - Connector that is used to connect this socket.
   1040  *  event - Connection event.
   1041  * Return:
   1042  *  One of AsyncIOAction values.
   1043  */
   1044 static AsyncIOAction
   1045 _on_connector_events(void* opaque,
   1046                      AsyncSocketConnector* connector,
   1047                      AsyncIOState event)
   1048 {
   1049     AsyncIOAction action;
   1050     AsyncSocket* const as = (AsyncSocket*)opaque;
   1051 
   1052     /* Reference the socket while we're working with it in this callback. */
   1053     async_socket_reference(as);
   1054 
   1055     if (event == ASIO_STATE_SUCCEEDED) {
   1056         /* Accept the connection. */
   1057         as->fd = async_socket_connector_pull_fd(connector);
   1058         loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
   1059     }
   1060 
   1061     /* Invoke client's callback. */
   1062     action = as->on_connection(as->client_opaque, as, event);
   1063     if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
   1064         /* For whatever reason the client didn't want to keep this connection.
   1065          * Close it. */
   1066         D("ASocket %s: Connection is discarded by the client.",
   1067           _async_socket_string(as));
   1068         _async_socket_close_socket(as);
   1069     }
   1070 
   1071     if (action != ASIO_ACTION_RETRY) {
   1072         async_socket_connector_release(connector);
   1073     }
   1074 
   1075     async_socket_release(as);
   1076 
   1077     return action;
   1078 }
   1079 
   1080 /* Timer callback invoked to reconnect the lost connection.
   1081  * Param:
   1082  *  as - Initialized AsyncSocket instance.
   1083  */
   1084 void
   1085 _on_async_socket_reconnect(void* opaque)
   1086 {
   1087     AsyncSocket* as = (AsyncSocket*)opaque;
   1088 
   1089     /* Reference the socket while we're working with it in this callback. */
   1090     async_socket_reference(as);
   1091     async_socket_connect(as, as->reconnect_to);
   1092     async_socket_release(as);
   1093 }
   1094 
   1095 
   1096 /********************************************************************************
   1097  *                  Android Device Socket public API
   1098  *******************************************************************************/
   1099 
   1100 AsyncSocket*
   1101 async_socket_new(int port,
   1102                  int reconnect_to,
   1103                  on_as_connection_cb client_cb,
   1104                  void* client_opaque,
   1105                  Looper* looper)
   1106 {
   1107     AsyncSocket* as;
   1108 
   1109     if (client_cb == NULL) {
   1110         E("Invalid client_cb parameter");
   1111         return NULL;
   1112     }
   1113 
   1114     ANEW0(as);
   1115 
   1116     as->fd = -1;
   1117     as->client_opaque = client_opaque;
   1118     as->on_connection = client_cb;
   1119     as->readers_head = as->readers_tail = NULL;
   1120     as->reconnect_to = reconnect_to;
   1121     as->ref_count = 1;
   1122     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
   1123     if (looper == NULL) {
   1124         as->looper = looper_newCore();
   1125         if (as->looper == NULL) {
   1126             E("Unable to create I/O looper for async socket '%s'",
   1127               _async_socket_string(as));
   1128             client_cb(client_opaque, as, ASIO_STATE_FAILED);
   1129             _async_socket_free(as);
   1130             return NULL;
   1131         }
   1132         as->owns_looper = 1;
   1133     } else {
   1134         as->looper = looper;
   1135         as->owns_looper = 0;
   1136     }
   1137 
   1138     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
   1139 
   1140     T("ASocket %s: Descriptor is created.", _async_socket_string(as));
   1141 
   1142     return as;
   1143 }
   1144 
   1145 int
   1146 async_socket_reference(AsyncSocket* as)
   1147 {
   1148     assert(as->ref_count > 0);
   1149     as->ref_count++;
   1150     return as->ref_count;
   1151 }
   1152 
   1153 int
   1154 async_socket_release(AsyncSocket* as)
   1155 {
   1156     assert(as->ref_count > 0);
   1157     as->ref_count--;
   1158     if (as->ref_count == 0) {
   1159         /* Last reference has been dropped. Destroy this object. */
   1160         _async_socket_cancel_all_io(as);
   1161         _async_socket_free(as);
   1162         return 0;
   1163     }
   1164     return as->ref_count;
   1165 }
   1166 
   1167 void
   1168 async_socket_connect(AsyncSocket* as, int retry_to)
   1169 {
   1170     T("ASocket %s: Handling connection request for %dms...",
   1171       _async_socket_string(as), retry_to);
   1172 
   1173     AsyncSocketConnector* const connector =
   1174         async_socket_connector_new(&as->address, retry_to, _on_connector_events,
   1175                                    as, as->looper);
   1176     if (connector != NULL) {
   1177         async_socket_connector_connect(connector);
   1178     } else {
   1179         as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
   1180     }
   1181 }
   1182 
   1183 void
   1184 async_socket_disconnect(AsyncSocket* as)
   1185 {
   1186     T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
   1187 
   1188     if (as != NULL) {
   1189         _async_socket_cancel_all_io(as);
   1190         _async_socket_close_socket(as);
   1191     }
   1192 }
   1193 
   1194 void
   1195 async_socket_reconnect(AsyncSocket* as, int retry_to)
   1196 {
   1197     T("ASocket %s: Handling reconnection request for %dms...",
   1198       _async_socket_string(as), retry_to);
   1199 
   1200     _async_socket_cancel_all_io(as);
   1201     _async_socket_close_socket(as);
   1202     _async_socket_reconnect(as, retry_to);
   1203 }
   1204 
   1205 void
   1206 async_socket_read_abs(AsyncSocket* as,
   1207                       void* buffer, uint32_t len,
   1208                       on_as_io_cb reader_cb,
   1209                       void* reader_opaque,
   1210                       Duration deadline)
   1211 {
   1212     T("ASocket %s: Handling read for %d bytes with deadline %lld...",
   1213       _async_socket_string(as), len, deadline);
   1214 
   1215     AsyncSocketIO* const asr =
   1216         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
   1217                                  deadline);
   1218     if (async_socket_is_connected(as)) {
   1219         /* Add new reader to the list. Note that we use initial reference from I/O
   1220          * 'new' routine as "in the list" reference counter. */
   1221         if (as->readers_head == NULL) {
   1222             as->readers_head = as->readers_tail = asr;
   1223         } else {
   1224             as->readers_tail->next = asr;
   1225             as->readers_tail = asr;
   1226         }
   1227         loopIo_wantRead(as->io);
   1228     } else {
   1229         D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
   1230         errno = ECONNRESET;
   1231         reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
   1232         async_socket_io_release(asr);
   1233     }
   1234 }
   1235 
   1236 void
   1237 async_socket_read_rel(AsyncSocket* as,
   1238                       void* buffer, uint32_t len,
   1239                       on_as_io_cb reader_cb,
   1240                       void* reader_opaque,
   1241                       int to)
   1242 {
   1243     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
   1244                                     DURATION_INFINITE;
   1245     async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
   1246 }
   1247 
   1248 void
   1249 async_socket_write_abs(AsyncSocket* as,
   1250                        const void* buffer, uint32_t len,
   1251                        on_as_io_cb writer_cb,
   1252                        void* writer_opaque,
   1253                        Duration deadline)
   1254 {
   1255     T("ASocket %s: Handling write for %d bytes with deadline %lld...",
   1256       _async_socket_string(as), len, deadline);
   1257 
   1258     AsyncSocketIO* const asw =
   1259         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
   1260                                  deadline);
   1261     if (async_socket_is_connected(as)) {
   1262         /* Add new writer to the list. Note that we use initial reference from I/O
   1263          * 'new' routine as "in the list" reference counter. */
   1264         if (as->writers_head == NULL) {
   1265             as->writers_head = as->writers_tail = asw;
   1266         } else {
   1267             as->writers_tail->next = asw;
   1268             as->writers_tail = asw;
   1269         }
   1270         loopIo_wantWrite(as->io);
   1271     } else {
   1272         D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
   1273         errno = ECONNRESET;
   1274         writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
   1275         async_socket_io_release(asw);
   1276     }
   1277 }
   1278 
   1279 void
   1280 async_socket_write_rel(AsyncSocket* as,
   1281                        const void* buffer, uint32_t len,
   1282                        on_as_io_cb writer_cb,
   1283                        void* writer_opaque,
   1284                        int to)
   1285 {
   1286     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
   1287                                     DURATION_INFINITE;
   1288     async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
   1289 }
   1290 
   1291 void*
   1292 async_socket_get_client_opaque(const AsyncSocket* as)
   1293 {
   1294     return as->client_opaque;
   1295 }
   1296 
   1297 Duration
   1298 async_socket_deadline(AsyncSocket* as, int rel)
   1299 {
   1300     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
   1301                         DURATION_INFINITE;
   1302 }
   1303 
   1304 int
   1305 async_socket_get_port(const AsyncSocket* as)
   1306 {
   1307     return sock_address_get_port(&as->address);
   1308 }
   1309 
   1310 int
   1311 async_socket_is_connected(const AsyncSocket* as)
   1312 {
   1313     return as->fd >= 0;
   1314 }
   1315