Home | History | Annotate | Download | only in workers
      1 /*
      2  * Copyright (C) 2008 Apple Inc. All Rights Reserved.
      3  * Copyright (C) 2009 Google Inc. All Rights Reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  *
     14  * THIS SOFTWARE IS PROVIDED BY APPLE COMPUTER, INC. ``AS IS'' AND ANY
     15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE COMPUTER, INC. OR
     18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
     22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     25  *
     26  */
     27 
     28 #include "config.h"
     29 
     30 #include "core/workers/WorkerMessagingProxy.h"
     31 
     32 #include "core/dom/CrossThreadTask.h"
     33 #include "core/dom/Document.h"
     34 #include "core/dom/ErrorEvent.h"
     35 #include "core/dom/MessageEvent.h"
     36 #include "core/dom/ScriptExecutionContext.h"
     37 #include "core/inspector/InspectorInstrumentation.h"
     38 #include "core/inspector/ScriptCallStack.h"
     39 #include "core/inspector/WorkerDebuggerAgent.h"
     40 #include "core/inspector/WorkerInspectorController.h"
     41 #include "core/loader/DocumentLoadTiming.h"
     42 #include "core/loader/DocumentLoader.h"
     43 #include "core/page/ContentSecurityPolicy.h"
     44 #include "core/page/DOMWindow.h"
     45 #include "core/page/PageGroup.h"
     46 #include "core/platform/NotImplemented.h"
     47 #include "core/workers/DedicatedWorkerGlobalScope.h"
     48 #include "core/workers/DedicatedWorkerThread.h"
     49 #include "core/workers/Worker.h"
     50 #include "core/workers/WorkerClients.h"
     51 #include "core/workers/WorkerThreadStartupData.h"
     52 #include <wtf/MainThread.h>
     53 
     54 namespace WebCore {
     55 
     56 class MessageWorkerGlobalScopeTask : public ScriptExecutionContext::Task {
     57 public:
     58     static PassOwnPtr<MessageWorkerGlobalScopeTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
     59     {
     60         return adoptPtr(new MessageWorkerGlobalScopeTask(message, channels));
     61     }
     62 
     63 private:
     64     MessageWorkerGlobalScopeTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
     65         : m_message(message)
     66         , m_channels(channels)
     67     {
     68     }
     69 
     70     virtual void performTask(ScriptExecutionContext* scriptContext)
     71     {
     72         ASSERT_WITH_SECURITY_IMPLICATION(scriptContext->isWorkerGlobalScope());
     73         DedicatedWorkerGlobalScope* context = static_cast<DedicatedWorkerGlobalScope*>(scriptContext);
     74         OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release());
     75         context->dispatchEvent(MessageEvent::create(ports.release(), m_message));
     76         context->thread()->workerObjectProxy().confirmMessageFromWorkerObject(context->hasPendingActivity());
     77     }
     78 
     79 private:
     80     RefPtr<SerializedScriptValue> m_message;
     81     OwnPtr<MessagePortChannelArray> m_channels;
     82 };
     83 
     84 class MessageWorkerTask : public ScriptExecutionContext::Task {
     85 public:
     86     static PassOwnPtr<MessageWorkerTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy)
     87     {
     88         return adoptPtr(new MessageWorkerTask(message, channels, messagingProxy));
     89     }
     90 
     91 private:
     92     MessageWorkerTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy)
     93         : m_message(message)
     94         , m_channels(channels)
     95         , m_messagingProxy(messagingProxy)
     96     {
     97     }
     98 
     99     virtual void performTask(ScriptExecutionContext* scriptContext)
    100     {
    101         Worker* workerObject = m_messagingProxy->workerObject();
    102         if (!workerObject || m_messagingProxy->askedToTerminate())
    103             return;
    104 
    105         OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release());
    106         workerObject->dispatchEvent(MessageEvent::create(ports.release(), m_message));
    107     }
    108 
    109 private:
    110     RefPtr<SerializedScriptValue> m_message;
    111     OwnPtr<MessagePortChannelArray> m_channels;
    112     WorkerMessagingProxy* m_messagingProxy;
    113 };
    114 
    115 class WorkerExceptionTask : public ScriptExecutionContext::Task {
    116 public:
    117     static PassOwnPtr<WorkerExceptionTask> create(const String& errorMessage, int lineNumber, int columnNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy)
    118     {
    119         return adoptPtr(new WorkerExceptionTask(errorMessage, lineNumber, columnNumber, sourceURL, messagingProxy));
    120     }
    121 
    122 private:
    123     WorkerExceptionTask(const String& errorMessage, int lineNumber, int columnNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy)
    124         : m_errorMessage(errorMessage.isolatedCopy())
    125         , m_lineNumber(lineNumber)
    126         , m_columnNumber(columnNumber)
    127         , m_sourceURL(sourceURL.isolatedCopy())
    128         , m_messagingProxy(messagingProxy)
    129     {
    130     }
    131 
    132     virtual void performTask(ScriptExecutionContext* context)
    133     {
    134         Worker* workerObject = m_messagingProxy->workerObject();
    135         if (!workerObject)
    136             return;
    137 
    138         // We don't bother checking the askedToTerminate() flag here, because exceptions should *always* be reported even if the thread is terminated.
    139         // This is intentionally different than the behavior in MessageWorkerTask, because terminated workers no longer deliver messages (section 4.6 of the WebWorker spec), but they do report exceptions.
    140 
    141         RefPtr<ErrorEvent> event = ErrorEvent::create(m_errorMessage, m_sourceURL, m_lineNumber, m_columnNumber);
    142         bool errorHandled = !workerObject->dispatchEvent(event);
    143         if (!errorHandled)
    144             context->reportException(event, 0, NotSharableCrossOrigin);
    145     }
    146 
    147     String m_errorMessage;
    148     int m_lineNumber;
    149     int m_columnNumber;
    150     String m_sourceURL;
    151     WorkerMessagingProxy* m_messagingProxy;
    152 };
    153 
    154 class WorkerGlobalScopeDestroyedTask : public ScriptExecutionContext::Task {
    155 public:
    156     static PassOwnPtr<WorkerGlobalScopeDestroyedTask> create(WorkerMessagingProxy* messagingProxy)
    157     {
    158         return adoptPtr(new WorkerGlobalScopeDestroyedTask(messagingProxy));
    159     }
    160 
    161 private:
    162     WorkerGlobalScopeDestroyedTask(WorkerMessagingProxy* messagingProxy)
    163         : m_messagingProxy(messagingProxy)
    164     {
    165     }
    166 
    167     virtual void performTask(ScriptExecutionContext*)
    168     {
    169         m_messagingProxy->workerGlobalScopeDestroyedInternal();
    170     }
    171 
    172     WorkerMessagingProxy* m_messagingProxy;
    173 };
    174 
    175 class WorkerTerminateTask : public ScriptExecutionContext::Task {
    176 public:
    177     static PassOwnPtr<WorkerTerminateTask> create(WorkerMessagingProxy* messagingProxy)
    178     {
    179         return adoptPtr(new WorkerTerminateTask(messagingProxy));
    180     }
    181 
    182 private:
    183     WorkerTerminateTask(WorkerMessagingProxy* messagingProxy)
    184         : m_messagingProxy(messagingProxy)
    185     {
    186     }
    187 
    188     virtual void performTask(ScriptExecutionContext*)
    189     {
    190         m_messagingProxy->terminateWorkerGlobalScope();
    191     }
    192 
    193     WorkerMessagingProxy* m_messagingProxy;
    194 };
    195 
    196 class WorkerThreadActivityReportTask : public ScriptExecutionContext::Task {
    197 public:
    198     static PassOwnPtr<WorkerThreadActivityReportTask> create(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity)
    199     {
    200         return adoptPtr(new WorkerThreadActivityReportTask(messagingProxy, confirmingMessage, hasPendingActivity));
    201     }
    202 
    203 private:
    204     WorkerThreadActivityReportTask(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity)
    205         : m_messagingProxy(messagingProxy)
    206         , m_confirmingMessage(confirmingMessage)
    207         , m_hasPendingActivity(hasPendingActivity)
    208     {
    209     }
    210 
    211     virtual void performTask(ScriptExecutionContext*)
    212     {
    213         m_messagingProxy->reportPendingActivityInternal(m_confirmingMessage, m_hasPendingActivity);
    214     }
    215 
    216     WorkerMessagingProxy* m_messagingProxy;
    217     bool m_confirmingMessage;
    218     bool m_hasPendingActivity;
    219 };
    220 
    221 class PostMessageToPageInspectorTask : public ScriptExecutionContext::Task {
    222 public:
    223     static PassOwnPtr<PostMessageToPageInspectorTask> create(WorkerMessagingProxy* messagingProxy, const String& message)
    224     {
    225         return adoptPtr(new PostMessageToPageInspectorTask(messagingProxy, message));
    226     }
    227 
    228 private:
    229     PostMessageToPageInspectorTask(WorkerMessagingProxy* messagingProxy, const String& message)
    230         : m_messagingProxy(messagingProxy)
    231         , m_message(message.isolatedCopy())
    232     {
    233     }
    234 
    235     virtual void performTask(ScriptExecutionContext*)
    236     {
    237         if (WorkerGlobalScopeProxy::PageInspector* pageInspector = m_messagingProxy->m_pageInspector)
    238             pageInspector->dispatchMessageFromWorker(m_message);
    239     }
    240 
    241     WorkerMessagingProxy* m_messagingProxy;
    242     String m_message;
    243 };
    244 
    245 WorkerMessagingProxy::WorkerMessagingProxy(Worker* workerObject, PassOwnPtr<WorkerClients> workerClients)
    246     : m_scriptExecutionContext(workerObject->scriptExecutionContext())
    247     , m_workerObject(workerObject)
    248     , m_mayBeDestroyed(false)
    249     , m_unconfirmedMessageCount(0)
    250     , m_workerThreadHadPendingActivity(false)
    251     , m_askedToTerminate(false)
    252     , m_pageInspector(0)
    253     , m_workerClients(workerClients)
    254 {
    255     ASSERT(m_workerObject);
    256     ASSERT((m_scriptExecutionContext->isDocument() && isMainThread())
    257         || (m_scriptExecutionContext->isWorkerGlobalScope() && toWorkerGlobalScope(m_scriptExecutionContext.get())->thread()->isCurrentThread()));
    258 }
    259 
    260 WorkerMessagingProxy::~WorkerMessagingProxy()
    261 {
    262     ASSERT(!m_workerObject);
    263     ASSERT((m_scriptExecutionContext->isDocument() && isMainThread())
    264         || (m_scriptExecutionContext->isWorkerGlobalScope() && toWorkerGlobalScope(m_scriptExecutionContext.get())->thread()->isCurrentThread()));
    265 }
    266 
    267 void WorkerMessagingProxy::startWorkerGlobalScope(const KURL& scriptURL, const String& userAgent, const String& sourceCode, WorkerThreadStartMode startMode)
    268 {
    269     // FIXME: This need to be revisited when we support nested worker one day
    270     ASSERT(m_scriptExecutionContext->isDocument());
    271     Document* document = toDocument(m_scriptExecutionContext.get());
    272 
    273     OwnPtr<WorkerThreadStartupData> startupData = WorkerThreadStartupData::create(scriptURL, userAgent, sourceCode, startMode, document->contentSecurityPolicy()->deprecatedHeader(), document->contentSecurityPolicy()->deprecatedHeaderType(), m_workerClients.release());
    274     double originTime = document->loader() ? document->loader()->timing()->referenceMonotonicTime() : monotonicallyIncreasingTime();
    275 
    276     RefPtr<DedicatedWorkerThread> thread = DedicatedWorkerThread::create(*this, *this, originTime, startupData.release());
    277     workerThreadCreated(thread);
    278     thread->start();
    279     InspectorInstrumentation::didStartWorkerGlobalScope(m_scriptExecutionContext.get(), this, scriptURL);
    280 }
    281 
    282 void WorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
    283 {
    284     m_scriptExecutionContext->postTask(MessageWorkerTask::create(message, channels, this));
    285 }
    286 
    287 void WorkerMessagingProxy::postMessageToWorkerGlobalScope(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
    288 {
    289     if (m_askedToTerminate)
    290         return;
    291 
    292     if (m_workerThread) {
    293         ++m_unconfirmedMessageCount;
    294         m_workerThread->runLoop().postTask(MessageWorkerGlobalScopeTask::create(message, channels));
    295     } else
    296         m_queuedEarlyTasks.append(MessageWorkerGlobalScopeTask::create(message, channels));
    297 }
    298 
    299 bool WorkerMessagingProxy::postTaskForModeToWorkerGlobalScope(PassOwnPtr<ScriptExecutionContext::Task> task, const String& mode)
    300 {
    301     if (m_askedToTerminate)
    302         return false;
    303 
    304     ASSERT(m_workerThread);
    305     m_workerThread->runLoop().postTaskForMode(task, mode);
    306     return true;
    307 }
    308 
    309 void WorkerMessagingProxy::postTaskToLoader(PassOwnPtr<ScriptExecutionContext::Task> task)
    310 {
    311     // FIXME: In case of nested workers, this should go directly to the root Document context.
    312     ASSERT(m_scriptExecutionContext->isDocument());
    313     m_scriptExecutionContext->postTask(task);
    314 }
    315 
    316 void WorkerMessagingProxy::postExceptionToWorkerObject(const String& errorMessage, int lineNumber, int columnNumber, const String& sourceURL)
    317 {
    318     m_scriptExecutionContext->postTask(WorkerExceptionTask::create(errorMessage, lineNumber, columnNumber, sourceURL, this));
    319 }
    320 
    321 static void postConsoleMessageTask(ScriptExecutionContext* context, WorkerMessagingProxy* messagingProxy, MessageSource source, MessageLevel level, const String& message, unsigned lineNumber, const String& sourceURL)
    322 {
    323     if (messagingProxy->askedToTerminate())
    324         return;
    325     context->addConsoleMessage(source, level, message, sourceURL, lineNumber);
    326 }
    327 
    328 void WorkerMessagingProxy::postConsoleMessageToWorkerObject(MessageSource source, MessageLevel level, const String& message, int lineNumber, const String& sourceURL)
    329 {
    330     m_scriptExecutionContext->postTask(createCallbackTask(&postConsoleMessageTask, AllowCrossThreadAccess(this), source, level, message, lineNumber, sourceURL));
    331 }
    332 
    333 void WorkerMessagingProxy::workerThreadCreated(PassRefPtr<DedicatedWorkerThread> workerThread)
    334 {
    335     m_workerThread = workerThread;
    336 
    337     if (m_askedToTerminate) {
    338         // Worker.terminate() could be called from JS before the thread was created.
    339         m_workerThread->stop();
    340     } else {
    341         unsigned taskCount = m_queuedEarlyTasks.size();
    342         ASSERT(!m_unconfirmedMessageCount);
    343         m_unconfirmedMessageCount = taskCount;
    344         m_workerThreadHadPendingActivity = true; // Worker initialization means a pending activity.
    345 
    346         for (unsigned i = 0; i < taskCount; ++i)
    347             m_workerThread->runLoop().postTask(m_queuedEarlyTasks[i].release());
    348         m_queuedEarlyTasks.clear();
    349     }
    350 }
    351 
    352 void WorkerMessagingProxy::workerObjectDestroyed()
    353 {
    354     m_workerObject = 0;
    355     m_scriptExecutionContext->postTask(createCallbackTask(&workerObjectDestroyedInternal, AllowCrossThreadAccess(this)));
    356 }
    357 
    358 void WorkerMessagingProxy::workerObjectDestroyedInternal(ScriptExecutionContext*, WorkerMessagingProxy* proxy)
    359 {
    360     proxy->m_mayBeDestroyed = true;
    361     if (proxy->m_workerThread)
    362         proxy->terminateWorkerGlobalScope();
    363     else
    364         proxy->workerGlobalScopeDestroyedInternal();
    365 }
    366 
    367 static void connectToWorkerGlobalScopeInspectorTask(ScriptExecutionContext* context, bool)
    368 {
    369     toWorkerGlobalScope(context)->workerInspectorController()->connectFrontend();
    370 }
    371 
    372 void WorkerMessagingProxy::connectToInspector(WorkerGlobalScopeProxy::PageInspector* pageInspector)
    373 {
    374     if (m_askedToTerminate)
    375         return;
    376     ASSERT(!m_pageInspector);
    377     m_pageInspector = pageInspector;
    378     m_workerThread->runLoop().postTaskForMode(createCallbackTask(connectToWorkerGlobalScopeInspectorTask, true), WorkerDebuggerAgent::debuggerTaskMode);
    379 }
    380 
    381 static void disconnectFromWorkerGlobalScopeInspectorTask(ScriptExecutionContext* context, bool)
    382 {
    383     toWorkerGlobalScope(context)->workerInspectorController()->disconnectFrontend();
    384 }
    385 
    386 void WorkerMessagingProxy::disconnectFromInspector()
    387 {
    388     m_pageInspector = 0;
    389     if (m_askedToTerminate)
    390         return;
    391     m_workerThread->runLoop().postTaskForMode(createCallbackTask(disconnectFromWorkerGlobalScopeInspectorTask, true), WorkerDebuggerAgent::debuggerTaskMode);
    392 }
    393 
    394 static void dispatchOnInspectorBackendTask(ScriptExecutionContext* context, const String& message)
    395 {
    396     toWorkerGlobalScope(context)->workerInspectorController()->dispatchMessageFromFrontend(message);
    397 }
    398 
    399 void WorkerMessagingProxy::sendMessageToInspector(const String& message)
    400 {
    401     if (m_askedToTerminate)
    402         return;
    403     m_workerThread->runLoop().postTaskForMode(createCallbackTask(dispatchOnInspectorBackendTask, String(message)), WorkerDebuggerAgent::debuggerTaskMode);
    404     WorkerDebuggerAgent::interruptAndDispatchInspectorCommands(m_workerThread.get());
    405 }
    406 
    407 void WorkerMessagingProxy::workerGlobalScopeDestroyed()
    408 {
    409     m_scriptExecutionContext->postTask(WorkerGlobalScopeDestroyedTask::create(this));
    410     // Will execute workerGlobalScopeDestroyedInternal() on context's thread.
    411 }
    412 
    413 void WorkerMessagingProxy::workerGlobalScopeClosed()
    414 {
    415     // Executes terminateWorkerGlobalScope() on parent context's thread.
    416     m_scriptExecutionContext->postTask(WorkerTerminateTask::create(this));
    417 }
    418 
    419 void WorkerMessagingProxy::workerGlobalScopeDestroyedInternal()
    420 {
    421     // WorkerGlobalScopeDestroyedTask is always the last to be performed, so the proxy is not needed for communication
    422     // in either side any more. However, the Worker object may still exist, and it assumes that the proxy exists, too.
    423     m_askedToTerminate = true;
    424     m_workerThread = 0;
    425 
    426     InspectorInstrumentation::workerGlobalScopeTerminated(m_scriptExecutionContext.get(), this);
    427 
    428     if (m_mayBeDestroyed)
    429         delete this;
    430 }
    431 
    432 void WorkerMessagingProxy::terminateWorkerGlobalScope()
    433 {
    434     if (m_askedToTerminate)
    435         return;
    436     m_askedToTerminate = true;
    437 
    438     if (m_workerThread)
    439         m_workerThread->stop();
    440 
    441     InspectorInstrumentation::workerGlobalScopeTerminated(m_scriptExecutionContext.get(), this);
    442 }
    443 
    444 void WorkerMessagingProxy::postMessageToPageInspector(const String& message)
    445 {
    446     m_scriptExecutionContext->postTask(PostMessageToPageInspectorTask::create(this, message));
    447 }
    448 
    449 void WorkerMessagingProxy::updateInspectorStateCookie(const String&)
    450 {
    451     notImplemented();
    452 }
    453 
    454 void WorkerMessagingProxy::confirmMessageFromWorkerObject(bool hasPendingActivity)
    455 {
    456     m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, true, hasPendingActivity));
    457     // Will execute reportPendingActivityInternal() on context's thread.
    458 }
    459 
    460 void WorkerMessagingProxy::reportPendingActivity(bool hasPendingActivity)
    461 {
    462     m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, false, hasPendingActivity));
    463     // Will execute reportPendingActivityInternal() on context's thread.
    464 }
    465 
    466 void WorkerMessagingProxy::reportPendingActivityInternal(bool confirmingMessage, bool hasPendingActivity)
    467 {
    468     if (confirmingMessage && !m_askedToTerminate) {
    469         ASSERT(m_unconfirmedMessageCount);
    470         --m_unconfirmedMessageCount;
    471     }
    472 
    473     m_workerThreadHadPendingActivity = hasPendingActivity;
    474 }
    475 
    476 bool WorkerMessagingProxy::hasPendingActivity() const
    477 {
    478     return (m_unconfirmedMessageCount || m_workerThreadHadPendingActivity) && !m_askedToTerminate;
    479 }
    480 
    481 } // namespace WebCore
    482