Home | History | Annotate | Download | only in websockets
      1 /*
      2  * Copyright (C) 2011, 2012 Google Inc.  All rights reserved.
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions are
      6  * met:
      7  *
      8  *     * Redistributions of source code must retain the above copyright
      9  * notice, this list of conditions and the following disclaimer.
     10  *     * Redistributions in binary form must reproduce the above
     11  * copyright notice, this list of conditions and the following disclaimer
     12  * in the documentation and/or other materials provided with the
     13  * distribution.
     14  *     * Neither the name of Google Inc. nor the names of its
     15  * contributors may be used to endorse or promote products derived from
     16  * this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 
     31 #include "config.h"
     32 
     33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
     34 
     35 #include "bindings/v8/ScriptCallStackFactory.h"
     36 #include "core/dom/CrossThreadTask.h"
     37 #include "core/dom/Document.h"
     38 #include "core/dom/ExecutionContext.h"
     39 #include "core/fileapi/Blob.h"
     40 #include "core/inspector/ScriptCallFrame.h"
     41 #include "core/inspector/ScriptCallStack.h"
     42 #include "core/workers/WorkerLoaderProxy.h"
     43 #include "core/workers/WorkerRunLoop.h"
     44 #include "core/workers/WorkerThread.h"
     45 #include "modules/websockets/MainThreadWebSocketChannel.h"
     46 #include "modules/websockets/NewWebSocketChannelImpl.h"
     47 #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h"
     48 #include "platform/RuntimeEnabledFeatures.h"
     49 #include "public/platform/Platform.h"
     50 #include "public/platform/WebWaitableEvent.h"
     51 #include "wtf/ArrayBuffer.h"
     52 #include "wtf/Assertions.h"
     53 #include "wtf/Functional.h"
     54 #include "wtf/MainThread.h"
     55 
     56 namespace WebCore {
     57 
     58 // Created and destroyed on the worker thread. All setters of this class are
     59 // called on the main thread, while all getters are called on the worker
     60 // thread. signalWorkerThread() must be called before any getters are called.
     61 class ThreadableWebSocketChannelSyncHelper {
     62 public:
     63     static PassOwnPtr<ThreadableWebSocketChannelSyncHelper> create(PassOwnPtr<blink::WebWaitableEvent> event)
     64     {
     65         return adoptPtr(new ThreadableWebSocketChannelSyncHelper(event));
     66     }
     67 
     68     // All setters are called on the main thread.
     69     void setConnectRequestResult(bool connectRequestResult)
     70     {
     71         m_connectRequestResult = connectRequestResult;
     72     }
     73     void setSendRequestResult(WebSocketChannel::SendResult sendRequestResult)
     74     {
     75         m_sendRequestResult = sendRequestResult;
     76     }
     77 
     78     // All getter are called on the worker thread.
     79     bool connectRequestResult() const
     80     {
     81         return m_connectRequestResult;
     82     }
     83     WebSocketChannel::SendResult sendRequestResult() const
     84     {
     85         return m_sendRequestResult;
     86     }
     87 
     88     // This should be called after all setters are called and before any
     89     // getters are called.
     90     void signalWorkerThread()
     91     {
     92         m_event->signal();
     93     }
     94 
     95     blink::WebWaitableEvent* event() const
     96     {
     97         return m_event.get();
     98     }
     99 
    100 private:
    101     ThreadableWebSocketChannelSyncHelper(PassOwnPtr<blink::WebWaitableEvent> event)
    102         : m_event(event)
    103         , m_connectRequestResult(false)
    104         , m_sendRequestResult(WebSocketChannel::SendFail)
    105     {
    106     }
    107 
    108     OwnPtr<blink::WebWaitableEvent> m_event;
    109     bool m_connectRequestResult;
    110     WebSocketChannel::SendResult m_sendRequestResult;
    111 };
    112 
    113 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
    114     : m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
    115     , m_bridge(Bridge::create(m_workerClientWrapper, workerGlobalScope))
    116     , m_sourceURLAtConnection(sourceURL)
    117     , m_lineNumberAtConnection(lineNumber)
    118 {
    119     ASSERT(m_workerClientWrapper.get());
    120     m_bridge->initialize(sourceURL, lineNumber);
    121 }
    122 
    123 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
    124 {
    125     if (m_bridge)
    126         m_bridge->disconnect();
    127 }
    128 
    129 bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
    130 {
    131     if (m_bridge)
    132         return m_bridge->connect(url, protocol);
    133     return false;
    134 }
    135 
    136 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
    137 {
    138     if (!m_bridge)
    139         return WebSocketChannel::SendFail;
    140     return m_bridge->send(message);
    141 }
    142 
    143 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
    144 {
    145     if (!m_bridge)
    146         return WebSocketChannel::SendFail;
    147     return m_bridge->send(binaryData, byteOffset, byteLength);
    148 }
    149 
    150 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
    151 {
    152     if (!m_bridge)
    153         return WebSocketChannel::SendFail;
    154     return m_bridge->send(blobData);
    155 }
    156 
    157 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
    158 {
    159     if (m_bridge)
    160         m_bridge->close(code, reason);
    161 }
    162 
    163 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    164 {
    165     if (!m_bridge)
    166         return;
    167 
    168     RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
    169     if (callStack && callStack->size())  {
    170         // In order to emulate the ConsoleMessage behavior,
    171         // we should ignore the specified url and line number if
    172         // we can get the JavaScript context.
    173         m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
    174     } else if (sourceURL.isEmpty() && !lineNumber) {
    175         // No information is specified by the caller - use the url
    176         // and the line number at the connection.
    177         m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
    178     } else {
    179         // Use the specified information.
    180         m_bridge->fail(reason, level, sourceURL, lineNumber);
    181     }
    182 }
    183 
    184 void WorkerThreadableWebSocketChannel::disconnect()
    185 {
    186     m_bridge->disconnect();
    187     m_bridge.clear();
    188 }
    189 
    190 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
    191 {
    192     visitor->trace(m_workerClientWrapper);
    193     WebSocketChannel::trace(visitor);
    194 }
    195 
    196 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<WeakReference<Peer> > reference, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ExecutionContext* context, const String& sourceURL, unsigned lineNumber, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
    197     : m_workerClientWrapper(clientWrapper)
    198     , m_loaderProxy(loaderProxy)
    199     , m_mainWebSocketChannel(nullptr)
    200     , m_syncHelper(syncHelper)
    201     , m_weakFactory(reference, this)
    202 {
    203     ASSERT(isMainThread());
    204     ASSERT(m_workerClientWrapper.get());
    205 
    206     Document* document = toDocument(context);
    207     if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
    208         m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
    209     } else {
    210         m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
    211     }
    212 
    213     m_syncHelper->signalWorkerThread();
    214 }
    215 
    216 WorkerThreadableWebSocketChannel::Peer::~Peer()
    217 {
    218     ASSERT(isMainThread());
    219     if (m_mainWebSocketChannel)
    220         m_mainWebSocketChannel->disconnect();
    221 }
    222 
    223 void WorkerThreadableWebSocketChannel::Peer::initialize(ExecutionContext* context, PassRefPtr<WeakReference<Peer> > reference, WorkerLoaderProxy* loaderProxy, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& sourceURLAtConnection, unsigned lineNumberAtConnection, PassOwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper)
    224 {
    225     // The caller must call destroy() to free the peer.
    226     new Peer(reference, clientWrapper, *loaderProxy, context, sourceURLAtConnection, lineNumberAtConnection, syncHelper);
    227 }
    228 
    229 void WorkerThreadableWebSocketChannel::Peer::destroy()
    230 {
    231     ASSERT(isMainThread());
    232     delete this;
    233 }
    234 
    235 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
    236 {
    237     ASSERT(isMainThread());
    238     if (!m_mainWebSocketChannel) {
    239         m_syncHelper->setConnectRequestResult(false);
    240     } else {
    241         bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
    242         m_syncHelper->setConnectRequestResult(connectRequestResult);
    243     }
    244     m_syncHelper->signalWorkerThread();
    245 }
    246 
    247 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
    248 {
    249     ASSERT(isMainThread());
    250     if (!m_mainWebSocketChannel) {
    251         m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
    252     } else {
    253         WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
    254         m_syncHelper->setSendRequestResult(sendRequestResult);
    255     }
    256     m_syncHelper->signalWorkerThread();
    257 }
    258 
    259 void WorkerThreadableWebSocketChannel::Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
    260 {
    261     ASSERT(isMainThread());
    262     if (!m_mainWebSocketChannel) {
    263         m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
    264     } else {
    265         WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(data);
    266         m_syncHelper->setSendRequestResult(sendRequestResult);
    267     }
    268     m_syncHelper->signalWorkerThread();
    269 }
    270 
    271 void WorkerThreadableWebSocketChannel::Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
    272 {
    273     ASSERT(isMainThread());
    274     if (!m_mainWebSocketChannel) {
    275         m_syncHelper->setSendRequestResult(WebSocketChannel::SendFail);
    276     } else {
    277         WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(blobData);
    278         m_syncHelper->setSendRequestResult(sendRequestResult);
    279     }
    280     m_syncHelper->signalWorkerThread();
    281 }
    282 
    283 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
    284 {
    285     ASSERT(isMainThread());
    286     if (!m_mainWebSocketChannel)
    287         return;
    288     m_mainWebSocketChannel->close(code, reason);
    289 }
    290 
    291 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    292 {
    293     ASSERT(isMainThread());
    294     if (!m_mainWebSocketChannel)
    295         return;
    296     m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
    297 }
    298 
    299 void WorkerThreadableWebSocketChannel::Peer::disconnect()
    300 {
    301     ASSERT(isMainThread());
    302     if (!m_mainWebSocketChannel)
    303         return;
    304     m_mainWebSocketChannel->disconnect();
    305     m_mainWebSocketChannel = nullptr;
    306 }
    307 
    308 static void workerGlobalScopeDidConnect(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
    309 {
    310     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    311     workerClientWrapper->didConnect(subprotocol, extensions);
    312 }
    313 
    314 void WorkerThreadableWebSocketChannel::Peer::didConnect(const String& subprotocol, const String& extensions)
    315 {
    316     ASSERT(isMainThread());
    317     // It is important to seprate task creation from posting
    318     // the task. See the above comment.
    319     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper.get(), subprotocol, extensions);
    320     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    321 }
    322 
    323 static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
    324 {
    325     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    326     workerClientWrapper->didReceiveMessage(message);
    327 }
    328 
    329 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
    330 {
    331     ASSERT(isMainThread());
    332     // It is important to seprate task creation from posting
    333     // the task. See the above comment.
    334     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper.get(), message);
    335     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    336 }
    337 
    338 static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
    339 {
    340     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    341     workerClientWrapper->didReceiveBinaryData(binaryData);
    342 }
    343 
    344 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
    345 {
    346     ASSERT(isMainThread());
    347     // It is important to seprate task creation from posting
    348     // the task. See the above comment.
    349     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper.get(), binaryData);
    350     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    351 }
    352 
    353 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long consumed)
    354 {
    355     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    356     workerClientWrapper->didConsumeBufferedAmount(consumed);
    357 }
    358 
    359 void WorkerThreadableWebSocketChannel::Peer::didConsumeBufferedAmount(unsigned long consumed)
    360 {
    361     ASSERT(isMainThread());
    362     // It is important to seprate task creation from posting
    363     // the task. See the above comment.
    364     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidConsumeBufferedAmount, m_workerClientWrapper.get(), consumed);
    365     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    366 }
    367 
    368 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    369 {
    370     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    371     workerClientWrapper->didStartClosingHandshake();
    372 }
    373 
    374 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
    375 {
    376     ASSERT(isMainThread());
    377     // It is important to seprate task creation from posting
    378     // the task. See the above comment.
    379     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper.get());
    380     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    381 }
    382 
    383 static void workerGlobalScopeDidClose(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
    384 {
    385     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    386     workerClientWrapper->didClose(closingHandshakeCompletion, code, reason);
    387 }
    388 
    389 void WorkerThreadableWebSocketChannel::Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
    390 {
    391     ASSERT(isMainThread());
    392     m_mainWebSocketChannel = nullptr;
    393     // It is important to seprate task creation from posting
    394     // the task. See the above comment.
    395     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper.get(), closingHandshakeCompletion, code, reason);
    396     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    397 }
    398 
    399 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
    400 {
    401     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
    402     workerClientWrapper->didReceiveMessageError();
    403 }
    404 
    405 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
    406 {
    407     ASSERT(isMainThread());
    408     // It is important to seprate task creation from posting
    409     // the task. See the above comment.
    410     OwnPtr<ExecutionContextTask> task = createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper.get());
    411     m_loaderProxy.postTaskToWorkerGlobalScope(task.release());
    412 }
    413 
    414 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtrWillBeRawPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WorkerGlobalScope& workerGlobalScope)
    415     : m_workerClientWrapper(workerClientWrapper)
    416     , m_workerGlobalScope(workerGlobalScope)
    417     , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
    418     , m_syncHelper(0)
    419 {
    420     ASSERT(m_workerClientWrapper.get());
    421 }
    422 
    423 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
    424 {
    425     disconnect();
    426 }
    427 
    428 void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceURL, unsigned lineNumber)
    429 {
    430     RefPtr<WeakReference<Peer> > reference = WeakReference<Peer>::createUnbound();
    431     m_peer = WeakPtr<Peer>(reference);
    432 
    433     OwnPtr<ThreadableWebSocketChannelSyncHelper> syncHelper = ThreadableWebSocketChannelSyncHelper::create(adoptPtr(blink::Platform::current()->createWaitableEvent()));
    434     // This pointer is guaranteed to be valid until we call terminatePeer.
    435     m_syncHelper = syncHelper.get();
    436 
    437     RefPtr<Bridge> protect(this);
    438     OwnPtr<ExecutionContextTask> task = createCallbackTask(&Peer::initialize, reference.release(), AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper.get(), sourceURL, lineNumber, syncHelper.release());
    439     if (!waitForMethodCompletion(task.release())) {
    440         // The worker thread has been signalled to shutdown before method completion.
    441         terminatePeer();
    442     }
    443 }
    444 
    445 bool WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
    446 {
    447     if (hasTerminatedPeer())
    448         return false;
    449 
    450     RefPtr<Bridge> protect(this);
    451     // It is important to seprate task creation from calling
    452     // waitForMethodCompletion. See the above comment.
    453     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::connect, m_peer, url.copy(), protocol.isolatedCopy()));
    454     if (!waitForMethodCompletion(task.release()))
    455         return false;
    456 
    457     return m_syncHelper->connectRequestResult();
    458 }
    459 
    460 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
    461 {
    462     if (hasTerminatedPeer())
    463         return WebSocketChannel::SendFail;
    464 
    465     RefPtr<Bridge> protect(this);
    466     // It is important to seprate task creation from calling
    467     // waitForMethodCompletion. See the above comment.
    468     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::send, m_peer, message.isolatedCopy()));
    469     if (!waitForMethodCompletion(task.release()))
    470         return WebSocketChannel::SendFail;
    471 
    472     return m_syncHelper->sendRequestResult();
    473 }
    474 
    475 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
    476 {
    477     if (hasTerminatedPeer())
    478         return WebSocketChannel::SendFail;
    479 
    480     // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
    481     OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
    482     if (binaryData.byteLength())
    483         memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
    484 
    485     RefPtr<Bridge> protect(this);
    486     // It is important to seprate task creation from calling
    487     // waitForMethodCompletion. See the above comment.
    488     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::sendArrayBuffer, m_peer, data.release()));
    489     if (!waitForMethodCompletion(task.release()))
    490         return WebSocketChannel::SendFail;
    491 
    492     return m_syncHelper->sendRequestResult();
    493 }
    494 
    495 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(PassRefPtr<BlobDataHandle> data)
    496 {
    497     if (hasTerminatedPeer())
    498         return WebSocketChannel::SendFail;
    499 
    500     RefPtr<Bridge> protect(this);
    501     // It is important to seprate task creation from calling
    502     // waitForMethodCompletion. See the above comment.
    503     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::sendBlob, m_peer, data));
    504     if (!waitForMethodCompletion(task.release()))
    505         return WebSocketChannel::SendFail;
    506 
    507     return m_syncHelper->sendRequestResult();
    508 }
    509 
    510 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
    511 {
    512     if (hasTerminatedPeer())
    513         return;
    514 
    515     // It is important to seprate task creation from calling
    516     // waitForMethodCompletion. See the above comment.
    517     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::close, m_peer, code, reason.isolatedCopy()));
    518     m_loaderProxy.postTaskToLoader(task.release());
    519 }
    520 
    521 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
    522 {
    523     if (hasTerminatedPeer())
    524         return;
    525 
    526     // It is important to seprate task creation from calling
    527     // waitForMethodCompletion. See the above comment.
    528     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::fail, m_peer, reason.isolatedCopy(), level, sourceURL.isolatedCopy(), lineNumber));
    529     m_loaderProxy.postTaskToLoader(task.release());
    530 }
    531 
    532 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
    533 {
    534     clearClientWrapper();
    535     terminatePeer();
    536 }
    537 
    538 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
    539 {
    540     m_workerClientWrapper->clearClient();
    541 }
    542 
    543 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
    544 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
    545 bool WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
    546 {
    547     ASSERT(m_workerGlobalScope);
    548     ASSERT(m_syncHelper);
    549 
    550     m_loaderProxy.postTaskToLoader(task);
    551 
    552     // We wait for the syncHelper event even if a shutdown event is fired.
    553     // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
    554     Vector<blink::WebWaitableEvent*> events;
    555     events.append(m_syncHelper->event());
    556     ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
    557     blink::Platform::current()->waitMultipleEvents(events);
    558     // This is checking whether a shutdown event is fired or not.
    559     return !m_workerGlobalScope->thread()->runLoop().terminated();
    560 }
    561 
    562 void WorkerThreadableWebSocketChannel::Bridge::terminatePeer()
    563 {
    564     OwnPtr<ExecutionContextTask> task = CallClosureTask::create(bind(&Peer::destroy, m_peer));
    565     m_loaderProxy.postTaskToLoader(task.release());
    566     // Peer::destroy() deletes m_peer and then m_syncHelper will be released.
    567     // We must not touch m_syncHelper any more.
    568     m_syncHelper = 0;
    569 
    570     // We won't use this any more.
    571     m_workerGlobalScope = nullptr;
    572 }
    573 
    574 } // namespace WebCore
    575