1 /* 2 * Copyright (C) 2009, 2010 Google Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "config.h" 32 33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS) 34 35 #include "WorkerThreadableWebSocketChannel.h" 36 37 #include "GenericWorkerTask.h" 38 #include "PlatformString.h" 39 #include "ScriptExecutionContext.h" 40 #include "ThreadableWebSocketChannelClientWrapper.h" 41 #include "WebSocketChannel.h" 42 #include "WebSocketChannelClient.h" 43 #include "WorkerContext.h" 44 #include "WorkerLoaderProxy.h" 45 #include "WorkerRunLoop.h" 46 #include "WorkerThread.h" 47 48 #include <wtf/PassRefPtr.h> 49 50 namespace WebCore { 51 52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol) 53 : m_workerContext(context) 54 , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client)) 55 , m_bridge(new Bridge(m_workerClientWrapper, m_workerContext, taskMode, url, protocol)) 56 { 57 } 58 59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() 60 { 61 if (m_bridge) 62 m_bridge->disconnect(); 63 } 64 65 void WorkerThreadableWebSocketChannel::connect() 66 { 67 if (m_bridge) 68 m_bridge->connect(); 69 } 70 71 bool WorkerThreadableWebSocketChannel::send(const String& message) 72 { 73 if (!m_bridge) 74 return false; 75 return m_bridge->send(message); 76 } 77 78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const 79 { 80 if (!m_bridge) 81 return 0; 82 return m_bridge->bufferedAmount(); 83 } 84 85 void WorkerThreadableWebSocketChannel::close() 86 { 87 if (m_bridge) 88 m_bridge->close(); 89 } 90 91 void WorkerThreadableWebSocketChannel::disconnect() 92 { 93 m_bridge->disconnect(); 94 m_bridge.clear(); 95 } 96 97 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol) 98 : m_workerClientWrapper(clientWrapper) 99 , m_loaderProxy(loaderProxy) 100 , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol)) 101 , m_taskMode(taskMode) 102 { 103 ASSERT(isMainThread()); 104 } 105 106 WorkerThreadableWebSocketChannel::Peer::~Peer() 107 { 108 ASSERT(isMainThread()); 109 if (m_mainWebSocketChannel) 110 m_mainWebSocketChannel->disconnect(); 111 } 112 113 void WorkerThreadableWebSocketChannel::Peer::connect() 114 { 115 ASSERT(isMainThread()); 116 if (!m_mainWebSocketChannel) 117 return; 118 m_mainWebSocketChannel->connect(); 119 } 120 121 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent) 122 { 123 ASSERT_UNUSED(context, context->isWorkerContext()); 124 workerClientWrapper->setSent(sent); 125 } 126 127 void WorkerThreadableWebSocketChannel::Peer::send(const String& message) 128 { 129 ASSERT(isMainThread()); 130 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 131 return; 132 bool sent = m_mainWebSocketChannel->send(message); 133 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode); 134 } 135 136 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) 137 { 138 ASSERT_UNUSED(context, context->isWorkerContext()); 139 workerClientWrapper->setBufferedAmount(bufferedAmount); 140 } 141 142 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount() 143 { 144 ASSERT(isMainThread()); 145 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 146 return; 147 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); 148 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); 149 } 150 151 void WorkerThreadableWebSocketChannel::Peer::close() 152 { 153 ASSERT(isMainThread()); 154 if (!m_mainWebSocketChannel) 155 return; 156 m_mainWebSocketChannel->close(); 157 m_mainWebSocketChannel = 0; 158 } 159 160 void WorkerThreadableWebSocketChannel::Peer::disconnect() 161 { 162 ASSERT(isMainThread()); 163 if (!m_mainWebSocketChannel) 164 return; 165 m_mainWebSocketChannel->disconnect(); 166 m_mainWebSocketChannel = 0; 167 } 168 169 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 170 { 171 ASSERT_UNUSED(context, context->isWorkerContext()); 172 workerClientWrapper->didConnect(); 173 } 174 175 void WorkerThreadableWebSocketChannel::Peer::didConnect() 176 { 177 ASSERT(isMainThread()); 178 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode); 179 } 180 181 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message) 182 { 183 ASSERT_UNUSED(context, context->isWorkerContext()); 184 workerClientWrapper->didReceiveMessage(message); 185 } 186 187 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message) 188 { 189 ASSERT(isMainThread()); 190 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode); 191 } 192 193 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount) 194 { 195 ASSERT_UNUSED(context, context->isWorkerContext()); 196 workerClientWrapper->didClose(unhandledBufferedAmount); 197 } 198 199 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount) 200 { 201 ASSERT(isMainThread()); 202 m_mainWebSocketChannel = 0; 203 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode); 204 } 205 206 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 207 { 208 ASSERT_UNUSED(context, context->isWorkerContext()); 209 thisPtr->m_peer = peer; 210 workerClientWrapper->setSyncMethodDone(); 211 } 212 213 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol) 214 { 215 ASSERT(isMainThread()); 216 ASSERT_UNUSED(context, context->isDocument()); 217 218 Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol); 219 thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode); 220 } 221 222 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol) 223 : m_workerClientWrapper(workerClientWrapper) 224 , m_workerContext(workerContext) 225 , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy()) 226 , m_taskMode(taskMode) 227 , m_peer(0) 228 { 229 ASSERT(m_workerClientWrapper.get()); 230 setMethodNotCompleted(); 231 m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol)); 232 waitForMethodCompletion(); 233 ASSERT(m_peer); 234 } 235 236 WorkerThreadableWebSocketChannel::Bridge::~Bridge() 237 { 238 disconnect(); 239 } 240 241 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer) 242 { 243 ASSERT(isMainThread()); 244 ASSERT_UNUSED(context, context->isDocument()); 245 ASSERT(peer); 246 247 peer->connect(); 248 } 249 250 void WorkerThreadableWebSocketChannel::Bridge::connect() 251 { 252 ASSERT(m_workerClientWrapper); 253 ASSERT(m_peer); 254 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer)); 255 } 256 257 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message) 258 { 259 ASSERT(isMainThread()); 260 ASSERT_UNUSED(context, context->isDocument()); 261 ASSERT(peer); 262 263 peer->send(message); 264 } 265 266 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message) 267 { 268 if (!m_workerClientWrapper) 269 return false; 270 ASSERT(m_peer); 271 setMethodNotCompleted(); 272 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message)); 273 waitForMethodCompletion(); 274 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 275 return clientWrapper && clientWrapper->sent(); 276 } 277 278 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer) 279 { 280 ASSERT(isMainThread()); 281 ASSERT_UNUSED(context, context->isDocument()); 282 ASSERT(peer); 283 284 peer->bufferedAmount(); 285 } 286 287 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() 288 { 289 if (!m_workerClientWrapper) 290 return 0; 291 ASSERT(m_peer); 292 setMethodNotCompleted(); 293 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer)); 294 waitForMethodCompletion(); 295 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 296 if (clientWrapper) 297 return clientWrapper->bufferedAmount(); 298 return 0; 299 } 300 301 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer) 302 { 303 ASSERT(isMainThread()); 304 ASSERT_UNUSED(context, context->isDocument()); 305 ASSERT(peer); 306 307 peer->close(); 308 } 309 310 void WorkerThreadableWebSocketChannel::Bridge::close() 311 { 312 ASSERT(m_peer); 313 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer)); 314 } 315 316 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer) 317 { 318 ASSERT(isMainThread()); 319 ASSERT_UNUSED(context, context->isDocument()); 320 ASSERT(peer); 321 322 delete peer; 323 } 324 325 void WorkerThreadableWebSocketChannel::Bridge::disconnect() 326 { 327 clearClientWrapper(); 328 if (m_peer) { 329 Peer* peer = m_peer; 330 m_peer = 0; 331 m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer)); 332 } 333 m_workerContext = 0; 334 } 335 336 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() 337 { 338 m_workerClientWrapper->clearClient(); 339 } 340 341 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted() 342 { 343 ASSERT(m_workerClientWrapper); 344 m_workerClientWrapper->clearSyncMethodDone(); 345 } 346 347 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() 348 { 349 if (!m_workerContext) 350 return; 351 WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop(); 352 MessageQueueWaitResult result = MessageQueueMessageReceived; 353 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 354 while (clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) { 355 result = runLoop.runInMode(m_workerContext.get(), m_taskMode); 356 clientWrapper = m_workerClientWrapper.get(); 357 } 358 } 359 360 } // namespace WebCore 361 362 #endif // ENABLE(WEB_SOCKETS) 363