Home | History | Annotate | Download | only in librpc
      1 #include <rpc/rpc.h>
      2 #include <arpa/inet.h>
      3 #include <rpc/rpc_router_ioctl.h>
      4 #include <debug.h>
      5 #include <pthread.h>
      6 #include <sys/select.h>
      7 
      8 #include <stdarg.h>
      9 #include <stdio.h>
     10 #include <string.h>
     11 #include <errno.h>
     12 
     13 #include <hardware_legacy/power.h>
     14 
     15 #define ANDROID_WAKE_LOCK_NAME "rpc-interface"
     16 
     17 void
     18 grabPartialWakeLock() {
     19     acquire_wake_lock(PARTIAL_WAKE_LOCK, ANDROID_WAKE_LOCK_NAME);
     20 }
     21 
     22 void
     23 releaseWakeLock() {
     24     release_wake_lock(ANDROID_WAKE_LOCK_NAME);
     25 }
     26 
     27 struct CLIENT {
     28     xdr_s_type *xdr;
     29     struct CLIENT *next;
     30     /* common attribute struct for setting up recursive mutexes */
     31     pthread_mutexattr_t lock_attr;
     32 
     33     /* We insist that there is only one outstanding call for a client at any
     34        time, and we use this mutex to enforce the rule.  When we start
     35        supporting multiple outstanding RPCs on a client, we will have to
     36        maintain a queue of them, and match incoming replies (by the XID of the
     37        incoming packet).  For now, we just block until we get that reply.
     38     */
     39     pthread_mutex_t lock;
     40 
     41     pthread_mutex_t wait_reply_lock;
     42     pthread_cond_t wait_reply;
     43 
     44     pthread_mutex_t input_xdr_lock;
     45     pthread_cond_t input_xdr_wait;
     46     volatile int input_xdr_busy;
     47 
     48     pthread_mutex_t wait_cb_lock;
     49     pthread_cond_t wait_cb;
     50     pthread_t cb_thread;
     51     volatile int got_cb;
     52     volatile int cb_stop;
     53 };
     54 
     55 extern void* svc_find(void *xprt, rpcprog_t prog, rpcvers_t vers);
     56 extern void svc_dispatch(void *svc, void *xprt);
     57 extern int  r_open();
     58 extern void r_close();
     59 extern xdr_s_type *xdr_init_common(const char *name, int is_client);
     60 extern xdr_s_type *xdr_clone(xdr_s_type *);
     61 extern void xdr_destroy_common(xdr_s_type *xdr);
     62 extern bool_t xdr_recv_reply_header (xdr_s_type *xdr, rpc_reply_header *reply);
     63 extern void *the_xprt;
     64 
     65 
     66 static pthread_mutex_t rx_mutex = PTHREAD_MUTEX_INITIALIZER;
     67 static pthread_t rx_thread;
     68 static volatile unsigned int num_clients;
     69 static volatile fd_set rx_fdset;
     70 static volatile int max_rxfd;
     71 static volatile struct CLIENT *clients;
     72 
     73 /* There's one of these for each RPC client which has received an RPC call. */
     74 static void *cb_context(void *__u)
     75 {
     76     CLIENT *client = (CLIENT *)__u;
     77     D("RPC-callback thread for %08x:%08x starting.\n",
     78       (client->xdr->x_prog | 0x01000000),
     79       client->xdr->x_vers);
     80     pthread_mutex_lock(&client->wait_cb_lock);
     81     while (client->cb_stop == 0) {
     82         if (!client->got_cb)
     83             pthread_cond_wait(&client->wait_cb,
     84                               &client->wait_cb_lock);
     85         /* We tell the thread it's time to exit by setting cb_stop to nonzero
     86            and signalling the conditional variable.  When there's no data, we
     87            skip to the top of the loop and exit.
     88         */
     89         if (!client->got_cb) {
     90             D("RPC-callback thread for %08x:%08x: signalled but no data.\n",
     91               (client->xdr->x_prog | 0x01000000),
     92               client->xdr->x_vers);
     93             continue;
     94         }
     95         client->got_cb = 0;
     96 
     97         /* We dispatch the message to the server representing the callback
     98          * client.
     99          */
    100         if (the_xprt) {
    101             void *svc;
    102             rpcprog_t prog =
    103                 ntohl(((uint32 *)(client->xdr->in_msg))[RPC_OFFSET+3]);
    104             rpcvers_t vers =
    105                 ntohl(((uint32 *)(client->xdr->in_msg))[RPC_OFFSET+4]);
    106 
    107             svc = svc_find(the_xprt, prog, vers);
    108             if (svc) {
    109                 XDR **svc_xdr = (XDR **)svc;
    110                 D("%08x:%08x dispatching RPC call (XID %d, xdr %p) for "
    111                   "callback client %08x:%08x.\n",
    112                   client->xdr->x_prog,
    113                   client->xdr->x_vers,
    114                   ntohl(((uint32 *)(client->xdr->in_msg))[RPC_OFFSET]),
    115                   client->xdr,
    116                   (uint32_t)prog, (int)vers);
    117                 /* We transplant the xdr of the client into the entry
    118                    representing the callback client in the list of servers.
    119                    Note that since we hold the wait_cb_lock for this client,
    120                    if another call for this callback client arrives before
    121                    we've finished processing this call, that will block until
    122                    we're done with this one.  If this happens, it would be
    123                    most likely a bug in the arm9 rpc router.
    124                 */
    125                 if (*svc_xdr) {
    126                     D("%08x:%08x expecting XDR == NULL"
    127                       "callback client %08x:%08x!\n",
    128                       client->xdr->x_prog,
    129                       client->xdr->x_vers,
    130                       (uint32_t)prog, (int)vers);
    131                     xdr_destroy_common(*svc_xdr);
    132                 }
    133 
    134                 D("%08x:%08x cloning XDR for "
    135                   "callback client %08x:%08x.\n",
    136                   client->xdr->x_prog,
    137                   client->xdr->x_vers,
    138                   (uint32_t)prog, (int)vers);
    139                 *svc_xdr = xdr_clone(client->xdr);
    140 
    141                 (*svc_xdr)->x_prog = prog;
    142                 (*svc_xdr)->x_vers = vers;
    143                 memcpy((*svc_xdr)->in_msg,
    144                        client->xdr->in_msg, client->xdr->in_len);
    145                 memcpy((*svc_xdr)->out_msg,
    146                        client->xdr->out_msg, client->xdr->out_next);
    147                 (*svc_xdr)->in_len = client->xdr->in_len;
    148                 (*svc_xdr)->out_next = client->xdr->out_next;
    149 
    150                 pthread_mutex_lock(&client->input_xdr_lock);
    151                 D("%08x:%08x marking input buffer as free.\n",
    152                   client->xdr->x_prog, client->xdr->x_vers);
    153                 client->input_xdr_busy = 0;
    154                 pthread_cond_signal(&client->input_xdr_wait);
    155                 pthread_mutex_unlock(&client->input_xdr_lock);
    156 
    157                 svc_dispatch(svc, the_xprt);
    158                 xdr_destroy_common(*svc_xdr);
    159                 *svc_xdr = NULL;
    160             }
    161             else E("%08x:%08x call packet arrived, but there's no "
    162                    "RPC server registered for %08x:%08x.\n",
    163                    client->xdr->x_prog,
    164                    client->xdr->x_vers,
    165                    (uint32_t)prog, (int)vers);
    166         }
    167         else E("%08x:%08x call packet arrived, but there's "
    168                "no RPC transport!\n",
    169                client->xdr->x_prog,
    170                client->xdr->x_vers);
    171 
    172         releaseWakeLock();
    173     }
    174     pthread_mutex_unlock(&client->wait_cb_lock);
    175 
    176 
    177     D("RPC-callback thread for %08x:%08x terminating.\n",
    178       (client->xdr->x_prog | 0x01000000),
    179       client->xdr->x_vers);
    180     return NULL;
    181 }
    182 
    183 static void *rx_context(void *__u __attribute__((unused)))
    184 {
    185     int n;
    186     struct timeval tv;
    187     fd_set rfds;
    188     while(num_clients) {
    189         pthread_mutex_lock(&rx_mutex);
    190         rfds = rx_fdset;
    191         pthread_mutex_unlock(&rx_mutex);
    192         tv.tv_sec = 0; tv.tv_usec = 500 * 1000;
    193         n = select(max_rxfd + 1, (fd_set *)&rfds, NULL, NULL, &tv);
    194         if (n < 0) {
    195             E("select() error %s (%d)\n", strerror(errno), errno);
    196             continue;
    197         }
    198 
    199         if (n) {
    200             pthread_mutex_lock(&rx_mutex); /* sync access to the client list */
    201             CLIENT *client = (CLIENT *)clients;
    202             for (; client; client = client->next) {
    203                 if (FD_ISSET(client->xdr->fd, &rfds)) {
    204 
    205                     /* We need to make sure that the XDR's in_buf is not in
    206                        use before we read into it.  The in_buf may be in use
    207                        in a race between processing an incoming call and
    208                        receiving a reply to an outstanding call, or processing
    209                        an incoming reply and receiving a call.
    210                     */
    211 
    212                     pthread_mutex_lock(&client->input_xdr_lock);
    213                     while (client->input_xdr_busy) {
    214                         D("%08x:%08x waiting for XDR input buffer "
    215                           "to be consumed.\n",
    216                           client->xdr->x_prog, client->xdr->x_vers);
    217                         pthread_cond_wait(
    218                             &client->input_xdr_wait,
    219                             &client->input_xdr_lock);
    220                     }
    221                     D("%08x:%08x reading data.\n",
    222                       client->xdr->x_prog, client->xdr->x_vers);
    223                     grabPartialWakeLock();
    224                     client->xdr->xops->read(client->xdr);
    225                     client->input_xdr_busy = 1;
    226                     pthread_mutex_unlock(&client->input_xdr_lock);
    227 
    228                     if (((uint32 *)(client->xdr->in_msg))[RPC_OFFSET+1] ==
    229                         htonl(RPC_MSG_REPLY)) {
    230                         /* Wake up the RPC client to receive its data. */
    231                         D("%08x:%08x received REPLY (XID %d), "
    232                           "grabbing mutex to wake up client.\n",
    233                           client->xdr->x_prog,
    234                           client->xdr->x_vers,
    235                           ntohl(((uint32 *)client->xdr->in_msg)[RPC_OFFSET]));
    236                         pthread_mutex_lock(&client->wait_reply_lock);
    237                         D("%08x:%08x got mutex, waking up client.\n",
    238                           client->xdr->x_prog,
    239                           client->xdr->x_vers);
    240                         pthread_cond_signal(&client->wait_reply);
    241                         pthread_mutex_unlock(&client->wait_reply_lock);
    242 
    243                         releaseWakeLock();
    244                     }
    245                     else {
    246                         pthread_mutex_lock(&client->wait_cb_lock);
    247                         D("%08x:%08x received CALL.\n",
    248                           client->xdr->x_prog,
    249                           client->xdr->x_vers);
    250                         client->got_cb = 1;
    251                         if (client->cb_stop < 0) {
    252                             D("%08x:%08x starting callback thread.\n",
    253                               client->xdr->x_prog,
    254                               client->xdr->x_vers);
    255                             client->cb_stop = 0;
    256                             pthread_create(&client->cb_thread,
    257                                            NULL,
    258                                            cb_context, client);
    259                         }
    260                         D("%08x:%08x waking up callback thread.\n",
    261                           client->xdr->x_prog,
    262                           client->xdr->x_vers);
    263                         pthread_cond_signal(&client->wait_cb);
    264                         pthread_mutex_unlock(&client->wait_cb_lock);
    265                     }
    266                 }
    267             }
    268             pthread_mutex_unlock(&rx_mutex);
    269         }
    270         else {
    271             V("rx thread timeout (%d clients):\n", num_clients);
    272 #if 0
    273             {
    274                 CLIENT *trav = (CLIENT *)clients;
    275                 for(; trav; trav = trav->next) {
    276                     if (trav->xdr)
    277                         V("\t%08x:%08x fd %02d\n",
    278                           trav->xdr->x_prog,
    279                           trav->xdr->x_vers,
    280                           trav->xdr->fd);
    281                     else V("\t(unknown)\n");
    282                 }
    283             }
    284 #endif
    285         }
    286     }
    287     D("RPC-client RX thread exiting!\n");
    288     return NULL;
    289 }
    290 
    291 enum clnt_stat
    292 clnt_call(
    293     CLIENT       * client,
    294     u_long         proc,
    295     xdrproc_t      xdr_args,
    296     caddr_t        args_ptr,
    297     xdrproc_t      xdr_results,
    298     caddr_t        rets_ptr,
    299     struct timeval timeout)
    300 {
    301     opaque_auth cred;
    302     opaque_auth verf;
    303     rpc_reply_header reply_header;
    304     enum clnt_stat ret = RPC_SUCCESS;
    305 
    306     xdr_s_type *xdr = client->xdr;
    307 
    308     pthread_mutex_lock(&client->lock);
    309 
    310 
    311     cred.oa_flavor = AUTH_NONE;
    312     cred.oa_length = 0;
    313     verf.oa_flavor = AUTH_NONE;
    314     verf.oa_length = 0;
    315 
    316     xdr->x_op = XDR_ENCODE;
    317 
    318     /* Send message header */
    319 
    320     if (!xdr_call_msg_start (xdr, xdr->x_prog, xdr->x_vers,
    321                              proc, &cred, &verf)) {
    322         XDR_MSG_ABORT (xdr);
    323         ret = RPC_CANTENCODEARGS;
    324         E("%08x:%08x error in xdr_call_msg_start()\n",
    325           client->xdr->x_prog,
    326           client->xdr->x_vers);
    327         goto out;
    328     }
    329 
    330     /* Send arguments */
    331 
    332     if (!xdr_args (xdr, args_ptr)) {
    333         XDR_MSG_ABORT(xdr);
    334         ret = RPC_CANTENCODEARGS;
    335         E("%08x:%08x error in xdr_args()\n",
    336           client->xdr->x_prog,
    337           client->xdr->x_vers);
    338         goto out;
    339     }
    340 
    341     /* Finish message - blocking */
    342     pthread_mutex_lock(&client->wait_reply_lock);
    343     D("%08x:%08x sending call (XID %d).\n",
    344       client->xdr->x_prog, client->xdr->x_vers, client->xdr->xid);
    345     if (!XDR_MSG_SEND(xdr)) {
    346         ret = RPC_CANTSEND;
    347         E("error in XDR_MSG_SEND\n");
    348         goto out_unlock;
    349     }
    350 
    351     D("%08x:%08x waiting for reply.\n",
    352       client->xdr->x_prog, client->xdr->x_vers);
    353     pthread_cond_wait(&client->wait_reply, &client->wait_reply_lock);
    354     D("%08x:%08x received reply.\n", client->xdr->x_prog, client->xdr->x_vers);
    355 
    356     if (((uint32 *)xdr->out_msg)[RPC_OFFSET] !=
    357         ((uint32 *)xdr->in_msg)[RPC_OFFSET]) {
    358         E("%08x:%08x XID mismatch: got %d, expecting %d.\n",
    359           client->xdr->x_prog, client->xdr->x_vers,
    360           ntohl(((uint32 *)xdr->in_msg)[RPC_OFFSET]),
    361           ntohl(((uint32 *)xdr->out_msg)[RPC_OFFSET]));
    362         ret = RPC_CANTRECV;
    363         goto out_unlock;
    364     }
    365 
    366     D("%08x:%08x decoding reply header.\n",
    367       client->xdr->x_prog, client->xdr->x_vers);
    368     if (!xdr_recv_reply_header (client->xdr, &reply_header)) {
    369         E("%08x:%08x error reading reply header.\n",
    370           client->xdr->x_prog, client->xdr->x_vers);
    371         ret = RPC_CANTRECV;
    372         goto out_unlock;
    373     }
    374 
    375     /* Check that other side accepted and responded */
    376     if (reply_header.stat != RPC_MSG_ACCEPTED) {
    377         /* Offset to map returned error into clnt_stat */
    378         ret = reply_header.u.dr.stat + RPC_VERSMISMATCH;
    379         E("%08x:%08x call was not accepted.\n",
    380           (uint32_t)client->xdr->x_prog, client->xdr->x_vers);
    381         goto out_unlock;
    382     } else if (reply_header.u.ar.stat != RPC_ACCEPT_SUCCESS) {
    383         /* Offset to map returned error into clnt_stat */
    384         ret = reply_header.u.ar.stat + RPC_AUTHERROR;
    385         E("%08x:%08x call failed with an authentication error.\n",
    386           (uint32_t)client->xdr->x_prog, client->xdr->x_vers);
    387         goto out_unlock;
    388     }
    389 
    390     xdr->x_op = XDR_DECODE;
    391     /* Decode results */
    392     if (!xdr_results(xdr, rets_ptr) || ! XDR_MSG_DONE(xdr)) {
    393         ret = RPC_CANTDECODERES;
    394         E("%08x:%08x error decoding results.\n",
    395           client->xdr->x_prog, client->xdr->x_vers);
    396         goto out_unlock;
    397     }
    398 
    399     D("%08x:%08x call success.\n",
    400       client->xdr->x_prog, client->xdr->x_vers);
    401 
    402   out_unlock:
    403     pthread_mutex_lock(&client->input_xdr_lock);
    404     D("%08x:%08x marking input buffer as free.\n",
    405       client->xdr->x_prog, client->xdr->x_vers);
    406     client->input_xdr_busy = 0;
    407     pthread_cond_signal(&client->input_xdr_wait);
    408     pthread_mutex_unlock(&client->input_xdr_lock);
    409 
    410     pthread_mutex_unlock(&client->wait_reply_lock);
    411   out:
    412     pthread_mutex_unlock(&client->lock);
    413     return ret;
    414 } /* clnt_call */
    415 
    416 bool_t xdr_recv_auth (xdr_s_type *xdr, opaque_auth *auth)
    417 {
    418     switch(sizeof(auth->oa_flavor)) {
    419     case 1:
    420         if(!XDR_RECV_INT8(xdr, (int8_t *)&(auth->oa_flavor))) return FALSE;
    421         break;
    422     case 2:
    423         if(!XDR_RECV_INT16(xdr, (int16_t *)&(auth->oa_flavor))) return FALSE;
    424         break;
    425     case 4:
    426         if(!XDR_RECV_INT32(xdr, (int32_t *)&(auth->oa_flavor))) return FALSE;
    427         break;
    428     }
    429     if (!XDR_RECV_UINT (xdr, (unsigned *)&(auth->oa_length))) {
    430         return FALSE;
    431     }
    432 
    433     if (auth->oa_length != 0) {
    434         /* We throw away the auth stuff--it's always the default. */
    435         auth->oa_base = NULL;
    436         if (!XDR_RECV_BYTES (xdr, NULL, auth->oa_length))
    437             return FALSE;
    438         else
    439             return FALSE;
    440     }
    441 
    442     return TRUE;
    443 } /* xdr_recv_auth */
    444 
    445 static bool_t
    446 xdr_recv_accepted_reply_header(xdr_s_type *xdr,
    447                                struct rpc_accepted_reply_header *accreply)
    448 {
    449     if (!xdr_recv_auth(xdr, &accreply->verf)) {
    450         return FALSE;
    451     }
    452 
    453     if (!XDR_RECV_ENUM(xdr, &accreply->stat)) {
    454         return FALSE;
    455     }
    456 
    457     switch ((*accreply).stat) {
    458     case RPC_PROG_MISMATCH:
    459         if (!XDR_RECV_UINT32(xdr, &accreply->u.versions.low)) {
    460             return FALSE;
    461         }
    462 
    463         if (!XDR_RECV_UINT32(xdr, &accreply->u.versions.high)) {
    464             return FALSE;
    465         }
    466         break;
    467 
    468     case RPC_ACCEPT_SUCCESS:
    469     case RPC_PROG_UNAVAIL:
    470     case RPC_PROC_UNAVAIL:
    471     case RPC_GARBAGE_ARGS:
    472     case RPC_SYSTEM_ERR:
    473     case RPC_PROG_LOCKED:
    474         // case ignored
    475         break;
    476 
    477     default:
    478         return FALSE;
    479     }
    480 
    481     return TRUE;
    482 } /* xdr_recv_accepted_reply_header */
    483 
    484 static bool_t xdr_recv_denied_reply(xdr_s_type *xdr,
    485                                     struct rpc_denied_reply *rejreply)
    486 {
    487     if (!XDR_RECV_ENUM (xdr, &rejreply->stat))
    488         return FALSE;
    489 
    490     switch ((*rejreply).stat) {
    491     case RPC_MISMATCH:
    492         if (!XDR_RECV_UINT32(xdr, &rejreply->u.versions.low))
    493             return FALSE;
    494         if (!XDR_RECV_UINT32(xdr, &rejreply->u.versions.high))
    495             return FALSE;
    496         break;
    497     case RPC_AUTH_ERROR:
    498         if (!XDR_RECV_ENUM (xdr, &rejreply->u.why))
    499             return FALSE;
    500         break;
    501     default:
    502         return FALSE;
    503     }
    504 
    505     return TRUE;
    506 } /* xdr_recv_denied_reply */
    507 
    508 bool_t xdr_recv_reply_header (xdr_s_type *xdr, rpc_reply_header *reply)
    509 {
    510     if (!XDR_RECV_ENUM(xdr, &reply->stat)) {
    511         return FALSE;
    512     }
    513 
    514     switch ((*reply).stat) {
    515     case RPC_MSG_ACCEPTED:
    516         if (!xdr_recv_accepted_reply_header(xdr, &reply->u.ar)) {
    517             return FALSE;
    518     }
    519         break;
    520     case RPC_MSG_DENIED:
    521         if (!xdr_recv_denied_reply(xdr, &reply->u.dr)) {
    522             return FALSE;
    523     }
    524         break;
    525     default:
    526         return FALSE;
    527     }
    528 
    529     return TRUE;
    530 } /* xdr_recv_reply_header */
    531 
    532 CLIENT *clnt_create(
    533     char * host,
    534     uint32 prog,
    535     uint32 vers,
    536     char * proto)
    537 {
    538     CLIENT *client = calloc(1, sizeof(CLIENT));
    539     if (client) {
    540         char name[256];
    541 
    542         /* for versions like 0x00010001, only compare against major version */
    543         if ((vers & 0xFFF00000) == 0)
    544             vers &= 0xFFFF0000;
    545 
    546         pthread_mutex_lock(&rx_mutex);
    547 
    548         snprintf(name, sizeof(name), "/dev/oncrpc/%08x:%08x",
    549                  (uint32_t)prog, (int)vers);
    550         client->xdr = xdr_init_common(name, 1 /* client XDR */);
    551         if (!client->xdr) {
    552             E("failed to initialize client (permissions?)!\n");
    553             free(client);
    554             pthread_mutex_unlock(&rx_mutex);
    555             return NULL;
    556         }
    557         client->xdr->x_prog = prog;
    558         client->xdr->x_vers = vers;
    559         client->cb_stop = -1; /* callback thread has not been started */
    560 
    561         if (!num_clients) {
    562             FD_ZERO(&rx_fdset);
    563             max_rxfd = 0;
    564         }
    565 
    566         FD_SET(client->xdr->fd, &rx_fdset);
    567         if (max_rxfd < client->xdr->fd)
    568             max_rxfd = client->xdr->fd;
    569         client->next = (CLIENT *)clients;
    570         clients = client;
    571         if (!num_clients++) {
    572             D("launching RX thread.\n");
    573             pthread_create(&rx_thread, NULL, rx_context, NULL);
    574         }
    575 
    576         pthread_mutexattr_init(&client->lock_attr);
    577 //      pthread_mutexattr_settype(&client->lock_attr, PTHREAD_MUTEX_RECURSIVE);
    578         pthread_mutex_init(&client->lock, &client->lock_attr);
    579         pthread_mutex_init(&client->wait_reply_lock, &client->lock_attr);
    580         pthread_cond_init(&client->wait_reply, NULL);
    581         pthread_mutex_init(&client->wait_cb_lock, &client->lock_attr);
    582         pthread_cond_init(&client->wait_cb, NULL);
    583         pthread_mutex_init(&client->input_xdr_lock, &client->lock_attr);
    584         pthread_cond_init(&client->input_xdr_wait, NULL);
    585 
    586         pthread_mutex_unlock(&rx_mutex);
    587     }
    588 
    589     return client;
    590 }
    591 
    592 void clnt_destroy(CLIENT *client) {
    593     if (client) {
    594         pthread_mutex_lock(&client->lock);
    595         D("%08x:%08x destroying client\n",
    596           client->xdr->x_prog,
    597           client->xdr->x_vers);
    598 
    599 
    600         if (!client->cb_stop) {
    601             /* The callback thread is running, we need to stop it */
    602             client->cb_stop = 1;
    603             D("%08x:%08x stopping callback thread\n",
    604               client->xdr->x_prog,
    605               client->xdr->x_vers);
    606             pthread_mutex_lock(&client->wait_cb_lock);
    607             pthread_cond_signal(&client->wait_cb);
    608             pthread_mutex_unlock(&client->wait_cb_lock);
    609             D("%08x:%08x joining callback thread\n",
    610               client->xdr->x_prog,
    611               client->xdr->x_vers);
    612             pthread_join(client->cb_thread, NULL);
    613         }
    614 
    615         pthread_mutex_lock(&rx_mutex); /* sync access to the client list */
    616         {
    617             CLIENT *trav = (CLIENT *)clients, *prev = NULL;
    618             for(; trav; trav = trav->next) {
    619                 if (trav == client) {
    620                      D("%08x:%08x removing from client list\n",
    621                       client->xdr->x_prog,
    622                       client->xdr->x_vers);
    623                     if (prev)
    624                         prev->next = trav->next;
    625                     else
    626                         clients = trav->next;
    627                     num_clients--;
    628                     FD_CLR(client->xdr->fd, &rx_fdset);
    629                     break;
    630                 }
    631                 prev = trav;
    632             }
    633         }
    634         if (!num_clients) {
    635             D("stopping rx thread!\n");
    636             pthread_join(rx_thread, NULL);
    637             D("stopped rx thread\n");
    638         }
    639         pthread_mutex_unlock(&rx_mutex); /* sync access to the client list */
    640 
    641         pthread_mutex_destroy(&client->input_xdr_lock);
    642         pthread_cond_destroy(&client->input_xdr_wait);
    643 
    644         pthread_mutex_destroy(&client->wait_reply_lock);
    645         pthread_cond_destroy(&client->wait_reply);
    646         xdr_destroy_common(client->xdr);
    647 
    648         // FIXME: what happens when we lock the client while destroying it,
    649         // and another thread locks the mutex in clnt_call, and then we
    650         // call pthread_mutex_destroy?  Does destroy automatically unlock and
    651         // then cause the lock in clnt_call() to return an error?  When we
    652         // unlock the mutex here there can be a context switch to the other
    653         // thread, which will cause it to obtain the mutex on the destroyed
    654         // client (and probably crash), and then we get to the destroy call
    655         // here... will that call fail?
    656         pthread_mutex_unlock(&client->lock);
    657         pthread_mutex_destroy(&client->lock);
    658         pthread_mutexattr_destroy(&client->lock_attr);
    659         D("client destroy done\n");
    660         free(client);
    661     }
    662 }
    663