1 /* 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "config.h" 32 33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h" 34 35 #include "bindings/v8/ScriptCallStackFactory.h" 36 #include "core/dom/CrossThreadTask.h" 37 #include "core/dom/Document.h" 38 #include "core/dom/ScriptExecutionContext.h" 39 #include "core/fileapi/Blob.h" 40 #include "core/inspector/ScriptCallFrame.h" 41 #include "core/inspector/ScriptCallStack.h" 42 #include "core/page/Settings.h" 43 #include "core/workers/WorkerGlobalScope.h" 44 #include "core/workers/WorkerLoaderProxy.h" 45 #include "core/workers/WorkerRunLoop.h" 46 #include "core/workers/WorkerThread.h" 47 #include "modules/websockets/MainThreadWebSocketChannel.h" 48 #include "modules/websockets/ThreadableWebSocketChannelClientWrapper.h" 49 #include "modules/websockets/WebSocketChannel.h" 50 #include "modules/websockets/WebSocketChannelClient.h" 51 #include "wtf/ArrayBuffer.h" 52 #include "wtf/MainThread.h" 53 #include "wtf/PassRefPtr.h" 54 #include "wtf/text/WTFString.h" 55 56 namespace WebCore { 57 58 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& taskMode) 59 : m_workerGlobalScope(context) 60 , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client)) 61 , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope, taskMode)) 62 , m_lineNumberAtConnection(0) 63 { 64 // We assume that we can take the JS callstack at WebSocket connection here. 65 RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true); 66 String sourceURL; 67 unsigned lineNumber = 0; 68 if (callStack && callStack->size()) { 69 sourceURL = callStack->at(0).sourceURL(); 70 lineNumber = callStack->at(0).lineNumber(); 71 } 72 m_bridge->initialize(sourceURL, lineNumber); 73 } 74 75 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() 76 { 77 if (m_bridge) 78 m_bridge->disconnect(); 79 } 80 81 void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol) 82 { 83 RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true); 84 if (callStack && callStack->size()) { 85 m_sourceURLAtConnection = callStack->at(0).sourceURL(); 86 m_lineNumberAtConnection = callStack->at(0).lineNumber(); 87 } 88 if (m_bridge) 89 m_bridge->connect(url, protocol); 90 } 91 92 String WorkerThreadableWebSocketChannel::subprotocol() 93 { 94 ASSERT(m_workerClientWrapper); 95 return m_workerClientWrapper->subprotocol(); 96 } 97 98 String WorkerThreadableWebSocketChannel::extensions() 99 { 100 ASSERT(m_workerClientWrapper); 101 return m_workerClientWrapper->extensions(); 102 } 103 104 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message) 105 { 106 if (!m_bridge) 107 return WebSocketChannel::SendFail; 108 return m_bridge->send(message); 109 } 110 111 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) 112 { 113 if (!m_bridge) 114 return WebSocketChannel::SendFail; 115 return m_bridge->send(binaryData, byteOffset, byteLength); 116 } 117 118 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const Blob& binaryData) 119 { 120 if (!m_bridge) 121 return WebSocketChannel::SendFail; 122 return m_bridge->send(binaryData); 123 } 124 125 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const 126 { 127 if (!m_bridge) 128 return 0; 129 return m_bridge->bufferedAmount(); 130 } 131 132 void WorkerThreadableWebSocketChannel::close(int code, const String& reason) 133 { 134 if (m_bridge) 135 m_bridge->close(code, reason); 136 } 137 138 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 139 { 140 if (!m_bridge) 141 return; 142 143 RefPtr<ScriptCallStack> callStack = createScriptCallStack(1, true); 144 if (callStack && callStack->size()) { 145 // In order to emulate the ConsoleMessage behavior, 146 // we should ignore the specified url and line number if 147 // we can get the JavaScript context. 148 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber()); 149 } else if (sourceURL.isEmpty() && !lineNumber) { 150 // No information is specified by the caller - use the url 151 // and the line number at the connection. 152 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection); 153 } else { 154 // Use the specified information. 155 m_bridge->fail(reason, level, sourceURL, lineNumber); 156 } 157 } 158 159 void WorkerThreadableWebSocketChannel::disconnect() 160 { 161 m_bridge->disconnect(); 162 m_bridge.clear(); 163 } 164 165 void WorkerThreadableWebSocketChannel::suspend() 166 { 167 m_workerClientWrapper->suspend(); 168 if (m_bridge) 169 m_bridge->suspend(); 170 } 171 172 void WorkerThreadableWebSocketChannel::resume() 173 { 174 m_workerClientWrapper->resume(); 175 if (m_bridge) 176 m_bridge->resume(); 177 } 178 179 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const String& sourceURL, unsigned lineNumber) 180 : m_workerClientWrapper(clientWrapper) 181 , m_loaderProxy(loaderProxy) 182 , m_mainWebSocketChannel(0) 183 , m_taskMode(taskMode) 184 { 185 Document* document = toDocument(context); 186 Settings* settings = document->settings(); 187 if (settings && settings->experimentalWebSocketEnabled()) { 188 // FIXME: Create an "experimental" WebSocketChannel instead of a MainThreadWebSocketChannel. 189 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber); 190 } else 191 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber); 192 ASSERT(isMainThread()); 193 } 194 195 WorkerThreadableWebSocketChannel::Peer::~Peer() 196 { 197 ASSERT(isMainThread()); 198 if (m_mainWebSocketChannel) 199 m_mainWebSocketChannel->disconnect(); 200 } 201 202 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol) 203 { 204 ASSERT(isMainThread()); 205 if (!m_mainWebSocketChannel) 206 return; 207 m_mainWebSocketChannel->connect(url, protocol); 208 } 209 210 static void workerGlobalScopeDidSend(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, WebSocketChannel::SendResult sendRequestResult) 211 { 212 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 213 workerClientWrapper->setSendRequestResult(sendRequestResult); 214 } 215 216 void WorkerThreadableWebSocketChannel::Peer::send(const String& message) 217 { 218 ASSERT(isMainThread()); 219 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 220 return; 221 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message); 222 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode); 223 } 224 225 void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData) 226 { 227 ASSERT(isMainThread()); 228 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 229 return; 230 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength()); 231 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode); 232 } 233 234 void WorkerThreadableWebSocketChannel::Peer::send(const Blob& binaryData) 235 { 236 ASSERT(isMainThread()); 237 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 238 return; 239 WebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData); 240 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode); 241 } 242 243 static void workerGlobalScopeDidGetBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) 244 { 245 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 246 workerClientWrapper->setBufferedAmount(bufferedAmount); 247 } 248 249 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount() 250 { 251 ASSERT(isMainThread()); 252 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 253 return; 254 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); 255 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); 256 } 257 258 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason) 259 { 260 ASSERT(isMainThread()); 261 if (!m_mainWebSocketChannel) 262 return; 263 m_mainWebSocketChannel->close(code, reason); 264 } 265 266 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 267 { 268 ASSERT(isMainThread()); 269 if (!m_mainWebSocketChannel) 270 return; 271 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber); 272 } 273 274 void WorkerThreadableWebSocketChannel::Peer::disconnect() 275 { 276 ASSERT(isMainThread()); 277 if (!m_mainWebSocketChannel) 278 return; 279 m_mainWebSocketChannel->disconnect(); 280 m_mainWebSocketChannel = 0; 281 } 282 283 void WorkerThreadableWebSocketChannel::Peer::suspend() 284 { 285 ASSERT(isMainThread()); 286 if (!m_mainWebSocketChannel) 287 return; 288 m_mainWebSocketChannel->suspend(); 289 } 290 291 void WorkerThreadableWebSocketChannel::Peer::resume() 292 { 293 ASSERT(isMainThread()); 294 if (!m_mainWebSocketChannel) 295 return; 296 m_mainWebSocketChannel->resume(); 297 } 298 299 static void workerGlobalScopeDidConnect(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions) 300 { 301 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 302 workerClientWrapper->setSubprotocol(subprotocol); 303 workerClientWrapper->setExtensions(extensions); 304 workerClientWrapper->didConnect(); 305 } 306 307 void WorkerThreadableWebSocketChannel::Peer::didConnect() 308 { 309 ASSERT(isMainThread()); 310 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()), m_taskMode); 311 } 312 313 static void workerGlobalScopeDidReceiveMessage(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message) 314 { 315 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 316 workerClientWrapper->didReceiveMessage(message); 317 } 318 319 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message) 320 { 321 ASSERT(isMainThread()); 322 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessage, m_workerClientWrapper, message), m_taskMode); 323 } 324 325 static void workerGlobalScopeDidReceiveBinaryData(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData) 326 { 327 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 328 workerClientWrapper->didReceiveBinaryData(binaryData); 329 } 330 331 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData) 332 { 333 ASSERT(isMainThread()); 334 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode); 335 } 336 337 static void workerGlobalScopeDidUpdateBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) 338 { 339 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 340 workerClientWrapper->didUpdateBufferedAmount(bufferedAmount); 341 } 342 343 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount) 344 { 345 ASSERT(isMainThread()); 346 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); 347 } 348 349 static void workerGlobalScopeDidStartClosingHandshake(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 350 { 351 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 352 workerClientWrapper->didStartClosingHandshake(); 353 } 354 355 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake() 356 { 357 ASSERT(isMainThread()); 358 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidStartClosingHandshake, m_workerClientWrapper), m_taskMode); 359 } 360 361 static void workerGlobalScopeDidClose(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) 362 { 363 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 364 workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason); 365 } 366 367 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) 368 { 369 ASSERT(isMainThread()); 370 m_mainWebSocketChannel = 0; 371 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode); 372 } 373 374 static void workerGlobalScopeDidReceiveMessageError(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 375 { 376 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 377 workerClientWrapper->didReceiveMessageError(); 378 } 379 380 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() 381 { 382 ASSERT(isMainThread()); 383 m_loaderProxy.postTaskForModeToWorkerGlobalScope(createCallbackTask(&workerGlobalScopeDidReceiveMessageError, m_workerClientWrapper), m_taskMode); 384 } 385 386 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerGlobalScope> workerGlobalScope, const String& taskMode) 387 : m_workerClientWrapper(workerClientWrapper) 388 , m_workerGlobalScope(workerGlobalScope) 389 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy()) 390 , m_taskMode(taskMode) 391 , m_peer(0) 392 { 393 ASSERT(m_workerClientWrapper.get()); 394 } 395 396 WorkerThreadableWebSocketChannel::Bridge::~Bridge() 397 { 398 disconnect(); 399 } 400 401 class WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask : public ScriptExecutionContext::Task { 402 public: 403 static PassOwnPtr<ScriptExecutionContext::Task> create(WorkerThreadableWebSocketChannel::Peer* peer, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 404 { 405 return adoptPtr(new WorkerGlobalScopeDidInitializeTask(peer, loaderProxy, workerClientWrapper)); 406 } 407 408 virtual ~WorkerGlobalScopeDidInitializeTask() { } 409 virtual void performTask(ScriptExecutionContext* context) OVERRIDE 410 { 411 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 412 if (m_workerClientWrapper->failedWebSocketChannelCreation()) { 413 // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer. 414 OwnPtr<WorkerThreadableWebSocketChannel::Peer> peer = adoptPtr(m_peer); 415 m_peer = 0; 416 m_loaderProxy->postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release())); 417 } else 418 m_workerClientWrapper->didCreateWebSocketChannel(m_peer); 419 } 420 virtual bool isCleanupTask() const OVERRIDE { return true; } 421 422 private: 423 WorkerGlobalScopeDidInitializeTask(WorkerThreadableWebSocketChannel::Peer* peer, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 424 : m_peer(peer) 425 , m_loaderProxy(loaderProxy) 426 , m_workerClientWrapper(workerClientWrapper) 427 { 428 } 429 430 WorkerThreadableWebSocketChannel::Peer* m_peer; 431 WorkerLoaderProxy* m_loaderProxy; 432 RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper; 433 }; 434 435 void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext* context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode, const String& sourceURL, unsigned lineNumber) 436 { 437 ASSERT(isMainThread()); 438 ASSERT_UNUSED(context, context->isDocument()); 439 440 RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper; 441 442 Peer* peer = Peer::create(clientWrapper, *loaderProxy, context, taskMode, sourceURL, lineNumber); 443 bool sent = loaderProxy->postTaskForModeToWorkerGlobalScope( 444 WorkerThreadableWebSocketChannel::WorkerGlobalScopeDidInitializeTask::create(peer, loaderProxy, clientWrapper), taskMode); 445 if (!sent) { 446 clientWrapper->clearPeer(); 447 delete peer; 448 } 449 } 450 451 void WorkerThreadableWebSocketChannel::Bridge::initialize(const String& sourceURL, unsigned lineNumber) 452 { 453 ASSERT(!m_peer); 454 setMethodNotCompleted(); 455 RefPtr<Bridge> protect(this); 456 m_loaderProxy.postTaskToLoader( 457 createCallbackTask(&Bridge::mainThreadInitialize, AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, m_taskMode, sourceURL, lineNumber)); 458 waitForMethodCompletion(); 459 // m_peer may be null when the nested runloop exited before a peer has created. 460 m_peer = m_workerClientWrapper->peer(); 461 if (!m_peer) 462 m_workerClientWrapper->setFailedWebSocketChannelCreation(); 463 } 464 465 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& protocol) 466 { 467 ASSERT(isMainThread()); 468 ASSERT_UNUSED(context, context->isDocument()); 469 ASSERT(peer); 470 471 peer->connect(url, protocol); 472 } 473 474 void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol) 475 { 476 ASSERT(m_workerClientWrapper); 477 if (!m_peer) 478 return; 479 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol)); 480 } 481 482 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message) 483 { 484 ASSERT(isMainThread()); 485 ASSERT_UNUSED(context, context->isDocument()); 486 ASSERT(peer); 487 488 peer->send(message); 489 } 490 491 void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ScriptExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char> > data) 492 { 493 ASSERT(isMainThread()); 494 ASSERT_UNUSED(context, context->isDocument()); 495 ASSERT(peer); 496 497 RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size()); 498 peer->send(*arrayBuffer); 499 } 500 501 void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& type, long long size) 502 { 503 ASSERT(isMainThread()); 504 ASSERT_UNUSED(context, context->isDocument()); 505 ASSERT(peer); 506 507 RefPtr<Blob> blob = Blob::create(url, type, size); 508 peer->send(*blob); 509 } 510 511 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message) 512 { 513 if (!m_workerClientWrapper || !m_peer) 514 return WebSocketChannel::SendFail; 515 setMethodNotCompleted(); 516 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message)); 517 RefPtr<Bridge> protect(this); 518 waitForMethodCompletion(); 519 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 520 if (!clientWrapper) 521 return WebSocketChannel::SendFail; 522 return clientWrapper->sendRequestResult(); 523 } 524 525 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) 526 { 527 if (!m_workerClientWrapper || !m_peer) 528 return WebSocketChannel::SendFail; 529 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>. 530 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength)); 531 if (binaryData.byteLength()) 532 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength); 533 setMethodNotCompleted(); 534 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release())); 535 RefPtr<Bridge> protect(this); 536 waitForMethodCompletion(); 537 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 538 if (!clientWrapper) 539 return WebSocketChannel::SendFail; 540 return clientWrapper->sendRequestResult(); 541 } 542 543 WebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const Blob& binaryData) 544 { 545 if (!m_workerClientWrapper || !m_peer) 546 return WebSocketChannel::SendFail; 547 setMethodNotCompleted(); 548 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), binaryData.url(), binaryData.type(), binaryData.size())); 549 RefPtr<Bridge> protect(this); 550 waitForMethodCompletion(); 551 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 552 if (!clientWrapper) 553 return WebSocketChannel::SendFail; 554 return clientWrapper->sendRequestResult(); 555 } 556 557 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer) 558 { 559 ASSERT(isMainThread()); 560 ASSERT_UNUSED(context, context->isDocument()); 561 ASSERT(peer); 562 563 peer->bufferedAmount(); 564 } 565 566 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() 567 { 568 if (!m_workerClientWrapper || !m_peer) 569 return 0; 570 setMethodNotCompleted(); 571 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer))); 572 RefPtr<Bridge> protect(this); 573 waitForMethodCompletion(); 574 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 575 if (clientWrapper) 576 return clientWrapper->bufferedAmount(); 577 return 0; 578 } 579 580 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer, int code, const String& reason) 581 { 582 ASSERT(isMainThread()); 583 ASSERT_UNUSED(context, context->isDocument()); 584 ASSERT(peer); 585 586 peer->close(code, reason); 587 } 588 589 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason) 590 { 591 if (!m_peer) 592 return; 593 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason)); 594 } 595 596 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 597 { 598 ASSERT(isMainThread()); 599 ASSERT_UNUSED(context, context->isDocument()); 600 ASSERT(peer); 601 602 peer->fail(reason, level, sourceURL, lineNumber); 603 } 604 605 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 606 { 607 if (!m_peer) 608 return; 609 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason, level, sourceURL, lineNumber)); 610 } 611 612 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, PassOwnPtr<Peer> peer) 613 { 614 ASSERT(isMainThread()); 615 ASSERT_UNUSED(context, context->isDocument()); 616 ASSERT_UNUSED(peer, peer); 617 618 // Peer object will be deleted even if the task does not run in the main thread's cleanup period, because 619 // the destructor for the task object (created by createCallbackTask()) will automatically delete the peer. 620 } 621 622 void WorkerThreadableWebSocketChannel::Bridge::disconnect() 623 { 624 clearClientWrapper(); 625 if (m_peer) { 626 OwnPtr<Peer> peer = adoptPtr(m_peer); 627 m_peer = 0; 628 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release())); 629 } 630 m_workerGlobalScope = 0; 631 } 632 633 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer) 634 { 635 ASSERT(isMainThread()); 636 ASSERT_UNUSED(context, context->isDocument()); 637 ASSERT(peer); 638 639 peer->suspend(); 640 } 641 642 void WorkerThreadableWebSocketChannel::Bridge::suspend() 643 { 644 if (!m_peer) 645 return; 646 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer))); 647 } 648 649 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer) 650 { 651 ASSERT(isMainThread()); 652 ASSERT_UNUSED(context, context->isDocument()); 653 ASSERT(peer); 654 655 peer->resume(); 656 } 657 658 void WorkerThreadableWebSocketChannel::Bridge::resume() 659 { 660 if (!m_peer) 661 return; 662 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer))); 663 } 664 665 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() 666 { 667 m_workerClientWrapper->clearClient(); 668 } 669 670 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted() 671 { 672 ASSERT(m_workerClientWrapper); 673 m_workerClientWrapper->clearSyncMethodDone(); 674 } 675 676 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end, 677 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. 678 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() 679 { 680 if (!m_workerGlobalScope) 681 return; 682 WorkerRunLoop& runLoop = m_workerGlobalScope->thread()->runLoop(); 683 MessageQueueWaitResult result = MessageQueueMessageReceived; 684 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 685 while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) { 686 result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null. 687 clientWrapper = m_workerClientWrapper.get(); 688 } 689 } 690 691 } // namespace WebCore 692