Home | History | Annotate | Download | only in websockets
      1 /*
      2  * Copyright (C) 2009, 2010 Google Inc.  All rights reserved.
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions are
      6  * met:
      7  *
      8  *     * Redistributions of source code must retain the above copyright
      9  * notice, this list of conditions and the following disclaimer.
     10  *     * Redistributions in binary form must reproduce the above
     11  * copyright notice, this list of conditions and the following disclaimer
     12  * in the documentation and/or other materials provided with the
     13  * distribution.
     14  *     * Neither the name of Google Inc. nor the names of its
     15  * contributors may be used to endorse or promote products derived from
     16  * this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 
     31 #include "config.h"
     32 
     33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
     34 
     35 #include "WorkerThreadableWebSocketChannel.h"
     36 
     37 #include "GenericWorkerTask.h"
     38 #include "PlatformString.h"
     39 #include "ScriptExecutionContext.h"
     40 #include "ThreadableWebSocketChannelClientWrapper.h"
     41 #include "WebSocketChannel.h"
     42 #include "WebSocketChannelClient.h"
     43 #include "WorkerContext.h"
     44 #include "WorkerLoaderProxy.h"
     45 #include "WorkerRunLoop.h"
     46 #include "WorkerThread.h"
     47 
     48 #include <wtf/PassRefPtr.h>
     49 
     50 namespace WebCore {
     51 
     52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol)
     53     : m_workerContext(context)
     54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
     55     , m_bridge(new Bridge(m_workerClientWrapper, m_workerContext, taskMode, url, protocol))
     56 {
     57 }
     58 
     59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
     60 {
     61     if (m_bridge)
     62         m_bridge->disconnect();
     63 }
     64 
     65 void WorkerThreadableWebSocketChannel::connect()
     66 {
     67     if (m_bridge)
     68         m_bridge->connect();
     69 }
     70 
     71 bool WorkerThreadableWebSocketChannel::send(const String& message)
     72 {
     73     if (!m_bridge)
     74         return false;
     75     return m_bridge->send(message);
     76 }
     77 
     78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
     79 {
     80     if (!m_bridge)
     81         return 0;
     82     return m_bridge->bufferedAmount();
     83 }
     84 
     85 void WorkerThreadableWebSocketChannel::close()
     86 {
     87     if (m_bridge)
     88         m_bridge->close();
     89 }
     90 
     91 void WorkerThreadableWebSocketChannel::disconnect()
     92 {
     93     m_bridge->disconnect();
     94     m_bridge.clear();
     95 }
     96 
     97 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
     98     : m_workerClientWrapper(clientWrapper)
     99     , m_loaderProxy(loaderProxy)
    100     , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol))
    101     , m_taskMode(taskMode)
    102 {
    103     ASSERT(isMainThread());
    104 }
    105 
    106 WorkerThreadableWebSocketChannel::Peer::~Peer()
    107 {
    108     ASSERT(isMainThread());
    109     if (m_mainWebSocketChannel)
    110         m_mainWebSocketChannel->disconnect();
    111 }
    112 
    113 void WorkerThreadableWebSocketChannel::Peer::connect()
    114 {
    115     ASSERT(isMainThread());
    116     if (!m_mainWebSocketChannel)
    117         return;
    118     m_mainWebSocketChannel->connect();
    119 }
    120 
    121 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent)
    122 {
    123     ASSERT_UNUSED(context, context->isWorkerContext());
    124     workerClientWrapper->setSent(sent);
    125 }
    126 
    127 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
    128 {
    129     ASSERT(isMainThread());
    130     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    131         return;
    132     bool sent = m_mainWebSocketChannel->send(message);
    133     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
    134 }
    135 
    136 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
    137 {
    138     ASSERT_UNUSED(context, context->isWorkerContext());
    139     workerClientWrapper->setBufferedAmount(bufferedAmount);
    140 }
    141 
    142 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
    143 {
    144     ASSERT(isMainThread());
    145     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    146         return;
    147     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
    148     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
    149 }
    150 
    151 void WorkerThreadableWebSocketChannel::Peer::close()
    152 {
    153     ASSERT(isMainThread());
    154     if (!m_mainWebSocketChannel)
    155         return;
    156     m_mainWebSocketChannel->close();
    157     m_mainWebSocketChannel = 0;
    158 }
    159 
    160 void WorkerThreadableWebSocketChannel::Peer::disconnect()
    161 {
    162     ASSERT(isMainThread());
    163     if (!m_mainWebSocketChannel)
    164         return;
    165     m_mainWebSocketChannel->disconnect();
    166     m_mainWebSocketChannel = 0;
    167 }
    168 
    169 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    170 {
    171     ASSERT_UNUSED(context, context->isWorkerContext());
    172     workerClientWrapper->didConnect();
    173 }
    174 
    175 void WorkerThreadableWebSocketChannel::Peer::didConnect()
    176 {
    177     ASSERT(isMainThread());
    178     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
    179 }
    180 
    181 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
    182 {
    183     ASSERT_UNUSED(context, context->isWorkerContext());
    184     workerClientWrapper->didReceiveMessage(message);
    185 }
    186 
    187 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
    188 {
    189     ASSERT(isMainThread());
    190     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
    191 }
    192 
    193 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount)
    194 {
    195     ASSERT_UNUSED(context, context->isWorkerContext());
    196     workerClientWrapper->didClose(unhandledBufferedAmount);
    197 }
    198 
    199 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount)
    200 {
    201     ASSERT(isMainThread());
    202     m_mainWebSocketChannel = 0;
    203     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode);
    204 }
    205 
    206 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    207 {
    208     ASSERT_UNUSED(context, context->isWorkerContext());
    209     thisPtr->m_peer = peer;
    210     workerClientWrapper->setSyncMethodDone();
    211 }
    212 
    213 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol)
    214 {
    215     ASSERT(isMainThread());
    216     ASSERT_UNUSED(context, context->isDocument());
    217 
    218     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol);
    219     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode);
    220 }
    221 
    222 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol)
    223     : m_workerClientWrapper(workerClientWrapper)
    224     , m_workerContext(workerContext)
    225     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
    226     , m_taskMode(taskMode)
    227     , m_peer(0)
    228 {
    229     ASSERT(m_workerClientWrapper.get());
    230     setMethodNotCompleted();
    231     m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol));
    232     waitForMethodCompletion();
    233     ASSERT(m_peer);
    234 }
    235 
    236 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
    237 {
    238     disconnect();
    239 }
    240 
    241 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer)
    242 {
    243     ASSERT(isMainThread());
    244     ASSERT_UNUSED(context, context->isDocument());
    245     ASSERT(peer);
    246 
    247     peer->connect();
    248 }
    249 
    250 void WorkerThreadableWebSocketChannel::Bridge::connect()
    251 {
    252     ASSERT(m_workerClientWrapper);
    253     ASSERT(m_peer);
    254     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer));
    255 }
    256 
    257 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
    258 {
    259     ASSERT(isMainThread());
    260     ASSERT_UNUSED(context, context->isDocument());
    261     ASSERT(peer);
    262 
    263     peer->send(message);
    264 }
    265 
    266 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
    267 {
    268     if (!m_workerClientWrapper)
    269         return false;
    270     ASSERT(m_peer);
    271     setMethodNotCompleted();
    272     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message));
    273     waitForMethodCompletion();
    274     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    275     return clientWrapper && clientWrapper->sent();
    276 }
    277 
    278 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
    279 {
    280     ASSERT(isMainThread());
    281     ASSERT_UNUSED(context, context->isDocument());
    282     ASSERT(peer);
    283 
    284     peer->bufferedAmount();
    285 }
    286 
    287 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
    288 {
    289     if (!m_workerClientWrapper)
    290         return 0;
    291     ASSERT(m_peer);
    292     setMethodNotCompleted();
    293     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer));
    294     waitForMethodCompletion();
    295     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    296     if (clientWrapper)
    297         return clientWrapper->bufferedAmount();
    298     return 0;
    299 }
    300 
    301 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
    302 {
    303     ASSERT(isMainThread());
    304     ASSERT_UNUSED(context, context->isDocument());
    305     ASSERT(peer);
    306 
    307     peer->close();
    308 }
    309 
    310 void WorkerThreadableWebSocketChannel::Bridge::close()
    311 {
    312     ASSERT(m_peer);
    313     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer));
    314 }
    315 
    316 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
    317 {
    318     ASSERT(isMainThread());
    319     ASSERT_UNUSED(context, context->isDocument());
    320     ASSERT(peer);
    321 
    322     delete peer;
    323 }
    324 
    325 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
    326 {
    327     clearClientWrapper();
    328     if (m_peer) {
    329         Peer* peer = m_peer;
    330         m_peer = 0;
    331         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer));
    332     }
    333     m_workerContext = 0;
    334 }
    335 
    336 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
    337 {
    338     m_workerClientWrapper->clearClient();
    339 }
    340 
    341 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
    342 {
    343     ASSERT(m_workerClientWrapper);
    344     m_workerClientWrapper->clearSyncMethodDone();
    345 }
    346 
    347 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
    348 {
    349     if (!m_workerContext)
    350         return;
    351     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
    352     MessageQueueWaitResult result = MessageQueueMessageReceived;
    353     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    354     while (clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
    355         result = runLoop.runInMode(m_workerContext.get(), m_taskMode);
    356         clientWrapper = m_workerClientWrapper.get();
    357     }
    358 }
    359 
    360 } // namespace WebCore
    361 
    362 #endif // ENABLE(WEB_SOCKETS)
    363