Home | History | Annotate | Download | only in libcutils
      1 /*
      2  * Copyright (C) 2007 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #define LOG_TAG "mq"
     18 
     19 #include <assert.h>
     20 #include <errno.h>
     21 #include <fcntl.h>
     22 #include <pthread.h>
     23 #include <stdlib.h>
     24 #include <string.h>
     25 #include <unistd.h>
     26 
     27 #include <sys/socket.h>
     28 #include <sys/types.h>
     29 #include <sys/un.h>
     30 #include <sys/uio.h>
     31 
     32 #include <cutils/array.h>
     33 #include <cutils/hashmap.h>
     34 #include <cutils/selector.h>
     35 
     36 #include "loghack.h"
     37 #include "buffer.h"
     38 
     39 /** Number of dead peers to remember. */
     40 #define PEER_HISTORY (16)
     41 
     42 typedef struct sockaddr SocketAddress;
     43 typedef struct sockaddr_un UnixAddress;
     44 
     45 /**
     46  * Process/user/group ID. We don't use ucred directly because it's only
     47  * available on Linux.
     48  */
     49 typedef struct {
     50     pid_t pid;
     51     uid_t uid;
     52     gid_t gid;
     53 } Credentials;
     54 
     55 /** Listens for bytes coming from remote peers. */
     56 typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
     57 
     58 /** Listens for the deaths of remote peers. */
     59 typedef void DeathListener(pid_t pid);
     60 
     61 /** Types of packets. */
     62 typedef enum {
     63     /** Request for a connection to another peer. */
     64     CONNECTION_REQUEST,
     65 
     66     /** A connection to another peer. */
     67     CONNECTION,
     68 
     69     /** Reports a failed connection attempt. */
     70     CONNECTION_ERROR,
     71 
     72     /** A generic packet of bytes. */
     73     BYTES,
     74 } PacketType;
     75 
     76 typedef enum {
     77     /** Reading a packet header. */
     78     READING_HEADER,
     79 
     80     /** Waiting for a connection from the master. */
     81     ACCEPTING_CONNECTION,
     82 
     83     /** Reading bytes. */
     84     READING_BYTES,
     85 } InputState;
     86 
     87 /** A packet header. */
     88 // TODO: Use custom headers for master->peer, peer->master, peer->peer.
     89 typedef struct {
     90     PacketType type;
     91     union {
     92         /** Packet size. Used for BYTES. */
     93         size_t size;
     94 
     95         /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
     96         Credentials credentials;
     97     };
     98 } Header;
     99 
    100 /** A packet which will be sent to a peer. */
    101 typedef struct OutgoingPacket OutgoingPacket;
    102 struct OutgoingPacket {
    103     /** Packet header. */
    104     Header header;
    105 
    106     union {
    107         /** Connection to peer. Used with CONNECTION. */
    108         int socket;
    109 
    110         /** Buffer of bytes. Used with BYTES. */
    111         Buffer* bytes;
    112     };
    113 
    114     /** Frees all resources associated with this packet. */
    115     void (*free)(OutgoingPacket* packet);
    116 
    117     /** Optional context. */
    118     void* context;
    119 
    120     /** Next packet in the queue. */
    121     OutgoingPacket* nextPacket;
    122 };
    123 
    124 /** Represents a remote peer. */
    125 typedef struct PeerProxy PeerProxy;
    126 
    127 /** Local peer state. You typically have one peer per process. */
    128 typedef struct {
    129     /** This peer's PID. */
    130     pid_t pid;
    131 
    132     /**
    133      * Map from pid to peer proxy. The peer has a peer proxy for each remote
    134      * peer it's connected to.
    135      *
    136      * Acquire mutex before use.
    137      */
    138     Hashmap* peerProxies;
    139 
    140     /** Manages I/O. */
    141     Selector* selector;
    142 
    143     /** Used to synchronize operations with the selector thread. */
    144     pthread_mutex_t mutex;
    145 
    146     /** Is this peer the master? */
    147     bool master;
    148 
    149     /** Peer proxy for the master. */
    150     PeerProxy* masterProxy;
    151 
    152     /** Listens for packets from remote peers. */
    153     BytesListener* onBytes;
    154 
    155     /** Listens for deaths of remote peers. */
    156     DeathListener* onDeath;
    157 
    158     /** Keeps track of recently dead peers. Requires mutex. */
    159     pid_t deadPeers[PEER_HISTORY];
    160     size_t deadPeerCursor;
    161 } Peer;
    162 
    163 struct PeerProxy {
    164     /** Credentials of the remote process. */
    165     Credentials credentials;
    166 
    167     /** Keeps track of data coming in from the remote peer. */
    168     InputState inputState;
    169     Buffer* inputBuffer;
    170     PeerProxy* connecting;
    171 
    172     /** File descriptor for this peer. */
    173     SelectableFd* fd;
    174 
    175     /**
    176      * Queue of packets to be written out to the remote peer.
    177      *
    178      * Requires mutex.
    179      */
    180     // TODO: Limit queue length.
    181     OutgoingPacket* currentPacket;
    182     OutgoingPacket* lastPacket;
    183 
    184     /** Used to write outgoing header. */
    185     Buffer outgoingHeader;
    186 
    187     /** True if this is the master's proxy. */
    188     bool master;
    189 
    190     /** Reference back to the local peer. */
    191     Peer* peer;
    192 
    193     /**
    194      * Used in master only. Maps this peer proxy to other peer proxies to
    195      * which the peer has been connected to. Maps pid to PeerProxy. Helps
    196      * keep track of which connections we've sent to whom.
    197      */
    198     Hashmap* connections;
    199 };
    200 
    201 /** Server socket path. */
    202 static const char* MASTER_PATH = "/master.peer";
    203 
    204 /** Credentials of the master peer. */
    205 static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
    206 
    207 /** Creates a peer proxy and adds it to the peer proxy map. */
    208 static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
    209 
    210 /** Sets the non-blocking flag on a descriptor. */
    211 static void setNonBlocking(int fd) {
    212     int flags;
    213     if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
    214         LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
    215     }
    216     if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
    217         LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
    218     }
    219 }
    220 
    221 /** Closes a fd and logs a warning if the close fails. */
    222 static void closeWithWarning(int fd) {
    223     int result = close(fd);
    224     if (result == -1) {
    225         ALOGW("close() error: %s", strerror(errno));
    226     }
    227 }
    228 
    229 /** Hashes pid_t keys. */
    230 static int pidHash(void* key) {
    231     pid_t* pid = (pid_t*) key;
    232     return (int) (*pid);
    233 }
    234 
    235 /** Compares pid_t keys. */
    236 static bool pidEquals(void* keyA, void* keyB) {
    237     pid_t* a = (pid_t*) keyA;
    238     pid_t* b = (pid_t*) keyB;
    239     return *a == *b;
    240 }
    241 
    242 /** Gets the master address. Not thread safe. */
    243 static UnixAddress* getMasterAddress() {
    244     static UnixAddress masterAddress;
    245     static bool initialized = false;
    246     if (initialized == false) {
    247         masterAddress.sun_family = AF_LOCAL;
    248         strcpy(masterAddress.sun_path, MASTER_PATH);
    249         initialized = true;
    250     }
    251     return &masterAddress;
    252 }
    253 
    254 /** Gets exclusive access to the peer for this thread. */
    255 static void peerLock(Peer* peer) {
    256     pthread_mutex_lock(&peer->mutex);
    257 }
    258 
    259 /** Releases exclusive access to the peer. */
    260 static void peerUnlock(Peer* peer) {
    261     pthread_mutex_unlock(&peer->mutex);
    262 }
    263 
    264 /** Frees a simple, i.e. header-only, outgoing packet. */
    265 static void outgoingPacketFree(OutgoingPacket* packet) {
    266     ALOGD("Freeing outgoing packet.");
    267 	free(packet);
    268 }
    269 
    270 /**
    271  * Prepare to read a new packet from the peer.
    272  */
    273 static void peerProxyExpectHeader(PeerProxy* peerProxy) {
    274     peerProxy->inputState = READING_HEADER;
    275     bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
    276 }
    277 
    278 /** Sets up the buffer for the outgoing header. */
    279 static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
    280     peerProxy->outgoingHeader.data
    281         = (char*) &(peerProxy->currentPacket->header);
    282     peerProxy->outgoingHeader.size = sizeof(Header);
    283     bufferPrepareForWrite(&peerProxy->outgoingHeader);
    284 }
    285 
    286 /** Adds a packet to the end of the queue. Callers must have the mutex. */
    287 static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
    288         OutgoingPacket* newPacket) {
    289     newPacket->nextPacket = NULL; // Just in case.
    290     if (peerProxy->currentPacket == NULL) {
    291         // The queue is empty.
    292         peerProxy->currentPacket = newPacket;
    293         peerProxy->lastPacket = newPacket;
    294 
    295         peerProxyPrepareOutgoingHeader(peerProxy);
    296     } else {
    297         peerProxy->lastPacket->nextPacket = newPacket;
    298     }
    299 }
    300 
    301 /** Takes the peer lock and enqueues the given packet. */
    302 static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
    303         OutgoingPacket* newPacket) {
    304     Peer* peer = peerProxy->peer;
    305     peerLock(peer);
    306     peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
    307     peerUnlock(peer);
    308 }
    309 
    310 /**
    311  * Frees current packet and moves to the next one. Returns true if there is
    312  * a next packet or false if the queue is empty.
    313  */
    314 static bool peerProxyNextPacket(PeerProxy* peerProxy) {
    315     Peer* peer = peerProxy->peer;
    316     peerLock(peer);
    317 
    318     OutgoingPacket* current = peerProxy->currentPacket;
    319 
    320     if (current == NULL) {
    321     	// The queue is already empty.
    322         peerUnlock(peer);
    323         return false;
    324     }
    325 
    326     OutgoingPacket* next = current->nextPacket;
    327     peerProxy->currentPacket = next;
    328     current->nextPacket = NULL;
    329     current->free(current);
    330     if (next == NULL) {
    331         // The queue is empty.
    332         peerProxy->lastPacket = NULL;
    333         peerUnlock(peer);
    334         return false;
    335     } else {
    336         peerUnlock(peer);
    337         peerProxyPrepareOutgoingHeader(peerProxy);
    338 
    339         // TODO: Start writing next packet? It would reduce the number of
    340         // system calls, but we could also starve other peers.
    341         return true;
    342     }
    343 }
    344 
    345 /**
    346  * Checks whether a peer died recently.
    347  */
    348 static bool peerIsDead(Peer* peer, pid_t pid) {
    349     size_t i;
    350     for (i = 0; i < PEER_HISTORY; i++) {
    351         pid_t deadPeer = peer->deadPeers[i];
    352         if (deadPeer == 0) {
    353             return false;
    354         }
    355         if (deadPeer == pid) {
    356             return true;
    357         }
    358     }
    359     return false;
    360 }
    361 
    362 /**
    363  * Cleans up connection information.
    364  */
    365 static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
    366     PeerProxy* deadPeer = (PeerProxy*) context;
    367     PeerProxy* otherPeer = (PeerProxy*) value;
    368     hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
    369     return true;
    370 }
    371 
    372 /**
    373  * Called when the peer dies.
    374  */
    375 static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
    376     if (errnoIsSet) {
    377         ALOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,
    378                 strerror(errno));
    379     } else {
    380         ALOGI("Peer %d died.", peerProxy->credentials.pid);
    381     }
    382 
    383     // If we lost the master, we're up a creek. We can't let this happen.
    384     if (peerProxy->master) {
    385         LOG_ALWAYS_FATAL("Lost connection to master.");
    386     }
    387 
    388     Peer* localPeer = peerProxy->peer;
    389     pid_t pid = peerProxy->credentials.pid;
    390 
    391     peerLock(localPeer);
    392 
    393     // Remember for awhile that the peer died.
    394     localPeer->deadPeers[localPeer->deadPeerCursor]
    395         = peerProxy->credentials.pid;
    396     localPeer->deadPeerCursor++;
    397     if (localPeer->deadPeerCursor == PEER_HISTORY) {
    398         localPeer->deadPeerCursor = 0;
    399     }
    400 
    401     // Remove from peer map.
    402     hashmapRemove(localPeer->peerProxies, &pid);
    403 
    404     // External threads can no longer get to this peer proxy, so we don't
    405     // need the lock anymore.
    406     peerUnlock(localPeer);
    407 
    408     // Remove the fd from the selector.
    409     if (peerProxy->fd != NULL) {
    410         peerProxy->fd->remove = true;
    411     }
    412 
    413     // Clear outgoing packet queue.
    414     while (peerProxyNextPacket(peerProxy)) {}
    415 
    416     bufferFree(peerProxy->inputBuffer);
    417 
    418     // This only applies to the master.
    419     if (peerProxy->connections != NULL) {
    420         // We can't leave these other maps pointing to freed memory.
    421         hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,
    422                 peerProxy);
    423         hashmapFree(peerProxy->connections);
    424     }
    425 
    426     // Invoke death listener.
    427     localPeer->onDeath(pid);
    428 
    429     // Free the peer proxy itself.
    430     free(peerProxy);
    431 }
    432 
    433 static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
    434     if (errno == EINTR) {
    435         // Log interruptions but otherwise ignore them.
    436         ALOGW("%s() interrupted.", functionName);
    437     } else if (errno == EAGAIN) {
    438         ALOGD("EWOULDBLOCK");
    439         // Ignore.
    440     } else {
    441         ALOGW("Error returned by %s().", functionName);
    442         peerProxyKill(peerProxy, true);
    443     }
    444 }
    445 
    446 /**
    447  * Buffers output sent to a peer. May be called multiple times until the entire
    448  * buffer is filled. Returns true when the buffer is empty.
    449  */
    450 static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
    451     ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
    452     if (size < 0) {
    453         peerProxyHandleError(peerProxy, "write");
    454         return false;
    455     } else {
    456         return bufferWriteComplete(outgoing);
    457     }
    458 }
    459 
    460 /** Writes packet bytes to peer. */
    461 static void peerProxyWriteBytes(PeerProxy* peerProxy) {
    462 	Buffer* buffer = peerProxy->currentPacket->bytes;
    463 	if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
    464         ALOGD("Bytes written.");
    465         peerProxyNextPacket(peerProxy);
    466     }
    467 }
    468 
    469 /** Sends a socket to the peer. */
    470 static void peerProxyWriteConnection(PeerProxy* peerProxy) {
    471     int socket = peerProxy->currentPacket->socket;
    472 
    473     // Why does sending and receiving fds have to be such a PITA?
    474     struct msghdr msg;
    475     struct iovec iov[1];
    476 
    477     union {
    478         struct cmsghdr cm;
    479         char control[CMSG_SPACE(sizeof(int))];
    480     } control_un;
    481 
    482     struct cmsghdr *cmptr;
    483 
    484     msg.msg_control = control_un.control;
    485     msg.msg_controllen = sizeof(control_un.control);
    486     cmptr = CMSG_FIRSTHDR(&msg);
    487     cmptr->cmsg_len = CMSG_LEN(sizeof(int));
    488     cmptr->cmsg_level = SOL_SOCKET;
    489     cmptr->cmsg_type = SCM_RIGHTS;
    490 
    491     // Store the socket in the message.
    492     *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
    493 
    494     msg.msg_name = NULL;
    495     msg.msg_namelen = 0;
    496     iov[0].iov_base = "";
    497     iov[0].iov_len = 1;
    498     msg.msg_iov = iov;
    499     msg.msg_iovlen = 1;
    500 
    501     ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
    502 
    503     if (result < 0) {
    504         peerProxyHandleError(peerProxy, "sendmsg");
    505     } else {
    506         // Success. Queue up the next packet.
    507         peerProxyNextPacket(peerProxy);
    508 
    509     }
    510 }
    511 
    512 /**
    513  * Writes some outgoing data.
    514  */
    515 static void peerProxyWrite(SelectableFd* fd) {
    516     // TODO: Try to write header and body with one system call.
    517 
    518     PeerProxy* peerProxy = (PeerProxy*) fd->data;
    519     OutgoingPacket* current = peerProxy->currentPacket;
    520 
    521     if (current == NULL) {
    522         // We have nothing left to write.
    523         return;
    524     }
    525 
    526     // Write the header.
    527     Buffer* outgoingHeader = &peerProxy->outgoingHeader;
    528     bool headerWritten = bufferWriteComplete(outgoingHeader);
    529     if (!headerWritten) {
    530         ALOGD("Writing header...");
    531         headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
    532         if (headerWritten) {
    533             ALOGD("Header written.");
    534         }
    535     }
    536 
    537     // Write body.
    538     if (headerWritten) {
    539         PacketType type = current->header.type;
    540         switch (type) {
    541             case CONNECTION:
    542                 peerProxyWriteConnection(peerProxy);
    543                 break;
    544             case BYTES:
    545                 peerProxyWriteBytes(peerProxy);
    546                 break;
    547             case CONNECTION_REQUEST:
    548             case CONNECTION_ERROR:
    549                 // These packets consist solely of a header.
    550                 peerProxyNextPacket(peerProxy);
    551                 break;
    552             default:
    553                 LOG_ALWAYS_FATAL("Unknown packet type: %d", type);
    554         }
    555     }
    556 }
    557 
    558 /**
    559  * Sets up a peer proxy's fd before we try to select() it.
    560  */
    561 static void peerProxyBeforeSelect(SelectableFd* fd) {
    562     ALOGD("Before select...");
    563 
    564     PeerProxy* peerProxy = (PeerProxy*) fd->data;
    565 
    566     peerLock(peerProxy->peer);
    567     bool hasPackets = peerProxy->currentPacket != NULL;
    568     peerUnlock(peerProxy->peer);
    569 
    570     if (hasPackets) {
    571         ALOGD("Packets found. Setting onWritable().");
    572 
    573         fd->onWritable = &peerProxyWrite;
    574     } else {
    575         // We have nothing to write.
    576         fd->onWritable = NULL;
    577     }
    578 }
    579 
    580 /** Prepare to read bytes from the peer. */
    581 static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
    582     ALOGD("Expecting %d bytes.", header->size);
    583 
    584     peerProxy->inputState = READING_BYTES;
    585     if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
    586         ALOGW("Couldn't allocate memory for incoming data. Size: %u",
    587                 (unsigned int) header->size);
    588 
    589         // TODO: Ignore the packet and log a warning?
    590         peerProxyKill(peerProxy, false);
    591     }
    592 }
    593 
    594 /**
    595  * Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
    596  * Sends a connection request to the master if desired.
    597  *
    598  * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
    599  * or ENOMEM if memory couldn't be allocated.
    600  */
    601 static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,
    602         bool requestConnection) {
    603     if (pid == peer->pid) {
    604         errno = EINVAL;
    605         return NULL;
    606     }
    607 
    608     if (peerIsDead(peer, pid)) {
    609         errno = EHOSTDOWN;
    610         return NULL;
    611     }
    612 
    613     PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
    614     if (peerProxy != NULL) {
    615         return peerProxy;
    616     }
    617 
    618     // If this is the master peer, we already know about all peers.
    619     if (peer->master) {
    620         errno = EHOSTDOWN;
    621         return NULL;
    622     }
    623 
    624     // Try to create a peer proxy.
    625     Credentials credentials;
    626     credentials.pid = pid;
    627 
    628     // Fake gid and uid until we have the real thing. The real creds are
    629     // filled in by masterProxyExpectConnection(). These fake creds will
    630     // never be exposed to the user.
    631     credentials.uid = 0;
    632     credentials.gid = 0;
    633 
    634     // Make sure we can allocate the connection request packet.
    635     OutgoingPacket* packet = NULL;
    636     if (requestConnection) {
    637         packet = calloc(1, sizeof(OutgoingPacket));
    638         if (packet == NULL) {
    639             errno = ENOMEM;
    640             return NULL;
    641         }
    642 
    643         packet->header.type = CONNECTION_REQUEST;
    644         packet->header.credentials = credentials;
    645         packet->free = &outgoingPacketFree;
    646     }
    647 
    648     peerProxy = peerProxyCreate(peer, credentials);
    649     if (peerProxy == NULL) {
    650         free(packet);
    651         errno = ENOMEM;
    652         return NULL;
    653     } else {
    654         // Send a connection request to the master.
    655         if (requestConnection) {
    656             PeerProxy* masterProxy = peer->masterProxy;
    657             peerProxyEnqueueOutgoingPacket(masterProxy, packet);
    658         }
    659 
    660         return peerProxy;
    661     }
    662 }
    663 
    664 /**
    665  * Switches the master peer proxy into a state where it's waiting for a
    666  * connection from the master.
    667  */
    668 static void masterProxyExpectConnection(PeerProxy* masterProxy,
    669         Header* header) {
    670     // TODO: Restructure things so we don't need this check.
    671     // Verify that this really is the master.
    672     if (!masterProxy->master) {
    673         ALOGW("Non-master process %d tried to send us a connection.",
    674             masterProxy->credentials.pid);
    675         // Kill off the evil peer.
    676         peerProxyKill(masterProxy, false);
    677         return;
    678     }
    679 
    680     masterProxy->inputState = ACCEPTING_CONNECTION;
    681     Peer* localPeer = masterProxy->peer;
    682 
    683     // Create a peer proxy so we have somewhere to stash the creds.
    684     // See if we already have a proxy set up.
    685     pid_t pid = header->credentials.pid;
    686     peerLock(localPeer);
    687     PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
    688     if (peerProxy == NULL) {
    689         ALOGW("Peer proxy creation failed: %s", strerror(errno));
    690     } else {
    691         // Fill in full credentials.
    692         peerProxy->credentials = header->credentials;
    693     }
    694     peerUnlock(localPeer);
    695 
    696     // Keep track of which peer proxy we're accepting a connection for.
    697     masterProxy->connecting = peerProxy;
    698 }
    699 
    700 /**
    701  * Reads input from a peer process.
    702  */
    703 static void peerProxyRead(SelectableFd* fd);
    704 
    705 /** Sets up fd callbacks. */
    706 static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
    707     peerProxy->fd = fd;
    708     fd->data = peerProxy;
    709     fd->onReadable = &peerProxyRead;
    710     fd->beforeSelect = &peerProxyBeforeSelect;
    711 
    712     // Make the socket non-blocking.
    713     setNonBlocking(fd->fd);
    714 }
    715 
    716 /**
    717  * Accepts a connection sent by the master proxy.
    718  */
    719 static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
    720     struct msghdr msg;
    721     struct iovec iov[1];
    722     ssize_t size;
    723     char ignored;
    724     int incomingFd;
    725 
    726     // TODO: Reuse code which writes the connection. Who the heck designed
    727     // this API anyway?
    728     union {
    729         struct cmsghdr cm;
    730         char control[CMSG_SPACE(sizeof(int))];
    731     } control_un;
    732     struct cmsghdr *cmptr;
    733     msg.msg_control = control_un.control;
    734     msg.msg_controllen = sizeof(control_un.control);
    735 
    736     msg.msg_name = NULL;
    737     msg.msg_namelen = 0;
    738 
    739     // We sent 1 byte of data so we can detect EOF.
    740     iov[0].iov_base = &ignored;
    741     iov[0].iov_len = 1;
    742     msg.msg_iov = iov;
    743     msg.msg_iovlen = 1;
    744 
    745     size = recvmsg(masterProxy->fd->fd, &msg, 0);
    746     if (size < 0) {
    747         if (errno == EINTR) {
    748             // Log interruptions but otherwise ignore them.
    749             ALOGW("recvmsg() interrupted.");
    750             return;
    751         } else if (errno == EAGAIN) {
    752             // Keep waiting for the connection.
    753             return;
    754         } else {
    755             LOG_ALWAYS_FATAL("Error reading connection from master: %s",
    756                     strerror(errno));
    757         }
    758     } else if (size == 0) {
    759         // EOF.
    760         LOG_ALWAYS_FATAL("Received EOF from master.");
    761     }
    762 
    763     // Extract fd from message.
    764     if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
    765             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
    766         if (cmptr->cmsg_level != SOL_SOCKET) {
    767             LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
    768         }
    769         if (cmptr->cmsg_type != SCM_RIGHTS) {
    770             LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
    771         }
    772         incomingFd = *((int*) CMSG_DATA(cmptr));
    773     } else {
    774         LOG_ALWAYS_FATAL("Expected fd.");
    775     }
    776 
    777     // The peer proxy this connection is for.
    778     PeerProxy* peerProxy = masterProxy->connecting;
    779     if (peerProxy == NULL) {
    780         ALOGW("Received connection for unknown peer.");
    781         closeWithWarning(incomingFd);
    782     } else {
    783         Peer* peer = masterProxy->peer;
    784 
    785         SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
    786         if (selectableFd == NULL) {
    787             ALOGW("Error adding fd to selector for %d.",
    788                     peerProxy->credentials.pid);
    789             closeWithWarning(incomingFd);
    790             peerProxyKill(peerProxy, false);
    791         }
    792 
    793         peerProxySetFd(peerProxy, selectableFd);
    794     }
    795 
    796     peerProxyExpectHeader(masterProxy);
    797 }
    798 
    799 /**
    800  * Frees an outgoing packet containing a connection.
    801  */
    802 static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
    803     closeWithWarning(packet->socket);
    804     outgoingPacketFree(packet);
    805 }
    806 
    807 /**
    808  * Connects two known peers.
    809  */
    810 static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
    811     int sockets[2];
    812     int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
    813     if (result == -1) {
    814         ALOGW("socketpair() error: %s", strerror(errno));
    815         // TODO: Send CONNECTION_FAILED packets to peers.
    816         return;
    817     }
    818 
    819     OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
    820     OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
    821     if (packetA == NULL || packetB == NULL) {
    822         free(packetA);
    823         free(packetB);
    824         ALOGW("malloc() error. Failed to tell process %d that process %d is"
    825                 " dead.", peerA->credentials.pid, peerB->credentials.pid);
    826         return;
    827     }
    828 
    829     packetA->header.type = CONNECTION;
    830     packetB->header.type = CONNECTION;
    831 
    832     packetA->header.credentials = peerB->credentials;
    833     packetB->header.credentials = peerA->credentials;
    834 
    835     packetA->socket = sockets[0];
    836     packetB->socket = sockets[1];
    837 
    838     packetA->free = &outgoingPacketFreeSocket;
    839     packetB->free = &outgoingPacketFreeSocket;
    840 
    841     peerLock(peerA->peer);
    842     peerProxyEnqueueOutgoingPacket(peerA, packetA);
    843     peerProxyEnqueueOutgoingPacket(peerB, packetB);
    844     peerUnlock(peerA->peer);
    845 }
    846 
    847 /**
    848  * Informs a peer that the peer they're trying to connect to couldn't be
    849  * found.
    850  */
    851 static void masterReportConnectionError(PeerProxy* peerProxy,
    852         Credentials credentials) {
    853     OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
    854     if (packet == NULL) {
    855         ALOGW("malloc() error. Failed to tell process %d that process %d is"
    856                 " dead.", peerProxy->credentials.pid, credentials.pid);
    857         return;
    858     }
    859 
    860     packet->header.type = CONNECTION_ERROR;
    861     packet->header.credentials = credentials;
    862     packet->free = &outgoingPacketFree;
    863 
    864     peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);
    865 }
    866 
    867 /**
    868  * Handles a request to be connected to another peer.
    869  */
    870 static void masterHandleConnectionRequest(PeerProxy* peerProxy,
    871         Header* header) {
    872     Peer* master = peerProxy->peer;
    873     pid_t targetPid = header->credentials.pid;
    874     if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
    875         // We haven't connected these peers yet.
    876         PeerProxy* targetPeer
    877             = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
    878         if (targetPeer == NULL) {
    879             // Unknown process.
    880             masterReportConnectionError(peerProxy, header->credentials);
    881         } else {
    882             masterConnectPeers(peerProxy, targetPeer);
    883         }
    884     }
    885 
    886     // This packet is complete. Get ready for the next one.
    887     peerProxyExpectHeader(peerProxy);
    888 }
    889 
    890 /**
    891  * The master told us this peer is dead.
    892  */
    893 static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
    894         Header* header) {
    895     Peer* peer = masterProxy->peer;
    896 
    897     // Look up the peer proxy.
    898     pid_t pid = header->credentials.pid;
    899     PeerProxy* peerProxy = NULL;
    900     peerLock(peer);
    901     peerProxy = hashmapGet(peer->peerProxies, &pid);
    902     peerUnlock(peer);
    903 
    904     if (peerProxy != NULL) {
    905         ALOGI("Couldn't connect to %d.", pid);
    906         peerProxyKill(peerProxy, false);
    907     } else {
    908         ALOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
    909     }
    910 
    911     peerProxyExpectHeader(masterProxy);
    912 }
    913 
    914 /**
    915  * Handles a packet header.
    916  */
    917 static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
    918     switch (header->type) {
    919         case CONNECTION_REQUEST:
    920             masterHandleConnectionRequest(peerProxy, header);
    921             break;
    922         case CONNECTION:
    923             masterProxyExpectConnection(peerProxy, header);
    924             break;
    925         case CONNECTION_ERROR:
    926             masterProxyHandleConnectionError(peerProxy, header);
    927             break;
    928         case BYTES:
    929             peerProxyExpectBytes(peerProxy, header);
    930             break;
    931         default:
    932             ALOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,
    933                     header->type);
    934             peerProxyKill(peerProxy, false);
    935     }
    936 }
    937 
    938 /**
    939  * Buffers input sent by peer. May be called multiple times until the entire
    940  * buffer is filled. Returns true when the buffer is full.
    941  */
    942 static bool peerProxyBufferInput(PeerProxy* peerProxy) {
    943     Buffer* in = peerProxy->inputBuffer;
    944     ssize_t size = bufferRead(in, peerProxy->fd->fd);
    945     if (size < 0) {
    946         peerProxyHandleError(peerProxy, "read");
    947         return false;
    948     } else if (size == 0) {
    949         // EOF.
    950     	ALOGI("EOF");
    951         peerProxyKill(peerProxy, false);
    952         return false;
    953     } else if (bufferReadComplete(in)) {
    954         // We're done!
    955         return true;
    956     } else {
    957         // Continue reading.
    958         return false;
    959     }
    960 }
    961 
    962 /**
    963  * Reads input from a peer process.
    964  */
    965 static void peerProxyRead(SelectableFd* fd) {
    966     ALOGD("Reading...");
    967     PeerProxy* peerProxy = (PeerProxy*) fd->data;
    968     int state = peerProxy->inputState;
    969     Buffer* in = peerProxy->inputBuffer;
    970     switch (state) {
    971         case READING_HEADER:
    972             if (peerProxyBufferInput(peerProxy)) {
    973                 ALOGD("Header read.");
    974                 // We've read the complete header.
    975                 Header* header = (Header*) in->data;
    976                 peerProxyHandleHeader(peerProxy, header);
    977             }
    978             break;
    979         case READING_BYTES:
    980             ALOGD("Reading bytes...");
    981             if (peerProxyBufferInput(peerProxy)) {
    982                 ALOGD("Bytes read.");
    983                 // We have the complete packet. Notify bytes listener.
    984                 peerProxy->peer->onBytes(peerProxy->credentials,
    985                     in->data, in->size);
    986 
    987                 // Get ready for the next packet.
    988                 peerProxyExpectHeader(peerProxy);
    989             }
    990             break;
    991         case ACCEPTING_CONNECTION:
    992             masterProxyAcceptConnection(peerProxy);
    993             break;
    994         default:
    995             LOG_ALWAYS_FATAL("Unknown state: %d", state);
    996     }
    997 }
    998 
    999 static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
   1000     PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
   1001     if (peerProxy == NULL) {
   1002         return NULL;
   1003     }
   1004 
   1005     peerProxy->inputBuffer = bufferCreate(sizeof(Header));
   1006     if (peerProxy->inputBuffer == NULL) {
   1007         free(peerProxy);
   1008         return NULL;
   1009     }
   1010 
   1011     peerProxy->peer = peer;
   1012     peerProxy->credentials = credentials;
   1013 
   1014     // Initial state == expecting a header.
   1015     peerProxyExpectHeader(peerProxy);
   1016 
   1017     // Add this proxy to the map. Make sure the key points to the stable memory
   1018     // inside of the peer proxy itself.
   1019     pid_t* pid = &(peerProxy->credentials.pid);
   1020     hashmapPut(peer->peerProxies, pid, peerProxy);
   1021     return peerProxy;
   1022 }
   1023 
   1024 /** Accepts a connection to the master peer. */
   1025 static void masterAcceptConnection(SelectableFd* listenerFd) {
   1026     // Accept connection.
   1027     int socket = accept(listenerFd->fd, NULL, NULL);
   1028     if (socket == -1) {
   1029         ALOGW("accept() error: %s", strerror(errno));
   1030         return;
   1031     }
   1032 
   1033     ALOGD("Accepted connection as fd %d.", socket);
   1034 
   1035     // Get credentials.
   1036     Credentials credentials;
   1037     struct ucred ucredentials;
   1038     socklen_t credentialsSize = sizeof(struct ucred);
   1039     int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,
   1040                 &ucredentials, &credentialsSize);
   1041     // We might want to verify credentialsSize.
   1042     if (result == -1) {
   1043         ALOGW("getsockopt() error: %s", strerror(errno));
   1044         closeWithWarning(socket);
   1045         return;
   1046     }
   1047 
   1048     // Copy values into our own structure so we know we have the types right.
   1049     credentials.pid = ucredentials.pid;
   1050     credentials.uid = ucredentials.uid;
   1051     credentials.gid = ucredentials.gid;
   1052 
   1053     ALOGI("Accepted connection from process %d.", credentials.pid);
   1054 
   1055     Peer* masterPeer = (Peer*) listenerFd->data;
   1056 
   1057     peerLock(masterPeer);
   1058 
   1059     // Make sure we don't already have a connection from that process.
   1060     PeerProxy* peerProxy
   1061         = hashmapGet(masterPeer->peerProxies, &credentials.pid);
   1062     if (peerProxy != NULL) {
   1063         peerUnlock(masterPeer);
   1064         ALOGW("Alread connected to process %d.", credentials.pid);
   1065         closeWithWarning(socket);
   1066         return;
   1067     }
   1068 
   1069     // Add connection to the selector.
   1070     SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
   1071     if (socketFd == NULL) {
   1072         peerUnlock(masterPeer);
   1073         ALOGW("malloc() failed.");
   1074         closeWithWarning(socket);
   1075         return;
   1076     }
   1077 
   1078     // Create a peer proxy.
   1079     peerProxy = peerProxyCreate(masterPeer, credentials);
   1080     peerUnlock(masterPeer);
   1081     if (peerProxy == NULL) {
   1082         ALOGW("malloc() failed.");
   1083         socketFd->remove = true;
   1084         closeWithWarning(socket);
   1085     }
   1086     peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
   1087     peerProxySetFd(peerProxy, socketFd);
   1088 }
   1089 
   1090 /**
   1091  * Creates the local peer.
   1092  */
   1093 static Peer* peerCreate() {
   1094     Peer* peer = calloc(1, sizeof(Peer));
   1095     if (peer == NULL) {
   1096         LOG_ALWAYS_FATAL("malloc() error.");
   1097     }
   1098     peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
   1099     peer->selector = selectorCreate();
   1100 
   1101     pthread_mutexattr_t attributes;
   1102     if (pthread_mutexattr_init(&attributes) != 0) {
   1103         LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
   1104     }
   1105     if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
   1106         LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
   1107     }
   1108     if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
   1109         LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
   1110     }
   1111 
   1112     peer->pid = getpid();
   1113     return peer;
   1114 }
   1115 
   1116 /** The local peer. */
   1117 static Peer* localPeer;
   1118 
   1119 /** Frees a packet of bytes. */
   1120 static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
   1121     ALOGD("Freeing outgoing packet.");
   1122     bufferFree(packet->bytes);
   1123     free(packet);
   1124 }
   1125 
   1126 /**
   1127  * Sends a packet of bytes to a remote peer. Returns 0 on success.
   1128  *
   1129  * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
   1130  * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
   1131  * to EINVAL if pid is the same as the local pid.
   1132  */
   1133 int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
   1134 	Peer* peer = localPeer;
   1135     assert(peer != NULL);
   1136 
   1137     OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
   1138     if (packet == NULL) {
   1139         errno = ENOMEM;
   1140         return -1;
   1141     }
   1142 
   1143     Buffer* copy = bufferCreate(size);
   1144     if (copy == NULL) {
   1145         free(packet);
   1146         errno = ENOMEM;
   1147         return -1;
   1148     }
   1149 
   1150     // Copy data.
   1151     memcpy(copy->data, bytes, size);
   1152     copy->size = size;
   1153 
   1154     packet->bytes = copy;
   1155     packet->header.type = BYTES;
   1156     packet->header.size = size;
   1157     packet->free = outgoingPacketFreeBytes;
   1158     bufferPrepareForWrite(packet->bytes);
   1159 
   1160     peerLock(peer);
   1161 
   1162     PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
   1163     if (peerProxy == NULL) {
   1164         // The peer is already dead or we couldn't alloc memory. Either way,
   1165         // errno is set.
   1166         peerUnlock(peer);
   1167         packet->free(packet);
   1168         return -1;
   1169     } else {
   1170         peerProxyEnqueueOutgoingPacket(peerProxy, packet);
   1171         peerUnlock(peer);
   1172         selectorWakeUp(peer->selector);
   1173         return 0;
   1174     }
   1175 }
   1176 
   1177 /** Keeps track of how to free shared bytes. */
   1178 typedef struct {
   1179     void (*free)(void* context);
   1180     void* context;
   1181 } SharedBytesFreer;
   1182 
   1183 /** Frees shared bytes. */
   1184 static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
   1185     SharedBytesFreer* sharedBytesFreer
   1186         = (SharedBytesFreer*) packet->context;
   1187     sharedBytesFreer->free(sharedBytesFreer->context);
   1188     free(sharedBytesFreer);
   1189     free(packet);
   1190 }
   1191 
   1192 /**
   1193  * Sends a packet of bytes to a remote peer without copying the bytes. Calls
   1194  * free() with context after the bytes have been sent.
   1195  *
   1196  * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
   1197  * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
   1198  * to EINVAL if pid is the same as the local pid.
   1199  */
   1200 int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
   1201         void (*free)(void* context), void* context) {
   1202     Peer* peer = localPeer;
   1203     assert(peer != NULL);
   1204 
   1205     OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
   1206     if (packet == NULL) {
   1207         errno = ENOMEM;
   1208         return -1;
   1209     }
   1210 
   1211     Buffer* wrapper = bufferWrap(bytes, size, size);
   1212     if (wrapper == NULL) {
   1213         free(packet);
   1214         errno = ENOMEM;
   1215         return -1;
   1216     }
   1217 
   1218     SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
   1219     if (sharedBytesFreer == NULL) {
   1220         free(packet);
   1221         free(wrapper);
   1222         errno = ENOMEM;
   1223         return -1;
   1224     }
   1225     sharedBytesFreer->free = free;
   1226     sharedBytesFreer->context = context;
   1227 
   1228     packet->bytes = wrapper;
   1229     packet->context = sharedBytesFreer;
   1230     packet->header.type = BYTES;
   1231     packet->header.size = size;
   1232     packet->free = &outgoingPacketFreeSharedBytes;
   1233     bufferPrepareForWrite(packet->bytes);
   1234 
   1235     peerLock(peer);
   1236 
   1237     PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
   1238     if (peerProxy == NULL) {
   1239         // The peer is already dead or we couldn't alloc memory. Either way,
   1240         // errno is set.
   1241         peerUnlock(peer);
   1242         packet->free(packet);
   1243         return -1;
   1244     } else {
   1245         peerProxyEnqueueOutgoingPacket(peerProxy, packet);
   1246         peerUnlock(peer);
   1247         selectorWakeUp(peer->selector);
   1248         return 0;
   1249     }
   1250 }
   1251 
   1252 /**
   1253  * Starts the master peer. The master peer differs from other peers in that
   1254  * it is responsible for connecting the other peers. You can only have one
   1255  * master peer.
   1256  *
   1257  * Goes into an I/O loop and does not return.
   1258  */
   1259 void masterPeerInitialize(BytesListener* bytesListener,
   1260         DeathListener* deathListener) {
   1261     // Create and bind socket.
   1262     int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
   1263     if (listenerSocket == -1) {
   1264         LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
   1265     }
   1266     unlink(MASTER_PATH);
   1267     int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),
   1268             sizeof(UnixAddress));
   1269     if (result == -1) {
   1270         LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
   1271     }
   1272 
   1273     ALOGD("Listener socket: %d",  listenerSocket);
   1274 
   1275     // Queue up to 16 connections.
   1276     result = listen(listenerSocket, 16);
   1277     if (result != 0) {
   1278         LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
   1279     }
   1280 
   1281     // Make socket non-blocking.
   1282     setNonBlocking(listenerSocket);
   1283 
   1284     // Create the peer for this process. Fail if we already have one.
   1285     if (localPeer != NULL) {
   1286         LOG_ALWAYS_FATAL("Peer is already initialized.");
   1287     }
   1288     localPeer = peerCreate();
   1289     if (localPeer == NULL) {
   1290         LOG_ALWAYS_FATAL("malloc() failed.");
   1291     }
   1292     localPeer->master = true;
   1293     localPeer->onBytes = bytesListener;
   1294     localPeer->onDeath = deathListener;
   1295 
   1296     // Make listener socket selectable.
   1297     SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
   1298     if (listenerFd == NULL) {
   1299         LOG_ALWAYS_FATAL("malloc() error.");
   1300     }
   1301     listenerFd->data = localPeer;
   1302     listenerFd->onReadable = &masterAcceptConnection;
   1303 }
   1304 
   1305 /**
   1306  * Starts a local peer.
   1307  *
   1308  * Goes into an I/O loop and does not return.
   1309  */
   1310 void peerInitialize(BytesListener* bytesListener,
   1311         DeathListener* deathListener) {
   1312     // Connect to master peer.
   1313     int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
   1314     if (masterSocket == -1) {
   1315         LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
   1316     }
   1317     int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
   1318             sizeof(UnixAddress));
   1319     if (result != 0) {
   1320         LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
   1321     }
   1322 
   1323     // Create the peer for this process. Fail if we already have one.
   1324     if (localPeer != NULL) {
   1325         LOG_ALWAYS_FATAL("Peer is already initialized.");
   1326     }
   1327     localPeer = peerCreate();
   1328     if (localPeer == NULL) {
   1329         LOG_ALWAYS_FATAL("malloc() failed.");
   1330     }
   1331     localPeer->onBytes = bytesListener;
   1332     localPeer->onDeath = deathListener;
   1333 
   1334     // Make connection selectable.
   1335     SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
   1336     if (masterFd == NULL) {
   1337         LOG_ALWAYS_FATAL("malloc() error.");
   1338     }
   1339 
   1340     // Create a peer proxy for the master peer.
   1341     PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
   1342     if (masterProxy == NULL) {
   1343         LOG_ALWAYS_FATAL("malloc() error.");
   1344     }
   1345     peerProxySetFd(masterProxy, masterFd);
   1346     masterProxy->master = true;
   1347     localPeer->masterProxy = masterProxy;
   1348 }
   1349 
   1350 /** Starts the master peer I/O loop. Doesn't return. */
   1351 void peerLoop() {
   1352     assert(localPeer != NULL);
   1353 
   1354     // Start selector.
   1355     selectorLoop(localPeer->selector);
   1356 }
   1357 
   1358