Home | History | Annotate | Download | only in workers
      1 /*
      2  * Copyright (C) 2008 Apple Inc. All Rights Reserved.
      3  * Copyright (C) 2009 Google Inc. All Rights Reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions
      7  * are met:
      8  * 1. Redistributions of source code must retain the above copyright
      9  *    notice, this list of conditions and the following disclaimer.
     10  * 2. Redistributions in binary form must reproduce the above copyright
     11  *    notice, this list of conditions and the following disclaimer in the
     12  *    documentation and/or other materials provided with the distribution.
     13  *
     14  * THIS SOFTWARE IS PROVIDED BY APPLE COMPUTER, INC. ``AS IS'' AND ANY
     15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE COMPUTER, INC. OR
     18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
     22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     25  *
     26  */
     27 
     28 #include "config.h"
     29 
     30 #if ENABLE(WORKERS)
     31 
     32 #include "WorkerMessagingProxy.h"
     33 
     34 #include "DedicatedWorkerContext.h"
     35 #include "DedicatedWorkerThread.h"
     36 #include "DOMWindow.h"
     37 #include "Document.h"
     38 #include "ErrorEvent.h"
     39 #include "ExceptionCode.h"
     40 #include "GenericWorkerTask.h"
     41 #include "MessageEvent.h"
     42 #include "ScriptExecutionContext.h"
     43 #include "Worker.h"
     44 
     45 namespace WebCore {
     46 
     47 class MessageWorkerContextTask : public ScriptExecutionContext::Task {
     48 public:
     49     static PassOwnPtr<MessageWorkerContextTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
     50     {
     51         return new MessageWorkerContextTask(message, channels);
     52     }
     53 
     54 private:
     55     MessageWorkerContextTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
     56         : m_message(message->release())
     57         , m_channels(channels)
     58     {
     59     }
     60 
     61     virtual void performTask(ScriptExecutionContext* scriptContext)
     62     {
     63         ASSERT(scriptContext->isWorkerContext());
     64         DedicatedWorkerContext* context = static_cast<DedicatedWorkerContext*>(scriptContext);
     65         OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release());
     66         context->dispatchEvent(MessageEvent::create(ports.release(), m_message));
     67         context->thread()->workerObjectProxy().confirmMessageFromWorkerObject(context->hasPendingActivity());
     68     }
     69 
     70 private:
     71     RefPtr<SerializedScriptValue> m_message;
     72     OwnPtr<MessagePortChannelArray> m_channels;
     73 };
     74 
     75 class MessageWorkerTask : public ScriptExecutionContext::Task {
     76 public:
     77     static PassOwnPtr<MessageWorkerTask> create(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy)
     78     {
     79         return new MessageWorkerTask(message, channels, messagingProxy);
     80     }
     81 
     82 private:
     83     MessageWorkerTask(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels, WorkerMessagingProxy* messagingProxy)
     84         : m_message(message->release())
     85         , m_channels(channels)
     86         , m_messagingProxy(messagingProxy)
     87     {
     88     }
     89 
     90     virtual void performTask(ScriptExecutionContext* scriptContext)
     91     {
     92         Worker* workerObject = m_messagingProxy->workerObject();
     93         if (!workerObject || m_messagingProxy->askedToTerminate())
     94             return;
     95 
     96         OwnPtr<MessagePortArray> ports = MessagePort::entanglePorts(*scriptContext, m_channels.release());
     97         workerObject->dispatchEvent(MessageEvent::create(ports.release(), m_message));
     98     }
     99 
    100 private:
    101     RefPtr<SerializedScriptValue> m_message;
    102     OwnPtr<MessagePortChannelArray> m_channels;
    103     WorkerMessagingProxy* m_messagingProxy;
    104 };
    105 
    106 class WorkerExceptionTask : public ScriptExecutionContext::Task {
    107 public:
    108     static PassOwnPtr<WorkerExceptionTask> create(const String& errorMessage, int lineNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy)
    109     {
    110         return new WorkerExceptionTask(errorMessage, lineNumber, sourceURL, messagingProxy);
    111     }
    112 
    113 private:
    114     WorkerExceptionTask(const String& errorMessage, int lineNumber, const String& sourceURL, WorkerMessagingProxy* messagingProxy)
    115         : m_errorMessage(errorMessage.crossThreadString())
    116         , m_lineNumber(lineNumber)
    117         , m_sourceURL(sourceURL.crossThreadString())
    118         , m_messagingProxy(messagingProxy)
    119     {
    120     }
    121 
    122     virtual void performTask(ScriptExecutionContext* context)
    123     {
    124         Worker* workerObject = m_messagingProxy->workerObject();
    125         if (!workerObject)
    126             return;
    127 
    128         // We don't bother checking the askedToTerminate() flag here, because exceptions should *always* be reported even if the thread is terminated.
    129         // This is intentionally different than the behavior in MessageWorkerTask, because terminated workers no longer deliver messages (section 4.6 of the WebWorker spec), but they do report exceptions.
    130 
    131         bool errorHandled = !workerObject->dispatchEvent(ErrorEvent::create(m_errorMessage, m_sourceURL, m_lineNumber));
    132         if (!errorHandled)
    133             context->reportException(m_errorMessage, m_lineNumber, m_sourceURL);
    134     }
    135 
    136     String m_errorMessage;
    137     int m_lineNumber;
    138     String m_sourceURL;
    139     WorkerMessagingProxy* m_messagingProxy;
    140 };
    141 
    142 class WorkerContextDestroyedTask : public ScriptExecutionContext::Task {
    143 public:
    144     static PassOwnPtr<WorkerContextDestroyedTask> create(WorkerMessagingProxy* messagingProxy)
    145     {
    146         return new WorkerContextDestroyedTask(messagingProxy);
    147     }
    148 
    149 private:
    150     WorkerContextDestroyedTask(WorkerMessagingProxy* messagingProxy)
    151         : m_messagingProxy(messagingProxy)
    152     {
    153     }
    154 
    155     virtual void performTask(ScriptExecutionContext*)
    156     {
    157         m_messagingProxy->workerContextDestroyedInternal();
    158     }
    159 
    160     WorkerMessagingProxy* m_messagingProxy;
    161 };
    162 
    163 class WorkerTerminateTask : public ScriptExecutionContext::Task {
    164 public:
    165     static PassOwnPtr<WorkerTerminateTask> create(WorkerMessagingProxy* messagingProxy)
    166     {
    167         return new WorkerTerminateTask(messagingProxy);
    168     }
    169 
    170 private:
    171     WorkerTerminateTask(WorkerMessagingProxy* messagingProxy)
    172         : m_messagingProxy(messagingProxy)
    173     {
    174     }
    175 
    176     virtual void performTask(ScriptExecutionContext*)
    177     {
    178         m_messagingProxy->terminateWorkerContext();
    179     }
    180 
    181     WorkerMessagingProxy* m_messagingProxy;
    182 };
    183 
    184 class WorkerThreadActivityReportTask : public ScriptExecutionContext::Task {
    185 public:
    186     static PassOwnPtr<WorkerThreadActivityReportTask> create(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity)
    187     {
    188         return new WorkerThreadActivityReportTask(messagingProxy, confirmingMessage, hasPendingActivity);
    189     }
    190 
    191 private:
    192     WorkerThreadActivityReportTask(WorkerMessagingProxy* messagingProxy, bool confirmingMessage, bool hasPendingActivity)
    193         : m_messagingProxy(messagingProxy)
    194         , m_confirmingMessage(confirmingMessage)
    195         , m_hasPendingActivity(hasPendingActivity)
    196     {
    197     }
    198 
    199     virtual void performTask(ScriptExecutionContext*)
    200     {
    201         m_messagingProxy->reportPendingActivityInternal(m_confirmingMessage, m_hasPendingActivity);
    202     }
    203 
    204     WorkerMessagingProxy* m_messagingProxy;
    205     bool m_confirmingMessage;
    206     bool m_hasPendingActivity;
    207 };
    208 
    209 
    210 #if !PLATFORM(CHROMIUM)
    211 WorkerContextProxy* WorkerContextProxy::create(Worker* worker)
    212 {
    213     return new WorkerMessagingProxy(worker);
    214 }
    215 #endif
    216 
    217 WorkerMessagingProxy::WorkerMessagingProxy(Worker* workerObject)
    218     : m_scriptExecutionContext(workerObject->scriptExecutionContext())
    219     , m_workerObject(workerObject)
    220     , m_unconfirmedMessageCount(0)
    221     , m_workerThreadHadPendingActivity(false)
    222     , m_askedToTerminate(false)
    223 {
    224     ASSERT(m_workerObject);
    225     ASSERT((m_scriptExecutionContext->isDocument() && isMainThread())
    226            || (m_scriptExecutionContext->isWorkerContext() && currentThread() == static_cast<WorkerContext*>(m_scriptExecutionContext.get())->thread()->threadID()));
    227 }
    228 
    229 WorkerMessagingProxy::~WorkerMessagingProxy()
    230 {
    231     ASSERT(!m_workerObject);
    232     ASSERT((m_scriptExecutionContext->isDocument() && isMainThread())
    233            || (m_scriptExecutionContext->isWorkerContext() && currentThread() == static_cast<WorkerContext*>(m_scriptExecutionContext.get())->thread()->threadID()));
    234 }
    235 
    236 void WorkerMessagingProxy::startWorkerContext(const KURL& scriptURL, const String& userAgent, const String& sourceCode)
    237 {
    238     RefPtr<DedicatedWorkerThread> thread = DedicatedWorkerThread::create(scriptURL, userAgent, sourceCode, *this, *this);
    239     workerThreadCreated(thread);
    240     thread->start();
    241 }
    242 
    243 void WorkerMessagingProxy::postMessageToWorkerObject(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
    244 {
    245     m_scriptExecutionContext->postTask(MessageWorkerTask::create(message, channels.release(), this));
    246 }
    247 
    248 void WorkerMessagingProxy::postMessageToWorkerContext(PassRefPtr<SerializedScriptValue> message, PassOwnPtr<MessagePortChannelArray> channels)
    249 {
    250     if (m_askedToTerminate)
    251         return;
    252 
    253     if (m_workerThread) {
    254         ++m_unconfirmedMessageCount;
    255         m_workerThread->runLoop().postTask(MessageWorkerContextTask::create(message, channels.release()));
    256     } else
    257         m_queuedEarlyTasks.append(MessageWorkerContextTask::create(message, channels.release()));
    258 }
    259 
    260 void WorkerMessagingProxy::postTaskForModeToWorkerContext(PassOwnPtr<ScriptExecutionContext::Task> task, const String& mode)
    261 {
    262     if (m_askedToTerminate)
    263         return;
    264 
    265     ASSERT(m_workerThread);
    266     m_workerThread->runLoop().postTaskForMode(task, mode);
    267 }
    268 
    269 void WorkerMessagingProxy::postTaskToLoader(PassOwnPtr<ScriptExecutionContext::Task> task)
    270 {
    271     // FIXME: In case of nested workers, this should go directly to the root Document context.
    272     ASSERT(m_scriptExecutionContext->isDocument());
    273     m_scriptExecutionContext->postTask(task);
    274 }
    275 
    276 void WorkerMessagingProxy::postExceptionToWorkerObject(const String& errorMessage, int lineNumber, const String& sourceURL)
    277 {
    278     m_scriptExecutionContext->postTask(WorkerExceptionTask::create(errorMessage, lineNumber, sourceURL, this));
    279 }
    280 
    281 static void postConsoleMessageTask(ScriptExecutionContext* context, WorkerMessagingProxy* messagingProxy, MessageDestination destination, MessageSource source, MessageType type, MessageLevel level, const String& message, unsigned lineNumber, const String& sourceURL)
    282 {
    283     if (messagingProxy->askedToTerminate())
    284         return;
    285     context->addMessage(destination, source, type, level, message, lineNumber, sourceURL);
    286 }
    287 
    288 void WorkerMessagingProxy::postConsoleMessageToWorkerObject(MessageDestination destination, MessageSource source, MessageType type, MessageLevel level, const String& message, int lineNumber, const String& sourceURL)
    289 {
    290     m_scriptExecutionContext->postTask(createCallbackTask(&postConsoleMessageTask, this, destination, source, type, level, message, lineNumber, sourceURL));
    291 }
    292 
    293 void WorkerMessagingProxy::workerThreadCreated(PassRefPtr<DedicatedWorkerThread> workerThread)
    294 {
    295     m_workerThread = workerThread;
    296 
    297     if (m_askedToTerminate) {
    298         // Worker.terminate() could be called from JS before the thread was created.
    299         m_workerThread->stop();
    300     } else {
    301         unsigned taskCount = m_queuedEarlyTasks.size();
    302         ASSERT(!m_unconfirmedMessageCount);
    303         m_unconfirmedMessageCount = taskCount;
    304         m_workerThreadHadPendingActivity = true; // Worker initialization means a pending activity.
    305 
    306         for (unsigned i = 0; i < taskCount; ++i)
    307             m_workerThread->runLoop().postTask(m_queuedEarlyTasks[i].release());
    308         m_queuedEarlyTasks.clear();
    309     }
    310 }
    311 
    312 void WorkerMessagingProxy::workerObjectDestroyed()
    313 {
    314     m_workerObject = 0;
    315     if (m_workerThread)
    316         terminateWorkerContext();
    317     else
    318         workerContextDestroyedInternal();
    319 }
    320 
    321 void WorkerMessagingProxy::workerContextDestroyed()
    322 {
    323     m_scriptExecutionContext->postTask(WorkerContextDestroyedTask::create(this));
    324     // Will execute workerContextDestroyedInternal() on context's thread.
    325 }
    326 
    327 void WorkerMessagingProxy::workerContextClosed()
    328 {
    329     // Executes terminateWorkerContext() on parent context's thread.
    330     m_scriptExecutionContext->postTask(WorkerTerminateTask::create(this));
    331 }
    332 
    333 void WorkerMessagingProxy::workerContextDestroyedInternal()
    334 {
    335     // WorkerContextDestroyedTask is always the last to be performed, so the proxy is not needed for communication
    336     // in either side any more. However, the Worker object may still exist, and it assumes that the proxy exists, too.
    337     m_askedToTerminate = true;
    338     m_workerThread = 0;
    339     if (!m_workerObject)
    340         delete this;
    341 }
    342 
    343 void WorkerMessagingProxy::terminateWorkerContext()
    344 {
    345     if (m_askedToTerminate)
    346         return;
    347     m_askedToTerminate = true;
    348 
    349     if (m_workerThread)
    350         m_workerThread->stop();
    351 }
    352 
    353 void WorkerMessagingProxy::confirmMessageFromWorkerObject(bool hasPendingActivity)
    354 {
    355     m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, true, hasPendingActivity));
    356     // Will execute reportPendingActivityInternal() on context's thread.
    357 }
    358 
    359 void WorkerMessagingProxy::reportPendingActivity(bool hasPendingActivity)
    360 {
    361     m_scriptExecutionContext->postTask(WorkerThreadActivityReportTask::create(this, false, hasPendingActivity));
    362     // Will execute reportPendingActivityInternal() on context's thread.
    363 }
    364 
    365 void WorkerMessagingProxy::reportPendingActivityInternal(bool confirmingMessage, bool hasPendingActivity)
    366 {
    367     if (confirmingMessage && !m_askedToTerminate) {
    368         ASSERT(m_unconfirmedMessageCount);
    369         --m_unconfirmedMessageCount;
    370     }
    371 
    372     m_workerThreadHadPendingActivity = hasPendingActivity;
    373 }
    374 
    375 bool WorkerMessagingProxy::hasPendingActivity() const
    376 {
    377     return (m_unconfirmedMessageCount || m_workerThreadHadPendingActivity) && !m_askedToTerminate;
    378 }
    379 
    380 } // namespace WebCore
    381 
    382 #endif // ENABLE(WORKERS)
    383