Home | History | Annotate | Download | only in websockets
      1 /*
      2  * Copyright (C) 2011, 2012 Google Inc.  All rights reserved.
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions are
      6  * met:
      7  *
      8  *     * Redistributions of source code must retain the above copyright
      9  * notice, this list of conditions and the following disclaimer.
     10  *     * Redistributions in binary form must reproduce the above
     11  * copyright notice, this list of conditions and the following disclaimer
     12  * in the documentation and/or other materials provided with the
     13  * distribution.
     14  *     * Neither the name of Google Inc. nor the names of its
     15  * contributors may be used to endorse or promote products derived from
     16  * this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 
     31 #include "config.h"
     32 
     33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
     34 
     35 #include "bindings/v8/ScriptCallStackFactory.h"
     36 #include "core/dom/CrossThreadTask.h"
     37 #include "core/dom/Document.h"
     38 #include "core/dom/ScriptExecutionContext.h"
     39 #include "core/fileapi/Blob.h"
     40 #include "core/inspector/ScriptCallFrame.h"
     41 #include "core/inspector/ScriptCallStack.h"
     42 #include "core/page/Settings.h"
     43 #include "core/workers/WorkerGlobalScope.h"
     44 #include "core/workers/WorkerLoaderProxy.h"
     45 #include "core/workers/WorkerRunLoop.h"
     46 #include "core/workers/WorkerThread.h"
     47 #include "modules/websockets/MainThreadWebSocketChannel.h"
     48 #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
     49 #include "modules/websockets/WebSocketChannel.h"
     50 #include "modules/websockets/WebSocketChannelClient.h"
     51 #include "wtf/ArrayBuffer.h"
     52 #include "wtf/MainThread.h"
     53 #include "wtf/PassRefPtr.h"
     54 #include "wtf/text/WTFString.h"
     55 
     56 namespace WebCore {
     57 
     58 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& taskMode)
     59     : m_workerGlobalScope(context)
     60     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client))
     61     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope, taskMode))
     62     , m_lineNumberAtConnection(0)
     63 {
     64     // We assume that we can take the JS callstack at WebSocket connection here.
     65     RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
     66     String sourceURL;
     67     unsigned lineNumber = 0;
     68     if (callStack && callStack->size()) {
     69         sourceURL = callStack->at(0).sourceURL();
     70         lineNumber = callStack->at(0).lineNumber();
     71     }
     72     m_bridge->initialize(sourceURL, lineNumber);
     73 }
     74 
     75 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
     76 {
     77     if (m_bridge)
     78         m_bridge->disconnect();
     79 }
     80 
     81 void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
     82 {
     83     RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
     84     if (callStack && callStack->size()) {
     85         m_sourceURLAtConnection = callStack->at(0).sourceURL();
     86         m_lineNumberAtConnection = callStack->at(0).lineNumber();
     87     }
     88     if (m_bridge)
     89         m_bridge->connect(url, protocol);
     90 }
     91 
     92 String WorkerThreadableWebSocketChannel::subprotocol()
     93 {
     94     ASSERT(m_workerClientWrapper);
     95     return m_workerClientWrapper->subprotocol();
     96 }
     97 
     98 String WorkerThreadableWebSocketChannel::extensions()
     99 {
    100     ASSERT(m_workerClientWrapper);
    101     return m_workerClientWrapper->extensions();
    102 }
    103 
    104 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
    105 {
    106     if (!m_bridge)
    107         return WebSocketChannel::SendFail;
    108     return m_bridge->send(message);
    109 }
    110 
    111 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
    112 {
    113     if (!m_bridge)
    114         return WebSocketChannel::SendFail;
    115     return m_bridge->send(binaryData, byteOffset, byteLength);
    116 }
    117 
    118 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const Blob& binaryData)
    119 {
    120     if (!m_bridge)
    121         return WebSocketChannel::SendFail;
    122     return m_bridge->send(binaryData);
    123 }
    124 
    125 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
    126 {
    127     if (!m_bridge)
    128         return 0;
    129     return m_bridge->bufferedAmount();
    130 }
    131 
    132 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
    133 {
    134     if (m_bridge)
    135         m_bridge->close(code, reason);
    136 }
    137 
    138 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    139 {
    140     if (!m_bridge)
    141         return;
    142 
    143     RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
    144     if (callStack && callStack->size())  {
    145         // In order to emulate the ConsoleMessage behavior,
    146         // we should ignore the specified url and line number if
    147         // we can get the JavaScript context.
    148         m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
    149     } else if (sourceURL.isEmpty() && !lineNumber) {
    150         // No information is specified by the caller - use the url
    151         // and the line number at the connection.
    152         m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
    153     } else {
    154         // Use the specified information.
    155         m_bridge->fail(reason, level, sourceURL, lineNumber);
    156     }
    157 }
    158 
    159 void WorkerThreadableWebSocketChannel::disconnect()
    160 {
    161     m_bridge->disconnect();
    162     m_bridge.clear();
    163 }
    164 
    165 void WorkerThreadableWebSocketChannel::suspend()
    166 {
    167     m_workerClientWrapper->suspend();
    168     if (m_bridge)
    169         m_bridge->suspend();
    170 }
    171 
    172 void WorkerThreadableWebSocketChannel::resume()
    173 {
    174     m_workerClientWrapper->resume();
    175     if (m_bridge)
    176         m_bridge->resume();
    177 }
    178 
    179 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const String& sourceURL, unsigned lineNumber)
    180     : m_workerClientWrapper(clientWrapper)
    181     , m_loaderProxy(loaderProxy)
    182     , m_mainWebSocketChannel(0)
    183     , m_taskMode(taskMode)
    184 {
    185     Document* document = toDocument(context);
    186     Settings* settings = document->settings();
    187     if (settings && settings->experimentalWebSocketEnabled()) {
    188         // FIXME: Create an "experimental" WebSocketChannel instead of a MainThreadWebSocketChannel.
    189         m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
    190     } else
    191         m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
    192     ASSERT(isMainThread());
    193 }
    194 
    195 WorkerThreadableWebSocketChannel::Peer::~Peer()
    196 {
    197     ASSERT(isMainThread());
    198     if (m_mainWebSocketChannel)
    199         m_mainWebSocketChannel->disconnect();
    200 }
    201 
    202 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
    203 {
    204     ASSERT(isMainThread());
    205     if (!m_mainWebSocketChannel)
    206         return;
    207     m_mainWebSocketChannel->connect(url, protocol);
    208 }
    209 
    210 static void workerGlobalScopeDidSend(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketChannel::SendResult sendRequestResult)
    211 {
    212     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    213     workerClientWrapper->setSendRequestResult(sendRequestResult);
    214 }
    215 
    216 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
    217 {
    218     ASSERT(isMainThread());
    219     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    220         return;
    221     WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
    222     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
    223 }
    224 
    225 void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData)
    226 {
    227     ASSERT(isMainThread());
    228     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    229         return;
    230     WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength());
    231     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
    232 }
    233 
    234 void WorkerThreadableWebSocketChannel::Peer::send(const Blob& binaryData)
    235 {
    236     ASSERT(isMainThread());
    237     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    238         return;
    239     WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData);
    240     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
    241 }
    242 
    243 static void workerGlobalScopeDidGetBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
    244 {
    245     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    246     workerClientWrapper->setBufferedAmount(bufferedAmount);
    247 }
    248 
    249 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
    250 {
    251     ASSERT(isMainThread());
    252     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
    253         return;
    254     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
    255     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
    256 }
    257 
    258 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
    259 {
    260     ASSERT(isMainThread());
    261     if (!m_mainWebSocketChannel)
    262         return;
    263     m_mainWebSocketChannel->close(code, reason);
    264 }
    265 
    266 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    267 {
    268     ASSERT(isMainThread());
    269     if (!m_mainWebSocketChannel)
    270         return;
    271     m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
    272 }
    273 
    274 void WorkerThreadableWebSocketChannel::Peer::disconnect()
    275 {
    276     ASSERT(isMainThread());
    277     if (!m_mainWebSocketChannel)
    278         return;
    279     m_mainWebSocketChannel->disconnect();
    280     m_mainWebSocketChannel = 0;
    281 }
    282 
    283 void WorkerThreadableWebSocketChannel::Peer::suspend()
    284 {
    285     ASSERT(isMainThread());
    286     if (!m_mainWebSocketChannel)
    287         return;
    288     m_mainWebSocketChannel->suspend();
    289 }
    290 
    291 void WorkerThreadableWebSocketChannel::Peer::resume()
    292 {
    293     ASSERT(isMainThread());
    294     if (!m_mainWebSocketChannel)
    295         return;
    296     m_mainWebSocketChannel->resume();
    297 }
    298 
    299 static void workerGlobalScopeDidConnect(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
    300 {
    301     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    302     workerClientWrapper->setSubprotocol(subprotocol);
    303     workerClientWrapper->setExtensions(extensions);
    304     workerClientWrapper->didConnect();
    305 }
    306 
    307 void WorkerThreadableWebSocketChannel::Peer::didConnect()
    308 {
    309     ASSERT(isMainThread());
    310     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()), m_taskMode);
    311 }
    312 
    313 static void workerGlobalScopeDidReceiveMessage(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
    314 {
    315     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    316     workerClientWrapper->didReceiveMessage(message);
    317 }
    318 
    319 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
    320 {
    321     ASSERT(isMainThread());
    322     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
    323 }
    324 
    325 static void workerGlobalScopeDidReceiveBinaryData(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
    326 {
    327     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    328     workerClientWrapper->didReceiveBinaryData(binaryData);
    329 }
    330 
    331 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
    332 {
    333     ASSERT(isMainThread());
    334     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode);
    335 }
    336 
    337 static void workerGlobalScopeDidUpdateBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
    338 {
    339     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    340     workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
    341 }
    342 
    343 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
    344 {
    345     ASSERT(isMainThread());
    346     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
    347 }
    348 
    349 static void workerGlobalScopeDidStartClosingHandshake(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    350 {
    351     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    352     workerClientWrapper->didStartClosingHandshake();
    353 }
    354 
    355 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
    356 {
    357     ASSERT(isMainThread());
    358     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
    359 }
    360 
    361 static void workerGlobalScopeDidClose(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
    362 {
    363     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    364     workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
    365 }
    366 
    367 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
    368 {
    369     ASSERT(isMainThread());
    370     m_mainWebSocketChannel = 0;
    371     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode);
    372 }
    373 
    374 static void workerGlobalScopeDidReceiveMessageError(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    375 {
    376     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    377     workerClientWrapper->didReceiveMessageError();
    378 }
    379 
    380 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
    381 {
    382     ASSERT(isMainThread());
    383     m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper), m_taskMode);
    384 }
    385 
    386 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerGlobalScope> workerGlobalScope, const String& taskMode)
    387     : m_workerClientWrapper(workerClientWrapper)
    388     , m_workerGlobalScope(workerGlobalScope)
    389     , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
    390     , m_taskMode(taskMode)
    391     , m_peer(0)
    392 {
    393     ASSERT(m_workerClientWrapper.get());
    394 }
    395 
    396 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
    397 {
    398     disconnect();
    399 }
    400 
    401 class WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask : public ScriptExecutionContext::Task {
    402 public:
    403     static PassOwnPtr<ScriptExecutionContext::Task> create(WorkerThreadableWebSocketChannel::Peer* peer, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    404     {
    405         return adoptPtr(new WorkerGlobalScopeDidInitializeTask(peer, loaderProxy, workerClientWrapper));
    406     }
    407 
    408     virtual ~WorkerGlobalScopeDidInitializeTask() { }
    409     virtual void performTask(ScriptExecutionContext* context) OVERRIDE
    410     {
    411         ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    412         if (m_workerClientWrapper->failedWebSocketChannelCreation()) {
    413             // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer.
    414             OwnPtr<WorkerThreadableWebSocketChannel::Peer> peer = adoptPtr(m_peer);
    415             m_peer = 0;
    416             m_loaderProxy->postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
    417         } else
    418             m_workerClientWrapper->didCreateWebSocketChannel(m_peer);
    419     }
    420     virtual bool isCleanupTask() const OVERRIDE { return true; }
    421 
    422 private:
    423     WorkerGlobalScopeDidInitializeTask(WorkerThreadableWebSocketChannel::Peer* peer, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    424         : m_peer(peer)
    425         , m_loaderProxy(loaderProxy)
    426         , m_workerClientWrapper(workerClientWrapper)
    427     {
    428     }
    429 
    430     WorkerThreadableWebSocketChannel::Peer* m_peer;
    431     WorkerLoaderProxy* m_loaderProxy;
    432     RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper;
    433 };
    434 
    435 void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext* context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode, const String& sourceURL, unsigned lineNumber)
    436 {
    437     ASSERT(isMainThread());
    438     ASSERT_UNUSED(context, context->isDocument());
    439 
    440     RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
    441 
    442     Peer* peer = Peer::create(clientWrapper, *loaderProxy, context, taskMode, sourceURL, lineNumber);
    443     bool sent = loaderProxy->postTaskForModeToWorkerGlobalScope(
    444         WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask::create(peer, loaderProxy, clientWrapper), taskMode);
    445     if (!sent) {
    446         clientWrapper->clearPeer();
    447         delete peer;
    448     }
    449 }
    450 
    451 void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceURL, unsigned lineNumber)
    452 {
    453     ASSERT(!m_peer);
    454     setMethodNotCompleted();
    455     RefPtr<Bridge> protect(this);
    456     m_loaderProxy.postTaskToLoader(
    457         createCallbackTask(&Bridge::mainThreadInitialize, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, m_taskMode, sourceURL, lineNumber));
    458     waitForMethodCompletion();
    459     // m_peer may be null when the nested runloop exited before a peer has created.
    460     m_peer = m_workerClientWrapper->peer();
    461     if (!m_peer)
    462         m_workerClientWrapper->setFailedWebSocketChannelCreation();
    463 }
    464 
    465 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& protocol)
    466 {
    467     ASSERT(isMainThread());
    468     ASSERT_UNUSED(context, context->isDocument());
    469     ASSERT(peer);
    470 
    471     peer->connect(url, protocol);
    472 }
    473 
    474 void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
    475 {
    476     ASSERT(m_workerClientWrapper);
    477     if (!m_peer)
    478         return;
    479     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol));
    480 }
    481 
    482 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
    483 {
    484     ASSERT(isMainThread());
    485     ASSERT_UNUSED(context, context->isDocument());
    486     ASSERT(peer);
    487 
    488     peer->send(message);
    489 }
    490 
    491 void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ScriptExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char> > data)
    492 {
    493     ASSERT(isMainThread());
    494     ASSERT_UNUSED(context, context->isDocument());
    495     ASSERT(peer);
    496 
    497     RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size());
    498     peer->send(*arrayBuffer);
    499 }
    500 
    501 void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& type, long long size)
    502 {
    503     ASSERT(isMainThread());
    504     ASSERT_UNUSED(context, context->isDocument());
    505     ASSERT(peer);
    506 
    507     RefPtr<Blob> blob = Blob::create(url, type, size);
    508     peer->send(*blob);
    509 }
    510 
    511 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
    512 {
    513     if (!m_workerClientWrapper || !m_peer)
    514         return WebSocketChannel::SendFail;
    515     setMethodNotCompleted();
    516     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
    517     RefPtr<Bridge> protect(this);
    518     waitForMethodCompletion();
    519     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    520     if (!clientWrapper)
    521         return WebSocketChannel::SendFail;
    522     return clientWrapper->sendRequestResult();
    523 }
    524 
    525 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
    526 {
    527     if (!m_workerClientWrapper || !m_peer)
    528         return WebSocketChannel::SendFail;
    529     // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
    530     OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
    531     if (binaryData.byteLength())
    532         memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
    533     setMethodNotCompleted();
    534     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release()));
    535     RefPtr<Bridge> protect(this);
    536     waitForMethodCompletion();
    537     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    538     if (!clientWrapper)
    539         return WebSocketChannel::SendFail;
    540     return clientWrapper->sendRequestResult();
    541 }
    542 
    543 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const Blob& binaryData)
    544 {
    545     if (!m_workerClientWrapper || !m_peer)
    546         return WebSocketChannel::SendFail;
    547     setMethodNotCompleted();
    548     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), binaryData.url(), binaryData.type(), binaryData.size()));
    549     RefPtr<Bridge> protect(this);
    550     waitForMethodCompletion();
    551     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    552     if (!clientWrapper)
    553         return WebSocketChannel::SendFail;
    554     return clientWrapper->sendRequestResult();
    555 }
    556 
    557 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
    558 {
    559     ASSERT(isMainThread());
    560     ASSERT_UNUSED(context, context->isDocument());
    561     ASSERT(peer);
    562 
    563     peer->bufferedAmount();
    564 }
    565 
    566 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
    567 {
    568     if (!m_workerClientWrapper || !m_peer)
    569         return 0;
    570     setMethodNotCompleted();
    571     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
    572     RefPtr<Bridge> protect(this);
    573     waitForMethodCompletion();
    574     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    575     if (clientWrapper)
    576         return clientWrapper->bufferedAmount();
    577     return 0;
    578 }
    579 
    580 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer, int code, const String& reason)
    581 {
    582     ASSERT(isMainThread());
    583     ASSERT_UNUSED(context, context->isDocument());
    584     ASSERT(peer);
    585 
    586     peer->close(code, reason);
    587 }
    588 
    589 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
    590 {
    591     if (!m_peer)
    592         return;
    593     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason));
    594 }
    595 
    596 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    597 {
    598     ASSERT(isMainThread());
    599     ASSERT_UNUSED(context, context->isDocument());
    600     ASSERT(peer);
    601 
    602     peer->fail(reason, level, sourceURL, lineNumber);
    603 }
    604 
    605 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    606 {
    607     if (!m_peer)
    608         return;
    609     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason, level, sourceURL, lineNumber));
    610 }
    611 
    612 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, PassOwnPtr<Peer> peer)
    613 {
    614     ASSERT(isMainThread());
    615     ASSERT_UNUSED(context, context->isDocument());
    616     ASSERT_UNUSED(peer, peer);
    617 
    618     // Peer object will be deleted even if the task does not run in the main thread's cleanup period, because
    619     // the destructor for the task object (created by createCallbackTask()) will automatically delete the peer.
    620 }
    621 
    622 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
    623 {
    624     clearClientWrapper();
    625     if (m_peer) {
    626         OwnPtr<Peer> peer = adoptPtr(m_peer);
    627         m_peer = 0;
    628         m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
    629     }
    630     m_workerGlobalScope = 0;
    631 }
    632 
    633 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
    634 {
    635     ASSERT(isMainThread());
    636     ASSERT_UNUSED(context, context->isDocument());
    637     ASSERT(peer);
    638 
    639     peer->suspend();
    640 }
    641 
    642 void WorkerThreadableWebSocketChannel::Bridge::suspend()
    643 {
    644     if (!m_peer)
    645         return;
    646     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
    647 }
    648 
    649 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
    650 {
    651     ASSERT(isMainThread());
    652     ASSERT_UNUSED(context, context->isDocument());
    653     ASSERT(peer);
    654 
    655     peer->resume();
    656 }
    657 
    658 void WorkerThreadableWebSocketChannel::Bridge::resume()
    659 {
    660     if (!m_peer)
    661         return;
    662     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
    663 }
    664 
    665 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
    666 {
    667     m_workerClientWrapper->clearClient();
    668 }
    669 
    670 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
    671 {
    672     ASSERT(m_workerClientWrapper);
    673     m_workerClientWrapper->clearSyncMethodDone();
    674 }
    675 
    676 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
    677 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
    678 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
    679 {
    680     if (!m_workerGlobalScope)
    681         return;
    682     WorkerRunLoop& runLoop = m_workerGlobalScope->thread()->runLoop();
    683     MessageQueueWaitResult result = MessageQueueMessageReceived;
    684     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
    685     while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
    686         result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null.
    687         clientWrapper = m_workerClientWrapper.get();
    688     }
    689 }
    690 
    691 } // namespace WebCore
    692