1 /* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * 5 * This library is free software; you can redistribute it and/or 6 * modify it under the terms of the GNU Lesser General Public 7 * License as published by the Free Software Foundation; either 8 * version 2 of the License, or (at your option) any later version. 9 * 10 * This library is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * Lesser General Public License for more details. 14 * 15 * You should have received a copy of the GNU Lesser General 16 * Public License along with this library; if not, write to the 17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 18 * Boston, MA 02111-1307, USA. 19 * 20 * Author: Alexander Larsson <alexl (at) redhat.com> 21 */ 22 23 #include "config.h" 24 25 #include "gioscheduler.h" 26 #include "gcancellable.h" 27 28 #include "gioalias.h" 29 30 /** 31 * SECTION:gioscheduler 32 * @short_description: I/O Scheduler 33 * @include: gio/gio.h 34 * 35 * Schedules asynchronous I/O operations. #GIOScheduler integrates 36 * into the main event loop (#GMainLoop) and may use threads if they 37 * are available. 38 * 39 * <para id="io-priority"><indexterm><primary>I/O priority</primary></indexterm> 40 * Each I/O operation has a priority, and the scheduler uses the priorities 41 * to determine the order in which operations are executed. They are 42 * <emphasis>not</emphasis> used to determine system-wide I/O scheduling. 43 * Priorities are integers, with lower numbers indicating higher priority. 44 * It is recommended to choose priorities between %G_PRIORITY_LOW and 45 * %G_PRIORITY_HIGH, with %G_PRIORITY_DEFAULT as a default. 46 * </para> 47 **/ 48 49 struct _GIOSchedulerJob { 50 GSList *active_link; 51 GIOSchedulerJobFunc job_func; 52 GSourceFunc cancel_func; /* Runs under job map lock */ 53 gpointer data; 54 GDestroyNotify destroy_notify; 55 56 gint io_priority; 57 GCancellable *cancellable; 58 59 guint idle_tag; 60 }; 61 62 G_LOCK_DEFINE_STATIC(active_jobs); 63 static GSList *active_jobs = NULL; 64 65 static GThreadPool *job_thread_pool = NULL; 66 67 static void io_job_thread (gpointer data, 68 gpointer user_data); 69 70 static void 71 g_io_job_free (GIOSchedulerJob *job) 72 { 73 if (job->cancellable) 74 g_object_unref (job->cancellable); 75 g_free (job); 76 } 77 78 static gint 79 g_io_job_compare (gconstpointer a, 80 gconstpointer b, 81 gpointer user_data) 82 { 83 const GIOSchedulerJob *aa = a; 84 const GIOSchedulerJob *bb = b; 85 86 /* Cancelled jobs are set prio == -1, so that 87 they are executed as quickly as possible */ 88 89 /* Lower value => higher priority */ 90 if (aa->io_priority < bb->io_priority) 91 return -1; 92 if (aa->io_priority == bb->io_priority) 93 return 0; 94 return 1; 95 } 96 97 static gpointer 98 init_scheduler (gpointer arg) 99 { 100 if (job_thread_pool == NULL) 101 { 102 /* TODO: thread_pool_new can fail */ 103 job_thread_pool = g_thread_pool_new (io_job_thread, 104 NULL, 105 10, 106 FALSE, 107 NULL); 108 if (job_thread_pool != NULL) 109 { 110 g_thread_pool_set_sort_function (job_thread_pool, 111 g_io_job_compare, 112 NULL); 113 /* It's kinda weird that this is a global setting 114 * instead of per threadpool. However, we really 115 * want to cache some threads, but not keep around 116 * those threads forever. */ 117 g_thread_pool_set_max_idle_time (15 * 1000); 118 g_thread_pool_set_max_unused_threads (2); 119 } 120 } 121 return NULL; 122 } 123 124 static void 125 remove_active_job (GIOSchedulerJob *job) 126 { 127 GIOSchedulerJob *other_job; 128 GSList *l; 129 gboolean resort_jobs; 130 131 G_LOCK (active_jobs); 132 active_jobs = g_slist_delete_link (active_jobs, job->active_link); 133 134 resort_jobs = FALSE; 135 for (l = active_jobs; l != NULL; l = l->next) 136 { 137 other_job = l->data; 138 if (other_job->io_priority >= 0 && 139 g_cancellable_is_cancelled (other_job->cancellable)) 140 { 141 other_job->io_priority = -1; 142 resort_jobs = TRUE; 143 } 144 } 145 G_UNLOCK (active_jobs); 146 147 if (resort_jobs && 148 job_thread_pool != NULL) 149 g_thread_pool_set_sort_function (job_thread_pool, 150 g_io_job_compare, 151 NULL); 152 153 } 154 155 static void 156 job_destroy (gpointer data) 157 { 158 GIOSchedulerJob *job = data; 159 160 if (job->destroy_notify) 161 job->destroy_notify (job->data); 162 163 remove_active_job (job); 164 g_io_job_free (job); 165 } 166 167 static void 168 io_job_thread (gpointer data, 169 gpointer user_data) 170 { 171 GIOSchedulerJob *job = data; 172 gboolean result; 173 174 if (job->cancellable) 175 g_cancellable_push_current (job->cancellable); 176 177 do 178 { 179 result = job->job_func (job, job->cancellable, job->data); 180 } 181 while (result); 182 183 if (job->cancellable) 184 g_cancellable_pop_current (job->cancellable); 185 186 job_destroy (job); 187 } 188 189 static gboolean 190 run_job_at_idle (gpointer data) 191 { 192 GIOSchedulerJob *job = data; 193 gboolean result; 194 195 if (job->cancellable) 196 g_cancellable_push_current (job->cancellable); 197 198 result = job->job_func (job, job->cancellable, job->data); 199 200 if (job->cancellable) 201 g_cancellable_pop_current (job->cancellable); 202 203 return result; 204 } 205 206 /** 207 * g_io_scheduler_push_job: 208 * @job_func: a #GIOSchedulerJobFunc. 209 * @user_data: data to pass to @job_func 210 * @notify: a #GDestroyNotify for @user_data, or %NULL 211 * @io_priority: the <link linkend="gioscheduler">I/O priority</link> 212 * of the request. 213 * @cancellable: optional #GCancellable object, %NULL to ignore. 214 * 215 * Schedules the I/O job to run. 216 * 217 * @notify will be called on @user_data after @job_func has returned, 218 * regardless whether the job was cancelled or has run to completion. 219 * 220 * If @cancellable is not %NULL, it can be used to cancel the I/O job 221 * by calling g_cancellable_cancel() or by calling 222 * g_io_scheduler_cancel_all_jobs(). 223 **/ 224 void 225 g_io_scheduler_push_job (GIOSchedulerJobFunc job_func, 226 gpointer user_data, 227 GDestroyNotify notify, 228 gint io_priority, 229 GCancellable *cancellable) 230 { 231 static GOnce once_init = G_ONCE_INIT; 232 GIOSchedulerJob *job; 233 234 g_return_if_fail (job_func != NULL); 235 236 job = g_new0 (GIOSchedulerJob, 1); 237 job->job_func = job_func; 238 job->data = user_data; 239 job->destroy_notify = notify; 240 job->io_priority = io_priority; 241 242 if (cancellable) 243 job->cancellable = g_object_ref (cancellable); 244 245 G_LOCK (active_jobs); 246 active_jobs = g_slist_prepend (active_jobs, job); 247 job->active_link = active_jobs; 248 G_UNLOCK (active_jobs); 249 250 if (g_thread_supported()) 251 { 252 g_once (&once_init, init_scheduler, NULL); 253 g_thread_pool_push (job_thread_pool, job, NULL); 254 } 255 else 256 { 257 /* Threads not available, instead do the i/o sync inside a 258 * low prio idle handler 259 */ 260 job->idle_tag = g_idle_add_full (G_PRIORITY_DEFAULT_IDLE + 1 + io_priority / 10, 261 run_job_at_idle, 262 job, job_destroy); 263 } 264 } 265 266 /** 267 * g_io_scheduler_cancel_all_jobs: 268 * 269 * Cancels all cancellable I/O jobs. 270 * 271 * A job is cancellable if a #GCancellable was passed into 272 * g_io_scheduler_push_job(). 273 **/ 274 void 275 g_io_scheduler_cancel_all_jobs (void) 276 { 277 GSList *cancellable_list, *l; 278 279 G_LOCK (active_jobs); 280 cancellable_list = NULL; 281 for (l = active_jobs; l != NULL; l = l->next) 282 { 283 GIOSchedulerJob *job = l->data; 284 if (job->cancellable) 285 cancellable_list = g_slist_prepend (cancellable_list, 286 g_object_ref (job->cancellable)); 287 } 288 G_UNLOCK (active_jobs); 289 290 for (l = cancellable_list; l != NULL; l = l->next) 291 { 292 GCancellable *c = l->data; 293 g_cancellable_cancel (c); 294 g_object_unref (c); 295 } 296 g_slist_free (cancellable_list); 297 } 298 299 typedef struct { 300 GSourceFunc func; 301 gboolean ret_val; 302 gpointer data; 303 GDestroyNotify notify; 304 305 GMutex *ack_lock; 306 GCond *ack_condition; 307 } MainLoopProxy; 308 309 static gboolean 310 mainloop_proxy_func (gpointer data) 311 { 312 MainLoopProxy *proxy = data; 313 314 proxy->ret_val = proxy->func (proxy->data); 315 316 if (proxy->notify) 317 proxy->notify (proxy->data); 318 319 if (proxy->ack_lock) 320 { 321 g_mutex_lock (proxy->ack_lock); 322 g_cond_signal (proxy->ack_condition); 323 g_mutex_unlock (proxy->ack_lock); 324 } 325 326 return FALSE; 327 } 328 329 static void 330 mainloop_proxy_free (MainLoopProxy *proxy) 331 { 332 if (proxy->ack_lock) 333 { 334 g_mutex_free (proxy->ack_lock); 335 g_cond_free (proxy->ack_condition); 336 } 337 338 g_free (proxy); 339 } 340 341 /** 342 * g_io_scheduler_job_send_to_mainloop: 343 * @job: a #GIOSchedulerJob 344 * @func: a #GSourceFunc callback that will be called in the main thread 345 * @user_data: data to pass to @func 346 * @notify: a #GDestroyNotify for @user_data, or %NULL 347 * 348 * Used from an I/O job to send a callback to be run in the 349 * main loop (main thread), waiting for the result (and thus 350 * blocking the I/O job). 351 * 352 * Returns: The return value of @func 353 **/ 354 gboolean 355 g_io_scheduler_job_send_to_mainloop (GIOSchedulerJob *job, 356 GSourceFunc func, 357 gpointer user_data, 358 GDestroyNotify notify) 359 { 360 GSource *source; 361 MainLoopProxy *proxy; 362 guint id; 363 gboolean ret_val; 364 365 g_return_val_if_fail (job != NULL, FALSE); 366 g_return_val_if_fail (func != NULL, FALSE); 367 368 if (job->idle_tag) 369 { 370 /* We just immediately re-enter in the case of idles (non-threads) 371 * Anything else would just deadlock. If you can't handle this, enable threads. 372 */ 373 ret_val = func (user_data); 374 if (notify) 375 notify (user_data); 376 return ret_val; 377 } 378 379 proxy = g_new0 (MainLoopProxy, 1); 380 proxy->func = func; 381 proxy->data = user_data; 382 proxy->notify = notify; 383 proxy->ack_lock = g_mutex_new (); 384 proxy->ack_condition = g_cond_new (); 385 g_mutex_lock (proxy->ack_lock); 386 387 source = g_idle_source_new (); 388 g_source_set_priority (source, G_PRIORITY_DEFAULT); 389 g_source_set_callback (source, mainloop_proxy_func, proxy, 390 NULL); 391 392 id = g_source_attach (source, NULL); 393 g_source_unref (source); 394 395 g_cond_wait (proxy->ack_condition, proxy->ack_lock); 396 g_mutex_unlock (proxy->ack_lock); 397 398 ret_val = proxy->ret_val; 399 mainloop_proxy_free (proxy); 400 401 return ret_val; 402 } 403 404 /** 405 * g_io_scheduler_job_send_to_mainloop_async: 406 * @job: a #GIOSchedulerJob 407 * @func: a #GSourceFunc callback that will be called in the main thread 408 * @user_data: data to pass to @func 409 * @notify: a #GDestroyNotify for @user_data, or %NULL 410 * 411 * Used from an I/O job to send a callback to be run asynchronously 412 * in the main loop (main thread). The callback will be run when the 413 * main loop is available, but at that time the I/O job might have 414 * finished. The return value from the callback is ignored. 415 * 416 * Note that if you are passing the @user_data from g_io_scheduler_push_job() 417 * on to this function you have to ensure that it is not freed before 418 * @func is called, either by passing %NULL as @notify to 419 * g_io_scheduler_push_job() or by using refcounting for @user_data. 420 **/ 421 void 422 g_io_scheduler_job_send_to_mainloop_async (GIOSchedulerJob *job, 423 GSourceFunc func, 424 gpointer user_data, 425 GDestroyNotify notify) 426 { 427 GSource *source; 428 MainLoopProxy *proxy; 429 guint id; 430 431 g_return_if_fail (job != NULL); 432 g_return_if_fail (func != NULL); 433 434 if (job->idle_tag) 435 { 436 /* We just immediately re-enter in the case of idles (non-threads) 437 * Anything else would just deadlock. If you can't handle this, enable threads. 438 */ 439 func (user_data); 440 if (notify) 441 notify (user_data); 442 return; 443 } 444 445 proxy = g_new0 (MainLoopProxy, 1); 446 proxy->func = func; 447 proxy->data = user_data; 448 proxy->notify = notify; 449 450 source = g_idle_source_new (); 451 g_source_set_priority (source, G_PRIORITY_DEFAULT); 452 g_source_set_callback (source, mainloop_proxy_func, proxy, 453 (GDestroyNotify)mainloop_proxy_free); 454 455 id = g_source_attach (source, NULL); 456 g_source_unref (source); 457 } 458 459 460 #define __G_IO_SCHEDULER_C__ 461 #include "gioaliasdef.c" 462