Home | History | Annotate | Download | only in workers
      1 /*
      2  * Copyright (C) 2008 Apple Inc. All Rights Reserved.
      3  * Copyright (C) 2009 Google Inc. All Rights Reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  *
     14  * THIS SOFTWARE IS PROVIDED BY APPLE COMPUTER, INC. ``AS IS'' AND ANY
     15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE COMPUTER, INC. OR
     18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
     22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     25  *
     26  */
     27 
     28 #include "config.h"
     29 
     30 #if ENABLE(WORKERS)
     31 
     32 #include "WorkerMessagingProxy.h"
     33 
     34 #include "CrossThreadTask.h"
     35 #include "DedicatedWorkerContext.h"
     36 #include "DedicatedWorkerThread.h"
     37 #include "DOMWindow.h"
     38 #include "Document.h"
     39 #include "ErrorEvent.h"
     40 #include "ExceptionCode.h"
     41 #include "MessageEvent.h"
     42 #include "ScriptCallStack.h"
     43 #include "ScriptExecutionContext.h"
     44 #include "Worker.h"
     45 
     46 namespace WebCore {
     47 
     48 class MessageWorkerContextTask : public ScriptExecutionContext::Task {
     49 public:
     50     static PassOwnPtr<MessageWorkerContextTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
     51     {
     52         return new MessageWorkerContextTask(message, channels);
     53     }
     54 
     55 private:
     56     MessageWorkerContextTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
     57         : m_message(message)
     58         , m_channels(channels)
     59     {
     60     }
     61 
     62     virtual void performTask(ScriptExecutionContext* scriptContext)
     63     {
     64         ASSERT(scriptContext->isWorkerContext());
     65         DedicatedWorkerContext* context = static_cast<DedicatedWorkerContext*>(scriptContext);
     66         OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release());
     67         context->dispatchEvent(MessageEvent::create(ports.release(), m_message));
     68         context->thread()->workerObjectProxy().confirmMessageFromWorkerObject(context->hasPendingActivity());
     69     }
     70 
     71 private:
     72     RefPtr<SerializedScriptValue> m_message;
     73     OwnPtr<MessagePortChannelArray> m_channels;
     74 };
     75 
     76 class MessageWorkerTask : public ScriptExecutionContext::Task {
     77 public:
     78     static PassOwnPtr<MessageWorkerTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy)
     79     {
     80         return new MessageWorkerTask(message, channels, messagingProxy);
     81     }
     82 
     83 private:
     84     MessageWorkerTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy)
     85         : m_message(message)
     86         , m_channels(channels)
     87         , m_messagingProxy(messagingProxy)
     88     {
     89     }
     90 
     91     virtual void performTask(ScriptExecutionContext* scriptContext)
     92     {
     93         Worker* workerObject = m_messagingProxy->workerObject();
     94         if (!workerObject || m_messagingProxy->askedToTerminate())
     95             return;
     96 
     97         OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release());
     98         workerObject->dispatchEvent(MessageEvent::create(ports.release(), m_message));
     99     }
    100 
    101 private:
    102     RefPtr<SerializedScriptValue> m_message;
    103     OwnPtr<MessagePortChannelArray> m_channels;
    104     WorkerMessagingProxy* m_messagingProxy;
    105 };
    106 
    107 class WorkerExceptionTask : public ScriptExecutionContext::Task {
    108 public:
    109     static PassOwnPtr<WorkerExceptionTask> create(const String& errorMessage, int lineNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy)
    110     {
    111         return new WorkerExceptionTask(errorMessage, lineNumber, sourceURL, messagingProxy);
    112     }
    113 
    114 private:
    115     WorkerExceptionTask(const String& errorMessage, int lineNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy)
    116         : m_errorMessage(errorMessage.crossThreadString())
    117         , m_lineNumber(lineNumber)
    118         , m_sourceURL(sourceURL.crossThreadString())
    119         , m_messagingProxy(messagingProxy)
    120     {
    121     }
    122 
    123     virtual void performTask(ScriptExecutionContext* context)
    124     {
    125         Worker* workerObject = m_messagingProxy->workerObject();
    126         if (!workerObject)
    127             return;
    128 
    129         // We don't bother checking the askedToTerminate() flag here, because exceptions should *always* be reported even if the thread is terminated.
    130         // This is intentionally different than the behavior in MessageWorkerTask, because terminated workers no longer deliver messages (section 4.6 of the WebWorker spec), but they do report exceptions.
    131 
    132         bool errorHandled = !workerObject->dispatchEvent(ErrorEvent::create(m_errorMessage, m_sourceURL, m_lineNumber));
    133         if (!errorHandled)
    134             context->reportException(m_errorMessage, m_lineNumber, m_sourceURL, 0);
    135     }
    136 
    137     String m_errorMessage;
    138     int m_lineNumber;
    139     String m_sourceURL;
    140     WorkerMessagingProxy* m_messagingProxy;
    141 };
    142 
    143 class WorkerContextDestroyedTask : public ScriptExecutionContext::Task {
    144 public:
    145     static PassOwnPtr<WorkerContextDestroyedTask> create(WorkerMessagingProxy* messagingProxy)
    146     {
    147         return new WorkerContextDestroyedTask(messagingProxy);
    148     }
    149 
    150 private:
    151     WorkerContextDestroyedTask(WorkerMessagingProxy* messagingProxy)
    152         : m_messagingProxy(messagingProxy)
    153     {
    154     }
    155 
    156     virtual void performTask(ScriptExecutionContext*)
    157     {
    158         m_messagingProxy->workerContextDestroyedInternal();
    159     }
    160 
    161     WorkerMessagingProxy* m_messagingProxy;
    162 };
    163 
    164 class WorkerTerminateTask : public ScriptExecutionContext::Task {
    165 public:
    166     static PassOwnPtr<WorkerTerminateTask> create(WorkerMessagingProxy* messagingProxy)
    167     {
    168         return new WorkerTerminateTask(messagingProxy);
    169     }
    170 
    171 private:
    172     WorkerTerminateTask(WorkerMessagingProxy* messagingProxy)
    173         : m_messagingProxy(messagingProxy)
    174     {
    175     }
    176 
    177     virtual void performTask(ScriptExecutionContext*)
    178     {
    179         m_messagingProxy->terminateWorkerContext();
    180     }
    181 
    182     WorkerMessagingProxy* m_messagingProxy;
    183 };
    184 
    185 class WorkerThreadActivityReportTask : public ScriptExecutionContext::Task {
    186 public:
    187     static PassOwnPtr<WorkerThreadActivityReportTask> create(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity)
    188     {
    189         return new WorkerThreadActivityReportTask(messagingProxy, confirmingMessage, hasPendingActivity);
    190     }
    191 
    192 private:
    193     WorkerThreadActivityReportTask(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity)
    194         : m_messagingProxy(messagingProxy)
    195         , m_confirmingMessage(confirmingMessage)
    196         , m_hasPendingActivity(hasPendingActivity)
    197     {
    198     }
    199 
    200     virtual void performTask(ScriptExecutionContext*)
    201     {
    202         m_messagingProxy->reportPendingActivityInternal(m_confirmingMessage, m_hasPendingActivity);
    203     }
    204 
    205     WorkerMessagingProxy* m_messagingProxy;
    206     bool m_confirmingMessage;
    207     bool m_hasPendingActivity;
    208 };
    209 
    210 
    211 #if !PLATFORM(CHROMIUM)
    212 WorkerContextProxy* WorkerContextProxy::create(Worker* worker)
    213 {
    214     return new WorkerMessagingProxy(worker);
    215 }
    216 #endif
    217 
    218 WorkerMessagingProxy::WorkerMessagingProxy(Worker* workerObject)
    219     : m_scriptExecutionContext(workerObject->scriptExecutionContext())
    220     , m_workerObject(workerObject)
    221     , m_unconfirmedMessageCount(0)
    222     , m_workerThreadHadPendingActivity(false)
    223     , m_askedToTerminate(false)
    224 {
    225     ASSERT(m_workerObject);
    226     ASSERT((m_scriptExecutionContext->isDocument() && isMainThread())
    227            || (m_scriptExecutionContext->isWorkerContext() && currentThread() == static_cast<WorkerContext*>(m_scriptExecutionContext.get())->thread()->threadID()));
    228 }
    229 
    230 WorkerMessagingProxy::~WorkerMessagingProxy()
    231 {
    232     ASSERT(!m_workerObject);
    233     ASSERT((m_scriptExecutionContext->isDocument() && isMainThread())
    234            || (m_scriptExecutionContext->isWorkerContext() && currentThread() == static_cast<WorkerContext*>(m_scriptExecutionContext.get())->thread()->threadID()));
    235 }
    236 
    237 void WorkerMessagingProxy::startWorkerContext(const KURL& scriptURL, const String& userAgent, const String& sourceCode)
    238 {
    239     RefPtr<DedicatedWorkerThread> thread = DedicatedWorkerThread::create(scriptURL, userAgent, sourceCode, *this, *this);
    240     workerThreadCreated(thread);
    241     thread->start();
    242 }
    243 
    244 void WorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
    245 {
    246     m_scriptExecutionContext->postTask(MessageWorkerTask::create(message, channels, this));
    247 }
    248 
    249 void WorkerMessagingProxy::postMessageToWorkerContext(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
    250 {
    251     if (m_askedToTerminate)
    252         return;
    253 
    254     if (m_workerThread) {
    255         ++m_unconfirmedMessageCount;
    256         m_workerThread->runLoop().postTask(MessageWorkerContextTask::create(message, channels));
    257     } else
    258         m_queuedEarlyTasks.append(MessageWorkerContextTask::create(message, channels));
    259 }
    260 
    261 void WorkerMessagingProxy::postTaskForModeToWorkerContext(PassOwnPtr<ScriptExecutionContext::Task> task, const String& mode)
    262 {
    263     if (m_askedToTerminate)
    264         return;
    265 
    266     ASSERT(m_workerThread);
    267     m_workerThread->runLoop().postTaskForMode(task, mode);
    268 }
    269 
    270 void WorkerMessagingProxy::postTaskToLoader(PassOwnPtr<ScriptExecutionContext::Task> task)
    271 {
    272     // FIXME: In case of nested workers, this should go directly to the root Document context.
    273     ASSERT(m_scriptExecutionContext->isDocument());
    274     m_scriptExecutionContext->postTask(task);
    275 }
    276 
    277 void WorkerMessagingProxy::postExceptionToWorkerObject(const String& errorMessage, int lineNumber, const String& sourceURL)
    278 {
    279     m_scriptExecutionContext->postTask(WorkerExceptionTask::create(errorMessage, lineNumber, sourceURL, this));
    280 }
    281 
    282 static void postConsoleMessageTask(ScriptExecutionContext* context, WorkerMessagingProxy* messagingProxy, MessageSource source, MessageType type, MessageLevel level, const String& message, unsigned lineNumber, const String& sourceURL)
    283 {
    284     if (messagingProxy->askedToTerminate())
    285         return;
    286     context->addMessage(source, type, level, message, lineNumber, sourceURL, 0);
    287 }
    288 
    289 void WorkerMessagingProxy::postConsoleMessageToWorkerObject(MessageSource source, MessageType type, MessageLevel level, const String& message, int lineNumber, const String& sourceURL)
    290 {
    291     m_scriptExecutionContext->postTask(createCallbackTask(&postConsoleMessageTask, this, source, type, level, message, lineNumber, sourceURL));
    292 }
    293 
    294 void WorkerMessagingProxy::workerThreadCreated(PassRefPtr<DedicatedWorkerThread> workerThread)
    295 {
    296     m_workerThread = workerThread;
    297 
    298     if (m_askedToTerminate) {
    299         // Worker.terminate() could be called from JS before the thread was created.
    300         m_workerThread->stop();
    301     } else {
    302         unsigned taskCount = m_queuedEarlyTasks.size();
    303         ASSERT(!m_unconfirmedMessageCount);
    304         m_unconfirmedMessageCount = taskCount;
    305         m_workerThreadHadPendingActivity = true; // Worker initialization means a pending activity.
    306 
    307         for (unsigned i = 0; i < taskCount; ++i)
    308             m_workerThread->runLoop().postTask(m_queuedEarlyTasks[i].release());
    309         m_queuedEarlyTasks.clear();
    310     }
    311 }
    312 
    313 void WorkerMessagingProxy::workerObjectDestroyed()
    314 {
    315     m_workerObject = 0;
    316     if (m_workerThread)
    317         terminateWorkerContext();
    318     else
    319         workerContextDestroyedInternal();
    320 }
    321 
    322 void WorkerMessagingProxy::workerContextDestroyed()
    323 {
    324     m_scriptExecutionContext->postTask(WorkerContextDestroyedTask::create(this));
    325     // Will execute workerContextDestroyedInternal() on context's thread.
    326 }
    327 
    328 void WorkerMessagingProxy::workerContextClosed()
    329 {
    330     // Executes terminateWorkerContext() on parent context's thread.
    331     m_scriptExecutionContext->postTask(WorkerTerminateTask::create(this));
    332 }
    333 
    334 void WorkerMessagingProxy::workerContextDestroyedInternal()
    335 {
    336     // WorkerContextDestroyedTask is always the last to be performed, so the proxy is not needed for communication
    337     // in either side any more. However, the Worker object may still exist, and it assumes that the proxy exists, too.
    338     m_askedToTerminate = true;
    339     m_workerThread = 0;
    340     if (!m_workerObject)
    341         delete this;
    342 }
    343 
    344 void WorkerMessagingProxy::terminateWorkerContext()
    345 {
    346     if (m_askedToTerminate)
    347         return;
    348     m_askedToTerminate = true;
    349 
    350     if (m_workerThread)
    351         m_workerThread->stop();
    352 }
    353 
    354 void WorkerMessagingProxy::confirmMessageFromWorkerObject(bool hasPendingActivity)
    355 {
    356     m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, true, hasPendingActivity));
    357     // Will execute reportPendingActivityInternal() on context's thread.
    358 }
    359 
    360 void WorkerMessagingProxy::reportPendingActivity(bool hasPendingActivity)
    361 {
    362     m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, false, hasPendingActivity));
    363     // Will execute reportPendingActivityInternal() on context's thread.
    364 }
    365 
    366 void WorkerMessagingProxy::reportPendingActivityInternal(bool confirmingMessage, bool hasPendingActivity)
    367 {
    368     if (confirmingMessage && !m_askedToTerminate) {
    369         ASSERT(m_unconfirmedMessageCount);
    370         --m_unconfirmedMessageCount;
    371     }
    372 
    373     m_workerThreadHadPendingActivity = hasPendingActivity;
    374 }
    375 
    376 bool WorkerMessagingProxy::hasPendingActivity() const
    377 {
    378     return (m_unconfirmedMessageCount || m_workerThreadHadPendingActivity) && !m_askedToTerminate;
    379 }
    380 
    381 } // namespace WebCore
    382 
    383 #endif // ENABLE(WORKERS)
    384