Home | History | Annotate | Download | only in android
      1 /*
      2  * Copyright (C) 2012 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 /*
     18  * Encapsulates exchange protocol between the emulator, and an Android device
     19  * that is connected to the host via USB. The communication is established over
     20  * a TCP port forwarding, enabled by ADB.
     21  */
     22 
     23 #include "qemu-common.h"
     24 #include "android/async-utils.h"
     25 #include "android/utils/debug.h"
     26 #include "android/async-socket-connector.h"
     27 #include "android/async-socket.h"
     28 #include "utils/panic.h"
     29 #include "iolooper.h"
     30 
     31 #define  E(...)    derror(__VA_ARGS__)
     32 #define  W(...)    dwarning(__VA_ARGS__)
     33 #define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
     34 #define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
     35 
     36 #define TRACE_ON    0
     37 
     38 #if TRACE_ON
     39 #define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
     40 #else
     41 #define  T(...)
     42 #endif
     43 
     44 /********************************************************************************
     45  *                  Asynchronous Socket internal API declarations
     46  *******************************************************************************/
     47 
     48 /* Gets socket's address string. */
     49 static const char* _async_socket_string(AsyncSocket* as);
     50 
     51 /* Gets socket's looper. */
     52 static Looper* _async_socket_get_looper(AsyncSocket* as);
     53 
     54 /* Handler for the I/O time out.
     55  * Param:
     56  *  as - Asynchronous socket for the I/O.
     57  *  asio - Desciptor for the timed out I/O.
     58  */
     59 static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
     60                                                 AsyncSocketIO* asio);
     61 
     62 /********************************************************************************
     63  *                  Asynchronous Socket Reader / Writer
     64  *******************************************************************************/
     65 
     66 struct AsyncSocketIO {
     67     /* Next I/O in the reader, or writer list. */
     68     AsyncSocketIO*      next;
     69     /* Asynchronous socket for this I/O. */
     70     AsyncSocket*        as;
     71     /* Timer used for time outs on this I/O. */
     72     LoopTimer           timer[1];
     73     /* An opaque pointer associated with this I/O. */
     74     void*               io_opaque;
     75     /* Buffer where to read / write data. */
     76     uint8_t*            buffer;
     77     /* Bytes to transfer through the socket for this I/O. */
     78     uint32_t            to_transfer;
     79     /* Bytes thransferred through the socket in this I/O. */
     80     uint32_t            transferred;
     81     /* I/O callback for this I/O. */
     82     on_as_io_cb         on_io;
     83     /* I/O type selector: 1 - read, 0 - write. */
     84     int                 is_io_read;
     85     /* State of the I/O. */
     86     AsyncIOState        state;
     87     /* Number of outstanding references to the I/O. */
     88     int                 ref_count;
     89     /* Deadline for this I/O */
     90     Duration            deadline;
     91 };
     92 
     93 /*
     94  * Recycling I/O instances.
     95  * Since AsyncSocketIO instances are not that large, it makes sence to recycle
     96  * them for faster allocation, rather than allocating and freeing them for each
     97  * I/O on the socket.
     98  */
     99 
    100 /* List of recycled I/O descriptors. */
    101 static AsyncSocketIO* _asio_recycled    = NULL;
    102 /* Number of I/O descriptors that are recycled in the _asio_recycled list. */
    103 static int _recycled_asio_count         = 0;
    104 /* Maximum number of I/O descriptors that can be recycled. */
    105 static const int _max_recycled_asio_num = 32;
    106 
    107 /* Handler for an I/O time-out timer event.
    108  * When this routine is invoked, it indicates that a time out has occurred on an
    109  * I/O.
    110  * Param:
    111  *  opaque - AsyncSocketIO instance representing the timed out I/O.
    112  */
    113 static void _on_async_socket_io_timed_out(void* opaque);
    114 
    115 /* Creates new I/O descriptor.
    116  * Param:
    117  *  as - Asynchronous socket for the I/O.
    118  *  is_io_read - I/O type selector: 1 - read, 0 - write.
    119  *  buffer, len - Reader / writer buffer address.
    120  *  io_cb - Callback for this reader / writer.
    121  *  io_opaque - An opaque pointer associated with the I/O.
    122  *  deadline - Deadline to complete the I/O.
    123  * Return:
    124  *  Initialized AsyncSocketIO instance.
    125  */
    126 static AsyncSocketIO*
    127 _async_socket_rw_new(AsyncSocket* as,
    128                      int is_io_read,
    129                      void* buffer,
    130                      uint32_t len,
    131                      on_as_io_cb io_cb,
    132                      void* io_opaque,
    133                      Duration deadline)
    134 {
    135     /* Lookup in the recycler first. */
    136     AsyncSocketIO* asio = _asio_recycled;
    137     if (asio != NULL) {
    138         /* Pull the descriptor from recycler. */
    139         _asio_recycled = asio->next;
    140         _recycled_asio_count--;
    141     } else {
    142         /* No recycled descriptors. Allocate new one. */
    143         ANEW0(asio);
    144     }
    145 
    146     asio->next          = NULL;
    147     asio->as            = as;
    148     asio->is_io_read    = is_io_read;
    149     asio->buffer        = (uint8_t*)buffer;
    150     asio->to_transfer   = len;
    151     asio->transferred   = 0;
    152     asio->on_io         = io_cb;
    153     asio->io_opaque     = io_opaque;
    154     asio->state         = ASIO_STATE_QUEUED;
    155     asio->ref_count     = 1;
    156     asio->deadline      = deadline;
    157     loopTimer_init(asio->timer, _async_socket_get_looper(as),
    158                    _on_async_socket_io_timed_out, asio);
    159     loopTimer_startAbsolute(asio->timer, deadline);
    160 
    161     /* Reference socket that is holding this I/O. */
    162     async_socket_reference(as);
    163 
    164     T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
    165       _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
    166 
    167     return asio;
    168 }
    169 
    170 /* Destroys and frees I/O descriptor. */
    171 static void
    172 _async_socket_io_free(AsyncSocketIO* asio)
    173 {
    174     AsyncSocket* const as = asio->as;
    175 
    176     T("ASocket %s: %s I/O descriptor %p is destroyed.",
    177       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    178 
    179     loopTimer_done(asio->timer);
    180 
    181     /* Try to recycle it first, and free the memory if recycler is full. */
    182     if (_recycled_asio_count < _max_recycled_asio_num) {
    183         asio->next = _asio_recycled;
    184         _asio_recycled = asio;
    185         _recycled_asio_count++;
    186     } else {
    187         AFREE(asio);
    188     }
    189 
    190     /* Release socket that is holding this I/O. */
    191     async_socket_release(as);
    192 }
    193 
    194 /* An I/O has been finished and its descriptor is about to be discarded. */
    195 static void
    196 _async_socket_io_finished(AsyncSocketIO* asio)
    197 {
    198     /* Notify the client of the I/O that I/O is finished. */
    199     asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
    200 }
    201 
    202 int
    203 async_socket_io_reference(AsyncSocketIO* asio)
    204 {
    205     assert(asio->ref_count > 0);
    206     asio->ref_count++;
    207     return asio->ref_count;
    208 }
    209 
    210 int
    211 async_socket_io_release(AsyncSocketIO* asio)
    212 {
    213     assert(asio->ref_count > 0);
    214     asio->ref_count--;
    215     if (asio->ref_count == 0) {
    216         _async_socket_io_finished(asio);
    217         /* Last reference has been dropped. Destroy this object. */
    218         _async_socket_io_free(asio);
    219         return 0;
    220     }
    221     return asio->ref_count;
    222 }
    223 
    224 /* Creates new asynchronous socket reader.
    225  * Param:
    226  *  as - Asynchronous socket for the reader.
    227  *  buffer, len - Reader's buffer.
    228  *  io_cb - Reader's callback.
    229  *  reader_opaque - An opaque pointer associated with the reader.
    230  *  deadline - Deadline to complete the operation.
    231  * Return:
    232  *  An initialized AsyncSocketIO intance.
    233  */
    234 static AsyncSocketIO*
    235 _async_socket_reader_new(AsyncSocket* as,
    236                          void* buffer,
    237                          uint32_t len,
    238                          on_as_io_cb io_cb,
    239                          void* reader_opaque,
    240                          Duration deadline)
    241 {
    242     AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
    243                                                      reader_opaque, deadline);
    244     return asio;
    245 }
    246 
    247 /* Creates new asynchronous socket writer.
    248  * Param:
    249  *  as - Asynchronous socket for the writer.
    250  *  buffer, len - Writer's buffer.
    251  *  io_cb - Writer's callback.
    252  *  writer_opaque - An opaque pointer associated with the writer.
    253  *  deadline - Deadline to complete the operation.
    254  * Return:
    255  *  An initialized AsyncSocketIO intance.
    256  */
    257 static AsyncSocketIO*
    258 _async_socket_writer_new(AsyncSocket* as,
    259                          const void* buffer,
    260                          uint32_t len,
    261                          on_as_io_cb io_cb,
    262                          void* writer_opaque,
    263                          Duration deadline)
    264 {
    265     AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
    266                                                      io_cb, writer_opaque,
    267                                                      deadline);
    268     return asio;
    269 }
    270 
    271 /* I/O timed out. */
    272 static void
    273 _on_async_socket_io_timed_out(void* opaque)
    274 {
    275     AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
    276     AsyncSocket* const as = asio->as;
    277 
    278     D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
    279       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
    280       asio->deadline, async_socket_deadline(as, 0));
    281 
    282     /* Reference while in callback. */
    283     async_socket_io_reference(asio);
    284     _async_socket_io_timed_out(asio->as, asio);
    285     async_socket_io_release(asio);
    286 }
    287 
    288 /********************************************************************************
    289  *                 Public Asynchronous Socket I/O API
    290  *******************************************************************************/
    291 
    292 AsyncSocket*
    293 async_socket_io_get_socket(const AsyncSocketIO* asio)
    294 {
    295     async_socket_reference(asio->as);
    296     return asio->as;
    297 }
    298 
    299 void
    300 async_socket_io_cancel_time_out(AsyncSocketIO* asio)
    301 {
    302     loopTimer_stop(asio->timer);
    303 }
    304 
    305 void*
    306 async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
    307 {
    308     return asio->io_opaque;
    309 }
    310 
    311 void*
    312 async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
    313 {
    314     return async_socket_get_client_opaque(asio->as);
    315 }
    316 
    317 void*
    318 async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
    319                                 uint32_t* transferred,
    320                                 uint32_t* to_transfer)
    321 {
    322     if (transferred != NULL) {
    323         *transferred = asio->transferred;
    324     }
    325     if (to_transfer != NULL) {
    326         *to_transfer = asio->to_transfer;
    327     }
    328     return asio->buffer;
    329 }
    330 
    331 void*
    332 async_socket_io_get_buffer(const AsyncSocketIO* asio)
    333 {
    334     return asio->buffer;
    335 }
    336 
    337 uint32_t
    338 async_socket_io_get_transferred(const AsyncSocketIO* asio)
    339 {
    340     return asio->transferred;
    341 }
    342 
    343 uint32_t
    344 async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
    345 {
    346     return asio->to_transfer;
    347 }
    348 
    349 int
    350 async_socket_io_is_read(const AsyncSocketIO* asio)
    351 {
    352     return asio->is_io_read;
    353 }
    354 
    355 /********************************************************************************
    356  *                      Asynchronous Socket internals
    357  *******************************************************************************/
    358 
    359 struct AsyncSocket {
    360     /* TCP address for the socket. */
    361     SockAddress         address;
    362     /* Connection callback for this socket. */
    363     on_as_connection_cb on_connection;
    364     /* An opaque pointer associated with this socket by the client. */
    365     void*               client_opaque;
    366     /* I/O looper for asynchronous I/O on the socket. */
    367     Looper*             looper;
    368     /* I/O descriptor for asynchronous I/O on the socket. */
    369     LoopIo              io[1];
    370     /* Timer to use for reconnection attempts. */
    371     LoopTimer           reconnect_timer[1];
    372     /* Head of the list of the active readers. */
    373     AsyncSocketIO*      readers_head;
    374     /* Tail of the list of the active readers. */
    375     AsyncSocketIO*      readers_tail;
    376     /* Head of the list of the active writers. */
    377     AsyncSocketIO*      writers_head;
    378     /* Tail of the list of the active writers. */
    379     AsyncSocketIO*      writers_tail;
    380     /* Socket's file descriptor. */
    381     int                 fd;
    382     /* Timeout to use for reconnection attempts. */
    383     int                 reconnect_to;
    384     /* Number of outstanding references to the socket. */
    385     int                 ref_count;
    386     /* Flags whether (1) or not (0) socket owns the looper. */
    387     int                 owns_looper;
    388 };
    389 
    390 static const char*
    391 _async_socket_string(AsyncSocket* as)
    392 {
    393     return sock_address_to_string(&as->address);
    394 }
    395 
    396 static Looper*
    397 _async_socket_get_looper(AsyncSocket* as)
    398 {
    399     return as->looper;
    400 }
    401 
    402 /* Pulls first reader out of the list.
    403  * Param:
    404  *  as - Initialized AsyncSocket instance.
    405  * Return:
    406  *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
    407  *  Note that the caller is responsible for releasing the I/O object returned
    408  *  from this routine.
    409  */
    410 static AsyncSocketIO*
    411 _async_socket_pull_first_io(AsyncSocket* as,
    412                             AsyncSocketIO** list_head,
    413                             AsyncSocketIO** list_tail)
    414 {
    415     AsyncSocketIO* const ret = *list_head;
    416     if (ret != NULL) {
    417         *list_head = ret->next;
    418         ret->next = NULL;
    419         if (*list_head == NULL) {
    420             *list_tail = NULL;
    421         }
    422     }
    423     return ret;
    424 }
    425 
    426 /* Pulls first reader out of the list.
    427  * Param:
    428  *  as - Initialized AsyncSocket instance.
    429  * Return:
    430  *  First reader pulled out of the list, or NULL if there are no readers in the
    431  *  list.
    432  *  Note that the caller is responsible for releasing the I/O object returned
    433  *  from this routine.
    434  */
    435 static AsyncSocketIO*
    436 _async_socket_pull_first_reader(AsyncSocket* as)
    437 {
    438     return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
    439 }
    440 
    441 /* Pulls first writer out of the list.
    442  * Param:
    443  *  as - Initialized AsyncSocket instance.
    444  * Return:
    445  *  First writer pulled out of the list, or NULL if there are no writers in the
    446  *  list.
    447  *  Note that the caller is responsible for releasing the I/O object returned
    448  *  from this routine.
    449  */
    450 static AsyncSocketIO*
    451 _async_socket_pull_first_writer(AsyncSocket* as)
    452 {
    453     return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
    454 }
    455 
    456 /* Removes an I/O descriptor from a list of active I/O.
    457  * Param:
    458  *  as - Initialized AsyncSocket instance.
    459  *  list_head, list_tail - Pointers to the list head and tail.
    460  *  io - I/O to remove.
    461  * Return:
    462  *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
    463  */
    464 static int
    465 _async_socket_remove_io(AsyncSocket* as,
    466                         AsyncSocketIO** list_head,
    467                         AsyncSocketIO** list_tail,
    468                         AsyncSocketIO* io)
    469 {
    470     AsyncSocketIO* prev = NULL;
    471 
    472     while (*list_head != NULL && io != *list_head) {
    473         prev = *list_head;
    474         list_head = &((*list_head)->next);
    475     }
    476     if (*list_head == NULL) {
    477         D("%s: I/O %p is not found in the list for socket '%s'",
    478           __FUNCTION__, io, _async_socket_string(as));
    479         return 0;
    480     }
    481 
    482     *list_head = io->next;
    483     if (prev != NULL) {
    484         prev->next = io->next;
    485     }
    486     if (*list_tail == io) {
    487         *list_tail = prev;
    488     }
    489 
    490     /* Release I/O adjusting reference added when I/O has been saved in the list. */
    491     async_socket_io_release(io);
    492 
    493     return 1;
    494 }
    495 
    496 /* Advances to the next I/O in the list.
    497  * Param:
    498  *  as - Initialized AsyncSocket instance.
    499  *  list_head, list_tail - Pointers to the list head and tail.
    500  */
    501 static void
    502 _async_socket_advance_io(AsyncSocket* as,
    503                          AsyncSocketIO** list_head,
    504                          AsyncSocketIO** list_tail)
    505 {
    506     AsyncSocketIO* first_io = *list_head;
    507     if (first_io != NULL) {
    508         *list_head = first_io->next;
    509         first_io->next = NULL;
    510     }
    511     if (*list_head == NULL) {
    512         *list_tail = NULL;
    513     }
    514     if (first_io != NULL) {
    515         /* Release I/O removed from the head of the list. */
    516         async_socket_io_release(first_io);
    517     }
    518 }
    519 
    520 /* Advances to the next reader in the list.
    521  * Param:
    522  *  as - Initialized AsyncSocket instance.
    523  */
    524 static void
    525 _async_socket_advance_reader(AsyncSocket* as)
    526 {
    527     _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
    528 }
    529 
    530 /* Advances to the next writer in the list.
    531  * Param:
    532  *  as - Initialized AsyncSocket instance.
    533  */
    534 static void
    535 _async_socket_advance_writer(AsyncSocket* as)
    536 {
    537     _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
    538 }
    539 
    540 /* Completes an I/O.
    541  * Param:
    542  *  as - Initialized AsyncSocket instance.
    543  *  asio - I/O to complete.
    544  * Return:
    545  *  One of AsyncIOAction values.
    546  */
    547 static AsyncIOAction
    548 _async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
    549 {
    550     T("ASocket %s: %s I/O %p is completed.",
    551       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    552 
    553     /* Stop the timer. */
    554     async_socket_io_cancel_time_out(asio);
    555 
    556     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
    557 }
    558 
    559 /* Timeouts an I/O.
    560  * Param:
    561  *  as - Initialized AsyncSocket instance.
    562  *  asio - An I/O that has timed out.
    563  * Return:
    564  *  One of AsyncIOAction values.
    565  */
    566 static AsyncIOAction
    567 _async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
    568 {
    569     T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
    570       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
    571       asio->deadline, async_socket_deadline(as, 0));
    572 
    573     /* Report to the client. */
    574     const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
    575                                              ASIO_STATE_TIMED_OUT);
    576 
    577     /* Remove the I/O from a list of active I/O for actions other than retry. */
    578     if (action != ASIO_ACTION_RETRY) {
    579         if (asio->is_io_read) {
    580             _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
    581         } else {
    582             _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
    583         }
    584     }
    585 
    586     return action;
    587 }
    588 
    589 /* Cancels an I/O.
    590  * Param:
    591  *  as - Initialized AsyncSocket instance.
    592  *  asio - An I/O to cancel.
    593  * Return:
    594  *  One of AsyncIOAction values.
    595  */
    596 static AsyncIOAction
    597 _async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
    598 {
    599     T("ASocket %s: %s I/O %p is cancelled.",
    600       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
    601 
    602     /* Stop the timer. */
    603     async_socket_io_cancel_time_out(asio);
    604 
    605     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
    606 }
    607 
    608 /* Reports an I/O failure.
    609  * Param:
    610  *  as - Initialized AsyncSocket instance.
    611  *  asio - An I/O that has failed. Can be NULL for general failures.
    612  *  failure - Failure (errno) that has occurred.
    613  * Return:
    614  *  One of AsyncIOAction values.
    615  */
    616 static AsyncIOAction
    617 _async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
    618 {
    619     T("ASocket %s: %s I/O %p has failed: %d -> %s",
    620       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
    621       failure, strerror(failure));
    622 
    623     /* Stop the timer. */
    624     async_socket_io_cancel_time_out(asio);
    625 
    626     errno = failure;
    627     return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
    628 }
    629 
    630 /* Cancels all the active socket readers.
    631  * Param:
    632  *  as - Initialized AsyncSocket instance.
    633  */
    634 static void
    635 _async_socket_cancel_readers(AsyncSocket* as)
    636 {
    637     while (as->readers_head != NULL) {
    638         AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
    639         /* We ignore action returned from the cancellation callback, since we're
    640          * in a disconnected state here. */
    641         _async_socket_cancel_io(as, to_cancel);
    642         async_socket_io_release(to_cancel);
    643     }
    644 }
    645 
    646 /* Cancels all the active socket writers.
    647  * Param:
    648  *  as - Initialized AsyncSocket instance.
    649  */
    650 static void
    651 _async_socket_cancel_writers(AsyncSocket* as)
    652 {
    653     while (as->writers_head != NULL) {
    654         AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
    655         /* We ignore action returned from the cancellation callback, since we're
    656          * in a disconnected state here. */
    657         _async_socket_cancel_io(as, to_cancel);
    658         async_socket_io_release(to_cancel);
    659     }
    660 }
    661 
    662 /* Cancels all the I/O on the socket. */
    663 static void
    664 _async_socket_cancel_all_io(AsyncSocket* as)
    665 {
    666     /* Stop the reconnection timer. */
    667     loopTimer_stop(as->reconnect_timer);
    668 
    669     /* Stop read / write on the socket. */
    670     loopIo_dontWantWrite(as->io);
    671     loopIo_dontWantRead(as->io);
    672 
    673     /* Cancel active readers and writers. */
    674     _async_socket_cancel_readers(as);
    675     _async_socket_cancel_writers(as);
    676 }
    677 
    678 /* Closes socket handle used by the async socket.
    679  * Param:
    680  *  as - Initialized AsyncSocket instance.
    681  */
    682 static void
    683 _async_socket_close_socket(AsyncSocket* as)
    684 {
    685     if (as->fd >= 0) {
    686         loopIo_done(as->io);
    687         socket_close(as->fd);
    688         T("ASocket %s: Socket handle %d is closed.",
    689           _async_socket_string(as), as->fd);
    690         as->fd = -1;
    691     }
    692 }
    693 
    694 /* Destroys AsyncSocket instance.
    695  * Param:
    696  *  as - Initialized AsyncSocket instance.
    697  */
    698 static void
    699 _async_socket_free(AsyncSocket* as)
    700 {
    701     if (as != NULL) {
    702         T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
    703 
    704         /* Close socket. */
    705         _async_socket_close_socket(as);
    706 
    707         /* Free allocated resources. */
    708         if (as->looper != NULL) {
    709             loopTimer_done(as->reconnect_timer);
    710             if (as->owns_looper) {
    711                 looper_free(as->looper);
    712             }
    713         }
    714         sock_address_done(&as->address);
    715         AFREE(as);
    716     }
    717 }
    718 
    719 /* Starts reconnection attempts after connection has been lost.
    720  * Param:
    721  *  as - Initialized AsyncSocket instance.
    722  *  to - Milliseconds to wait before reconnection attempt.
    723  */
    724 static void
    725 _async_socket_reconnect(AsyncSocket* as, int to)
    726 {
    727     T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
    728 
    729     /* Make sure that no I/O is active, and socket is closed before we
    730      * reconnect. */
    731     _async_socket_cancel_all_io(as);
    732 
    733     /* Set the timer for reconnection attempt. */
    734     loopTimer_startRelative(as->reconnect_timer, to);
    735 }
    736 
    737 /********************************************************************************
    738  *                      Asynchronous Socket callbacks
    739  *******************************************************************************/
    740 
    741 /* A callback that is invoked when socket gets disconnected.
    742  * Param:
    743  *  as - Initialized AsyncSocket instance.
    744  */
    745 static void
    746 _on_async_socket_disconnected(AsyncSocket* as)
    747 {
    748     /* Save error to restore it for the client's callback. */
    749     const int save_errno = errno;
    750     AsyncIOAction action = ASIO_ACTION_ABORT;
    751 
    752     D("ASocket %s: Disconnected.", _async_socket_string(as));
    753 
    754     /* Cancel all the I/O on this socket. */
    755     _async_socket_cancel_all_io(as);
    756 
    757     /* Close the socket. */
    758     _async_socket_close_socket(as);
    759 
    760     /* Restore errno, and invoke client's callback. */
    761     errno = save_errno;
    762     action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
    763 
    764     if (action == ASIO_ACTION_RETRY) {
    765         /* Client requested reconnection. */
    766         _async_socket_reconnect(as, as->reconnect_to);
    767     }
    768 }
    769 
    770 /* A callback that is invoked on socket's I/O failure.
    771  * Param:
    772  *  as - Initialized AsyncSocket instance.
    773  *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
    774  */
    775 static AsyncIOAction
    776 _on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
    777 {
    778     D("ASocket %s: %s I/O failure: %d -> %s",
    779       _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
    780       errno, strerror(errno));
    781 
    782     /* Report the failure. */
    783     return _async_socket_io_failure(as, asio, errno);
    784 }
    785 
    786 /* A callback that is invoked when there is data available to read.
    787  * Param:
    788  *  as - Initialized AsyncSocket instance.
    789  * Return:
    790  *  0 on success, or -1 on failure. Failure returned from this routine will
    791  *  skip writes (if awailable) behind this read.
    792  */
    793 static int
    794 _on_async_socket_recv(AsyncSocket* as)
    795 {
    796     AsyncIOAction action;
    797 
    798     /* Get current reader. */
    799     AsyncSocketIO* const asr = as->readers_head;
    800     if (asr == NULL) {
    801         D("ASocket %s: No reader is available.", _async_socket_string(as));
    802         loopIo_dontWantRead(as->io);
    803         return 0;
    804     }
    805 
    806     /* Reference the reader while we're working with it in this callback. */
    807     async_socket_io_reference(asr);
    808 
    809     /* Bump I/O state, and inform the client that I/O is in progress. */
    810     if (asr->state == ASIO_STATE_QUEUED) {
    811         asr->state = ASIO_STATE_STARTED;
    812     } else {
    813         asr->state = ASIO_STATE_CONTINUES;
    814     }
    815     action = asr->on_io(asr->io_opaque, asr, asr->state);
    816     if (action == ASIO_ACTION_ABORT) {
    817         D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
    818         /* Move on to the next reader. */
    819         _async_socket_advance_reader(as);
    820         /* Lets see if there are still active readers, and enable, or disable
    821          * read I/O callback accordingly. */
    822         if (as->readers_head != NULL) {
    823             loopIo_wantRead(as->io);
    824         } else {
    825             loopIo_dontWantRead(as->io);
    826         }
    827         async_socket_io_release(asr);
    828         return 0;
    829     }
    830 
    831     /* Read next chunk of data. */
    832     int res = socket_recv(as->fd, asr->buffer + asr->transferred,
    833                           asr->to_transfer - asr->transferred);
    834     while (res < 0 && errno == EINTR) {
    835         res = socket_recv(as->fd, asr->buffer + asr->transferred,
    836                           asr->to_transfer - asr->transferred);
    837     }
    838 
    839     if (res == 0) {
    840         /* Socket has been disconnected. */
    841         errno = ECONNRESET;
    842         _on_async_socket_disconnected(as);
    843         async_socket_io_release(asr);
    844         return -1;
    845     }
    846 
    847     if (res < 0) {
    848         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    849             /* Yield to writes behind this read. */
    850             loopIo_wantRead(as->io);
    851             async_socket_io_release(asr);
    852             return 0;
    853         }
    854 
    855         /* An I/O error. */
    856         action = _on_async_socket_failure(as, asr);
    857         if (action != ASIO_ACTION_RETRY) {
    858             D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
    859             /* Move on to the next reader. */
    860             _async_socket_advance_reader(as);
    861             /* Lets see if there are still active readers, and enable, or disable
    862              * read I/O callback accordingly. */
    863             if (as->readers_head != NULL) {
    864                 loopIo_wantRead(as->io);
    865             } else {
    866                 loopIo_dontWantRead(as->io);
    867             }
    868         }
    869         async_socket_io_release(asr);
    870         return -1;
    871     }
    872 
    873     /* Update the reader's descriptor. */
    874     asr->transferred += res;
    875     if (asr->transferred == asr->to_transfer) {
    876         /* This read is completed. Move on to the next reader. */
    877         _async_socket_advance_reader(as);
    878 
    879         /* Notify reader completion. */
    880         _async_socket_complete_io(as, asr);
    881     }
    882 
    883     /* Lets see if there are still active readers, and enable, or disable read
    884      * I/O callback accordingly. */
    885     if (as->readers_head != NULL) {
    886         loopIo_wantRead(as->io);
    887     } else {
    888         loopIo_dontWantRead(as->io);
    889     }
    890 
    891     async_socket_io_release(asr);
    892 
    893     return 0;
    894 }
    895 
    896 /* A callback that is invoked when there is data available to write.
    897  * Param:
    898  *  as - Initialized AsyncSocket instance.
    899  * Return:
    900  *  0 on success, or -1 on failure. Failure returned from this routine will
    901  *  skip reads (if awailable) behind this write.
    902  */
    903 static int
    904 _on_async_socket_send(AsyncSocket* as)
    905 {
    906     AsyncIOAction action;
    907 
    908     /* Get current writer. */
    909     AsyncSocketIO* const asw = as->writers_head;
    910     if (asw == NULL) {
    911         D("ASocket %s: No writer is available.", _async_socket_string(as));
    912         loopIo_dontWantWrite(as->io);
    913         return 0;
    914     }
    915 
    916     /* Reference the writer while we're working with it in this callback. */
    917     async_socket_io_reference(asw);
    918 
    919     /* Bump I/O state, and inform the client that I/O is in progress. */
    920     if (asw->state == ASIO_STATE_QUEUED) {
    921         asw->state = ASIO_STATE_STARTED;
    922     } else {
    923         asw->state = ASIO_STATE_CONTINUES;
    924     }
    925     action = asw->on_io(asw->io_opaque, asw, asw->state);
    926     if (action == ASIO_ACTION_ABORT) {
    927         D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
    928         /* Move on to the next writer. */
    929         _async_socket_advance_writer(as);
    930         /* Lets see if there are still active writers, and enable, or disable
    931          * write I/O callback accordingly. */
    932         if (as->writers_head != NULL) {
    933             loopIo_wantWrite(as->io);
    934         } else {
    935             loopIo_dontWantWrite(as->io);
    936         }
    937         async_socket_io_release(asw);
    938         return 0;
    939     }
    940 
    941     /* Write next chunk of data. */
    942     int res = socket_send(as->fd, asw->buffer + asw->transferred,
    943                           asw->to_transfer - asw->transferred);
    944     while (res < 0 && errno == EINTR) {
    945         res = socket_send(as->fd, asw->buffer + asw->transferred,
    946                           asw->to_transfer - asw->transferred);
    947     }
    948 
    949     if (res == 0) {
    950         /* Socket has been disconnected. */
    951         errno = ECONNRESET;
    952         _on_async_socket_disconnected(as);
    953         async_socket_io_release(asw);
    954         return -1;
    955     }
    956 
    957     if (res < 0) {
    958         if (errno == EWOULDBLOCK || errno == EAGAIN) {
    959             /* Yield to reads behind this write. */
    960             loopIo_wantWrite(as->io);
    961             async_socket_io_release(asw);
    962             return 0;
    963         }
    964 
    965         /* An I/O error. */
    966         action = _on_async_socket_failure(as, asw);
    967         if (action != ASIO_ACTION_RETRY) {
    968             D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
    969             /* Move on to the next writer. */
    970             _async_socket_advance_writer(as);
    971             /* Lets see if there are still active writers, and enable, or disable
    972              * write I/O callback accordingly. */
    973             if (as->writers_head != NULL) {
    974                 loopIo_wantWrite(as->io);
    975             } else {
    976                 loopIo_dontWantWrite(as->io);
    977             }
    978         }
    979         async_socket_io_release(asw);
    980         return -1;
    981     }
    982 
    983     /* Update the writer descriptor. */
    984     asw->transferred += res;
    985     if (asw->transferred == asw->to_transfer) {
    986         /* This write is completed. Move on to the next writer. */
    987         _async_socket_advance_writer(as);
    988 
    989         /* Notify writer completion. */
    990         _async_socket_complete_io(as, asw);
    991     }
    992 
    993     /* Lets see if there are still active writers, and enable, or disable write
    994      * I/O callback accordingly. */
    995     if (as->writers_head != NULL) {
    996         loopIo_wantWrite(as->io);
    997     } else {
    998         loopIo_dontWantWrite(as->io);
    999     }
   1000 
   1001     async_socket_io_release(asw);
   1002 
   1003     return 0;
   1004 }
   1005 
   1006 /* A callback that is invoked when an I/O is available on socket.
   1007  * Param:
   1008  *  as - Initialized AsyncSocket instance.
   1009  *  fd - Socket's file descriptor.
   1010  *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
   1011  */
   1012 static void
   1013 _on_async_socket_io(void* opaque, int fd, unsigned events)
   1014 {
   1015     AsyncSocket* const as = (AsyncSocket*)opaque;
   1016 
   1017     /* Reference the socket while we're working with it in this callback. */
   1018     async_socket_reference(as);
   1019 
   1020     if ((events & LOOP_IO_READ) != 0) {
   1021         if (_on_async_socket_recv(as) != 0) {
   1022             async_socket_release(as);
   1023             return;
   1024         }
   1025     }
   1026 
   1027     if ((events & LOOP_IO_WRITE) != 0) {
   1028         if (_on_async_socket_send(as) != 0) {
   1029             async_socket_release(as);
   1030             return;
   1031         }
   1032     }
   1033 
   1034     async_socket_release(as);
   1035 }
   1036 
   1037 /* A callback that is invoked by asynchronous socket connector on connection
   1038  *  events.
   1039  * Param:
   1040  *  opaque - Initialized AsyncSocket instance.
   1041  *  connector - Connector that is used to connect this socket.
   1042  *  event - Connection event.
   1043  * Return:
   1044  *  One of AsyncIOAction values.
   1045  */
   1046 static AsyncIOAction
   1047 _on_connector_events(void* opaque,
   1048                      AsyncSocketConnector* connector,
   1049                      AsyncIOState event)
   1050 {
   1051     AsyncIOAction action;
   1052     AsyncSocket* const as = (AsyncSocket*)opaque;
   1053 
   1054     /* Reference the socket while we're working with it in this callback. */
   1055     async_socket_reference(as);
   1056 
   1057     if (event == ASIO_STATE_SUCCEEDED) {
   1058         /* Accept the connection. */
   1059         as->fd = async_socket_connector_pull_fd(connector);
   1060         loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
   1061     }
   1062 
   1063     /* Invoke client's callback. */
   1064     action = as->on_connection(as->client_opaque, as, event);
   1065     if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
   1066         /* For whatever reason the client didn't want to keep this connection.
   1067          * Close it. */
   1068         D("ASocket %s: Connection is discarded by the client.",
   1069           _async_socket_string(as));
   1070         _async_socket_close_socket(as);
   1071     }
   1072 
   1073     if (action != ASIO_ACTION_RETRY) {
   1074         async_socket_connector_release(connector);
   1075     }
   1076 
   1077     async_socket_release(as);
   1078 
   1079     return action;
   1080 }
   1081 
   1082 /* Timer callback invoked to reconnect the lost connection.
   1083  * Param:
   1084  *  as - Initialized AsyncSocket instance.
   1085  */
   1086 void
   1087 _on_async_socket_reconnect(void* opaque)
   1088 {
   1089     AsyncSocket* as = (AsyncSocket*)opaque;
   1090 
   1091     /* Reference the socket while we're working with it in this callback. */
   1092     async_socket_reference(as);
   1093     async_socket_connect(as, as->reconnect_to);
   1094     async_socket_release(as);
   1095 }
   1096 
   1097 
   1098 /********************************************************************************
   1099  *                  Android Device Socket public API
   1100  *******************************************************************************/
   1101 
   1102 AsyncSocket*
   1103 async_socket_new(int port,
   1104                  int reconnect_to,
   1105                  on_as_connection_cb client_cb,
   1106                  void* client_opaque,
   1107                  Looper* looper)
   1108 {
   1109     AsyncSocket* as;
   1110 
   1111     if (client_cb == NULL) {
   1112         E("Invalid client_cb parameter");
   1113         return NULL;
   1114     }
   1115 
   1116     ANEW0(as);
   1117 
   1118     as->fd = -1;
   1119     as->client_opaque = client_opaque;
   1120     as->on_connection = client_cb;
   1121     as->readers_head = as->readers_tail = NULL;
   1122     as->reconnect_to = reconnect_to;
   1123     as->ref_count = 1;
   1124     sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
   1125     if (looper == NULL) {
   1126         as->looper = looper_newCore();
   1127         if (as->looper == NULL) {
   1128             E("Unable to create I/O looper for async socket '%s'",
   1129               _async_socket_string(as));
   1130             client_cb(client_opaque, as, ASIO_STATE_FAILED);
   1131             _async_socket_free(as);
   1132             return NULL;
   1133         }
   1134         as->owns_looper = 1;
   1135     } else {
   1136         as->looper = looper;
   1137         as->owns_looper = 0;
   1138     }
   1139 
   1140     loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
   1141 
   1142     T("ASocket %s: Descriptor is created.", _async_socket_string(as));
   1143 
   1144     return as;
   1145 }
   1146 
   1147 int
   1148 async_socket_reference(AsyncSocket* as)
   1149 {
   1150     assert(as->ref_count > 0);
   1151     as->ref_count++;
   1152     return as->ref_count;
   1153 }
   1154 
   1155 int
   1156 async_socket_release(AsyncSocket* as)
   1157 {
   1158     assert(as->ref_count > 0);
   1159     as->ref_count--;
   1160     if (as->ref_count == 0) {
   1161         /* Last reference has been dropped. Destroy this object. */
   1162         _async_socket_cancel_all_io(as);
   1163         _async_socket_free(as);
   1164         return 0;
   1165     }
   1166     return as->ref_count;
   1167 }
   1168 
   1169 void
   1170 async_socket_connect(AsyncSocket* as, int retry_to)
   1171 {
   1172     T("ASocket %s: Handling connection request for %dms...",
   1173       _async_socket_string(as), retry_to);
   1174 
   1175     AsyncSocketConnector* const connector =
   1176         async_socket_connector_new(&as->address, retry_to, _on_connector_events,
   1177                                    as, as->looper);
   1178     if (connector != NULL) {
   1179         async_socket_connector_connect(connector);
   1180     } else {
   1181         as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
   1182     }
   1183 }
   1184 
   1185 void
   1186 async_socket_disconnect(AsyncSocket* as)
   1187 {
   1188     T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
   1189 
   1190     if (as != NULL) {
   1191         _async_socket_cancel_all_io(as);
   1192         _async_socket_close_socket(as);
   1193     }
   1194 }
   1195 
   1196 void
   1197 async_socket_reconnect(AsyncSocket* as, int retry_to)
   1198 {
   1199     T("ASocket %s: Handling reconnection request for %dms...",
   1200       _async_socket_string(as), retry_to);
   1201 
   1202     _async_socket_cancel_all_io(as);
   1203     _async_socket_close_socket(as);
   1204     _async_socket_reconnect(as, retry_to);
   1205 }
   1206 
   1207 void
   1208 async_socket_read_abs(AsyncSocket* as,
   1209                       void* buffer, uint32_t len,
   1210                       on_as_io_cb reader_cb,
   1211                       void* reader_opaque,
   1212                       Duration deadline)
   1213 {
   1214     T("ASocket %s: Handling read for %d bytes with deadline %lld...",
   1215       _async_socket_string(as), len, deadline);
   1216 
   1217     AsyncSocketIO* const asr =
   1218         _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
   1219                                  deadline);
   1220     /* Add new reader to the list. Note that we use initial reference from I/O
   1221      * 'new' routine as "in the list" reference counter. */
   1222     if (as->readers_head == NULL) {
   1223         as->readers_head = as->readers_tail = asr;
   1224     } else {
   1225         as->readers_tail->next = asr;
   1226         as->readers_tail = asr;
   1227     }
   1228     loopIo_wantRead(as->io);
   1229 }
   1230 
   1231 void
   1232 async_socket_read_rel(AsyncSocket* as,
   1233                       void* buffer, uint32_t len,
   1234                       on_as_io_cb reader_cb,
   1235                       void* reader_opaque,
   1236                       int to)
   1237 {
   1238     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
   1239                                     DURATION_INFINITE;
   1240     async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
   1241 }
   1242 
   1243 void
   1244 async_socket_write_abs(AsyncSocket* as,
   1245                        const void* buffer, uint32_t len,
   1246                        on_as_io_cb writer_cb,
   1247                        void* writer_opaque,
   1248                        Duration deadline)
   1249 {
   1250     T("ASocket %s: Handling write for %d bytes with deadline %lld...",
   1251       _async_socket_string(as), len, deadline);
   1252 
   1253     AsyncSocketIO* const asw =
   1254         _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
   1255                                  deadline);
   1256     /* Add new writer to the list. Note that we use initial reference from I/O
   1257      * 'new' routine as "in the list" reference counter. */
   1258     if (as->writers_head == NULL) {
   1259         as->writers_head = as->writers_tail = asw;
   1260     } else {
   1261         as->writers_tail->next = asw;
   1262         as->writers_tail = asw;
   1263     }
   1264     loopIo_wantWrite(as->io);
   1265 }
   1266 
   1267 void
   1268 async_socket_write_rel(AsyncSocket* as,
   1269                        const void* buffer, uint32_t len,
   1270                        on_as_io_cb writer_cb,
   1271                        void* writer_opaque,
   1272                        int to)
   1273 {
   1274     const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
   1275                                     DURATION_INFINITE;
   1276     async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
   1277 }
   1278 
   1279 void*
   1280 async_socket_get_client_opaque(const AsyncSocket* as)
   1281 {
   1282     return as->client_opaque;
   1283 }
   1284 
   1285 Duration
   1286 async_socket_deadline(AsyncSocket* as, int rel)
   1287 {
   1288     return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
   1289                         DURATION_INFINITE;
   1290 }
   1291 
   1292 int
   1293 async_socket_get_port(const AsyncSocket* as)
   1294 {
   1295     return sock_address_get_port(&as->address);
   1296 }
   1297