Home | History | Annotate | Download | only in librpc
      1 #include <rpc/rpc.h>
      2 #include <arpa/inet.h>
      3 #include <rpc/rpc_router_ioctl.h>
      4 #include <debug.h>
      5 #include <pthread.h>
      6 #include <sys/select.h>
      7 
      8 #include <stdarg.h>
      9 #include <stdio.h>
     10 #include <string.h>
     11 #include <errno.h>
     12 #include <stdlib.h>
     13 
     14 #include <hardware_legacy/power.h>
     15 
     16 #define ANDROID_WAKE_LOCK_NAME "rpc-interface"
     17 
     18 void
     19 grabPartialWakeLock() {
     20     acquire_wake_lock(PARTIAL_WAKE_LOCK, ANDROID_WAKE_LOCK_NAME);
     21 }
     22 
     23 void
     24 releaseWakeLock() {
     25     release_wake_lock(ANDROID_WAKE_LOCK_NAME);
     26 }
     27 
     28 struct CLIENT {
     29     xdr_s_type *xdr;
     30     struct CLIENT *next;
     31     /* common attribute struct for setting up recursive mutexes */
     32     pthread_mutexattr_t lock_attr;
     33 
     34     /* We insist that there is only one outstanding call for a client at any
     35        time, and we use this mutex to enforce the rule.  When we start
     36        supporting multiple outstanding RPCs on a client, we will have to
     37        maintain a queue of them, and match incoming replies (by the XID of the
     38        incoming packet).  For now, we just block until we get that reply.
     39     */
     40     pthread_mutex_t lock;
     41 
     42     pthread_mutex_t wait_reply_lock;
     43     pthread_cond_t wait_reply;
     44 
     45     pthread_mutex_t input_xdr_lock;
     46     pthread_cond_t input_xdr_wait;
     47     volatile int input_xdr_busy;
     48 
     49     pthread_mutex_t wait_cb_lock;
     50     pthread_cond_t wait_cb;
     51     pthread_t cb_thread;
     52     volatile int got_cb;
     53     volatile int cb_stop;
     54 };
     55 
     56 extern void* svc_find(void *xprt, rpcprog_t prog, rpcvers_t vers);
     57 extern void svc_dispatch(void *svc, void *xprt);
     58 extern int  r_open();
     59 extern void r_close();
     60 extern xdr_s_type *xdr_init_common(const char *name, int is_client);
     61 extern xdr_s_type *xdr_clone(xdr_s_type *);
     62 extern void xdr_destroy_common(xdr_s_type *xdr);
     63 extern bool_t xdr_recv_reply_header (xdr_s_type *xdr, rpc_reply_header *reply);
     64 extern void *the_xprt;
     65 
     66 
     67 static pthread_mutex_t rx_mutex = PTHREAD_MUTEX_INITIALIZER;
     68 static pthread_t rx_thread;
     69 static volatile unsigned int num_clients;
     70 static volatile fd_set rx_fdset;
     71 static volatile int max_rxfd;
     72 static volatile struct CLIENT *clients;
     73 
     74 /* There's one of these for each RPC client which has received an RPC call. */
     75 static void *cb_context(void *__u)
     76 {
     77     CLIENT *client = (CLIENT *)__u;
     78     D("RPC-callback thread for %08x:%08x starting.\n",
     79       (client->xdr->x_prog | 0x01000000),
     80       client->xdr->x_vers);
     81     pthread_mutex_lock(&client->wait_cb_lock);
     82     while (client->cb_stop == 0) {
     83         if (!client->got_cb)
     84             pthread_cond_wait(&client->wait_cb,
     85                               &client->wait_cb_lock);
     86         /* We tell the thread it's time to exit by setting cb_stop to nonzero
     87            and signalling the conditional variable.  When there's no data, we
     88            skip to the top of the loop and exit.
     89         */
     90         if (!client->got_cb) {
     91             D("RPC-callback thread for %08x:%08x: signalled but no data.\n",
     92               (client->xdr->x_prog | 0x01000000),
     93               client->xdr->x_vers);
     94             continue;
     95         }
     96         client->got_cb = 0;
     97 
     98         /* We dispatch the message to the server representing the callback
     99          * client.
    100          */
    101         if (the_xprt) {
    102             void *svc;
    103             rpcprog_t prog =
    104                 ntohl(((uint32 *)(client->xdr->in_msg))[RPC_OFFSET+3]);
    105             rpcvers_t vers =
    106                 ntohl(((uint32 *)(client->xdr->in_msg))[RPC_OFFSET+4]);
    107 
    108             svc = svc_find(the_xprt, prog, vers);
    109             if (svc) {
    110                 XDR **svc_xdr = (XDR **)svc;
    111                 D("%08x:%08x dispatching RPC call (XID %d, xdr %p) for "
    112                   "callback client %08x:%08x.\n",
    113                   client->xdr->x_prog,
    114                   client->xdr->x_vers,
    115                   ntohl(((uint32 *)(client->xdr->in_msg))[RPC_OFFSET]),
    116                   client->xdr,
    117                   (uint32_t)prog, (int)vers);
    118                 /* We transplant the xdr of the client into the entry
    119                    representing the callback client in the list of servers.
    120                    Note that since we hold the wait_cb_lock for this client,
    121                    if another call for this callback client arrives before
    122                    we've finished processing this call, that will block until
    123                    we're done with this one.  If this happens, it would be
    124                    most likely a bug in the arm9 rpc router.
    125                 */
    126                 if (*svc_xdr) {
    127                     D("%08x:%08x expecting XDR == NULL"
    128                       "callback client %08x:%08x!\n",
    129                       client->xdr->x_prog,
    130                       client->xdr->x_vers,
    131                       (uint32_t)prog, (int)vers);
    132                     xdr_destroy_common(*svc_xdr);
    133                 }
    134 
    135                 D("%08x:%08x cloning XDR for "
    136                   "callback client %08x:%08x.\n",
    137                   client->xdr->x_prog,
    138                   client->xdr->x_vers,
    139                   (uint32_t)prog, (int)vers);
    140                 *svc_xdr = xdr_clone(client->xdr);
    141 
    142                 (*svc_xdr)->x_prog = prog;
    143                 (*svc_xdr)->x_vers = vers;
    144                 memcpy((*svc_xdr)->in_msg,
    145                        client->xdr->in_msg, client->xdr->in_len);
    146                 memcpy((*svc_xdr)->out_msg,
    147                        client->xdr->out_msg, client->xdr->out_next);
    148                 (*svc_xdr)->in_len = client->xdr->in_len;
    149                 (*svc_xdr)->out_next = client->xdr->out_next;
    150 
    151                 pthread_mutex_lock(&client->input_xdr_lock);
    152                 D("%08x:%08x marking input buffer as free.\n",
    153                   client->xdr->x_prog, client->xdr->x_vers);
    154                 client->input_xdr_busy = 0;
    155                 pthread_cond_signal(&client->input_xdr_wait);
    156                 pthread_mutex_unlock(&client->input_xdr_lock);
    157 
    158                 svc_dispatch(svc, the_xprt);
    159                 xdr_destroy_common(*svc_xdr);
    160                 *svc_xdr = NULL;
    161             }
    162             else E("%08x:%08x call packet arrived, but there's no "
    163                    "RPC server registered for %08x:%08x.\n",
    164                    client->xdr->x_prog,
    165                    client->xdr->x_vers,
    166                    (uint32_t)prog, (int)vers);
    167         }
    168         else E("%08x:%08x call packet arrived, but there's "
    169                "no RPC transport!\n",
    170                client->xdr->x_prog,
    171                client->xdr->x_vers);
    172 
    173         releaseWakeLock();
    174     }
    175     pthread_mutex_unlock(&client->wait_cb_lock);
    176 
    177 
    178     D("RPC-callback thread for %08x:%08x terminating.\n",
    179       (client->xdr->x_prog | 0x01000000),
    180       client->xdr->x_vers);
    181     return NULL;
    182 }
    183 
    184 static void *rx_context(void *__u __attribute__((unused)))
    185 {
    186     int n;
    187     struct timeval tv;
    188     fd_set rfds;
    189     while(num_clients) {
    190         pthread_mutex_lock(&rx_mutex);
    191         rfds = rx_fdset;
    192         pthread_mutex_unlock(&rx_mutex);
    193         tv.tv_sec = 0; tv.tv_usec = 500 * 1000;
    194         n = select(max_rxfd + 1, (fd_set *)&rfds, NULL, NULL, &tv);
    195         if (n < 0) {
    196             E("select() error %s (%d)\n", strerror(errno), errno);
    197             continue;
    198         }
    199 
    200         if (n) {
    201             pthread_mutex_lock(&rx_mutex); /* sync access to the client list */
    202             CLIENT *client = (CLIENT *)clients;
    203             for (; client; client = client->next) {
    204                 if (FD_ISSET(client->xdr->fd, &rfds)) {
    205 
    206                     /* We need to make sure that the XDR's in_buf is not in
    207                        use before we read into it.  The in_buf may be in use
    208                        in a race between processing an incoming call and
    209                        receiving a reply to an outstanding call, or processing
    210                        an incoming reply and receiving a call.
    211                     */
    212 
    213                     pthread_mutex_lock(&client->input_xdr_lock);
    214                     while (client->input_xdr_busy) {
    215                         D("%08x:%08x waiting for XDR input buffer "
    216                           "to be consumed.\n",
    217                           client->xdr->x_prog, client->xdr->x_vers);
    218                         pthread_cond_wait(
    219                             &client->input_xdr_wait,
    220                             &client->input_xdr_lock);
    221                     }
    222                     D("%08x:%08x reading data.\n",
    223                       client->xdr->x_prog, client->xdr->x_vers);
    224                     grabPartialWakeLock();
    225                     if (client->xdr->xops->read(client->xdr) == 0) {
    226                         E("%08x:%08x ONCRPC read error: aborting!\n",
    227                           client->xdr->x_prog, client->xdr->x_vers);
    228                         abort();
    229                     }
    230                     client->input_xdr_busy = 1;
    231                     pthread_mutex_unlock(&client->input_xdr_lock);
    232 
    233                     if (((uint32 *)(client->xdr->in_msg))[RPC_OFFSET+1] ==
    234                         htonl(RPC_MSG_REPLY)) {
    235                         /* Wake up the RPC client to receive its data. */
    236                         D("%08x:%08x received REPLY (XID %d), "
    237                           "grabbing mutex to wake up client.\n",
    238                           client->xdr->x_prog,
    239                           client->xdr->x_vers,
    240                           ntohl(((uint32 *)client->xdr->in_msg)[RPC_OFFSET]));
    241                         pthread_mutex_lock(&client->wait_reply_lock);
    242                         D("%08x:%08x got mutex, waking up client.\n",
    243                           client->xdr->x_prog,
    244                           client->xdr->x_vers);
    245                         pthread_cond_signal(&client->wait_reply);
    246                         pthread_mutex_unlock(&client->wait_reply_lock);
    247 
    248                         releaseWakeLock();
    249                     }
    250                     else {
    251                         pthread_mutex_lock(&client->wait_cb_lock);
    252                         D("%08x:%08x received CALL.\n",
    253                           client->xdr->x_prog,
    254                           client->xdr->x_vers);
    255                         client->got_cb = 1;
    256                         if (client->cb_stop < 0) {
    257                             D("%08x:%08x starting callback thread.\n",
    258                               client->xdr->x_prog,
    259                               client->xdr->x_vers);
    260                             client->cb_stop = 0;
    261                             pthread_create(&client->cb_thread,
    262                                            NULL,
    263                                            cb_context, client);
    264                         }
    265                         D("%08x:%08x waking up callback thread.\n",
    266                           client->xdr->x_prog,
    267                           client->xdr->x_vers);
    268                         pthread_cond_signal(&client->wait_cb);
    269                         pthread_mutex_unlock(&client->wait_cb_lock);
    270                     }
    271                 }
    272             }
    273             pthread_mutex_unlock(&rx_mutex);
    274         }
    275         else {
    276             V("rx thread timeout (%d clients):\n", num_clients);
    277 #if 0
    278             {
    279                 CLIENT *trav = (CLIENT *)clients;
    280                 for(; trav; trav = trav->next) {
    281                     if (trav->xdr)
    282                         V("\t%08x:%08x fd %02d\n",
    283                           trav->xdr->x_prog,
    284                           trav->xdr->x_vers,
    285                           trav->xdr->fd);
    286                     else V("\t(unknown)\n");
    287                 }
    288             }
    289 #endif
    290         }
    291     }
    292     D("RPC-client RX thread exiting!\n");
    293     return NULL;
    294 }
    295 
    296 enum clnt_stat
    297 clnt_call(
    298     CLIENT       * client,
    299     u_long         proc,
    300     xdrproc_t      xdr_args,
    301     caddr_t        args_ptr,
    302     xdrproc_t      xdr_results,
    303     caddr_t        rets_ptr,
    304     struct timeval timeout)
    305 {
    306     opaque_auth cred;
    307     opaque_auth verf;
    308     rpc_reply_header reply_header;
    309     enum clnt_stat ret = RPC_SUCCESS;
    310 
    311     xdr_s_type *xdr = client->xdr;
    312 
    313     pthread_mutex_lock(&client->lock);
    314 
    315 
    316     cred.oa_flavor = AUTH_NONE;
    317     cred.oa_length = 0;
    318     verf.oa_flavor = AUTH_NONE;
    319     verf.oa_length = 0;
    320 
    321     xdr->x_op = XDR_ENCODE;
    322 
    323     /* Send message header */
    324 
    325     if (!xdr_call_msg_start (xdr, xdr->x_prog, xdr->x_vers,
    326                              proc, &cred, &verf)) {
    327         XDR_MSG_ABORT (xdr);
    328         ret = RPC_CANTENCODEARGS;
    329         E("%08x:%08x error in xdr_call_msg_start()\n",
    330           client->xdr->x_prog,
    331           client->xdr->x_vers);
    332         goto out;
    333     }
    334 
    335     /* Send arguments */
    336 
    337     if (!xdr_args (xdr, args_ptr)) {
    338         XDR_MSG_ABORT(xdr);
    339         ret = RPC_CANTENCODEARGS;
    340         E("%08x:%08x error in xdr_args()\n",
    341           client->xdr->x_prog,
    342           client->xdr->x_vers);
    343         goto out;
    344     }
    345 
    346     /* Finish message - blocking */
    347     pthread_mutex_lock(&client->wait_reply_lock);
    348     D("%08x:%08x sending call (XID %d).\n",
    349       client->xdr->x_prog, client->xdr->x_vers, client->xdr->xid);
    350     if (!XDR_MSG_SEND(xdr)) {
    351         ret = RPC_CANTSEND;
    352         E("error in XDR_MSG_SEND\n");
    353         goto out_unlock;
    354     }
    355 
    356     D("%08x:%08x waiting for reply.\n",
    357       client->xdr->x_prog, client->xdr->x_vers);
    358     pthread_cond_wait(&client->wait_reply, &client->wait_reply_lock);
    359     D("%08x:%08x received reply.\n", client->xdr->x_prog, client->xdr->x_vers);
    360 
    361     if (((uint32 *)xdr->out_msg)[RPC_OFFSET] !=
    362         ((uint32 *)xdr->in_msg)[RPC_OFFSET]) {
    363         E("%08x:%08x XID mismatch: got %d, expecting %d.\n",
    364           client->xdr->x_prog, client->xdr->x_vers,
    365           ntohl(((uint32 *)xdr->in_msg)[RPC_OFFSET]),
    366           ntohl(((uint32 *)xdr->out_msg)[RPC_OFFSET]));
    367         ret = RPC_CANTRECV;
    368         goto out_unlock;
    369     }
    370 
    371     D("%08x:%08x decoding reply header.\n",
    372       client->xdr->x_prog, client->xdr->x_vers);
    373     if (!xdr_recv_reply_header (client->xdr, &reply_header)) {
    374         E("%08x:%08x error reading reply header.\n",
    375           client->xdr->x_prog, client->xdr->x_vers);
    376         ret = RPC_CANTRECV;
    377         goto out_unlock;
    378     }
    379 
    380     /* Check that other side accepted and responded */
    381     if (reply_header.stat != RPC_MSG_ACCEPTED) {
    382         /* Offset to map returned error into clnt_stat */
    383         ret = reply_header.u.dr.stat + RPC_VERSMISMATCH;
    384         E("%08x:%08x call was not accepted.\n",
    385           (uint32_t)client->xdr->x_prog, client->xdr->x_vers);
    386         goto out_unlock;
    387     } else if (reply_header.u.ar.stat != RPC_ACCEPT_SUCCESS) {
    388         /* Offset to map returned error into clnt_stat */
    389         ret = reply_header.u.ar.stat + RPC_AUTHERROR;
    390         E("%08x:%08x call failed with an authentication error.\n",
    391           (uint32_t)client->xdr->x_prog, client->xdr->x_vers);
    392         goto out_unlock;
    393     }
    394 
    395     xdr->x_op = XDR_DECODE;
    396     /* Decode results */
    397     if (!xdr_results(xdr, rets_ptr) || ! XDR_MSG_DONE(xdr)) {
    398         ret = RPC_CANTDECODERES;
    399         E("%08x:%08x error decoding results.\n",
    400           client->xdr->x_prog, client->xdr->x_vers);
    401         goto out_unlock;
    402     }
    403 
    404     D("%08x:%08x call success.\n",
    405       client->xdr->x_prog, client->xdr->x_vers);
    406 
    407   out_unlock:
    408     pthread_mutex_lock(&client->input_xdr_lock);
    409     D("%08x:%08x marking input buffer as free.\n",
    410       client->xdr->x_prog, client->xdr->x_vers);
    411     client->input_xdr_busy = 0;
    412     pthread_cond_signal(&client->input_xdr_wait);
    413     pthread_mutex_unlock(&client->input_xdr_lock);
    414 
    415     pthread_mutex_unlock(&client->wait_reply_lock);
    416   out:
    417     pthread_mutex_unlock(&client->lock);
    418     return ret;
    419 } /* clnt_call */
    420 
    421 bool_t xdr_recv_auth (xdr_s_type *xdr, opaque_auth *auth)
    422 {
    423     switch(sizeof(auth->oa_flavor)) {
    424     case 1:
    425         if(!XDR_RECV_INT8(xdr, (int8_t *)&(auth->oa_flavor))) return FALSE;
    426         break;
    427     case 2:
    428         if(!XDR_RECV_INT16(xdr, (int16_t *)&(auth->oa_flavor))) return FALSE;
    429         break;
    430     case 4:
    431         if(!XDR_RECV_INT32(xdr, (int32_t *)&(auth->oa_flavor))) return FALSE;
    432         break;
    433     }
    434     if (!XDR_RECV_UINT (xdr, (unsigned *)&(auth->oa_length))) {
    435         return FALSE;
    436     }
    437 
    438     if (auth->oa_length != 0) {
    439         /* We throw away the auth stuff--it's always the default. */
    440         auth->oa_base = NULL;
    441         if (!XDR_RECV_BYTES (xdr, NULL, auth->oa_length))
    442             return FALSE;
    443         else
    444             return FALSE;
    445     }
    446 
    447     return TRUE;
    448 } /* xdr_recv_auth */
    449 
    450 static bool_t
    451 xdr_recv_accepted_reply_header(xdr_s_type *xdr,
    452                                struct rpc_accepted_reply_header *accreply)
    453 {
    454     if (!xdr_recv_auth(xdr, &accreply->verf)) {
    455         return FALSE;
    456     }
    457 
    458     if (!XDR_RECV_ENUM(xdr, &accreply->stat)) {
    459         return FALSE;
    460     }
    461 
    462     switch ((*accreply).stat) {
    463     case RPC_PROG_MISMATCH:
    464         if (!XDR_RECV_UINT32(xdr, &accreply->u.versions.low)) {
    465             return FALSE;
    466         }
    467 
    468         if (!XDR_RECV_UINT32(xdr, &accreply->u.versions.high)) {
    469             return FALSE;
    470         }
    471         break;
    472 
    473     case RPC_ACCEPT_SUCCESS:
    474     case RPC_PROG_UNAVAIL:
    475     case RPC_PROC_UNAVAIL:
    476     case RPC_GARBAGE_ARGS:
    477     case RPC_SYSTEM_ERR:
    478     case RPC_PROG_LOCKED:
    479         // case ignored
    480         break;
    481 
    482     default:
    483         return FALSE;
    484     }
    485 
    486     return TRUE;
    487 } /* xdr_recv_accepted_reply_header */
    488 
    489 static bool_t xdr_recv_denied_reply(xdr_s_type *xdr,
    490                                     struct rpc_denied_reply *rejreply)
    491 {
    492     if (!XDR_RECV_ENUM (xdr, &rejreply->stat))
    493         return FALSE;
    494 
    495     switch ((*rejreply).stat) {
    496     case RPC_MISMATCH:
    497         if (!XDR_RECV_UINT32(xdr, &rejreply->u.versions.low))
    498             return FALSE;
    499         if (!XDR_RECV_UINT32(xdr, &rejreply->u.versions.high))
    500             return FALSE;
    501         break;
    502     case RPC_AUTH_ERROR:
    503         if (!XDR_RECV_ENUM (xdr, &rejreply->u.why))
    504             return FALSE;
    505         break;
    506     default:
    507         return FALSE;
    508     }
    509 
    510     return TRUE;
    511 } /* xdr_recv_denied_reply */
    512 
    513 bool_t xdr_recv_reply_header (xdr_s_type *xdr, rpc_reply_header *reply)
    514 {
    515     if (!XDR_RECV_ENUM(xdr, &reply->stat)) {
    516         return FALSE;
    517     }
    518 
    519     switch ((*reply).stat) {
    520     case RPC_MSG_ACCEPTED:
    521         if (!xdr_recv_accepted_reply_header(xdr, &reply->u.ar))
    522             return FALSE;
    523         break;
    524     case RPC_MSG_DENIED:
    525         if (!xdr_recv_denied_reply(xdr, &reply->u.dr))
    526             return FALSE;
    527         break;
    528     default:
    529         return FALSE;
    530     }
    531 
    532     return TRUE;
    533 } /* xdr_recv_reply_header */
    534 
    535 CLIENT *clnt_create(
    536     char * host,
    537     uint32 prog,
    538     uint32 vers,
    539     char * proto)
    540 {
    541     CLIENT *client = calloc(1, sizeof(CLIENT));
    542     if (client) {
    543         char name[256];
    544 
    545         /* for versions like 0x00010001, only compare against major version */
    546         if ((vers & 0xFFF00000) == 0)
    547             vers &= 0xFFFF0000;
    548 
    549         pthread_mutex_lock(&rx_mutex);
    550 
    551         snprintf(name, sizeof(name), "/dev/oncrpc/%08x:%08x",
    552                  (uint32_t)prog, (int)vers);
    553         client->xdr = xdr_init_common(name, 1 /* client XDR */);
    554         if (!client->xdr) {
    555             E("failed to initialize client (permissions?)!\n");
    556             free(client);
    557             pthread_mutex_unlock(&rx_mutex);
    558             return NULL;
    559         }
    560         client->xdr->x_prog = prog;
    561         client->xdr->x_vers = vers;
    562         client->cb_stop = -1; /* callback thread has not been started */
    563 
    564         if (!num_clients) {
    565             FD_ZERO(&rx_fdset);
    566             max_rxfd = 0;
    567         }
    568 
    569         FD_SET(client->xdr->fd, &rx_fdset);
    570         if (max_rxfd < client->xdr->fd)
    571             max_rxfd = client->xdr->fd;
    572         client->next = (CLIENT *)clients;
    573         clients = client;
    574         if (!num_clients++) {
    575             D("launching RX thread.\n");
    576             pthread_create(&rx_thread, NULL, rx_context, NULL);
    577         }
    578 
    579         pthread_mutexattr_init(&client->lock_attr);
    580 //      pthread_mutexattr_settype(&client->lock_attr, PTHREAD_MUTEX_RECURSIVE);
    581         pthread_mutex_init(&client->lock, &client->lock_attr);
    582         pthread_mutex_init(&client->wait_reply_lock, &client->lock_attr);
    583         pthread_cond_init(&client->wait_reply, NULL);
    584         pthread_mutex_init(&client->wait_cb_lock, &client->lock_attr);
    585         pthread_cond_init(&client->wait_cb, NULL);
    586         pthread_mutex_init(&client->input_xdr_lock, &client->lock_attr);
    587         pthread_cond_init(&client->input_xdr_wait, NULL);
    588 
    589         pthread_mutex_unlock(&rx_mutex);
    590     }
    591 
    592     return client;
    593 }
    594 
    595 void clnt_destroy(CLIENT *client) {
    596     if (client) {
    597         pthread_mutex_lock(&client->lock);
    598         D("%08x:%08x destroying client\n",
    599           client->xdr->x_prog,
    600           client->xdr->x_vers);
    601 
    602 
    603         if (!client->cb_stop) {
    604             /* The callback thread is running, we need to stop it */
    605             client->cb_stop = 1;
    606             D("%08x:%08x stopping callback thread\n",
    607               client->xdr->x_prog,
    608               client->xdr->x_vers);
    609             pthread_mutex_lock(&client->wait_cb_lock);
    610             pthread_cond_signal(&client->wait_cb);
    611             pthread_mutex_unlock(&client->wait_cb_lock);
    612             D("%08x:%08x joining callback thread\n",
    613               client->xdr->x_prog,
    614               client->xdr->x_vers);
    615             pthread_join(client->cb_thread, NULL);
    616         }
    617 
    618         pthread_mutex_lock(&rx_mutex); /* sync access to the client list */
    619         {
    620             CLIENT *trav = (CLIENT *)clients, *prev = NULL;
    621             for(; trav; trav = trav->next) {
    622                 if (trav == client) {
    623                      D("%08x:%08x removing from client list\n",
    624                       client->xdr->x_prog,
    625                       client->xdr->x_vers);
    626                     if (prev)
    627                         prev->next = trav->next;
    628                     else
    629                         clients = trav->next;
    630                     num_clients--;
    631                     FD_CLR(client->xdr->fd, &rx_fdset);
    632                     break;
    633                 }
    634                 prev = trav;
    635             }
    636         }
    637         if (!num_clients) {
    638             D("stopping rx thread!\n");
    639             pthread_join(rx_thread, NULL);
    640             D("stopped rx thread\n");
    641         }
    642         pthread_mutex_unlock(&rx_mutex); /* sync access to the client list */
    643 
    644         pthread_mutex_destroy(&client->input_xdr_lock);
    645         pthread_cond_destroy(&client->input_xdr_wait);
    646 
    647         pthread_mutex_destroy(&client->wait_reply_lock);
    648         pthread_cond_destroy(&client->wait_reply);
    649         xdr_destroy_common(client->xdr);
    650 
    651         // FIXME: what happens when we lock the client while destroying it,
    652         // and another thread locks the mutex in clnt_call, and then we
    653         // call pthread_mutex_destroy?  Does destroy automatically unlock and
    654         // then cause the lock in clnt_call() to return an error?  When we
    655         // unlock the mutex here there can be a context switch to the other
    656         // thread, which will cause it to obtain the mutex on the destroyed
    657         // client (and probably crash), and then we get to the destroy call
    658         // here... will that call fail?
    659         pthread_mutex_unlock(&client->lock);
    660         pthread_mutex_destroy(&client->lock);
    661         pthread_mutexattr_destroy(&client->lock_attr);
    662         D("client destroy done\n");
    663         free(client);
    664     }
    665 }
    666