1 /* 2 * Copyright (C) 2008 Apple Inc. All Rights Reserved. 3 * Copyright (C) 2009 Google Inc. All Rights Reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY APPLE COMPUTER, INC. ``AS IS'' AND ANY 15 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 17 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE COMPUTER, INC. OR 18 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 19 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 20 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 21 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 22 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 24 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 * 26 */ 27 28 #include "config.h" 29 30 #if ENABLE(WORKERS) 31 32 #include "WorkerMessagingProxy.h" 33 34 #include "CrossThreadTask.h" 35 #include "DedicatedWorkerContext.h" 36 #include "DedicatedWorkerThread.h" 37 #include "DOMWindow.h" 38 #include "Document.h" 39 #include "ErrorEvent.h" 40 #include "ExceptionCode.h" 41 #include "MessageEvent.h" 42 #include "ScriptCallStack.h" 43 #include "ScriptExecutionContext.h" 44 #include "Worker.h" 45 46 namespace WebCore { 47 48 class MessageWorkerContextTask : public ScriptExecutionContext::Task { 49 public: 50 static PassOwnPtr<MessageWorkerContextTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels) 51 { 52 return new MessageWorkerContextTask(message, channels); 53 } 54 55 private: 56 MessageWorkerContextTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels) 57 : m_message(message) 58 , m_channels(channels) 59 { 60 } 61 62 virtual void performTask(ScriptExecutionContext* scriptContext) 63 { 64 ASSERT(scriptContext->isWorkerContext()); 65 DedicatedWorkerContext* context = static_cast<DedicatedWorkerContext*>(scriptContext); 66 OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release()); 67 context->dispatchEvent(MessageEvent::create(ports.release(), m_message)); 68 context->thread()->workerObjectProxy().confirmMessageFromWorkerObject(context->hasPendingActivity()); 69 } 70 71 private: 72 RefPtr<SerializedScriptValue> m_message; 73 OwnPtr<MessagePortChannelArray> m_channels; 74 }; 75 76 class MessageWorkerTask : public ScriptExecutionContext::Task { 77 public: 78 static PassOwnPtr<MessageWorkerTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy) 79 { 80 return new MessageWorkerTask(message, channels, messagingProxy); 81 } 82 83 private: 84 MessageWorkerTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy) 85 : m_message(message) 86 , m_channels(channels) 87 , m_messagingProxy(messagingProxy) 88 { 89 } 90 91 virtual void performTask(ScriptExecutionContext* scriptContext) 92 { 93 Worker* workerObject = m_messagingProxy->workerObject(); 94 if (!workerObject || m_messagingProxy->askedToTerminate()) 95 return; 96 97 OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release()); 98 workerObject->dispatchEvent(MessageEvent::create(ports.release(), m_message)); 99 } 100 101 private: 102 RefPtr<SerializedScriptValue> m_message; 103 OwnPtr<MessagePortChannelArray> m_channels; 104 WorkerMessagingProxy* m_messagingProxy; 105 }; 106 107 class WorkerExceptionTask : public ScriptExecutionContext::Task { 108 public: 109 static PassOwnPtr<WorkerExceptionTask> create(const String& errorMessage, int lineNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy) 110 { 111 return new WorkerExceptionTask(errorMessage, lineNumber, sourceURL, messagingProxy); 112 } 113 114 private: 115 WorkerExceptionTask(const String& errorMessage, int lineNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy) 116 : m_errorMessage(errorMessage.crossThreadString()) 117 , m_lineNumber(lineNumber) 118 , m_sourceURL(sourceURL.crossThreadString()) 119 , m_messagingProxy(messagingProxy) 120 { 121 } 122 123 virtual void performTask(ScriptExecutionContext* context) 124 { 125 Worker* workerObject = m_messagingProxy->workerObject(); 126 if (!workerObject) 127 return; 128 129 // We don't bother checking the askedToTerminate() flag here, because exceptions should *always* be reported even if the thread is terminated. 130 // This is intentionally different than the behavior in MessageWorkerTask, because terminated workers no longer deliver messages (section 4.6 of the WebWorker spec), but they do report exceptions. 131 132 bool errorHandled = !workerObject->dispatchEvent(ErrorEvent::create(m_errorMessage, m_sourceURL, m_lineNumber)); 133 if (!errorHandled) 134 context->reportException(m_errorMessage, m_lineNumber, m_sourceURL, 0); 135 } 136 137 String m_errorMessage; 138 int m_lineNumber; 139 String m_sourceURL; 140 WorkerMessagingProxy* m_messagingProxy; 141 }; 142 143 class WorkerContextDestroyedTask : public ScriptExecutionContext::Task { 144 public: 145 static PassOwnPtr<WorkerContextDestroyedTask> create(WorkerMessagingProxy* messagingProxy) 146 { 147 return new WorkerContextDestroyedTask(messagingProxy); 148 } 149 150 private: 151 WorkerContextDestroyedTask(WorkerMessagingProxy* messagingProxy) 152 : m_messagingProxy(messagingProxy) 153 { 154 } 155 156 virtual void performTask(ScriptExecutionContext*) 157 { 158 m_messagingProxy->workerContextDestroyedInternal(); 159 } 160 161 WorkerMessagingProxy* m_messagingProxy; 162 }; 163 164 class WorkerTerminateTask : public ScriptExecutionContext::Task { 165 public: 166 static PassOwnPtr<WorkerTerminateTask> create(WorkerMessagingProxy* messagingProxy) 167 { 168 return new WorkerTerminateTask(messagingProxy); 169 } 170 171 private: 172 WorkerTerminateTask(WorkerMessagingProxy* messagingProxy) 173 : m_messagingProxy(messagingProxy) 174 { 175 } 176 177 virtual void performTask(ScriptExecutionContext*) 178 { 179 m_messagingProxy->terminateWorkerContext(); 180 } 181 182 WorkerMessagingProxy* m_messagingProxy; 183 }; 184 185 class WorkerThreadActivityReportTask : public ScriptExecutionContext::Task { 186 public: 187 static PassOwnPtr<WorkerThreadActivityReportTask> create(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity) 188 { 189 return new WorkerThreadActivityReportTask(messagingProxy, confirmingMessage, hasPendingActivity); 190 } 191 192 private: 193 WorkerThreadActivityReportTask(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity) 194 : m_messagingProxy(messagingProxy) 195 , m_confirmingMessage(confirmingMessage) 196 , m_hasPendingActivity(hasPendingActivity) 197 { 198 } 199 200 virtual void performTask(ScriptExecutionContext*) 201 { 202 m_messagingProxy->reportPendingActivityInternal(m_confirmingMessage, m_hasPendingActivity); 203 } 204 205 WorkerMessagingProxy* m_messagingProxy; 206 bool m_confirmingMessage; 207 bool m_hasPendingActivity; 208 }; 209 210 211 #if !PLATFORM(CHROMIUM) 212 WorkerContextProxy* WorkerContextProxy::create(Worker* worker) 213 { 214 return new WorkerMessagingProxy(worker); 215 } 216 #endif 217 218 WorkerMessagingProxy::WorkerMessagingProxy(Worker* workerObject) 219 : m_scriptExecutionContext(workerObject->scriptExecutionContext()) 220 , m_workerObject(workerObject) 221 , m_unconfirmedMessageCount(0) 222 , m_workerThreadHadPendingActivity(false) 223 , m_askedToTerminate(false) 224 { 225 ASSERT(m_workerObject); 226 ASSERT((m_scriptExecutionContext->isDocument() && isMainThread()) 227 || (m_scriptExecutionContext->isWorkerContext() && currentThread() == static_cast<WorkerContext*>(m_scriptExecutionContext.get())->thread()->threadID())); 228 } 229 230 WorkerMessagingProxy::~WorkerMessagingProxy() 231 { 232 ASSERT(!m_workerObject); 233 ASSERT((m_scriptExecutionContext->isDocument() && isMainThread()) 234 || (m_scriptExecutionContext->isWorkerContext() && currentThread() == static_cast<WorkerContext*>(m_scriptExecutionContext.get())->thread()->threadID())); 235 } 236 237 void WorkerMessagingProxy::startWorkerContext(const KURL& scriptURL, const String& userAgent, const String& sourceCode) 238 { 239 RefPtr<DedicatedWorkerThread> thread = DedicatedWorkerThread::create(scriptURL, userAgent, sourceCode, *this, *this); 240 workerThreadCreated(thread); 241 thread->start(); 242 } 243 244 void WorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels) 245 { 246 m_scriptExecutionContext->postTask(MessageWorkerTask::create(message, channels, this)); 247 } 248 249 void WorkerMessagingProxy::postMessageToWorkerContext(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels) 250 { 251 if (m_askedToTerminate) 252 return; 253 254 if (m_workerThread) { 255 ++m_unconfirmedMessageCount; 256 m_workerThread->runLoop().postTask(MessageWorkerContextTask::create(message, channels)); 257 } else 258 m_queuedEarlyTasks.append(MessageWorkerContextTask::create(message, channels)); 259 } 260 261 void WorkerMessagingProxy::postTaskForModeToWorkerContext(PassOwnPtr<ScriptExecutionContext::Task> task, const String& mode) 262 { 263 if (m_askedToTerminate) 264 return; 265 266 ASSERT(m_workerThread); 267 m_workerThread->runLoop().postTaskForMode(task, mode); 268 } 269 270 void WorkerMessagingProxy::postTaskToLoader(PassOwnPtr<ScriptExecutionContext::Task> task) 271 { 272 // FIXME: In case of nested workers, this should go directly to the root Document context. 273 ASSERT(m_scriptExecutionContext->isDocument()); 274 m_scriptExecutionContext->postTask(task); 275 } 276 277 void WorkerMessagingProxy::postExceptionToWorkerObject(const String& errorMessage, int lineNumber, const String& sourceURL) 278 { 279 m_scriptExecutionContext->postTask(WorkerExceptionTask::create(errorMessage, lineNumber, sourceURL, this)); 280 } 281 282 static void postConsoleMessageTask(ScriptExecutionContext* context, WorkerMessagingProxy* messagingProxy, MessageSource source, MessageType type, MessageLevel level, const String& message, unsigned lineNumber, const String& sourceURL) 283 { 284 if (messagingProxy->askedToTerminate()) 285 return; 286 context->addMessage(source, type, level, message, lineNumber, sourceURL, 0); 287 } 288 289 void WorkerMessagingProxy::postConsoleMessageToWorkerObject(MessageSource source, MessageType type, MessageLevel level, const String& message, int lineNumber, const String& sourceURL) 290 { 291 m_scriptExecutionContext->postTask(createCallbackTask(&postConsoleMessageTask, this, source, type, level, message, lineNumber, sourceURL)); 292 } 293 294 void WorkerMessagingProxy::workerThreadCreated(PassRefPtr<DedicatedWorkerThread> workerThread) 295 { 296 m_workerThread = workerThread; 297 298 if (m_askedToTerminate) { 299 // Worker.terminate() could be called from JS before the thread was created. 300 m_workerThread->stop(); 301 } else { 302 unsigned taskCount = m_queuedEarlyTasks.size(); 303 ASSERT(!m_unconfirmedMessageCount); 304 m_unconfirmedMessageCount = taskCount; 305 m_workerThreadHadPendingActivity = true; // Worker initialization means a pending activity. 306 307 for (unsigned i = 0; i < taskCount; ++i) 308 m_workerThread->runLoop().postTask(m_queuedEarlyTasks[i].release()); 309 m_queuedEarlyTasks.clear(); 310 } 311 } 312 313 void WorkerMessagingProxy::workerObjectDestroyed() 314 { 315 m_workerObject = 0; 316 if (m_workerThread) 317 terminateWorkerContext(); 318 else 319 workerContextDestroyedInternal(); 320 } 321 322 void WorkerMessagingProxy::workerContextDestroyed() 323 { 324 m_scriptExecutionContext->postTask(WorkerContextDestroyedTask::create(this)); 325 // Will execute workerContextDestroyedInternal() on context's thread. 326 } 327 328 void WorkerMessagingProxy::workerContextClosed() 329 { 330 // Executes terminateWorkerContext() on parent context's thread. 331 m_scriptExecutionContext->postTask(WorkerTerminateTask::create(this)); 332 } 333 334 void WorkerMessagingProxy::workerContextDestroyedInternal() 335 { 336 // WorkerContextDestroyedTask is always the last to be performed, so the proxy is not needed for communication 337 // in either side any more. However, the Worker object may still exist, and it assumes that the proxy exists, too. 338 m_askedToTerminate = true; 339 m_workerThread = 0; 340 if (!m_workerObject) 341 delete this; 342 } 343 344 void WorkerMessagingProxy::terminateWorkerContext() 345 { 346 if (m_askedToTerminate) 347 return; 348 m_askedToTerminate = true; 349 350 if (m_workerThread) 351 m_workerThread->stop(); 352 } 353 354 void WorkerMessagingProxy::confirmMessageFromWorkerObject(bool hasPendingActivity) 355 { 356 m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, true, hasPendingActivity)); 357 // Will execute reportPendingActivityInternal() on context's thread. 358 } 359 360 void WorkerMessagingProxy::reportPendingActivity(bool hasPendingActivity) 361 { 362 m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, false, hasPendingActivity)); 363 // Will execute reportPendingActivityInternal() on context's thread. 364 } 365 366 void WorkerMessagingProxy::reportPendingActivityInternal(bool confirmingMessage, bool hasPendingActivity) 367 { 368 if (confirmingMessage && !m_askedToTerminate) { 369 ASSERT(m_unconfirmedMessageCount); 370 --m_unconfirmedMessageCount; 371 } 372 373 m_workerThreadHadPendingActivity = hasPendingActivity; 374 } 375 376 bool WorkerMessagingProxy::hasPendingActivity() const 377 { 378 return (m_unconfirmedMessageCount || m_workerThreadHadPendingActivity) && !m_askedToTerminate; 379 } 380 381 } // namespace WebCore 382 383 #endif // ENABLE(WORKERS) 384