1 /* 2 * Copyright (C) 2009, 2010 Google Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "config.h" 32 33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS) 34 35 #include "WorkerThreadableWebSocketChannel.h" 36 37 #include "CrossThreadTask.h" 38 #include "PlatformString.h" 39 #include "ScriptExecutionContext.h" 40 #include "ThreadableWebSocketChannelClientWrapper.h" 41 #include "WebSocketChannel.h" 42 #include "WebSocketChannelClient.h" 43 #include "WorkerContext.h" 44 #include "WorkerLoaderProxy.h" 45 #include "WorkerRunLoop.h" 46 #include "WorkerThread.h" 47 48 #include <wtf/PassRefPtr.h> 49 50 namespace WebCore { 51 52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol) 53 : m_workerContext(context) 54 , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client)) 55 , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode, url, protocol)) 56 { 57 } 58 59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() 60 { 61 if (m_bridge) 62 m_bridge->disconnect(); 63 } 64 65 void WorkerThreadableWebSocketChannel::connect() 66 { 67 if (m_bridge) 68 m_bridge->connect(); 69 } 70 71 bool WorkerThreadableWebSocketChannel::send(const String& message) 72 { 73 if (!m_bridge) 74 return false; 75 return m_bridge->send(message); 76 } 77 78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const 79 { 80 if (!m_bridge) 81 return 0; 82 return m_bridge->bufferedAmount(); 83 } 84 85 void WorkerThreadableWebSocketChannel::close() 86 { 87 if (m_bridge) 88 m_bridge->close(); 89 } 90 91 void WorkerThreadableWebSocketChannel::disconnect() 92 { 93 m_bridge->disconnect(); 94 m_bridge.clear(); 95 } 96 97 void WorkerThreadableWebSocketChannel::suspend() 98 { 99 m_workerClientWrapper->suspend(); 100 if (m_bridge) 101 m_bridge->suspend(); 102 } 103 104 void WorkerThreadableWebSocketChannel::resume() 105 { 106 m_workerClientWrapper->resume(); 107 if (m_bridge) 108 m_bridge->resume(); 109 } 110 111 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol) 112 : m_workerClientWrapper(clientWrapper) 113 , m_loaderProxy(loaderProxy) 114 , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol)) 115 , m_taskMode(taskMode) 116 { 117 ASSERT(isMainThread()); 118 } 119 120 WorkerThreadableWebSocketChannel::Peer::~Peer() 121 { 122 ASSERT(isMainThread()); 123 if (m_mainWebSocketChannel) 124 m_mainWebSocketChannel->disconnect(); 125 } 126 127 void WorkerThreadableWebSocketChannel::Peer::connect() 128 { 129 ASSERT(isMainThread()); 130 if (!m_mainWebSocketChannel) 131 return; 132 m_mainWebSocketChannel->connect(); 133 } 134 135 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent) 136 { 137 ASSERT_UNUSED(context, context->isWorkerContext()); 138 workerClientWrapper->setSent(sent); 139 } 140 141 void WorkerThreadableWebSocketChannel::Peer::send(const String& message) 142 { 143 ASSERT(isMainThread()); 144 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 145 return; 146 bool sent = m_mainWebSocketChannel->send(message); 147 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode); 148 } 149 150 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) 151 { 152 ASSERT_UNUSED(context, context->isWorkerContext()); 153 workerClientWrapper->setBufferedAmount(bufferedAmount); 154 } 155 156 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount() 157 { 158 ASSERT(isMainThread()); 159 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 160 return; 161 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); 162 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); 163 } 164 165 void WorkerThreadableWebSocketChannel::Peer::close() 166 { 167 ASSERT(isMainThread()); 168 if (!m_mainWebSocketChannel) 169 return; 170 m_mainWebSocketChannel->close(); 171 m_mainWebSocketChannel = 0; 172 } 173 174 void WorkerThreadableWebSocketChannel::Peer::disconnect() 175 { 176 ASSERT(isMainThread()); 177 if (!m_mainWebSocketChannel) 178 return; 179 m_mainWebSocketChannel->disconnect(); 180 m_mainWebSocketChannel = 0; 181 } 182 183 void WorkerThreadableWebSocketChannel::Peer::suspend() 184 { 185 ASSERT(isMainThread()); 186 if (!m_mainWebSocketChannel) 187 return; 188 m_mainWebSocketChannel->suspend(); 189 } 190 191 void WorkerThreadableWebSocketChannel::Peer::resume() 192 { 193 ASSERT(isMainThread()); 194 if (!m_mainWebSocketChannel) 195 return; 196 m_mainWebSocketChannel->resume(); 197 } 198 199 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 200 { 201 ASSERT_UNUSED(context, context->isWorkerContext()); 202 workerClientWrapper->didConnect(); 203 } 204 205 void WorkerThreadableWebSocketChannel::Peer::didConnect() 206 { 207 ASSERT(isMainThread()); 208 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode); 209 } 210 211 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message) 212 { 213 ASSERT_UNUSED(context, context->isWorkerContext()); 214 workerClientWrapper->didReceiveMessage(message); 215 } 216 217 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message) 218 { 219 ASSERT(isMainThread()); 220 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode); 221 } 222 223 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount) 224 { 225 ASSERT_UNUSED(context, context->isWorkerContext()); 226 workerClientWrapper->didClose(unhandledBufferedAmount); 227 } 228 229 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount) 230 { 231 ASSERT(isMainThread()); 232 m_mainWebSocketChannel = 0; 233 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode); 234 } 235 236 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 237 { 238 ASSERT_UNUSED(context, context->isWorkerContext()); 239 thisPtr->m_peer = peer; 240 workerClientWrapper->setSyncMethodDone(); 241 } 242 243 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol) 244 { 245 ASSERT(isMainThread()); 246 ASSERT_UNUSED(context, context->isDocument()); 247 248 Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol); 249 thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode); 250 } 251 252 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol) 253 : m_workerClientWrapper(workerClientWrapper) 254 , m_workerContext(workerContext) 255 , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy()) 256 , m_taskMode(taskMode) 257 , m_peer(0) 258 { 259 ASSERT(m_workerClientWrapper.get()); 260 setMethodNotCompleted(); 261 m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol)); 262 waitForMethodCompletion(); 263 ASSERT(m_peer); 264 } 265 266 WorkerThreadableWebSocketChannel::Bridge::~Bridge() 267 { 268 disconnect(); 269 } 270 271 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer) 272 { 273 ASSERT(isMainThread()); 274 ASSERT_UNUSED(context, context->isDocument()); 275 ASSERT(peer); 276 277 peer->connect(); 278 } 279 280 void WorkerThreadableWebSocketChannel::Bridge::connect() 281 { 282 ASSERT(m_workerClientWrapper); 283 ASSERT(m_peer); 284 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer)); 285 } 286 287 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message) 288 { 289 ASSERT(isMainThread()); 290 ASSERT_UNUSED(context, context->isDocument()); 291 ASSERT(peer); 292 293 peer->send(message); 294 } 295 296 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message) 297 { 298 if (!m_workerClientWrapper) 299 return false; 300 ASSERT(m_peer); 301 setMethodNotCompleted(); 302 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message)); 303 RefPtr<Bridge> protect(this); 304 waitForMethodCompletion(); 305 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 306 return clientWrapper && clientWrapper->sent(); 307 } 308 309 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer) 310 { 311 ASSERT(isMainThread()); 312 ASSERT_UNUSED(context, context->isDocument()); 313 ASSERT(peer); 314 315 peer->bufferedAmount(); 316 } 317 318 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() 319 { 320 if (!m_workerClientWrapper) 321 return 0; 322 ASSERT(m_peer); 323 setMethodNotCompleted(); 324 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer)); 325 RefPtr<Bridge> protect(this); 326 waitForMethodCompletion(); 327 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 328 if (clientWrapper) 329 return clientWrapper->bufferedAmount(); 330 return 0; 331 } 332 333 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer) 334 { 335 ASSERT(isMainThread()); 336 ASSERT_UNUSED(context, context->isDocument()); 337 ASSERT(peer); 338 339 peer->close(); 340 } 341 342 void WorkerThreadableWebSocketChannel::Bridge::close() 343 { 344 ASSERT(m_peer); 345 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer)); 346 } 347 348 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer) 349 { 350 ASSERT(isMainThread()); 351 ASSERT_UNUSED(context, context->isDocument()); 352 ASSERT(peer); 353 354 delete peer; 355 } 356 357 void WorkerThreadableWebSocketChannel::Bridge::disconnect() 358 { 359 clearClientWrapper(); 360 if (m_peer) { 361 Peer* peer = m_peer; 362 m_peer = 0; 363 m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer)); 364 } 365 m_workerContext = 0; 366 } 367 368 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer) 369 { 370 ASSERT(isMainThread()); 371 ASSERT_UNUSED(context, context->isDocument()); 372 ASSERT(peer); 373 374 peer->suspend(); 375 } 376 377 void WorkerThreadableWebSocketChannel::Bridge::suspend() 378 { 379 ASSERT(m_peer); 380 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, m_peer)); 381 } 382 383 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer) 384 { 385 ASSERT(isMainThread()); 386 ASSERT_UNUSED(context, context->isDocument()); 387 ASSERT(peer); 388 389 peer->resume(); 390 } 391 392 void WorkerThreadableWebSocketChannel::Bridge::resume() 393 { 394 ASSERT(m_peer); 395 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, m_peer)); 396 } 397 398 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() 399 { 400 m_workerClientWrapper->clearClient(); 401 } 402 403 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted() 404 { 405 ASSERT(m_workerClientWrapper); 406 m_workerClientWrapper->clearSyncMethodDone(); 407 } 408 409 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end, 410 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. 411 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() 412 { 413 if (!m_workerContext) 414 return; 415 WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop(); 416 MessageQueueWaitResult result = MessageQueueMessageReceived; 417 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 418 while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) { 419 result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null. 420 clientWrapper = m_workerClientWrapper.get(); 421 } 422 } 423 424 } // namespace WebCore 425 426 #endif // ENABLE(WEB_SOCKETS) 427