Home | History | Annotate | Download | only in websockets
      1 /*
      2  * Copyright (C) 2009, 2010 Google Inc.  All rights reserved.
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions are
      6  * met:
      7  *
      8  *     * Redistributions of source code must retain the above copyright
      9  * notice, this list of conditions and the following disclaimer.
     10  *     * Redistributions in binary form must reproduce the above
     11  * copyright notice, this list of conditions and the following disclaimer
     12  * in the documentation and/or other materials provided with the
     13  * distribution.
     14  *     * Neither the name of Google Inc. nor the names of its
     15  * contributors may be used to endorse or promote products derived from
     16  * this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 
     31 #include "config.h"
     32 
     33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
     34 
     35 #include "WorkerThreadableWebSocketChannel.h"
     36 
     37 #include "CrossThreadTask.h"
     38 #include "PlatformString.h"
     39 #include "ScriptExecutionContext.h"
     40 #include "ThreadableWebSocketChannelClientWrapper.h"
     41 #include "WebSocketChannel.h"
     42 #include "WebSocketChannelClient.h"
     43 #include "WorkerContext.h"
     44 #include "WorkerLoaderProxy.h"
     45 #include "WorkerRunLoop.h"
     46 #include "WorkerThread.h"
     47 
     48 #include <wtf/PassRefPtr.h>
     49 
     50 namespace WebCore {
     51 
     52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol)
     53     : m_workerContext(context)
     54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
     55     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode, url, protocol))
     56 {
     57 }
     58 
     59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
     60 {
     61     if (m_bridge)
     62         m_bridge->disconnect();
     63 }
     64 
     65 void WorkerThreadableWebSocketChannel::connect()
     66 {
     67     if (m_bridge)
     68         m_bridge->connect();
     69 }
     70 
     71 bool WorkerThreadableWebSocketChannel::send(const String& message)
     72 {
     73     if (!m_bridge)
     74         return false;
     75     return m_bridge->send(message);
     76 }
     77 
     78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
     79 {
     80     if (!m_bridge)
     81         return 0;
     82     return m_bridge->bufferedAmount();
     83 }
     84 
     85 void WorkerThreadableWebSocketChannel::close()
     86 {
     87     if (m_bridge)
     88         m_bridge->close();
     89 }
     90 
     91 void WorkerThreadableWebSocketChannel::disconnect()
     92 {
     93     m_bridge->disconnect();
     94     m_bridge.clear();
     95 }
     96 
     97 void WorkerThreadableWebSocketChannel::suspend()
     98 {
     99     m_workerClientWrapper->suspend();
    100     if (m_bridge)
    101         m_bridge->suspend();
    102 }
    103 
    104 void WorkerThreadableWebSocketChannel::resume()
    105 {
    106     m_workerClientWrapper->resume();
    107     if (m_bridge)
    108         m_bridge->resume();
    109 }
    110 
    111 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
    112     : m_workerClientWrapper(clientWrapper)
    113     , m_loaderProxy(loaderProxy)
    114     , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol))
    115     , m_taskMode(taskMode)
    116 {
    117     ASSERT(isMainThread());
    118 }
    119 
    120 WorkerThreadableWebSocketChannel::Peer::~Peer()
    121 {
    122     ASSERT(isMainThread());
    123     if (m_mainWebSocketChannel)
    124         m_mainWebSocketChannel->disconnect();
    125 }
    126 
    127 void WorkerThreadableWebSocketChannel::Peer::connect()
    128 {
    129     ASSERT(isMainThread());
    130     if (!m_mainWebSocketChannel)
    131         return;
    132     m_mainWebSocketChannel->connect();
    133 }
    134 
    135 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent)
    136 {
    137     ASSERT_UNUSED(context, context->isWorkerContext());
    138     workerClientWrapper->setSent(sent);
    139 }
    140 
    141 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
    142 {
    143     ASSERT(isMainThread());
    144     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    145         return;
    146     bool sent = m_mainWebSocketChannel->send(message);
    147     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
    148 }
    149 
    150 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
    151 {
    152     ASSERT_UNUSED(context, context->isWorkerContext());
    153     workerClientWrapper->setBufferedAmount(bufferedAmount);
    154 }
    155 
    156 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
    157 {
    158     ASSERT(isMainThread());
    159     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    160         return;
    161     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
    162     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
    163 }
    164 
    165 void WorkerThreadableWebSocketChannel::Peer::close()
    166 {
    167     ASSERT(isMainThread());
    168     if (!m_mainWebSocketChannel)
    169         return;
    170     m_mainWebSocketChannel->close();
    171     m_mainWebSocketChannel = 0;
    172 }
    173 
    174 void WorkerThreadableWebSocketChannel::Peer::disconnect()
    175 {
    176     ASSERT(isMainThread());
    177     if (!m_mainWebSocketChannel)
    178         return;
    179     m_mainWebSocketChannel->disconnect();
    180     m_mainWebSocketChannel = 0;
    181 }
    182 
    183 void WorkerThreadableWebSocketChannel::Peer::suspend()
    184 {
    185     ASSERT(isMainThread());
    186     if (!m_mainWebSocketChannel)
    187         return;
    188     m_mainWebSocketChannel->suspend();
    189 }
    190 
    191 void WorkerThreadableWebSocketChannel::Peer::resume()
    192 {
    193     ASSERT(isMainThread());
    194     if (!m_mainWebSocketChannel)
    195         return;
    196     m_mainWebSocketChannel->resume();
    197 }
    198 
    199 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    200 {
    201     ASSERT_UNUSED(context, context->isWorkerContext());
    202     workerClientWrapper->didConnect();
    203 }
    204 
    205 void WorkerThreadableWebSocketChannel::Peer::didConnect()
    206 {
    207     ASSERT(isMainThread());
    208     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
    209 }
    210 
    211 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
    212 {
    213     ASSERT_UNUSED(context, context->isWorkerContext());
    214     workerClientWrapper->didReceiveMessage(message);
    215 }
    216 
    217 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
    218 {
    219     ASSERT(isMainThread());
    220     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
    221 }
    222 
    223 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount)
    224 {
    225     ASSERT_UNUSED(context, context->isWorkerContext());
    226     workerClientWrapper->didClose(unhandledBufferedAmount);
    227 }
    228 
    229 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount)
    230 {
    231     ASSERT(isMainThread());
    232     m_mainWebSocketChannel = 0;
    233     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode);
    234 }
    235 
    236 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    237 {
    238     ASSERT_UNUSED(context, context->isWorkerContext());
    239     thisPtr->m_peer = peer;
    240     workerClientWrapper->setSyncMethodDone();
    241 }
    242 
    243 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol)
    244 {
    245     ASSERT(isMainThread());
    246     ASSERT_UNUSED(context, context->isDocument());
    247 
    248     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol);
    249     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode);
    250 }
    251 
    252 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol)
    253     : m_workerClientWrapper(workerClientWrapper)
    254     , m_workerContext(workerContext)
    255     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
    256     , m_taskMode(taskMode)
    257     , m_peer(0)
    258 {
    259     ASSERT(m_workerClientWrapper.get());
    260     setMethodNotCompleted();
    261     m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol));
    262     waitForMethodCompletion();
    263     ASSERT(m_peer);
    264 }
    265 
    266 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
    267 {
    268     disconnect();
    269 }
    270 
    271 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer)
    272 {
    273     ASSERT(isMainThread());
    274     ASSERT_UNUSED(context, context->isDocument());
    275     ASSERT(peer);
    276 
    277     peer->connect();
    278 }
    279 
    280 void WorkerThreadableWebSocketChannel::Bridge::connect()
    281 {
    282     ASSERT(m_workerClientWrapper);
    283     ASSERT(m_peer);
    284     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer));
    285 }
    286 
    287 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
    288 {
    289     ASSERT(isMainThread());
    290     ASSERT_UNUSED(context, context->isDocument());
    291     ASSERT(peer);
    292 
    293     peer->send(message);
    294 }
    295 
    296 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
    297 {
    298     if (!m_workerClientWrapper)
    299         return false;
    300     ASSERT(m_peer);
    301     setMethodNotCompleted();
    302     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message));
    303     RefPtr<Bridge> protect(this);
    304     waitForMethodCompletion();
    305     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    306     return clientWrapper && clientWrapper->sent();
    307 }
    308 
    309 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
    310 {
    311     ASSERT(isMainThread());
    312     ASSERT_UNUSED(context, context->isDocument());
    313     ASSERT(peer);
    314 
    315     peer->bufferedAmount();
    316 }
    317 
    318 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
    319 {
    320     if (!m_workerClientWrapper)
    321         return 0;
    322     ASSERT(m_peer);
    323     setMethodNotCompleted();
    324     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer));
    325     RefPtr<Bridge> protect(this);
    326     waitForMethodCompletion();
    327     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    328     if (clientWrapper)
    329         return clientWrapper->bufferedAmount();
    330     return 0;
    331 }
    332 
    333 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
    334 {
    335     ASSERT(isMainThread());
    336     ASSERT_UNUSED(context, context->isDocument());
    337     ASSERT(peer);
    338 
    339     peer->close();
    340 }
    341 
    342 void WorkerThreadableWebSocketChannel::Bridge::close()
    343 {
    344     ASSERT(m_peer);
    345     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer));
    346 }
    347 
    348 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
    349 {
    350     ASSERT(isMainThread());
    351     ASSERT_UNUSED(context, context->isDocument());
    352     ASSERT(peer);
    353 
    354     delete peer;
    355 }
    356 
    357 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
    358 {
    359     clearClientWrapper();
    360     if (m_peer) {
    361         Peer* peer = m_peer;
    362         m_peer = 0;
    363         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer));
    364     }
    365     m_workerContext = 0;
    366 }
    367 
    368 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
    369 {
    370     ASSERT(isMainThread());
    371     ASSERT_UNUSED(context, context->isDocument());
    372     ASSERT(peer);
    373 
    374     peer->suspend();
    375 }
    376 
    377 void WorkerThreadableWebSocketChannel::Bridge::suspend()
    378 {
    379     ASSERT(m_peer);
    380     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, m_peer));
    381 }
    382 
    383 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
    384 {
    385     ASSERT(isMainThread());
    386     ASSERT_UNUSED(context, context->isDocument());
    387     ASSERT(peer);
    388 
    389     peer->resume();
    390 }
    391 
    392 void WorkerThreadableWebSocketChannel::Bridge::resume()
    393 {
    394     ASSERT(m_peer);
    395     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, m_peer));
    396 }
    397 
    398 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
    399 {
    400     m_workerClientWrapper->clearClient();
    401 }
    402 
    403 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
    404 {
    405     ASSERT(m_workerClientWrapper);
    406     m_workerClientWrapper->clearSyncMethodDone();
    407 }
    408 
    409 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
    410 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
    411 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
    412 {
    413     if (!m_workerContext)
    414         return;
    415     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
    416     MessageQueueWaitResult result = MessageQueueMessageReceived;
    417     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    418     while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
    419         result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
    420         clientWrapper = m_workerClientWrapper.get();
    421     }
    422 }
    423 
    424 } // namespace WebCore
    425 
    426 #endif // ENABLE(WEB_SOCKETS)
    427