1 /* 2 * Copyright (C) 2011 Igalia S.L. 3 * Copyright (C) 2010 Apple Inc. All rights reserved. 4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' 16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS 19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 25 * THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #include "config.h" 29 #include "WorkQueue.h" 30 31 #include "WKBase.h" 32 #include <WebCore/NotImplemented.h> 33 #include <gio/gio.h> 34 #include <glib.h> 35 #include <wtf/gobject/GRefPtr.h> 36 37 // WorkQueue::EventSource 38 class WorkQueue::EventSource { 39 public: 40 EventSource(PassOwnPtr<WorkItem> workItem, WorkQueue* workQueue, GCancellable* cancellable) 41 : m_workItem(workItem) 42 , m_workQueue(workQueue) 43 , m_cancellable(cancellable) 44 { 45 } 46 47 void cancel() 48 { 49 if (!m_cancellable) 50 return; 51 g_cancellable_cancel(m_cancellable); 52 } 53 54 static void executeEventSource(EventSource* eventSource) 55 { 56 ASSERT(eventSource); 57 WorkQueue* queue = eventSource->m_workQueue; 58 { 59 MutexLocker locker(queue->m_isValidMutex); 60 if (!queue->m_isValid) 61 return; 62 } 63 64 eventSource->m_workItem->execute(); 65 } 66 67 static gboolean performWorkOnce(EventSource* eventSource) 68 { 69 executeEventSource(eventSource); 70 return FALSE; 71 } 72 73 static gboolean performWork(GSocket* socket, GIOCondition condition, EventSource* eventSource) 74 { 75 if (!(condition & G_IO_IN) && !(condition & G_IO_HUP) && !(condition & G_IO_ERR)) { 76 // EventSource has been cancelled, return FALSE to destroy the source. 77 return FALSE; 78 } 79 80 executeEventSource(eventSource); 81 return TRUE; 82 } 83 84 static gboolean performWorkOnTermination(GPid, gint, EventSource* eventSource) 85 { 86 executeEventSource(eventSource); 87 return FALSE; 88 } 89 90 static void deleteEventSource(EventSource* eventSource) 91 { 92 ASSERT(eventSource); 93 delete eventSource; 94 } 95 96 public: 97 PassOwnPtr<WorkItem> m_workItem; 98 WorkQueue* m_workQueue; 99 GCancellable* m_cancellable; 100 }; 101 102 // WorkQueue 103 void WorkQueue::platformInitialize(const char* name) 104 { 105 m_eventContext = g_main_context_new(); 106 ASSERT(m_eventContext); 107 m_eventLoop = g_main_loop_new(m_eventContext, FALSE); 108 ASSERT(m_eventLoop); 109 m_workQueueThread = createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::startWorkQueueThread), this, name); 110 } 111 112 void WorkQueue::platformInvalidate() 113 { 114 MutexLocker locker(m_eventLoopLock); 115 116 if (m_eventLoop) { 117 if (g_main_loop_is_running(m_eventLoop)) 118 g_main_loop_quit(m_eventLoop); 119 120 g_main_loop_unref(m_eventLoop); 121 m_eventLoop = 0; 122 } 123 124 if (m_eventContext) { 125 g_main_context_unref(m_eventContext); 126 m_eventContext = 0; 127 } 128 } 129 130 void* WorkQueue::startWorkQueueThread(WorkQueue* workQueue) 131 { 132 workQueue->workQueueThreadBody(); 133 return 0; 134 } 135 136 void WorkQueue::workQueueThreadBody() 137 { 138 g_main_loop_run(m_eventLoop); 139 } 140 141 void WorkQueue::registerEventSourceHandler(int fileDescriptor, int condition, PassOwnPtr<WorkItem> item) 142 { 143 GRefPtr<GSocket> socket = adoptGRef(g_socket_new_from_fd(fileDescriptor, 0)); 144 ASSERT(socket); 145 GRefPtr<GCancellable> cancellable = adoptGRef(g_cancellable_new()); 146 GRefPtr<GSource> dispatchSource = adoptGRef(g_socket_create_source(socket.get(), static_cast<GIOCondition>(condition), cancellable.get())); 147 ASSERT(dispatchSource); 148 EventSource* eventSource = new EventSource(item, this, cancellable.get()); 149 ASSERT(eventSource); 150 151 g_source_set_callback(dispatchSource.get(), reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWork), 152 eventSource, reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource)); 153 154 // Set up the event sources under the mutex since this is shared across multiple threads. 155 { 156 MutexLocker locker(m_eventSourcesLock); 157 Vector<EventSource*> sources; 158 EventSourceIterator it = m_eventSources.find(fileDescriptor); 159 if (it != m_eventSources.end()) 160 sources = it->second; 161 162 sources.append(eventSource); 163 m_eventSources.set(fileDescriptor, sources); 164 } 165 166 g_source_attach(dispatchSource.get(), m_eventContext); 167 } 168 169 void WorkQueue::unregisterEventSourceHandler(int fileDescriptor) 170 { 171 ASSERT(fileDescriptor); 172 173 MutexLocker locker(m_eventSourcesLock); 174 175 EventSourceIterator it = m_eventSources.find(fileDescriptor); 176 ASSERT(it != m_eventSources.end()); 177 ASSERT(m_eventSources.contains(fileDescriptor)); 178 179 if (it != m_eventSources.end()) { 180 Vector<EventSource*> sources = it->second; 181 for (unsigned i = 0; i < sources.size(); i++) 182 sources[i]->cancel(); 183 184 m_eventSources.remove(it); 185 } 186 } 187 188 void WorkQueue::scheduleWorkOnSource(GSource* dispatchSource, PassOwnPtr<WorkItem> item, GSourceFunc sourceCallback) 189 { 190 EventSource* eventSource = new EventSource(item, this, 0); 191 192 g_source_set_callback(dispatchSource, sourceCallback, eventSource, 193 reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource)); 194 195 g_source_attach(dispatchSource, m_eventContext); 196 } 197 198 void WorkQueue::scheduleWork(PassOwnPtr<WorkItem> item) 199 { 200 GRefPtr<GSource> dispatchSource = adoptGRef(g_idle_source_new()); 201 ASSERT(dispatchSource); 202 g_source_set_priority(dispatchSource.get(), G_PRIORITY_DEFAULT); 203 204 scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce)); 205 } 206 207 void WorkQueue::scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item, double delay) 208 { 209 GRefPtr<GSource> dispatchSource = adoptGRef(g_timeout_source_new(static_cast<guint>(delay * 1000))); 210 ASSERT(dispatchSource); 211 212 scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce)); 213 } 214 215 void WorkQueue::scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process, PassOwnPtr<WorkItem> item) 216 { 217 GRefPtr<GSource> dispatchSource = adoptGRef(g_child_watch_source_new(process)); 218 ASSERT(dispatchSource); 219 220 scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnTermination)); 221 } 222