Home | History | Annotate | Download | only in gtk
      1 /*
      2  * Copyright (C) 2011 Igalia S.L.
      3  * Copyright (C) 2010 Apple Inc. All rights reserved.
      4  * Portions Copyright (c) 2010 Motorola Mobility, Inc.  All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
     16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
     17  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     18  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
     19  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
     25  * THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include "config.h"
     29 #include "WorkQueue.h"
     30 
     31 #include "WKBase.h"
     32 #include <WebCore/NotImplemented.h>
     33 #include <gio/gio.h>
     34 #include <glib.h>
     35 #include <wtf/gobject/GRefPtr.h>
     36 
     37 // WorkQueue::EventSource
     38 class WorkQueue::EventSource {
     39 public:
     40     EventSource(PassOwnPtr<WorkItem> workItem, WorkQueue* workQueue, GCancellable* cancellable)
     41         : m_workItem(workItem)
     42         , m_workQueue(workQueue)
     43         , m_cancellable(cancellable)
     44     {
     45     }
     46 
     47     void cancel()
     48     {
     49         if (!m_cancellable)
     50             return;
     51         g_cancellable_cancel(m_cancellable);
     52     }
     53 
     54     static void executeEventSource(EventSource* eventSource)
     55     {
     56         ASSERT(eventSource);
     57         WorkQueue* queue = eventSource->m_workQueue;
     58         {
     59             MutexLocker locker(queue->m_isValidMutex);
     60             if (!queue->m_isValid)
     61                 return;
     62         }
     63 
     64         eventSource->m_workItem->execute();
     65     }
     66 
     67     static gboolean performWorkOnce(EventSource* eventSource)
     68     {
     69         executeEventSource(eventSource);
     70         return FALSE;
     71     }
     72 
     73     static gboolean performWork(GSocket* socket, GIOCondition condition, EventSource* eventSource)
     74     {
     75         if (!(condition & G_IO_IN) && !(condition & G_IO_HUP) && !(condition & G_IO_ERR)) {
     76             // EventSource has been cancelled, return FALSE to destroy the source.
     77             return FALSE;
     78         }
     79 
     80         executeEventSource(eventSource);
     81         return TRUE;
     82     }
     83 
     84     static gboolean performWorkOnTermination(GPid, gint, EventSource* eventSource)
     85     {
     86         executeEventSource(eventSource);
     87         return FALSE;
     88     }
     89 
     90     static void deleteEventSource(EventSource* eventSource)
     91     {
     92         ASSERT(eventSource);
     93         delete eventSource;
     94     }
     95 
     96 public:
     97     PassOwnPtr<WorkItem> m_workItem;
     98     WorkQueue* m_workQueue;
     99     GCancellable* m_cancellable;
    100 };
    101 
    102 // WorkQueue
    103 void WorkQueue::platformInitialize(const char* name)
    104 {
    105     m_eventContext = g_main_context_new();
    106     ASSERT(m_eventContext);
    107     m_eventLoop = g_main_loop_new(m_eventContext, FALSE);
    108     ASSERT(m_eventLoop);
    109     m_workQueueThread = createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::startWorkQueueThread), this, name);
    110 }
    111 
    112 void WorkQueue::platformInvalidate()
    113 {
    114     MutexLocker locker(m_eventLoopLock);
    115 
    116     if (m_eventLoop) {
    117         if (g_main_loop_is_running(m_eventLoop))
    118             g_main_loop_quit(m_eventLoop);
    119 
    120         g_main_loop_unref(m_eventLoop);
    121         m_eventLoop = 0;
    122     }
    123 
    124     if (m_eventContext) {
    125         g_main_context_unref(m_eventContext);
    126         m_eventContext = 0;
    127     }
    128 }
    129 
    130 void* WorkQueue::startWorkQueueThread(WorkQueue* workQueue)
    131 {
    132     workQueue->workQueueThreadBody();
    133     return 0;
    134 }
    135 
    136 void WorkQueue::workQueueThreadBody()
    137 {
    138     g_main_loop_run(m_eventLoop);
    139 }
    140 
    141 void WorkQueue::registerEventSourceHandler(int fileDescriptor, int condition, PassOwnPtr<WorkItem> item)
    142 {
    143     GRefPtr<GSocket> socket = adoptGRef(g_socket_new_from_fd(fileDescriptor, 0));
    144     ASSERT(socket);
    145     GRefPtr<GCancellable> cancellable = adoptGRef(g_cancellable_new());
    146     GRefPtr<GSource> dispatchSource = adoptGRef(g_socket_create_source(socket.get(), static_cast<GIOCondition>(condition), cancellable.get()));
    147     ASSERT(dispatchSource);
    148     EventSource* eventSource = new EventSource(item, this, cancellable.get());
    149     ASSERT(eventSource);
    150 
    151     g_source_set_callback(dispatchSource.get(), reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWork),
    152         eventSource, reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
    153 
    154     // Set up the event sources under the mutex since this is shared across multiple threads.
    155     {
    156         MutexLocker locker(m_eventSourcesLock);
    157         Vector<EventSource*> sources;
    158         EventSourceIterator it = m_eventSources.find(fileDescriptor);
    159         if (it != m_eventSources.end())
    160             sources = it->second;
    161 
    162         sources.append(eventSource);
    163         m_eventSources.set(fileDescriptor, sources);
    164     }
    165 
    166     g_source_attach(dispatchSource.get(), m_eventContext);
    167 }
    168 
    169 void WorkQueue::unregisterEventSourceHandler(int fileDescriptor)
    170 {
    171     ASSERT(fileDescriptor);
    172 
    173     MutexLocker locker(m_eventSourcesLock);
    174 
    175     EventSourceIterator it = m_eventSources.find(fileDescriptor);
    176     ASSERT(it != m_eventSources.end());
    177     ASSERT(m_eventSources.contains(fileDescriptor));
    178 
    179     if (it != m_eventSources.end()) {
    180         Vector<EventSource*> sources = it->second;
    181         for (unsigned i = 0; i < sources.size(); i++)
    182             sources[i]->cancel();
    183 
    184         m_eventSources.remove(it);
    185     }
    186 }
    187 
    188 void WorkQueue::scheduleWorkOnSource(GSource* dispatchSource, PassOwnPtr<WorkItem> item, GSourceFunc sourceCallback)
    189 {
    190     EventSource* eventSource = new EventSource(item, this, 0);
    191 
    192     g_source_set_callback(dispatchSource, sourceCallback, eventSource,
    193                           reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
    194 
    195     g_source_attach(dispatchSource, m_eventContext);
    196 }
    197 
    198 void WorkQueue::scheduleWork(PassOwnPtr<WorkItem> item)
    199 {
    200     GRefPtr<GSource> dispatchSource = adoptGRef(g_idle_source_new());
    201     ASSERT(dispatchSource);
    202     g_source_set_priority(dispatchSource.get(), G_PRIORITY_DEFAULT);
    203 
    204     scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
    205 }
    206 
    207 void WorkQueue::scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item, double delay)
    208 {
    209     GRefPtr<GSource> dispatchSource = adoptGRef(g_timeout_source_new(static_cast<guint>(delay * 1000)));
    210     ASSERT(dispatchSource);
    211 
    212     scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
    213 }
    214 
    215 void WorkQueue::scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process, PassOwnPtr<WorkItem> item)
    216 {
    217     GRefPtr<GSource> dispatchSource = adoptGRef(g_child_watch_source_new(process));
    218     ASSERT(dispatchSource);
    219 
    220     scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnTermination));
    221 }
    222