Home | History | Annotate | Download | only in CoreIPC
      1 /*
      2  * Copyright (C) 2010 Apple Inc. All rights reserved.
      3  * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies)
      4  * Portions Copyright (c) 2010 Motorola Mobility, Inc.  All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
     16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
     17  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     18  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
     19  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
     25  * THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #ifndef Connection_h
     29 #define Connection_h
     30 
     31 #include "ArgumentDecoder.h"
     32 #include "ArgumentEncoder.h"
     33 #include "Arguments.h"
     34 #include "MessageID.h"
     35 #include "WorkQueue.h"
     36 #include <wtf/HashMap.h>
     37 #include <wtf/PassRefPtr.h>
     38 #include <wtf/OwnPtr.h>
     39 #include <wtf/Threading.h>
     40 
     41 #if PLATFORM(MAC)
     42 #include <mach/mach_port.h>
     43 #elif PLATFORM(WIN)
     44 #include <string>
     45 #elif PLATFORM(QT)
     46 class QSocketNotifier;
     47 #endif
     48 
     49 #if PLATFORM(QT) || PLATFORM(GTK)
     50 #include "PlatformProcessIdentifier.h"
     51 #endif
     52 
     53 class RunLoop;
     54 
     55 namespace CoreIPC {
     56 
     57 class MessageID;
     58 
     59 enum SyncReplyMode {
     60     AutomaticReply,
     61     ManualReply
     62 };
     63 
     64 enum MessageSendFlags {
     65     // Whether this message should be dispatched when waiting for a sync reply.
     66     // This is the default for synchronous messages.
     67     DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0,
     68 };
     69 
     70 #define MESSAGE_CHECK_BASE(assertion, connection) do \
     71     if (!(assertion)) { \
     72         ASSERT(assertion); \
     73         (connection)->markCurrentlyDispatchedMessageAsInvalid(); \
     74         return; \
     75     } \
     76 while (0)
     77 
     78 class Connection : public ThreadSafeRefCounted<Connection> {
     79 public:
     80     class MessageReceiver {
     81     protected:
     82         virtual ~MessageReceiver() { }
     83 
     84     public:
     85         virtual void didReceiveMessage(Connection*, MessageID, ArgumentDecoder*) = 0;
     86         virtual SyncReplyMode didReceiveSyncMessage(Connection*, MessageID, ArgumentDecoder*, ArgumentEncoder*) { ASSERT_NOT_REACHED(); return AutomaticReply; }
     87     };
     88 
     89     class Client : public MessageReceiver {
     90     protected:
     91         virtual ~Client() { }
     92 
     93     public:
     94         virtual void didClose(Connection*) = 0;
     95         virtual void didReceiveInvalidMessage(Connection*, MessageID) = 0;
     96         virtual void syncMessageSendTimedOut(Connection*) = 0;
     97 
     98 #if PLATFORM(WIN)
     99         virtual Vector<HWND> windowsToReceiveSentMessagesWhileWaitingForSyncReply() = 0;
    100 #endif
    101     };
    102 
    103 #if PLATFORM(MAC)
    104     typedef mach_port_t Identifier;
    105 #elif PLATFORM(WIN)
    106     typedef HANDLE Identifier;
    107     static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier);
    108 #elif USE(UNIX_DOMAIN_SOCKETS)
    109     typedef int Identifier;
    110 #endif
    111 
    112     static PassRefPtr<Connection> createServerConnection(Identifier, Client*, RunLoop* clientRunLoop);
    113     static PassRefPtr<Connection> createClientConnection(Identifier, Client*, RunLoop* clientRunLoop);
    114     ~Connection();
    115 
    116 #if PLATFORM(MAC)
    117     void setShouldCloseConnectionOnMachExceptions();
    118 #elif PLATFORM(QT) || PLATFORM(GTK)
    119     void setShouldCloseConnectionOnProcessTermination(WebKit::PlatformProcessIdentifier);
    120 #endif
    121 
    122     void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool);
    123     void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure);
    124 
    125     // The set callback will be called on the connection work queue when the connection is closed,
    126     // before didCall is called on the client thread. Must be called before the connection is opened.
    127     // In the future we might want a more generic way to handle sync or async messages directly
    128     // on the work queue, for example if we want to handle them on some other thread we could avoid
    129     // handling the message on the client thread first.
    130     typedef void (*DidCloseOnConnectionWorkQueueCallback)(WorkQueue&, Connection*);
    131     void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback);
    132 
    133     bool open();
    134     void invalidate();
    135     void markCurrentlyDispatchedMessageAsInvalid();
    136 
    137     void setDefaultSyncMessageTimeout(double);
    138 
    139     static const int DefaultTimeout = 0;
    140     static const int NoTimeout = -1;
    141 
    142     template<typename T> bool send(const T& message, uint64_t destinationID, unsigned messageSendFlags = 0);
    143     template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = DefaultTimeout);
    144     template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, double timeout);
    145 
    146     PassOwnPtr<ArgumentEncoder> createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID);
    147     bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>, unsigned messageSendFlags = 0);
    148     bool sendSyncReply(PassOwnPtr<ArgumentEncoder>);
    149 
    150     // FIXME: These variants of send, sendSync and waitFor are all deprecated.
    151     // All clients should move to the overloads that take a message type.
    152     template<typename E, typename T> bool deprecatedSend(E messageID, uint64_t destinationID, const T& arguments);
    153     template<typename E, typename T, typename U> bool deprecatedSendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout);
    154     template<typename E> PassOwnPtr<ArgumentDecoder> deprecatedWaitFor(E messageID, uint64_t destinationID, double timeout);
    155 
    156 private:
    157     template<typename T> class Message {
    158     public:
    159         Message()
    160             : m_arguments(0)
    161         {
    162         }
    163 
    164         Message(MessageID messageID, PassOwnPtr<T> arguments)
    165             : m_messageID(messageID)
    166             , m_arguments(arguments.leakPtr())
    167         {
    168         }
    169 
    170         MessageID messageID() const { return m_messageID; }
    171         uint64_t destinationID() const { return m_arguments->destinationID(); }
    172 
    173         T* arguments() const { return m_arguments; }
    174 
    175         PassOwnPtr<T> releaseArguments()
    176         {
    177             T* arguments = m_arguments;
    178             m_arguments = 0;
    179 
    180             return arguments;
    181         }
    182 
    183     private:
    184         MessageID m_messageID;
    185         T* m_arguments;
    186     };
    187 
    188 public:
    189     typedef Message<ArgumentEncoder> OutgoingMessage;
    190 
    191 private:
    192     Connection(Identifier, bool isServer, Client*, RunLoop* clientRunLoop);
    193     void platformInitialize(Identifier);
    194     void platformInvalidate();
    195 
    196     bool isValid() const { return m_client; }
    197 
    198     PassOwnPtr<ArgumentDecoder> waitForMessage(MessageID, uint64_t destinationID, double timeout);
    199 
    200     PassOwnPtr<ArgumentDecoder> sendSyncMessage(MessageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder>, double timeout);
    201     PassOwnPtr<ArgumentDecoder> waitForSyncReply(uint64_t syncRequestID, double timeout);
    202 
    203     // Called on the connection work queue.
    204     void processIncomingMessage(MessageID, PassOwnPtr<ArgumentDecoder>);
    205     void processIncomingSyncReply(PassOwnPtr<ArgumentDecoder>);
    206 
    207     bool canSendOutgoingMessages() const;
    208     bool platformCanSendOutgoingMessages() const;
    209     void sendOutgoingMessages();
    210     bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
    211     void connectionDidClose();
    212 
    213     typedef Message<ArgumentDecoder> IncomingMessage;
    214 
    215     // Called on the listener thread.
    216     void dispatchConnectionDidClose();
    217     void dispatchMessage(IncomingMessage&);
    218     void dispatchMessages();
    219     void dispatchSyncMessage(MessageID, ArgumentDecoder*);
    220     void didFailToSendSyncMessage();
    221 
    222     // Can be called on any thread.
    223     void enqueueIncomingMessage(IncomingMessage&);
    224 
    225     Client* m_client;
    226     bool m_isServer;
    227     uint64_t m_syncRequestID;
    228 
    229     bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage;
    230     bool m_shouldExitOnSyncMessageSendFailure;
    231     DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback;
    232 
    233     bool m_isConnected;
    234     WorkQueue m_connectionQueue;
    235     RunLoop* m_clientRunLoop;
    236 
    237     unsigned m_inDispatchMessageCount;
    238     unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount;
    239     bool m_didReceiveInvalidMessage;
    240 
    241     double m_defaultSyncMessageTimeout;
    242 
    243     // Incoming messages.
    244     Mutex m_incomingMessagesLock;
    245     Vector<IncomingMessage> m_incomingMessages;
    246 
    247     // Outgoing messages.
    248     Mutex m_outgoingMessagesLock;
    249     Deque<OutgoingMessage> m_outgoingMessages;
    250 
    251     ThreadCondition m_waitForMessageCondition;
    252     Mutex m_waitForMessageMutex;
    253     HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*> m_waitForMessageMap;
    254 
    255     // Represents a sync request for which we're waiting on a reply.
    256     struct PendingSyncReply {
    257         // The request ID.
    258         uint64_t syncRequestID;
    259 
    260         // The reply decoder, will be null if there was an error processing the sync
    261         // message on the other side.
    262         ArgumentDecoder* replyDecoder;
    263 
    264         // Will be set to true once a reply has been received or an error occurred.
    265         bool didReceiveReply;
    266 
    267         PendingSyncReply()
    268             : syncRequestID(0)
    269             , replyDecoder(0)
    270             , didReceiveReply(false)
    271         {
    272         }
    273 
    274         explicit PendingSyncReply(uint64_t syncRequestID)
    275             : syncRequestID(syncRequestID)
    276             , replyDecoder(0)
    277             , didReceiveReply(0)
    278         {
    279         }
    280 
    281         PassOwnPtr<ArgumentDecoder> releaseReplyDecoder()
    282         {
    283             OwnPtr<ArgumentDecoder> reply = adoptPtr(replyDecoder);
    284             replyDecoder = 0;
    285 
    286             return reply.release();
    287         }
    288     };
    289 
    290     class SyncMessageState;
    291     friend class SyncMessageState;
    292     RefPtr<SyncMessageState> m_syncMessageState;
    293 
    294     Mutex m_syncReplyStateMutex;
    295     bool m_shouldWaitForSyncReplies;
    296     Vector<PendingSyncReply> m_pendingSyncReplies;
    297 
    298 #if PLATFORM(MAC)
    299     // Called on the connection queue.
    300     void receiveSourceEventHandler();
    301     void initializeDeadNameSource();
    302     void exceptionSourceEventHandler();
    303 
    304     mach_port_t m_sendPort;
    305     mach_port_t m_receivePort;
    306 
    307     // If setShouldCloseConnectionOnMachExceptions has been called, this has
    308     // the exception port that exceptions from the other end will be sent on.
    309     mach_port_t m_exceptionPort;
    310 
    311 #elif PLATFORM(WIN)
    312     // Called on the connection queue.
    313     void readEventHandler();
    314     void writeEventHandler();
    315 
    316     Vector<uint8_t> m_readBuffer;
    317     OVERLAPPED m_readState;
    318     OwnPtr<ArgumentEncoder> m_pendingWriteArguments;
    319     OVERLAPPED m_writeState;
    320     HANDLE m_connectionPipe;
    321 #elif USE(UNIX_DOMAIN_SOCKETS)
    322     // Called on the connection queue.
    323     void readyReadHandler();
    324 
    325     Vector<uint8_t> m_readBuffer;
    326     size_t m_currentMessageSize;
    327     int m_socketDescriptor;
    328 
    329 #if PLATFORM(QT)
    330     QSocketNotifier* m_socketNotifier;
    331 #endif
    332 #endif
    333 };
    334 
    335 template<typename T> bool Connection::send(const T& message, uint64_t destinationID, unsigned messageSendFlags)
    336 {
    337     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
    338     argumentEncoder->encode(message);
    339 
    340     return sendMessage(MessageID(T::messageID), argumentEncoder.release(), messageSendFlags);
    341 }
    342 
    343 template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout)
    344 {
    345     uint64_t syncRequestID = 0;
    346     OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
    347 
    348     // Encode the rest of the input arguments.
    349     argumentEncoder->encode(message);
    350 
    351     // Now send the message and wait for a reply.
    352     OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(T::messageID), syncRequestID, argumentEncoder.release(), timeout);
    353     if (!replyDecoder)
    354         return false;
    355 
    356     // Decode the reply.
    357     return replyDecoder->decode(const_cast<typename T::Reply&>(reply));
    358 }
    359 
    360 template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, double timeout)
    361 {
    362     OwnPtr<ArgumentDecoder> decoder = waitForMessage(MessageID(T::messageID), destinationID, timeout);
    363     if (!decoder)
    364         return false;
    365 
    366     ASSERT(decoder->destinationID() == destinationID);
    367     m_client->didReceiveMessage(this, MessageID(T::messageID), decoder.get());
    368     return true;
    369 }
    370 
    371 // These three member functions are all deprecated.
    372 
    373 template<typename E, typename T, typename U>
    374 inline bool Connection::deprecatedSendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout)
    375 {
    376     uint64_t syncRequestID = 0;
    377     OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
    378 
    379     // Encode the input arguments.
    380     argumentEncoder->encode(arguments);
    381 
    382     // Now send the message and wait for a reply.
    383     OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(messageID), syncRequestID, argumentEncoder.release(), timeout);
    384     if (!replyDecoder)
    385         return false;
    386 
    387     // Decode the reply.
    388     return replyDecoder->decode(const_cast<U&>(reply));
    389 }
    390 
    391 template<typename E, typename T>
    392 bool Connection::deprecatedSend(E messageID, uint64_t destinationID, const T& arguments)
    393 {
    394     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
    395     argumentEncoder->encode(arguments);
    396 
    397     return sendMessage(MessageID(messageID), argumentEncoder.release());
    398 }
    399 
    400 template<typename E> inline PassOwnPtr<ArgumentDecoder> Connection::deprecatedWaitFor(E messageID, uint64_t destinationID, double timeout)
    401 {
    402     return waitForMessage(MessageID(messageID), destinationID, timeout);
    403 }
    404 
    405 } // namespace CoreIPC
    406 
    407 #endif // Connection_h
    408