Home | History | Annotate | Download | only in android
      1 /*
      2  * Copyright (C) 2012 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 /*
     18  * Encapsulates exchange protocol between the emulator, and an Android device
     19  * that is connected to the host via USB. The communication is established over
     20  * a TCP port forwarding, enabled by ADB.
     21  */
     22 
     23 #include "android/async-socket-connector.h"
     24 #include "android/async-socket.h"
     25 #include "android/utils/debug.h"
     26 #include "android/utils/eintr_wrapper.h"
     27 #include "android/utils/panic.h"
     28 #include "android/iolooper.h"
     29 
     30 #define  E(...)    derror(__VA_ARGS__)
     31 #define  W(...)    dwarning(__VA_ARGS__)
     32 #define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
     33 #define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
     34 
     35 #define TRACE_ON    0
     36 
     37 #if TRACE_ON
     38 #define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
     39 #else
     40 #define  T(...)
     41 #endif
     42 
     43 /********************************************************************************
     44  *                  Asynchronous Socket internal API declarations
     45  *******************************************************************************/
     46 
     47 /* Gets socket's address string. */
     48 static const char* _async_socket_string(AsyncSocket* as);
     49 
     50 /* Gets socket's looper. */
     51 static Looper* _async_socket_get_looper(AsyncSocket* as);
     52 
     53 /* Handler for the I/O time out.
     54  * Param:
     55  *  as - Asynchronous socket for the I/O.
     56  *  asio - Desciptor for the timed out I/O.
     57  */
     58 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
     59                                                 AsyncSocketIO* asio);
     60 
     61 /********************************************************************************
     62  *                  Asynchronous Socket Reader / Writer
     63  *******************************************************************************/
     64 
     65 struct AsyncSocketIO {
     66     /* Next I/O in the reader, or writer list. */
     67     AsyncSocketIO*      next;
     68     /* Asynchronous socket for this I/O. */
     69     AsyncSocket*        as;
     70     /* Timer used for time outs on this I/O. */
     71     LoopTimer           timer[1];
     72     /* An opaque pointer associated with this I/O. */
     73     void*               io_opaque;
     74     /* Buffer where to read / write data. */
     75     uint8_t*            buffer;
     76     /* Bytes to transfer through the socket for this I/O. */
     77     uint32_t            to_transfer;
     78     /* Bytes thransferred through the socket in this I/O. */
     79     uint32_t            transferred;
     80     /* I/O callback for this I/O. */
     81     on_as_io_cb         on_io;
     82     /* I/O type selector: 1 - read, 0 - write. */
     83     int                 is_io_read;
     84     /* State of the I/O. */
     85     AsyncIOState        state;
     86     /* Number of outstanding references to the I/O. */
     87     int                 ref_count;
     88     /* Deadline for this I/O */
     89     Duration            deadline;
     90 };
     91 
     92 /*
     93  * Recycling I/O instances.
     94  * Since AsyncSocketIO instances are not that large, it makes sence to recycle
     95  * them for faster allocation, rather than allocating and freeing them for each
     96  * I/O on the socket.
     97  */
     98 
     99 /* List of recycled I/O descriptors. */
    100 static AsyncSocketIO* _asio_recycled    = NULL;
    101 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
    102 static int _recycled_asio_count         = 0;
    103 /* Maximum number of I/O descriptors that can be recycled. */
    104 static const int _max_recycled_asio_num = 32;
    105 
    106 /* Handler for an I/O time-out timer event.
    107  * When this routine is invoked, it indicates that a time out has occurred on an
    108  * I/O.
    109  * Param:
    110  *  opaque - AsyncSocketIO instance representing the timed out I/O.
    111  */
    112 static void _on_async_socket_io_timed_out(void* opaque);
    113 
    114 /* Creates new I/O descriptor.
    115  * Param:
    116  *  as - Asynchronous socket for the I/O.
    117  *  is_io_read - I/O type selector: 1 - read, 0 - write.
    118  *  buffer, len - Reader / writer buffer address.
    119  *  io_cb - Callback for this reader / writer.
    120  *  io_opaque - An opaque pointer associated with the I/O.
    121  *  deadline - Deadline to complete the I/O.
    122  * Return:
    123  *  Initialized AsyncSocketIO instance.
    124  */
    125 static AsyncSocketIO*
    126 _async_socket_rw_new(AsyncSocket* as,
    127                      int is_io_read,
    128                      void* buffer,
    129                      uint32_t len,
    130                      on_as_io_cb io_cb,
    131                      void* io_opaque,
    132                      Duration deadline)
    133 {
    134     /* Lookup in the recycler first. */
    135     AsyncSocketIO* asio = _asio_recycled;
    136     if (asio != NULL) {
    137         /* Pull the descriptor from recycler. */
    138         _asio_recycled = asio->next;
    139         _recycled_asio_count--;
    140     } else {
    141         /* No recycled descriptors. Allocate new one. */
    142         ANEW0(asio);
    143     }
    144 
    145     asio->next          = NULL;
    146     asio->as            = as;
    147     asio->is_io_read    = is_io_read;
    148     asio->buffer        = (uint8_t*)buffer;
    149     asio->to_transfer   = len;
    150     asio->transferred   = 0;
    151     asio->on_io         = io_cb;
    152     asio->io_opaque     = io_opaque;
    153     asio->state         = ASIO_STATE_QUEUED;
    154     asio->ref_count     = 1;
    155     asio->deadline      = deadline;
    156     loopTimer_init(asio->timer, _async_socket_get_looper(as),
    157                    _on_async_socket_io_timed_out, asio);
    158     loopTimer_startAbsolute(asio->timer, deadline);
    159 
    160     /* Reference socket that is holding this I/O. */
    161     async_socket_reference(as);
    162 
    163     T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
    164       _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
    165 
    166     return asio;
    167 }
    168 
    169 /* Destroys and frees I/O descriptor. */
    170 static void
    171 _async_socket_io_free(AsyncSocketIO* asio)
    172 {
    173     AsyncSocket* const as = asio->as;
    174 
    175     T("ASocket %s: %s I/O descriptor %p is destroyed.",
    176       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    177 
    178     loopTimer_done(asio->timer);
    179 
    180     /* Try to recycle it first, and free the memory if recycler is full. */
    181     if (_recycled_asio_count < _max_recycled_asio_num) {
    182         asio->next = _asio_recycled;
    183         _asio_recycled = asio;
    184         _recycled_asio_count++;
    185     } else {
    186         AFREE(asio);
    187     }
    188 
    189     /* Release socket that is holding this I/O. */
    190     async_socket_release(as);
    191 }
    192 
    193 /* An I/O has been finished and its descriptor is about to be discarded. */
    194 static void
    195 _async_socket_io_finished(AsyncSocketIO* asio)
    196 {
    197     /* Notify the client of the I/O that I/O is finished. */
    198     asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
    199 }
    200 
    201 int
    202 async_socket_io_reference(AsyncSocketIO* asio)
    203 {
    204     assert(asio->ref_count > 0);
    205     asio->ref_count++;
    206     return asio->ref_count;
    207 }
    208 
    209 int
    210 async_socket_io_release(AsyncSocketIO* asio)
    211 {
    212     assert(asio->ref_count > 0);
    213     asio->ref_count--;
    214     if (asio->ref_count == 0) {
    215         _async_socket_io_finished(asio);
    216         /* Last reference has been dropped. Destroy this object. */
    217         _async_socket_io_free(asio);
    218         return 0;
    219     }
    220     return asio->ref_count;
    221 }
    222 
    223 /* Creates new asynchronous socket reader.
    224  * Param:
    225  *  as - Asynchronous socket for the reader.
    226  *  buffer, len - Reader's buffer.
    227  *  io_cb - Reader's callback.
    228  *  reader_opaque - An opaque pointer associated with the reader.
    229  *  deadline - Deadline to complete the operation.
    230  * Return:
    231  *  An initialized AsyncSocketIO intance.
    232  */
    233 static AsyncSocketIO*
    234 _async_socket_reader_new(AsyncSocket* as,
    235                          void* buffer,
    236                          uint32_t len,
    237                          on_as_io_cb io_cb,
    238                          void* reader_opaque,
    239                          Duration deadline)
    240 {
    241     AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
    242                                                      reader_opaque, deadline);
    243     return asio;
    244 }
    245 
    246 /* Creates new asynchronous socket writer.
    247  * Param:
    248  *  as - Asynchronous socket for the writer.
    249  *  buffer, len - Writer's buffer.
    250  *  io_cb - Writer's callback.
    251  *  writer_opaque - An opaque pointer associated with the writer.
    252  *  deadline - Deadline to complete the operation.
    253  * Return:
    254  *  An initialized AsyncSocketIO intance.
    255  */
    256 static AsyncSocketIO*
    257 _async_socket_writer_new(AsyncSocket* as,
    258                          const void* buffer,
    259                          uint32_t len,
    260                          on_as_io_cb io_cb,
    261                          void* writer_opaque,
    262                          Duration deadline)
    263 {
    264     AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
    265                                                      io_cb, writer_opaque,
    266                                                      deadline);
    267     return asio;
    268 }
    269 
    270 /* I/O timed out. */
    271 static void
    272 _on_async_socket_io_timed_out(void* opaque)
    273 {
    274     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
    275     AsyncSocket* const as = asio->as;
    276 
    277     D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
    278       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
    279       asio->deadline, async_socket_deadline(as, 0));
    280 
    281     /* Reference while in callback. */
    282     async_socket_io_reference(asio);
    283     _async_socket_io_timed_out(asio->as, asio);
    284     async_socket_io_release(asio);
    285 }
    286 
    287 /********************************************************************************
    288  *                 Public Asynchronous Socket I/O API
    289  *******************************************************************************/
    290 
    291 AsyncSocket*
    292 async_socket_io_get_socket(const AsyncSocketIO* asio)
    293 {
    294     async_socket_reference(asio->as);
    295     return asio->as;
    296 }
    297 
    298 void
    299 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
    300 {
    301     loopTimer_stop(asio->timer);
    302 }
    303 
    304 void*
    305 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
    306 {
    307     return asio->io_opaque;
    308 }
    309 
    310 void*
    311 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
    312 {
    313     return async_socket_get_client_opaque(asio->as);
    314 }
    315 
    316 void*
    317 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
    318                                 uint32_t* transferred,
    319                                 uint32_t* to_transfer)
    320 {
    321     if (transferred != NULL) {
    322         *transferred = asio->transferred;
    323     }
    324     if (to_transfer != NULL) {
    325         *to_transfer = asio->to_transfer;
    326     }
    327     return asio->buffer;
    328 }
    329 
    330 void*
    331 async_socket_io_get_buffer(const AsyncSocketIO* asio)
    332 {
    333     return asio->buffer;
    334 }
    335 
    336 uint32_t
    337 async_socket_io_get_transferred(const AsyncSocketIO* asio)
    338 {
    339     return asio->transferred;
    340 }
    341 
    342 uint32_t
    343 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
    344 {
    345     return asio->to_transfer;
    346 }
    347 
    348 int
    349 async_socket_io_is_read(const AsyncSocketIO* asio)
    350 {
    351     return asio->is_io_read;
    352 }
    353 
    354 /********************************************************************************
    355  *                      Asynchronous Socket internals
    356  *******************************************************************************/
    357 
    358 struct AsyncSocket {
    359     /* TCP address for the socket. */
    360     SockAddress         address;
    361     /* Connection callback for this socket. */
    362     on_as_connection_cb on_connection;
    363     /* An opaque pointer associated with this socket by the client. */
    364     void*               client_opaque;
    365     /* I/O looper for asynchronous I/O on the socket. */
    366     Looper*             looper;
    367     /* I/O descriptor for asynchronous I/O on the socket. */
    368     LoopIo              io[1];
    369     /* Timer to use for reconnection attempts. */
    370     LoopTimer           reconnect_timer[1];
    371     /* Head of the list of the active readers. */
    372     AsyncSocketIO*      readers_head;
    373     /* Tail of the list of the active readers. */
    374     AsyncSocketIO*      readers_tail;
    375     /* Head of the list of the active writers. */
    376     AsyncSocketIO*      writers_head;
    377     /* Tail of the list of the active writers. */
    378     AsyncSocketIO*      writers_tail;
    379     /* Socket's file descriptor. */
    380     int                 fd;
    381     /* Timeout to use for reconnection attempts. */
    382     int                 reconnect_to;
    383     /* Number of outstanding references to the socket. */
    384     int                 ref_count;
    385     /* Flags whether (1) or not (0) socket owns the looper. */
    386     int                 owns_looper;
    387 };
    388 
    389 static const char*
    390 _async_socket_string(AsyncSocket* as)
    391 {
    392     return sock_address_to_string(&as->address);
    393 }
    394 
    395 static Looper*
    396 _async_socket_get_looper(AsyncSocket* as)
    397 {
    398     return as->looper;
    399 }
    400 
    401 /* Pulls first reader out of the list.
    402  * Param:
    403  *  as - Initialized AsyncSocket instance.
    404  * Return:
    405  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
    406  *  Note that the caller is responsible for releasing the I/O object returned
    407  *  from this routine.
    408  */
    409 static AsyncSocketIO*
    410 _async_socket_pull_first_io(AsyncSocket* as,
    411                             AsyncSocketIO** list_head,
    412                             AsyncSocketIO** list_tail)
    413 {
    414     AsyncSocketIO* const ret = *list_head;
    415     if (ret != NULL) {
    416         *list_head = ret->next;
    417         ret->next = NULL;
    418         if (*list_head == NULL) {
    419             *list_tail = NULL;
    420         }
    421     }
    422     return ret;
    423 }
    424 
    425 /* Pulls first reader out of the list.
    426  * Param:
    427  *  as - Initialized AsyncSocket instance.
    428  * Return:
    429  *  First reader pulled out of the list, or NULL if there are no readers in the
    430  *  list.
    431  *  Note that the caller is responsible for releasing the I/O object returned
    432  *  from this routine.
    433  */
    434 static AsyncSocketIO*
    435 _async_socket_pull_first_reader(AsyncSocket* as)
    436 {
    437     return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
    438 }
    439 
    440 /* Pulls first writer out of the list.
    441  * Param:
    442  *  as - Initialized AsyncSocket instance.
    443  * Return:
    444  *  First writer pulled out of the list, or NULL if there are no writers in the
    445  *  list.
    446  *  Note that the caller is responsible for releasing the I/O object returned
    447  *  from this routine.
    448  */
    449 static AsyncSocketIO*
    450 _async_socket_pull_first_writer(AsyncSocket* as)
    451 {
    452     return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
    453 }
    454 
    455 /* Removes an I/O descriptor from a list of active I/O.
    456  * Param:
    457  *  as - Initialized AsyncSocket instance.
    458  *  list_head, list_tail - Pointers to the list head and tail.
    459  *  io - I/O to remove.
    460  * Return:
    461  *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
    462  */
    463 static int
    464 _async_socket_remove_io(AsyncSocket* as,
    465                         AsyncSocketIO** list_head,
    466                         AsyncSocketIO** list_tail,
    467                         AsyncSocketIO* io)
    468 {
    469     AsyncSocketIO* prev = NULL;
    470 
    471     while (*list_head != NULL && io != *list_head) {
    472         prev = *list_head;
    473         list_head = &((*list_head)->next);
    474     }
    475     if (*list_head == NULL) {
    476         D("%s: I/O %p is not found in the list for socket '%s'",
    477           __FUNCTION__, io, _async_socket_string(as));
    478         return 0;
    479     }
    480 
    481     *list_head = io->next;
    482     if (prev != NULL) {
    483         prev->next = io->next;
    484     }
    485     if (*list_tail == io) {
    486         *list_tail = prev;
    487     }
    488 
    489     /* Release I/O adjusting reference added when I/O has been saved in the list. */
    490     async_socket_io_release(io);
    491 
    492     return 1;
    493 }
    494 
    495 /* Advances to the next I/O in the list.
    496  * Param:
    497  *  as - Initialized AsyncSocket instance.
    498  *  list_head, list_tail - Pointers to the list head and tail.
    499  */
    500 static void
    501 _async_socket_advance_io(AsyncSocket* as,
    502                          AsyncSocketIO** list_head,
    503                          AsyncSocketIO** list_tail)
    504 {
    505     AsyncSocketIO* first_io = *list_head;
    506     if (first_io != NULL) {
    507         *list_head = first_io->next;
    508         first_io->next = NULL;
    509     }
    510     if (*list_head == NULL) {
    511         *list_tail = NULL;
    512     }
    513     if (first_io != NULL) {
    514         /* Release I/O removed from the head of the list. */
    515         async_socket_io_release(first_io);
    516     }
    517 }
    518 
    519 /* Advances to the next reader in the list.
    520  * Param:
    521  *  as - Initialized AsyncSocket instance.
    522  */
    523 static void
    524 _async_socket_advance_reader(AsyncSocket* as)
    525 {
    526     _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
    527 }
    528 
    529 /* Advances to the next writer in the list.
    530  * Param:
    531  *  as - Initialized AsyncSocket instance.
    532  */
    533 static void
    534 _async_socket_advance_writer(AsyncSocket* as)
    535 {
    536     _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
    537 }
    538 
    539 /* Completes an I/O.
    540  * Param:
    541  *  as - Initialized AsyncSocket instance.
    542  *  asio - I/O to complete.
    543  * Return:
    544  *  One of AsyncIOAction values.
    545  */
    546 static AsyncIOAction
    547 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
    548 {
    549     T("ASocket %s: %s I/O %p is completed.",
    550       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    551 
    552     /* Stop the timer. */
    553     async_socket_io_cancel_time_out(asio);
    554 
    555     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
    556 }
    557 
    558 /* Timeouts an I/O.
    559  * Param:
    560  *  as - Initialized AsyncSocket instance.
    561  *  asio - An I/O that has timed out.
    562  * Return:
    563  *  One of AsyncIOAction values.
    564  */
    565 static AsyncIOAction
    566 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
    567 {
    568     T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
    569       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
    570       asio->deadline, async_socket_deadline(as, 0));
    571 
    572     /* Report to the client. */
    573     const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
    574                                              ASIO_STATE_TIMED_OUT);
    575 
    576     /* Remove the I/O from a list of active I/O for actions other than retry. */
    577     if (action != ASIO_ACTION_RETRY) {
    578         if (asio->is_io_read) {
    579             _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
    580         } else {
    581             _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
    582         }
    583     }
    584 
    585     return action;
    586 }
    587 
    588 /* Cancels an I/O.
    589  * Param:
    590  *  as - Initialized AsyncSocket instance.
    591  *  asio - An I/O to cancel.
    592  * Return:
    593  *  One of AsyncIOAction values.
    594  */
    595 static AsyncIOAction
    596 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
    597 {
    598     T("ASocket %s: %s I/O %p is cancelled.",
    599       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    600 
    601     /* Stop the timer. */
    602     async_socket_io_cancel_time_out(asio);
    603 
    604     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
    605 }
    606 
    607 /* Reports an I/O failure.
    608  * Param:
    609  *  as - Initialized AsyncSocket instance.
    610  *  asio - An I/O that has failed. Can be NULL for general failures.
    611  *  failure - Failure (errno) that has occurred.
    612  * Return:
    613  *  One of AsyncIOAction values.
    614  */
    615 static AsyncIOAction
    616 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
    617 {
    618     T("ASocket %s: %s I/O %p has failed: %d -> %s",
    619       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
    620       failure, strerror(failure));
    621 
    622     /* Stop the timer. */
    623     async_socket_io_cancel_time_out(asio);
    624 
    625     errno = failure;
    626     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
    627 }
    628 
    629 /* Cancels all the active socket readers.
    630  * Param:
    631  *  as - Initialized AsyncSocket instance.
    632  */
    633 static void
    634 _async_socket_cancel_readers(AsyncSocket* as)
    635 {
    636     while (as->readers_head != NULL) {
    637         AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
    638         /* We ignore action returned from the cancellation callback, since we're
    639          * in a disconnected state here. */
    640         _async_socket_cancel_io(as, to_cancel);
    641         async_socket_io_release(to_cancel);
    642     }
    643 }
    644 
    645 /* Cancels all the active socket writers.
    646  * Param:
    647  *  as - Initialized AsyncSocket instance.
    648  */
    649 static void
    650 _async_socket_cancel_writers(AsyncSocket* as)
    651 {
    652     while (as->writers_head != NULL) {
    653         AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
    654         /* We ignore action returned from the cancellation callback, since we're
    655          * in a disconnected state here. */
    656         _async_socket_cancel_io(as, to_cancel);
    657         async_socket_io_release(to_cancel);
    658     }
    659 }
    660 
    661 /* Cancels all the I/O on the socket. */
    662 static void
    663 _async_socket_cancel_all_io(AsyncSocket* as)
    664 {
    665     /* Stop the reconnection timer. */
    666     loopTimer_stop(as->reconnect_timer);
    667 
    668     /* Stop read / write on the socket. */
    669     loopIo_dontWantWrite(as->io);
    670     loopIo_dontWantRead(as->io);
    671 
    672     /* Cancel active readers and writers. */
    673     _async_socket_cancel_readers(as);
    674     _async_socket_cancel_writers(as);
    675 }
    676 
    677 /* Closes socket handle used by the async socket.
    678  * Param:
    679  *  as - Initialized AsyncSocket instance.
    680  */
    681 static void
    682 _async_socket_close_socket(AsyncSocket* as)
    683 {
    684     if (as->fd >= 0) {
    685         T("ASocket %s: Socket handle %d is closed.",
    686           _async_socket_string(as), as->fd);
    687         loopIo_done(as->io);
    688         socket_close(as->fd);
    689         as->fd = -1;
    690     }
    691 }
    692 
    693 /* Destroys AsyncSocket instance.
    694  * Param:
    695  *  as - Initialized AsyncSocket instance.
    696  */
    697 static void
    698 _async_socket_free(AsyncSocket* as)
    699 {
    700     if (as != NULL) {
    701         T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
    702 
    703         /* Close socket. */
    704         _async_socket_close_socket(as);
    705 
    706         /* Free allocated resources. */
    707         if (as->looper != NULL) {
    708             loopTimer_done(as->reconnect_timer);
    709             if (as->owns_looper) {
    710                 looper_free(as->looper);
    711             }
    712         }
    713         sock_address_done(&as->address);
    714         AFREE(as);
    715     }
    716 }
    717 
    718 /* Starts reconnection attempts after connection has been lost.
    719  * Param:
    720  *  as - Initialized AsyncSocket instance.
    721  *  to - Milliseconds to wait before reconnection attempt.
    722  */
    723 static void
    724 _async_socket_reconnect(AsyncSocket* as, int to)
    725 {
    726     T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
    727 
    728     /* Make sure that no I/O is active, and socket is closed before we
    729      * reconnect. */
    730     _async_socket_cancel_all_io(as);
    731 
    732     /* Set the timer for reconnection attempt. */
    733     loopTimer_startRelative(as->reconnect_timer, to);
    734 }
    735 
    736 /********************************************************************************
    737  *                      Asynchronous Socket callbacks
    738  *******************************************************************************/
    739 
    740 /* A callback that is invoked when socket gets disconnected.
    741  * Param:
    742  *  as - Initialized AsyncSocket instance.
    743  */
    744 static void
    745 _on_async_socket_disconnected(AsyncSocket* as)
    746 {
    747     /* Save error to restore it for the client's callback. */
    748     const int save_errno = errno;
    749     AsyncIOAction action = ASIO_ACTION_ABORT;
    750 
    751     D("ASocket %s: Disconnected.", _async_socket_string(as));
    752 
    753     /* Cancel all the I/O on this socket. */
    754     _async_socket_cancel_all_io(as);
    755 
    756     /* Close the socket. */
    757     _async_socket_close_socket(as);
    758 
    759     /* Restore errno, and invoke client's callback. */
    760     errno = save_errno;
    761     action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
    762 
    763     if (action == ASIO_ACTION_RETRY) {
    764         /* Client requested reconnection. */
    765         _async_socket_reconnect(as, as->reconnect_to);
    766     }
    767 }
    768 
    769 /* A callback that is invoked on socket's I/O failure.
    770  * Param:
    771  *  as - Initialized AsyncSocket instance.
    772  *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
    773  */
    774 static AsyncIOAction
    775 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
    776 {
    777     D("ASocket %s: %s I/O failure: %d -> %s",
    778       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
    779       errno, strerror(errno));
    780 
    781     /* Report the failure. */
    782     return _async_socket_io_failure(as, asio, errno);
    783 }
    784 
    785 /* A callback that is invoked when there is data available to read.
    786  * Param:
    787  *  as - Initialized AsyncSocket instance.
    788  * Return:
    789  *  0 on success, or -1 on failure. Failure returned from this routine will
    790  *  skip writes (if awailable) behind this read.
    791  */
    792 static int
    793 _on_async_socket_recv(AsyncSocket* as)
    794 {
    795     AsyncIOAction action;
    796 
    797     /* Get current reader. */
    798     AsyncSocketIO* const asr = as->readers_head;
    799     if (asr == NULL) {
    800         D("ASocket %s: No reader is available.", _async_socket_string(as));
    801         loopIo_dontWantRead(as->io);
    802         return 0;
    803     }
    804 
    805     /* Reference the reader while we're working with it in this callback. */
    806     async_socket_io_reference(asr);
    807 
    808     /* Bump I/O state, and inform the client that I/O is in progress. */
    809     if (asr->state == ASIO_STATE_QUEUED) {
    810         asr->state = ASIO_STATE_STARTED;
    811     } else {
    812         asr->state = ASIO_STATE_CONTINUES;
    813     }
    814     action = asr->on_io(asr->io_opaque, asr, asr->state);
    815     if (action == ASIO_ACTION_ABORT) {
    816         D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
    817         /* Move on to the next reader. */
    818         _async_socket_advance_reader(as);
    819         /* Lets see if there are still active readers, and enable, or disable
    820          * read I/O callback accordingly. */
    821         if (as->readers_head != NULL) {
    822             loopIo_wantRead(as->io);
    823         } else {
    824             loopIo_dontWantRead(as->io);
    825         }
    826         async_socket_io_release(asr);
    827         return 0;
    828     }
    829 
    830     /* Read next chunk of data. */
    831     int res = HANDLE_EINTR(
    832             socket_recv(as->fd,
    833                         asr->buffer + asr->transferred,
    834                         asr->to_transfer - asr->transferred));
    835     if (res == 0) {
    836         /* Socket has been disconnected. */
    837         errno = ECONNRESET;
    838         _on_async_socket_disconnected(as);
    839         async_socket_io_release(asr);
    840         return -1;
    841     }
    842 
    843     if (res < 0) {
    844         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    845             /* Yield to writes behind this read. */
    846             loopIo_wantRead(as->io);
    847             async_socket_io_release(asr);
    848             return 0;
    849         }
    850 
    851         /* An I/O error. */
    852         action = _on_async_socket_failure(as, asr);
    853         if (action != ASIO_ACTION_RETRY) {
    854             D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
    855             /* Move on to the next reader. */
    856             _async_socket_advance_reader(as);
    857             /* Lets see if there are still active readers, and enable, or disable
    858              * read I/O callback accordingly. */
    859             if (as->readers_head != NULL) {
    860                 loopIo_wantRead(as->io);
    861             } else {
    862                 loopIo_dontWantRead(as->io);
    863             }
    864         }
    865         async_socket_io_release(asr);
    866         return -1;
    867     }
    868 
    869     /* Update the reader's descriptor. */
    870     asr->transferred += res;
    871     if (asr->transferred == asr->to_transfer) {
    872         /* This read is completed. Move on to the next reader. */
    873         _async_socket_advance_reader(as);
    874 
    875         /* Notify reader completion. */
    876         _async_socket_complete_io(as, asr);
    877     }
    878 
    879     /* Lets see if there are still active readers, and enable, or disable read
    880      * I/O callback accordingly. */
    881     if (as->readers_head != NULL) {
    882         loopIo_wantRead(as->io);
    883     } else {
    884         loopIo_dontWantRead(as->io);
    885     }
    886 
    887     async_socket_io_release(asr);
    888 
    889     return 0;
    890 }
    891 
    892 /* A callback that is invoked when there is data available to write.
    893  * Param:
    894  *  as - Initialized AsyncSocket instance.
    895  * Return:
    896  *  0 on success, or -1 on failure. Failure returned from this routine will
    897  *  skip reads (if awailable) behind this write.
    898  */
    899 static int
    900 _on_async_socket_send(AsyncSocket* as)
    901 {
    902     AsyncIOAction action;
    903 
    904     /* Get current writer. */
    905     AsyncSocketIO* const asw = as->writers_head;
    906     if (asw == NULL) {
    907         D("ASocket %s: No writer is available.", _async_socket_string(as));
    908         loopIo_dontWantWrite(as->io);
    909         return 0;
    910     }
    911 
    912     /* Reference the writer while we're working with it in this callback. */
    913     async_socket_io_reference(asw);
    914 
    915     /* Bump I/O state, and inform the client that I/O is in progress. */
    916     if (asw->state == ASIO_STATE_QUEUED) {
    917         asw->state = ASIO_STATE_STARTED;
    918     } else {
    919         asw->state = ASIO_STATE_CONTINUES;
    920     }
    921     action = asw->on_io(asw->io_opaque, asw, asw->state);
    922     if (action == ASIO_ACTION_ABORT) {
    923         D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
    924         /* Move on to the next writer. */
    925         _async_socket_advance_writer(as);
    926         /* Lets see if there are still active writers, and enable, or disable
    927          * write I/O callback accordingly. */
    928         if (as->writers_head != NULL) {
    929             loopIo_wantWrite(as->io);
    930         } else {
    931             loopIo_dontWantWrite(as->io);
    932         }
    933         async_socket_io_release(asw);
    934         return 0;
    935     }
    936 
    937     /* Write next chunk of data. */
    938     int res = HANDLE_EINTR(
    939             socket_send(as->fd,
    940                         asw->buffer + asw->transferred,
    941                         asw->to_transfer - asw->transferred));
    942     if (res == 0) {
    943         /* Socket has been disconnected. */
    944         errno = ECONNRESET;
    945         _on_async_socket_disconnected(as);
    946         async_socket_io_release(asw);
    947         return -1;
    948     }
    949 
    950     if (res < 0) {
    951         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    952             /* Yield to reads behind this write. */
    953             loopIo_wantWrite(as->io);
    954             async_socket_io_release(asw);
    955             return 0;
    956         }
    957 
    958         /* An I/O error. */
    959         action = _on_async_socket_failure(as, asw);
    960         if (action != ASIO_ACTION_RETRY) {
    961             D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
    962             /* Move on to the next writer. */
    963             _async_socket_advance_writer(as);
    964             /* Lets see if there are still active writers, and enable, or disable
    965              * write I/O callback accordingly. */
    966             if (as->writers_head != NULL) {
    967                 loopIo_wantWrite(as->io);
    968             } else {
    969                 loopIo_dontWantWrite(as->io);
    970             }
    971         }
    972         async_socket_io_release(asw);
    973         return -1;
    974     }
    975 
    976     /* Update the writer descriptor. */
    977     asw->transferred += res;
    978     if (asw->transferred == asw->to_transfer) {
    979         /* This write is completed. Move on to the next writer. */
    980         _async_socket_advance_writer(as);
    981 
    982         /* Notify writer completion. */
    983         _async_socket_complete_io(as, asw);
    984     }
    985 
    986     /* Lets see if there are still active writers, and enable, or disable write
    987      * I/O callback accordingly. */
    988     if (as->writers_head != NULL) {
    989         loopIo_wantWrite(as->io);
    990     } else {
    991         loopIo_dontWantWrite(as->io);
    992     }
    993 
    994     async_socket_io_release(asw);
    995 
    996     return 0;
    997 }
    998 
    999 /* A callback that is invoked when an I/O is available on socket.
   1000  * Param:
   1001  *  as - Initialized AsyncSocket instance.
   1002  *  fd - Socket's file descriptor.
   1003  *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
   1004  */
   1005 static void
   1006 _on_async_socket_io(void* opaque, int fd, unsigned events)
   1007 {
   1008     AsyncSocket* const as = (AsyncSocket*)opaque;
   1009 
   1010     /* Reference the socket while we're working with it in this callback. */
   1011     async_socket_reference(as);
   1012 
   1013     if ((events & LOOP_IO_READ) != 0) {
   1014         if (_on_async_socket_recv(as) != 0) {
   1015             async_socket_release(as);
   1016             return;
   1017         }
   1018     }
   1019 
   1020     if ((events & LOOP_IO_WRITE) != 0) {
   1021         if (_on_async_socket_send(as) != 0) {
   1022             async_socket_release(as);
   1023             return;
   1024         }
   1025     }
   1026 
   1027     async_socket_release(as);
   1028 }
   1029 
   1030 /* A callback that is invoked by asynchronous socket connector on connection
   1031  *  events.
   1032  * Param:
   1033  *  opaque - Initialized AsyncSocket instance.
   1034  *  connector - Connector that is used to connect this socket.
   1035  *  event - Connection event.
   1036  * Return:
   1037  *  One of AsyncIOAction values.
   1038  */
   1039 static AsyncIOAction
   1040 _on_connector_events(void* opaque,
   1041                      AsyncSocketConnector* connector,
   1042                      AsyncIOState event)
   1043 {
   1044     AsyncIOAction action;
   1045     AsyncSocket* const as = (AsyncSocket*)opaque;
   1046 
   1047     /* Reference the socket while we're working with it in this callback. */
   1048     async_socket_reference(as);
   1049 
   1050     if (event == ASIO_STATE_SUCCEEDED) {
   1051         /* Accept the connection. */
   1052         as->fd = async_socket_connector_pull_fd(connector);
   1053         loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
   1054     }
   1055 
   1056     /* Invoke client's callback. */
   1057     action = as->on_connection(as->client_opaque, as, event);
   1058     if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
   1059         /* For whatever reason the client didn't want to keep this connection.
   1060          * Close it. */
   1061         D("ASocket %s: Connection is discarded by the client.",
   1062           _async_socket_string(as));
   1063         _async_socket_close_socket(as);
   1064     }
   1065 
   1066     if (action != ASIO_ACTION_RETRY) {
   1067         async_socket_connector_release(connector);
   1068     }
   1069 
   1070     async_socket_release(as);
   1071 
   1072     return action;
   1073 }
   1074 
   1075 /* Timer callback invoked to reconnect the lost connection.
   1076  * Param:
   1077  *  as - Initialized AsyncSocket instance.
   1078  */
   1079 void
   1080 _on_async_socket_reconnect(void* opaque)
   1081 {
   1082     AsyncSocket* as = (AsyncSocket*)opaque;
   1083 
   1084     /* Reference the socket while we're working with it in this callback. */
   1085     async_socket_reference(as);
   1086     async_socket_connect(as, as->reconnect_to);
   1087     async_socket_release(as);
   1088 }
   1089 
   1090 
   1091 /********************************************************************************
   1092  *                  Android Device Socket public API
   1093  *******************************************************************************/
   1094 
   1095 AsyncSocket*
   1096 async_socket_new(int port,
   1097                  int reconnect_to,
   1098                  on_as_connection_cb client_cb,
   1099                  void* client_opaque,
   1100                  Looper* looper)
   1101 {
   1102     AsyncSocket* as;
   1103 
   1104     if (client_cb == NULL) {
   1105         E("Invalid client_cb parameter");
   1106         return NULL;
   1107     }
   1108 
   1109     ANEW0(as);
   1110 
   1111     as->fd = -1;
   1112     as->client_opaque = client_opaque;
   1113     as->on_connection = client_cb;
   1114     as->readers_head = as->readers_tail = NULL;
   1115     as->reconnect_to = reconnect_to;
   1116     as->ref_count = 1;
   1117     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
   1118     if (looper == NULL) {
   1119         as->looper = looper_newCore();
   1120         if (as->looper == NULL) {
   1121             E("Unable to create I/O looper for async socket '%s'",
   1122               _async_socket_string(as));
   1123             client_cb(client_opaque, as, ASIO_STATE_FAILED);
   1124             _async_socket_free(as);
   1125             return NULL;
   1126         }
   1127         as->owns_looper = 1;
   1128     } else {
   1129         as->looper = looper;
   1130         as->owns_looper = 0;
   1131     }
   1132 
   1133     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
   1134 
   1135     T("ASocket %s: Descriptor is created.", _async_socket_string(as));
   1136 
   1137     return as;
   1138 }
   1139 
   1140 int
   1141 async_socket_reference(AsyncSocket* as)
   1142 {
   1143     assert(as->ref_count > 0);
   1144     as->ref_count++;
   1145     return as->ref_count;
   1146 }
   1147 
   1148 int
   1149 async_socket_release(AsyncSocket* as)
   1150 {
   1151     assert(as->ref_count > 0);
   1152     as->ref_count--;
   1153     if (as->ref_count == 0) {
   1154         /* Last reference has been dropped. Destroy this object. */
   1155         _async_socket_cancel_all_io(as);
   1156         _async_socket_free(as);
   1157         return 0;
   1158     }
   1159     return as->ref_count;
   1160 }
   1161 
   1162 void
   1163 async_socket_connect(AsyncSocket* as, int retry_to)
   1164 {
   1165     T("ASocket %s: Handling connection request for %dms...",
   1166       _async_socket_string(as), retry_to);
   1167 
   1168     AsyncSocketConnector* const connector =
   1169         async_socket_connector_new(&as->address, retry_to, _on_connector_events,
   1170                                    as, as->looper);
   1171     if (connector != NULL) {
   1172         async_socket_connector_connect(connector);
   1173     } else {
   1174         as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
   1175     }
   1176 }
   1177 
   1178 void
   1179 async_socket_disconnect(AsyncSocket* as)
   1180 {
   1181     T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
   1182 
   1183     if (as != NULL) {
   1184         _async_socket_cancel_all_io(as);
   1185         _async_socket_close_socket(as);
   1186     }
   1187 }
   1188 
   1189 void
   1190 async_socket_reconnect(AsyncSocket* as, int retry_to)
   1191 {
   1192     T("ASocket %s: Handling reconnection request for %dms...",
   1193       _async_socket_string(as), retry_to);
   1194 
   1195     _async_socket_cancel_all_io(as);
   1196     _async_socket_close_socket(as);
   1197     _async_socket_reconnect(as, retry_to);
   1198 }
   1199 
   1200 void
   1201 async_socket_read_abs(AsyncSocket* as,
   1202                       void* buffer, uint32_t len,
   1203                       on_as_io_cb reader_cb,
   1204                       void* reader_opaque,
   1205                       Duration deadline)
   1206 {
   1207     T("ASocket %s: Handling read for %d bytes with deadline %lld...",
   1208       _async_socket_string(as), len, deadline);
   1209 
   1210     AsyncSocketIO* const asr =
   1211         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
   1212                                  deadline);
   1213     if (async_socket_is_connected(as)) {
   1214         /* Add new reader to the list. Note that we use initial reference from I/O
   1215          * 'new' routine as "in the list" reference counter. */
   1216         if (as->readers_head == NULL) {
   1217             as->readers_head = as->readers_tail = asr;
   1218         } else {
   1219             as->readers_tail->next = asr;
   1220             as->readers_tail = asr;
   1221         }
   1222         loopIo_wantRead(as->io);
   1223     } else {
   1224         D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
   1225         errno = ECONNRESET;
   1226         reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
   1227         async_socket_io_release(asr);
   1228     }
   1229 }
   1230 
   1231 void
   1232 async_socket_read_rel(AsyncSocket* as,
   1233                       void* buffer, uint32_t len,
   1234                       on_as_io_cb reader_cb,
   1235                       void* reader_opaque,
   1236                       int to)
   1237 {
   1238     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
   1239                                     DURATION_INFINITE;
   1240     async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
   1241 }
   1242 
   1243 void
   1244 async_socket_write_abs(AsyncSocket* as,
   1245                        const void* buffer, uint32_t len,
   1246                        on_as_io_cb writer_cb,
   1247                        void* writer_opaque,
   1248                        Duration deadline)
   1249 {
   1250     T("ASocket %s: Handling write for %d bytes with deadline %lld...",
   1251       _async_socket_string(as), len, deadline);
   1252 
   1253     AsyncSocketIO* const asw =
   1254         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
   1255                                  deadline);
   1256     if (async_socket_is_connected(as)) {
   1257         /* Add new writer to the list. Note that we use initial reference from I/O
   1258          * 'new' routine as "in the list" reference counter. */
   1259         if (as->writers_head == NULL) {
   1260             as->writers_head = as->writers_tail = asw;
   1261         } else {
   1262             as->writers_tail->next = asw;
   1263             as->writers_tail = asw;
   1264         }
   1265         loopIo_wantWrite(as->io);
   1266     } else {
   1267         D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
   1268         errno = ECONNRESET;
   1269         writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
   1270         async_socket_io_release(asw);
   1271     }
   1272 }
   1273 
   1274 void
   1275 async_socket_write_rel(AsyncSocket* as,
   1276                        const void* buffer, uint32_t len,
   1277                        on_as_io_cb writer_cb,
   1278                        void* writer_opaque,
   1279                        int to)
   1280 {
   1281     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
   1282                                     DURATION_INFINITE;
   1283     async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
   1284 }
   1285 
   1286 void*
   1287 async_socket_get_client_opaque(const AsyncSocket* as)
   1288 {
   1289     return as->client_opaque;
   1290 }
   1291 
   1292 Duration
   1293 async_socket_deadline(AsyncSocket* as, int rel)
   1294 {
   1295     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
   1296                         DURATION_INFINITE;
   1297 }
   1298 
   1299 int
   1300 async_socket_get_port(const AsyncSocket* as)
   1301 {
   1302     return sock_address_get_port(&as->address);
   1303 }
   1304 
   1305 int
   1306 async_socket_is_connected(const AsyncSocket* as)
   1307 {
   1308     return as->fd >= 0;
   1309 }
   1310