Home | History | Annotate | Download | only in CoreIPC
      1 /*
      2  * Copyright (C) 2010 Apple Inc. All rights reserved.
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions
      6  * are met:
      7  * 1. Redistributions of source code must retain the above copyright
      8  *    notice, this list of conditions and the following disclaimer.
      9  * 2. Redistributions in binary form must reproduce the above copyright
     10  *    notice, this list of conditions and the following disclaimer in the
     11  *    documentation and/or other materials provided with the distribution.
     12  *
     13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
     14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
     15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
     17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
     23  * THE POSSIBILITY OF SUCH DAMAGE.
     24  */
     25 
     26 #include "config.h"
     27 #include "Connection.h"
     28 
     29 #include "BinarySemaphore.h"
     30 #include "CoreIPCMessageKinds.h"
     31 #include "RunLoop.h"
     32 #include "WebProcess.h"
     33 #include "WorkItem.h"
     34 #include <wtf/CurrentTime.h>
     35 
     36 using namespace std;
     37 
     38 namespace CoreIPC {
     39 
     40 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
     41 public:
     42     static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
     43     ~SyncMessageState();
     44 
     45     void wakeUpClientRunLoop()
     46     {
     47         m_waitForSyncReplySemaphore.signal();
     48     }
     49 
     50     bool wait(double absoluteTime)
     51     {
     52         return m_waitForSyncReplySemaphore.wait(absoluteTime);
     53     }
     54 
     55 #if PLATFORM(WIN)
     56     bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages)
     57     {
     58         return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime);
     59     }
     60 #endif
     61 
     62     // Returns true if this message will be handled on a client thread that is currently
     63     // waiting for a reply to a synchronous message.
     64     bool processIncomingMessage(Connection*, IncomingMessage&);
     65 
     66     void dispatchMessages();
     67 
     68 private:
     69     explicit SyncMessageState(RunLoop*);
     70 
     71     typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
     72     static SyncMessageStateMap& syncMessageStateMap()
     73     {
     74         DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
     75         return syncMessageStateMap;
     76     }
     77 
     78     static Mutex& syncMessageStateMapMutex()
     79     {
     80         DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
     81         return syncMessageStateMapMutex;
     82     }
     83 
     84     void dispatchMessageAndResetDidScheduleDispatchMessagesWork();
     85 
     86     RunLoop* m_runLoop;
     87     BinarySemaphore m_waitForSyncReplySemaphore;
     88 
     89     // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply.
     90     Mutex m_mutex;
     91 
     92     bool m_didScheduleDispatchMessagesWork;
     93 
     94     struct ConnectionAndIncomingMessage {
     95         RefPtr<Connection> connection;
     96         IncomingMessage incomingMessage;
     97     };
     98     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
     99 };
    100 
    101 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
    102 {
    103     MutexLocker locker(syncMessageStateMapMutex());
    104     pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
    105 
    106     if (!result.second) {
    107         ASSERT(result.first->second);
    108         return result.first->second;
    109     }
    110 
    111     RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
    112     result.first->second = syncMessageState.get();
    113 
    114     return syncMessageState.release();
    115 }
    116 
    117 Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
    118     : m_runLoop(runLoop)
    119     , m_didScheduleDispatchMessagesWork(false)
    120 {
    121 }
    122 
    123 Connection::SyncMessageState::~SyncMessageState()
    124 {
    125     MutexLocker locker(syncMessageStateMapMutex());
    126 
    127     ASSERT(syncMessageStateMap().contains(m_runLoop));
    128     syncMessageStateMap().remove(m_runLoop);
    129 }
    130 
    131 bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
    132 {
    133     MessageID messageID = incomingMessage.messageID();
    134     if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply())
    135         return false;
    136 
    137     ConnectionAndIncomingMessage connectionAndIncomingMessage;
    138     connectionAndIncomingMessage.connection = connection;
    139     connectionAndIncomingMessage.incomingMessage = incomingMessage;
    140 
    141     {
    142         MutexLocker locker(m_mutex);
    143 
    144         if (!m_didScheduleDispatchMessagesWork) {
    145             m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork));
    146             m_didScheduleDispatchMessagesWork = true;
    147         }
    148 
    149         m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
    150     }
    151 
    152     wakeUpClientRunLoop();
    153 
    154     return true;
    155 }
    156 
    157 void Connection::SyncMessageState::dispatchMessages()
    158 {
    159     ASSERT(m_runLoop == RunLoop::current());
    160 
    161     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
    162 
    163     {
    164         MutexLocker locker(m_mutex);
    165         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
    166     }
    167 
    168     for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
    169         ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
    170         connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
    171     }
    172 }
    173 
    174 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork()
    175 {
    176     {
    177         MutexLocker locker(m_mutex);
    178         ASSERT(m_didScheduleDispatchMessagesWork);
    179         m_didScheduleDispatchMessagesWork = false;
    180     }
    181 
    182     dispatchMessages();
    183 }
    184 
    185 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
    186 {
    187     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
    188 }
    189 
    190 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
    191 {
    192     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
    193 }
    194 
    195 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
    196     : m_client(client)
    197     , m_isServer(isServer)
    198     , m_syncRequestID(0)
    199     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
    200     , m_shouldExitOnSyncMessageSendFailure(false)
    201     , m_didCloseOnConnectionWorkQueueCallback(0)
    202     , m_isConnected(false)
    203     , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
    204     , m_clientRunLoop(clientRunLoop)
    205     , m_inDispatchMessageCount(0)
    206     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
    207     , m_didReceiveInvalidMessage(false)
    208     , m_defaultSyncMessageTimeout(NoTimeout)
    209     , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
    210     , m_shouldWaitForSyncReplies(true)
    211 {
    212     ASSERT(m_client);
    213 
    214     platformInitialize(identifier);
    215 }
    216 
    217 Connection::~Connection()
    218 {
    219     ASSERT(!isValid());
    220 
    221     m_connectionQueue.invalidate();
    222 }
    223 
    224 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
    225 {
    226     ASSERT(!m_isConnected);
    227 
    228     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
    229 }
    230 
    231 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
    232 {
    233     ASSERT(!m_isConnected);
    234 
    235     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
    236 }
    237 
    238 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
    239 {
    240     ASSERT(!m_isConnected);
    241 
    242     m_didCloseOnConnectionWorkQueueCallback = callback;
    243 }
    244 
    245 void Connection::invalidate()
    246 {
    247     if (!isValid()) {
    248         // Someone already called invalidate().
    249         return;
    250     }
    251 
    252     // Reset the client.
    253     m_client = 0;
    254 
    255     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
    256 }
    257 
    258 void Connection::markCurrentlyDispatchedMessageAsInvalid()
    259 {
    260     // This should only be called while processing a message.
    261     ASSERT(m_inDispatchMessageCount > 0);
    262 
    263     m_didReceiveInvalidMessage = true;
    264 }
    265 
    266 void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)
    267 {
    268     ASSERT(defaultSyncMessageTimeout != DefaultTimeout);
    269 
    270     m_defaultSyncMessageTimeout = defaultSyncMessageTimeout;
    271 }
    272 
    273 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
    274 {
    275     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
    276 
    277     // Encode the sync request ID.
    278     syncRequestID = ++m_syncRequestID;
    279     argumentEncoder->encode(syncRequestID);
    280 
    281     return argumentEncoder.release();
    282 }
    283 
    284 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
    285 {
    286     if (!isValid())
    287         return false;
    288 
    289     if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
    290         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
    291             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
    292         messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
    293 
    294     MutexLocker locker(m_outgoingMessagesLock);
    295     m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
    296 
    297     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
    298     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
    299     return true;
    300 }
    301 
    302 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
    303 {
    304     return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
    305 }
    306 
    307 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
    308 {
    309     // First, check if this message is already in the incoming messages queue.
    310     {
    311         MutexLocker locker(m_incomingMessagesLock);
    312 
    313         for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
    314             const IncomingMessage& message = m_incomingMessages[i];
    315 
    316             if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
    317                 OwnPtr<ArgumentDecoder> arguments(message.arguments());
    318 
    319                 // Erase the incoming message.
    320                 m_incomingMessages.remove(i);
    321                 return arguments.release();
    322             }
    323         }
    324     }
    325 
    326     double absoluteTime = currentTime() + timeout;
    327 
    328     std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
    329 
    330     {
    331         MutexLocker locker(m_waitForMessageMutex);
    332 
    333         // We don't support having multiple clients wait for the same message.
    334         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
    335 
    336         // Insert our pending wait.
    337         m_waitForMessageMap.set(messageAndDestination, 0);
    338     }
    339 
    340     // Now wait for it to be set.
    341     while (true) {
    342         MutexLocker locker(m_waitForMessageMutex);
    343 
    344         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
    345         if (it->second) {
    346             OwnPtr<ArgumentDecoder> arguments(it->second);
    347             m_waitForMessageMap.remove(it);
    348 
    349             return arguments.release();
    350         }
    351 
    352         // Now we wait.
    353         if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
    354             // We timed out, now remove the pending wait.
    355             m_waitForMessageMap.remove(messageAndDestination);
    356 
    357             break;
    358         }
    359     }
    360 
    361     return PassOwnPtr<ArgumentDecoder>();
    362 }
    363 
    364 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
    365 {
    366     // We only allow sending sync messages from the client run loop.
    367     ASSERT(RunLoop::current() == m_clientRunLoop);
    368 
    369     if (!isValid()) {
    370         didFailToSendSyncMessage();
    371         return 0;
    372     }
    373 
    374     // Push the pending sync reply information on our stack.
    375     {
    376         MutexLocker locker(m_syncReplyStateMutex);
    377         if (!m_shouldWaitForSyncReplies) {
    378             didFailToSendSyncMessage();
    379             return 0;
    380         }
    381 
    382         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
    383     }
    384 
    385     // First send the message.
    386     sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply);
    387 
    388     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
    389     // keep an extra reference to the connection here in case it's invalidated.
    390     RefPtr<Connection> protect(this);
    391     OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
    392 
    393     // Finally, pop the pending sync reply information.
    394     {
    395         MutexLocker locker(m_syncReplyStateMutex);
    396         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
    397         m_pendingSyncReplies.removeLast();
    398     }
    399 
    400     if (!reply)
    401         didFailToSendSyncMessage();
    402 
    403     return reply.release();
    404 }
    405 
    406 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
    407 {
    408     if (timeout == DefaultTimeout)
    409         timeout = m_defaultSyncMessageTimeout;
    410 
    411     // Use a really long timeout.
    412     if (timeout == NoTimeout)
    413         timeout = 1e10;
    414 
    415     double absoluteTime = currentTime() + timeout;
    416 
    417     bool timedOut = false;
    418     while (!timedOut) {
    419         // First, check if we have any messages that we need to process.
    420         m_syncMessageState->dispatchMessages();
    421 
    422         {
    423             MutexLocker locker(m_syncReplyStateMutex);
    424 
    425             // Second, check if there is a sync reply at the top of the stack.
    426             ASSERT(!m_pendingSyncReplies.isEmpty());
    427 
    428             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
    429             ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
    430 
    431             // We found the sync reply, or the connection was closed.
    432             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
    433                 return pendingSyncReply.releaseReplyDecoder();
    434         }
    435 
    436         // We didn't find a sync reply yet, keep waiting.
    437 #if PLATFORM(WIN)
    438         timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply());
    439 #else
    440         timedOut = !m_syncMessageState->wait(absoluteTime);
    441 #endif
    442     }
    443 
    444     // We timed out.
    445     if (m_client)
    446         m_client->syncMessageSendTimedOut(this);
    447 
    448     return 0;
    449 }
    450 
    451 void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)
    452 {
    453     MutexLocker locker(m_syncReplyStateMutex);
    454     ASSERT(!m_pendingSyncReplies.isEmpty());
    455 
    456     // Go through the stack of sync requests that have pending replies and see which one
    457     // this reply is for.
    458     for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
    459         PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
    460 
    461         if (pendingSyncReply.syncRequestID != arguments->destinationID())
    462             continue;
    463 
    464         ASSERT(!pendingSyncReply.replyDecoder);
    465 
    466         pendingSyncReply.replyDecoder = arguments.leakPtr();
    467         pendingSyncReply.didReceiveReply = true;
    468 
    469         // We got a reply to the last send message, wake up the client run loop so it can be processed.
    470         if (i == m_pendingSyncReplies.size())
    471             m_syncMessageState->wakeUpClientRunLoop();
    472 
    473         return;
    474     }
    475 
    476     // We got a reply for a message we never sent.
    477     // FIXME: Dispatch a didReceiveInvalidMessage callback on the client.
    478     ASSERT_NOT_REACHED();
    479 }
    480 
    481 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
    482 {
    483     // Check if this is a sync reply.
    484     if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
    485         processIncomingSyncReply(arguments);
    486         return;
    487     }
    488 
    489     IncomingMessage incomingMessage(messageID, arguments);
    490 
    491     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
    492     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
    493     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
    494     if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
    495         return;
    496 
    497     // Check if we're waiting for this message.
    498     {
    499         MutexLocker locker(m_waitForMessageMutex);
    500 
    501         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
    502         if (it != m_waitForMessageMap.end()) {
    503             it->second = incomingMessage.releaseArguments().leakPtr();
    504             ASSERT(it->second);
    505 
    506             m_waitForMessageCondition.signal();
    507             return;
    508         }
    509     }
    510 
    511     enqueueIncomingMessage(incomingMessage);
    512 }
    513 
    514 void Connection::connectionDidClose()
    515 {
    516     // The connection is now invalid.
    517     platformInvalidate();
    518 
    519     {
    520         MutexLocker locker(m_syncReplyStateMutex);
    521 
    522         ASSERT(m_shouldWaitForSyncReplies);
    523         m_shouldWaitForSyncReplies = false;
    524 
    525         if (!m_pendingSyncReplies.isEmpty())
    526             m_syncMessageState->wakeUpClientRunLoop();
    527     }
    528 
    529     if (m_didCloseOnConnectionWorkQueueCallback)
    530         m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
    531 
    532     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
    533 }
    534 
    535 void Connection::dispatchConnectionDidClose()
    536 {
    537     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
    538     // then the client will be null here.
    539     if (!m_client)
    540         return;
    541 
    542 
    543     // Because we define a connection as being "valid" based on wheter it has a null client, we null out
    544     // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
    545     // will then wait indefinitely for a reply.
    546     Client* client = m_client;
    547     m_client = 0;
    548 
    549     client->didClose(this);
    550 }
    551 
    552 bool Connection::canSendOutgoingMessages() const
    553 {
    554     return m_isConnected && platformCanSendOutgoingMessages();
    555 }
    556 
    557 void Connection::sendOutgoingMessages()
    558 {
    559     if (!canSendOutgoingMessages())
    560         return;
    561 
    562     while (true) {
    563         OutgoingMessage message;
    564         {
    565             MutexLocker locker(m_outgoingMessagesLock);
    566             if (m_outgoingMessages.isEmpty())
    567                 break;
    568             message = m_outgoingMessages.takeFirst();
    569         }
    570 
    571         if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
    572             break;
    573     }
    574 }
    575 
    576 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
    577 {
    578     ASSERT(messageID.isSync());
    579 
    580     // Decode the sync request ID.
    581     uint64_t syncRequestID = 0;
    582 
    583     if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
    584         // We received an invalid sync message.
    585         arguments->markInvalid();
    586         return;
    587     }
    588 
    589     // Create our reply encoder.
    590     ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr();
    591 
    592     // Hand off both the decoder and encoder to the client..
    593     SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
    594 
    595     // FIXME: If the message was invalid, we should send back a SyncMessageError.
    596     ASSERT(!arguments->isInvalid());
    597 
    598     if (syncReplyMode == ManualReply) {
    599         // The client will take ownership of the reply encoder and send it at some point in the future.
    600         // We won't do anything here.
    601         return;
    602     }
    603 
    604     // Send the reply.
    605     sendSyncReply(replyEncoder);
    606 }
    607 
    608 void Connection::didFailToSendSyncMessage()
    609 {
    610     if (!m_shouldExitOnSyncMessageSendFailure)
    611         return;
    612 
    613     exit(0);
    614 }
    615 
    616 void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
    617 {
    618     MutexLocker locker(m_incomingMessagesLock);
    619     m_incomingMessages.append(incomingMessage);
    620 
    621     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
    622 }
    623 
    624 void Connection::dispatchMessage(IncomingMessage& message)
    625 {
    626     OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
    627 
    628     // If there's no client, return. We do this after calling releaseArguments so that
    629     // the ArgumentDecoder message will be freed.
    630     if (!m_client)
    631         return;
    632 
    633     m_inDispatchMessageCount++;
    634 
    635     if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
    636         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
    637 
    638     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
    639     m_didReceiveInvalidMessage = false;
    640 
    641     if (message.messageID().isSync())
    642         dispatchSyncMessage(message.messageID(), arguments.get());
    643     else
    644         m_client->didReceiveMessage(this, message.messageID(), arguments.get());
    645 
    646     m_didReceiveInvalidMessage |= arguments->isInvalid();
    647     m_inDispatchMessageCount--;
    648 
    649     if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
    650         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
    651 
    652     if (m_didReceiveInvalidMessage && m_client)
    653         m_client->didReceiveInvalidMessage(this, message.messageID());
    654 
    655     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
    656 }
    657 
    658 void Connection::dispatchMessages()
    659 {
    660     Vector<IncomingMessage> incomingMessages;
    661 
    662     {
    663         MutexLocker locker(m_incomingMessagesLock);
    664         m_incomingMessages.swap(incomingMessages);
    665     }
    666 
    667     for (size_t i = 0; i < incomingMessages.size(); ++i)
    668         dispatchMessage(incomingMessages[i]);
    669 }
    670 
    671 } // namespace CoreIPC
    672