1 /* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * 5 * This library is free software; you can redistribute it and/or 6 * modify it under the terms of the GNU Lesser General Public 7 * License as published by the Free Software Foundation; either 8 * version 2 of the License, or (at your option) any later version. 9 * 10 * This library is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * Lesser General Public License for more details. 14 * 15 * You should have received a copy of the GNU Lesser General 16 * Public License along with this library; if not, write to the 17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 18 * Boston, MA 02111-1307, USA. 19 * 20 * Author: Alexander Larsson <alexl (at) redhat.com> 21 */ 22 23 #include "config.h" 24 #include <glib.h> 25 #include "glibintl.h" 26 27 #include "ginputstream.h" 28 #include "gseekable.h" 29 #include "gcancellable.h" 30 #include "gasyncresult.h" 31 #include "gsimpleasyncresult.h" 32 #include "gioerror.h" 33 34 #include "gioalias.h" 35 36 /** 37 * SECTION:ginputstream 38 * @short_description: Base class for implementing streaming input 39 * @include: gio/gio.h 40 * 41 * GInputStream has functions to read from a stream (g_input_stream_read()), 42 * to close a stream (g_input_stream_close()) and to skip some content 43 * (g_input_stream_skip()). 44 * 45 * To copy the content of an input stream to an output stream without 46 * manually handling the reads and writes, use g_output_stream_splice(). 47 * 48 * All of these functions have async variants too. 49 **/ 50 51 G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT); 52 53 struct _GInputStreamPrivate { 54 guint closed : 1; 55 guint pending : 1; 56 GAsyncReadyCallback outstanding_callback; 57 }; 58 59 static gssize g_input_stream_real_skip (GInputStream *stream, 60 gsize count, 61 GCancellable *cancellable, 62 GError **error); 63 static void g_input_stream_real_read_async (GInputStream *stream, 64 void *buffer, 65 gsize count, 66 int io_priority, 67 GCancellable *cancellable, 68 GAsyncReadyCallback callback, 69 gpointer user_data); 70 static gssize g_input_stream_real_read_finish (GInputStream *stream, 71 GAsyncResult *result, 72 GError **error); 73 static void g_input_stream_real_skip_async (GInputStream *stream, 74 gsize count, 75 int io_priority, 76 GCancellable *cancellable, 77 GAsyncReadyCallback callback, 78 gpointer data); 79 static gssize g_input_stream_real_skip_finish (GInputStream *stream, 80 GAsyncResult *result, 81 GError **error); 82 static void g_input_stream_real_close_async (GInputStream *stream, 83 int io_priority, 84 GCancellable *cancellable, 85 GAsyncReadyCallback callback, 86 gpointer data); 87 static gboolean g_input_stream_real_close_finish (GInputStream *stream, 88 GAsyncResult *result, 89 GError **error); 90 91 static void 92 g_input_stream_finalize (GObject *object) 93 { 94 GInputStream *stream; 95 96 stream = G_INPUT_STREAM (object); 97 98 if (!stream->priv->closed) 99 g_input_stream_close (stream, NULL, NULL); 100 101 G_OBJECT_CLASS (g_input_stream_parent_class)->finalize (object); 102 } 103 104 static void 105 g_input_stream_dispose (GObject *object) 106 { 107 GInputStream *stream; 108 109 stream = G_INPUT_STREAM (object); 110 111 if (!stream->priv->closed) 112 g_input_stream_close (stream, NULL, NULL); 113 114 G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object); 115 } 116 117 118 static void 119 g_input_stream_class_init (GInputStreamClass *klass) 120 { 121 GObjectClass *gobject_class = G_OBJECT_CLASS (klass); 122 123 g_type_class_add_private (klass, sizeof (GInputStreamPrivate)); 124 125 gobject_class->finalize = g_input_stream_finalize; 126 gobject_class->dispose = g_input_stream_dispose; 127 128 klass->skip = g_input_stream_real_skip; 129 klass->read_async = g_input_stream_real_read_async; 130 klass->read_finish = g_input_stream_real_read_finish; 131 klass->skip_async = g_input_stream_real_skip_async; 132 klass->skip_finish = g_input_stream_real_skip_finish; 133 klass->close_async = g_input_stream_real_close_async; 134 klass->close_finish = g_input_stream_real_close_finish; 135 } 136 137 static void 138 g_input_stream_init (GInputStream *stream) 139 { 140 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 141 G_TYPE_INPUT_STREAM, 142 GInputStreamPrivate); 143 } 144 145 /** 146 * g_input_stream_read: 147 * @stream: a #GInputStream. 148 * @buffer: a buffer to read data into (which should be at least count bytes long). 149 * @count: the number of bytes that will be read from the stream 150 * @cancellable: optional #GCancellable object, %NULL to ignore. 151 * @error: location to store the error occuring, or %NULL to ignore 152 * 153 * Tries to read @count bytes from the stream into the buffer starting at 154 * @buffer. Will block during this read. 155 * 156 * If count is zero returns zero and does nothing. A value of @count 157 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. 158 * 159 * On success, the number of bytes read into the buffer is returned. 160 * It is not an error if this is not the same as the requested size, as it 161 * can happen e.g. near the end of a file. Zero is returned on end of file 162 * (or if @count is zero), but never otherwise. 163 * 164 * If @cancellable is not NULL, then the operation can be cancelled by 165 * triggering the cancellable object from another thread. If the operation 166 * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an 167 * operation was partially finished when the operation was cancelled the 168 * partial result will be returned, without an error. 169 * 170 * On error -1 is returned and @error is set accordingly. 171 * 172 * Return value: Number of bytes read, or -1 on error 173 **/ 174 gssize 175 g_input_stream_read (GInputStream *stream, 176 void *buffer, 177 gsize count, 178 GCancellable *cancellable, 179 GError **error) 180 { 181 GInputStreamClass *class; 182 gssize res; 183 184 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1); 185 g_return_val_if_fail (buffer != NULL, 0); 186 187 if (count == 0) 188 return 0; 189 190 if (((gssize) count) < 0) 191 { 192 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 193 _("Too large count value passed to %s"), G_STRFUNC); 194 return -1; 195 } 196 197 class = G_INPUT_STREAM_GET_CLASS (stream); 198 199 if (class->read_fn == NULL) 200 { 201 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, 202 _("Input stream doesn't implement read")); 203 return -1; 204 } 205 206 if (!g_input_stream_set_pending (stream, error)) 207 return -1; 208 209 if (cancellable) 210 g_cancellable_push_current (cancellable); 211 212 res = class->read_fn (stream, buffer, count, cancellable, error); 213 214 if (cancellable) 215 g_cancellable_pop_current (cancellable); 216 217 g_input_stream_clear_pending (stream); 218 219 return res; 220 } 221 222 /** 223 * g_input_stream_read_all: 224 * @stream: a #GInputStream. 225 * @buffer: a buffer to read data into (which should be at least count bytes long). 226 * @count: the number of bytes that will be read from the stream 227 * @bytes_read: location to store the number of bytes that was read from the stream 228 * @cancellable: optional #GCancellable object, %NULL to ignore. 229 * @error: location to store the error occuring, or %NULL to ignore 230 * 231 * Tries to read @count bytes from the stream into the buffer starting at 232 * @buffer. Will block during this read. 233 * 234 * This function is similar to g_input_stream_read(), except it tries to 235 * read as many bytes as requested, only stopping on an error or end of stream. 236 * 237 * On a successful read of @count bytes, or if we reached the end of the 238 * stream, %TRUE is returned, and @bytes_read is set to the number of bytes 239 * read into @buffer. 240 * 241 * If there is an error during the operation %FALSE is returned and @error 242 * is set to indicate the error status, @bytes_read is updated to contain 243 * the number of bytes read into @buffer before the error occurred. 244 * 245 * Return value: %TRUE on success, %FALSE if there was an error 246 **/ 247 gboolean 248 g_input_stream_read_all (GInputStream *stream, 249 void *buffer, 250 gsize count, 251 gsize *bytes_read, 252 GCancellable *cancellable, 253 GError **error) 254 { 255 gsize _bytes_read; 256 gssize res; 257 258 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE); 259 g_return_val_if_fail (buffer != NULL, FALSE); 260 261 _bytes_read = 0; 262 while (_bytes_read < count) 263 { 264 res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read, 265 cancellable, error); 266 if (res == -1) 267 { 268 if (bytes_read) 269 *bytes_read = _bytes_read; 270 return FALSE; 271 } 272 273 if (res == 0) 274 break; 275 276 _bytes_read += res; 277 } 278 279 if (bytes_read) 280 *bytes_read = _bytes_read; 281 return TRUE; 282 } 283 284 /** 285 * g_input_stream_skip: 286 * @stream: a #GInputStream. 287 * @count: the number of bytes that will be skipped from the stream 288 * @cancellable: optional #GCancellable object, %NULL to ignore. 289 * @error: location to store the error occuring, or %NULL to ignore 290 * 291 * Tries to skip @count bytes from the stream. Will block during the operation. 292 * 293 * This is identical to g_input_stream_read(), from a behaviour standpoint, 294 * but the bytes that are skipped are not returned to the user. Some 295 * streams have an implementation that is more efficient than reading the data. 296 * 297 * This function is optional for inherited classes, as the default implementation 298 * emulates it using read. 299 * 300 * If @cancellable is not %NULL, then the operation can be cancelled by 301 * triggering the cancellable object from another thread. If the operation 302 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 303 * operation was partially finished when the operation was cancelled the 304 * partial result will be returned, without an error. 305 * 306 * Return value: Number of bytes skipped, or -1 on error 307 **/ 308 gssize 309 g_input_stream_skip (GInputStream *stream, 310 gsize count, 311 GCancellable *cancellable, 312 GError **error) 313 { 314 GInputStreamClass *class; 315 gssize res; 316 317 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1); 318 319 if (count == 0) 320 return 0; 321 322 if (((gssize) count) < 0) 323 { 324 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 325 _("Too large count value passed to %s"), G_STRFUNC); 326 return -1; 327 } 328 329 class = G_INPUT_STREAM_GET_CLASS (stream); 330 331 if (!g_input_stream_set_pending (stream, error)) 332 return -1; 333 334 if (cancellable) 335 g_cancellable_push_current (cancellable); 336 337 res = class->skip (stream, count, cancellable, error); 338 339 if (cancellable) 340 g_cancellable_pop_current (cancellable); 341 342 g_input_stream_clear_pending (stream); 343 344 return res; 345 } 346 347 static gssize 348 g_input_stream_real_skip (GInputStream *stream, 349 gsize count, 350 GCancellable *cancellable, 351 GError **error) 352 { 353 GInputStreamClass *class; 354 gssize ret, read_bytes; 355 char buffer[8192]; 356 GError *my_error; 357 358 class = G_INPUT_STREAM_GET_CLASS (stream); 359 360 if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream))) 361 { 362 if (g_seekable_seek (G_SEEKABLE (stream), 363 count, 364 G_SEEK_CUR, 365 cancellable, 366 NULL)) 367 return count; 368 } 369 370 /* If not seekable, or seek failed, fall back to reading data: */ 371 372 class = G_INPUT_STREAM_GET_CLASS (stream); 373 374 read_bytes = 0; 375 while (1) 376 { 377 my_error = NULL; 378 379 ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count), 380 cancellable, &my_error); 381 if (ret == -1) 382 { 383 if (read_bytes > 0 && 384 my_error->domain == G_IO_ERROR && 385 my_error->code == G_IO_ERROR_CANCELLED) 386 { 387 g_error_free (my_error); 388 return read_bytes; 389 } 390 391 g_propagate_error (error, my_error); 392 return -1; 393 } 394 395 count -= ret; 396 read_bytes += ret; 397 398 if (ret == 0 || count == 0) 399 return read_bytes; 400 } 401 } 402 403 /** 404 * g_input_stream_close: 405 * @stream: A #GInputStream. 406 * @cancellable: optional #GCancellable object, %NULL to ignore. 407 * @error: location to store the error occuring, or %NULL to ignore 408 * 409 * Closes the stream, releasing resources related to it. 410 * 411 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED. 412 * Closing a stream multiple times will not return an error. 413 * 414 * Streams will be automatically closed when the last reference 415 * is dropped, but you might want to call this function to make sure 416 * resources are released as early as possible. 417 * 418 * Some streams might keep the backing store of the stream (e.g. a file descriptor) 419 * open after the stream is closed. See the documentation for the individual 420 * stream for details. 421 * 422 * On failure the first error that happened will be reported, but the close 423 * operation will finish as much as possible. A stream that failed to 424 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it 425 * is important to check and report the error to the user. 426 * 427 * If @cancellable is not NULL, then the operation can be cancelled by 428 * triggering the cancellable object from another thread. If the operation 429 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. 430 * Cancelling a close will still leave the stream closed, but some streams 431 * can use a faster close that doesn't block to e.g. check errors. 432 * 433 * Return value: %TRUE on success, %FALSE on failure 434 **/ 435 gboolean 436 g_input_stream_close (GInputStream *stream, 437 GCancellable *cancellable, 438 GError **error) 439 { 440 GInputStreamClass *class; 441 gboolean res; 442 443 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE); 444 445 class = G_INPUT_STREAM_GET_CLASS (stream); 446 447 if (stream->priv->closed) 448 return TRUE; 449 450 res = TRUE; 451 452 if (!g_input_stream_set_pending (stream, error)) 453 return FALSE; 454 455 if (cancellable) 456 g_cancellable_push_current (cancellable); 457 458 if (class->close_fn) 459 res = class->close_fn (stream, cancellable, error); 460 461 if (cancellable) 462 g_cancellable_pop_current (cancellable); 463 464 g_input_stream_clear_pending (stream); 465 466 stream->priv->closed = TRUE; 467 468 return res; 469 } 470 471 static void 472 async_ready_callback_wrapper (GObject *source_object, 473 GAsyncResult *res, 474 gpointer user_data) 475 { 476 GInputStream *stream = G_INPUT_STREAM (source_object); 477 478 g_input_stream_clear_pending (stream); 479 if (stream->priv->outstanding_callback) 480 (*stream->priv->outstanding_callback) (source_object, res, user_data); 481 g_object_unref (stream); 482 } 483 484 static void 485 async_ready_close_callback_wrapper (GObject *source_object, 486 GAsyncResult *res, 487 gpointer user_data) 488 { 489 GInputStream *stream = G_INPUT_STREAM (source_object); 490 491 g_input_stream_clear_pending (stream); 492 stream->priv->closed = TRUE; 493 if (stream->priv->outstanding_callback) 494 (*stream->priv->outstanding_callback) (source_object, res, user_data); 495 g_object_unref (stream); 496 } 497 498 /** 499 * g_input_stream_read_async: 500 * @stream: A #GInputStream. 501 * @buffer: a buffer to read data into (which should be at least count bytes long). 502 * @count: the number of bytes that will be read from the stream 503 * @io_priority: the <link linkend="io-priority">I/O priority</link> 504 * of the request. 505 * @cancellable: optional #GCancellable object, %NULL to ignore. 506 * @callback: callback to call when the request is satisfied 507 * @user_data: the data to pass to callback function 508 * 509 * Request an asynchronous read of @count bytes from the stream into the buffer 510 * starting at @buffer. When the operation is finished @callback will be called. 511 * You can then call g_input_stream_read_finish() to get the result of the 512 * operation. 513 * 514 * During an async request no other sync and async calls are allowed, and will 515 * result in %G_IO_ERROR_PENDING errors. 516 * 517 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. 518 * 519 * On success, the number of bytes read into the buffer will be passed to the 520 * callback. It is not an error if this is not the same as the requested size, as it 521 * can happen e.g. near the end of a file, but generally we try to read 522 * as many bytes as requested. Zero is returned on end of file 523 * (or if @count is zero), but never otherwise. 524 * 525 * Any outstanding i/o request with higher priority (lower numerical value) will 526 * be executed before an outstanding request with lower priority. Default 527 * priority is %G_PRIORITY_DEFAULT. 528 * 529 * The asyncronous methods have a default fallback that uses threads to implement 530 * asynchronicity, so they are optional for inheriting classes. However, if you 531 * override one you must override all. 532 **/ 533 void 534 g_input_stream_read_async (GInputStream *stream, 535 void *buffer, 536 gsize count, 537 int io_priority, 538 GCancellable *cancellable, 539 GAsyncReadyCallback callback, 540 gpointer user_data) 541 { 542 GInputStreamClass *class; 543 GSimpleAsyncResult *simple; 544 GError *error = NULL; 545 546 g_return_if_fail (G_IS_INPUT_STREAM (stream)); 547 g_return_if_fail (buffer != NULL); 548 549 if (count == 0) 550 { 551 simple = g_simple_async_result_new (G_OBJECT (stream), 552 callback, 553 user_data, 554 g_input_stream_read_async); 555 g_simple_async_result_complete_in_idle (simple); 556 g_object_unref (simple); 557 return; 558 } 559 560 if (((gssize) count) < 0) 561 { 562 g_simple_async_report_error_in_idle (G_OBJECT (stream), 563 callback, 564 user_data, 565 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 566 _("Too large count value passed to %s"), 567 G_STRFUNC); 568 return; 569 } 570 571 if (!g_input_stream_set_pending (stream, &error)) 572 { 573 g_simple_async_report_gerror_in_idle (G_OBJECT (stream), 574 callback, 575 user_data, 576 error); 577 g_error_free (error); 578 return; 579 } 580 581 class = G_INPUT_STREAM_GET_CLASS (stream); 582 stream->priv->outstanding_callback = callback; 583 g_object_ref (stream); 584 class->read_async (stream, buffer, count, io_priority, cancellable, 585 async_ready_callback_wrapper, user_data); 586 } 587 588 /** 589 * g_input_stream_read_finish: 590 * @stream: a #GInputStream. 591 * @result: a #GAsyncResult. 592 * @error: a #GError location to store the error occuring, or %NULL to 593 * ignore. 594 * 595 * Finishes an asynchronous stream read operation. 596 * 597 * Returns: number of bytes read in, or -1 on error. 598 **/ 599 gssize 600 g_input_stream_read_finish (GInputStream *stream, 601 GAsyncResult *result, 602 GError **error) 603 { 604 GSimpleAsyncResult *simple; 605 GInputStreamClass *class; 606 607 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1); 608 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); 609 610 if (G_IS_SIMPLE_ASYNC_RESULT (result)) 611 { 612 simple = G_SIMPLE_ASYNC_RESULT (result); 613 if (g_simple_async_result_propagate_error (simple, error)) 614 return -1; 615 616 /* Special case read of 0 bytes */ 617 if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async) 618 return 0; 619 } 620 621 class = G_INPUT_STREAM_GET_CLASS (stream); 622 return class->read_finish (stream, result, error); 623 } 624 625 /** 626 * g_input_stream_skip_async: 627 * @stream: A #GInputStream. 628 * @count: the number of bytes that will be skipped from the stream 629 * @io_priority: the <link linkend="io-priority">I/O priority</link> 630 * of the request. 631 * @cancellable: optional #GCancellable object, %NULL to ignore. 632 * @callback: callback to call when the request is satisfied 633 * @user_data: the data to pass to callback function 634 * 635 * Request an asynchronous skip of @count bytes from the stream into the buffer 636 * starting at @buffer. When the operation is finished @callback will be called. 637 * You can then call g_input_stream_skip_finish() to get the result of the 638 * operation. 639 * 640 * During an async request no other sync and async calls are allowed, and will 641 * result in %G_IO_ERROR_PENDING errors. 642 * 643 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. 644 * 645 * On success, the number of bytes skipped will be passed to the 646 * callback. It is not an error if this is not the same as the requested size, as it 647 * can happen e.g. near the end of a file, but generally we try to skip 648 * as many bytes as requested. Zero is returned on end of file 649 * (or if @count is zero), but never otherwise. 650 * 651 * Any outstanding i/o request with higher priority (lower numerical value) will 652 * be executed before an outstanding request with lower priority. Default 653 * priority is %G_PRIORITY_DEFAULT. 654 * 655 * The asyncronous methods have a default fallback that uses threads to implement 656 * asynchronicity, so they are optional for inheriting classes. However, if you 657 * override one you must override all. 658 **/ 659 void 660 g_input_stream_skip_async (GInputStream *stream, 661 gsize count, 662 int io_priority, 663 GCancellable *cancellable, 664 GAsyncReadyCallback callback, 665 gpointer user_data) 666 { 667 GInputStreamClass *class; 668 GSimpleAsyncResult *simple; 669 GError *error = NULL; 670 671 g_return_if_fail (G_IS_INPUT_STREAM (stream)); 672 673 if (count == 0) 674 { 675 simple = g_simple_async_result_new (G_OBJECT (stream), 676 callback, 677 user_data, 678 g_input_stream_skip_async); 679 680 g_simple_async_result_complete_in_idle (simple); 681 g_object_unref (simple); 682 return; 683 } 684 685 if (((gssize) count) < 0) 686 { 687 g_simple_async_report_error_in_idle (G_OBJECT (stream), 688 callback, 689 user_data, 690 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 691 _("Too large count value passed to %s"), 692 G_STRFUNC); 693 return; 694 } 695 696 if (!g_input_stream_set_pending (stream, &error)) 697 { 698 g_simple_async_report_gerror_in_idle (G_OBJECT (stream), 699 callback, 700 user_data, 701 error); 702 g_error_free (error); 703 return; 704 } 705 706 class = G_INPUT_STREAM_GET_CLASS (stream); 707 stream->priv->outstanding_callback = callback; 708 g_object_ref (stream); 709 class->skip_async (stream, count, io_priority, cancellable, 710 async_ready_callback_wrapper, user_data); 711 } 712 713 /** 714 * g_input_stream_skip_finish: 715 * @stream: a #GInputStream. 716 * @result: a #GAsyncResult. 717 * @error: a #GError location to store the error occuring, or %NULL to 718 * ignore. 719 * 720 * Finishes a stream skip operation. 721 * 722 * Returns: the size of the bytes skipped, or %-1 on error. 723 **/ 724 gssize 725 g_input_stream_skip_finish (GInputStream *stream, 726 GAsyncResult *result, 727 GError **error) 728 { 729 GSimpleAsyncResult *simple; 730 GInputStreamClass *class; 731 732 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1); 733 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); 734 735 if (G_IS_SIMPLE_ASYNC_RESULT (result)) 736 { 737 simple = G_SIMPLE_ASYNC_RESULT (result); 738 if (g_simple_async_result_propagate_error (simple, error)) 739 return -1; 740 741 /* Special case skip of 0 bytes */ 742 if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async) 743 return 0; 744 } 745 746 class = G_INPUT_STREAM_GET_CLASS (stream); 747 return class->skip_finish (stream, result, error); 748 } 749 750 /** 751 * g_input_stream_close_async: 752 * @stream: A #GInputStream. 753 * @io_priority: the <link linkend="io-priority">I/O priority</link> 754 * of the request. 755 * @cancellable: optional cancellable object 756 * @callback: callback to call when the request is satisfied 757 * @user_data: the data to pass to callback function 758 * 759 * Requests an asynchronous closes of the stream, releasing resources related to it. 760 * When the operation is finished @callback will be called. 761 * You can then call g_input_stream_close_finish() to get the result of the 762 * operation. 763 * 764 * For behaviour details see g_input_stream_close(). 765 * 766 * The asyncronous methods have a default fallback that uses threads to implement 767 * asynchronicity, so they are optional for inheriting classes. However, if you 768 * override one you must override all. 769 **/ 770 void 771 g_input_stream_close_async (GInputStream *stream, 772 int io_priority, 773 GCancellable *cancellable, 774 GAsyncReadyCallback callback, 775 gpointer user_data) 776 { 777 GInputStreamClass *class; 778 GSimpleAsyncResult *simple; 779 GError *error = NULL; 780 781 g_return_if_fail (G_IS_INPUT_STREAM (stream)); 782 783 if (stream->priv->closed) 784 { 785 simple = g_simple_async_result_new (G_OBJECT (stream), 786 callback, 787 user_data, 788 g_input_stream_close_async); 789 790 g_simple_async_result_complete_in_idle (simple); 791 g_object_unref (simple); 792 return; 793 } 794 795 if (!g_input_stream_set_pending (stream, &error)) 796 { 797 g_simple_async_report_gerror_in_idle (G_OBJECT (stream), 798 callback, 799 user_data, 800 error); 801 g_error_free (error); 802 return; 803 } 804 805 class = G_INPUT_STREAM_GET_CLASS (stream); 806 stream->priv->outstanding_callback = callback; 807 g_object_ref (stream); 808 class->close_async (stream, io_priority, cancellable, 809 async_ready_close_callback_wrapper, user_data); 810 } 811 812 /** 813 * g_input_stream_close_finish: 814 * @stream: a #GInputStream. 815 * @result: a #GAsyncResult. 816 * @error: a #GError location to store the error occuring, or %NULL to 817 * ignore. 818 * 819 * Finishes closing a stream asynchronously, started from g_input_stream_close_async(). 820 * 821 * Returns: %TRUE if the stream was closed successfully. 822 **/ 823 gboolean 824 g_input_stream_close_finish (GInputStream *stream, 825 GAsyncResult *result, 826 GError **error) 827 { 828 GSimpleAsyncResult *simple; 829 GInputStreamClass *class; 830 831 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE); 832 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); 833 834 if (G_IS_SIMPLE_ASYNC_RESULT (result)) 835 { 836 simple = G_SIMPLE_ASYNC_RESULT (result); 837 if (g_simple_async_result_propagate_error (simple, error)) 838 return FALSE; 839 840 /* Special case already closed */ 841 if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async) 842 return TRUE; 843 } 844 845 class = G_INPUT_STREAM_GET_CLASS (stream); 846 return class->close_finish (stream, result, error); 847 } 848 849 /** 850 * g_input_stream_is_closed: 851 * @stream: input stream. 852 * 853 * Checks if an input stream is closed. 854 * 855 * Returns: %TRUE if the stream is closed. 856 **/ 857 gboolean 858 g_input_stream_is_closed (GInputStream *stream) 859 { 860 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE); 861 862 return stream->priv->closed; 863 } 864 865 /** 866 * g_input_stream_has_pending: 867 * @stream: input stream. 868 * 869 * Checks if an input stream has pending actions. 870 * 871 * Returns: %TRUE if @stream has pending actions. 872 **/ 873 gboolean 874 g_input_stream_has_pending (GInputStream *stream) 875 { 876 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE); 877 878 return stream->priv->pending; 879 } 880 881 /** 882 * g_input_stream_set_pending: 883 * @stream: input stream 884 * @error: a #GError location to store the error occuring, or %NULL to 885 * ignore. 886 * 887 * Sets @stream to have actions pending. If the pending flag is 888 * already set or @stream is closed, it will return %FALSE and set 889 * @error. 890 * 891 * Return value: %TRUE if pending was previously unset and is now set. 892 **/ 893 gboolean 894 g_input_stream_set_pending (GInputStream *stream, GError **error) 895 { 896 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE); 897 898 if (stream->priv->closed) 899 { 900 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, 901 _("Stream is already closed")); 902 return FALSE; 903 } 904 905 if (stream->priv->pending) 906 { 907 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING, 908 /* Translators: This is an error you get if there is already an 909 * operation running against this stream when you try to start 910 * one */ 911 _("Stream has outstanding operation")); 912 return FALSE; 913 } 914 915 stream->priv->pending = TRUE; 916 return TRUE; 917 } 918 919 /** 920 * g_input_stream_clear_pending: 921 * @stream: input stream 922 * 923 * Clears the pending flag on @stream. 924 **/ 925 void 926 g_input_stream_clear_pending (GInputStream *stream) 927 { 928 g_return_if_fail (G_IS_INPUT_STREAM (stream)); 929 930 stream->priv->pending = FALSE; 931 } 932 933 /******************************************** 934 * Default implementation of async ops * 935 ********************************************/ 936 937 typedef struct { 938 void *buffer; 939 gsize count_requested; 940 gssize count_read; 941 } ReadData; 942 943 static void 944 read_async_thread (GSimpleAsyncResult *res, 945 GObject *object, 946 GCancellable *cancellable) 947 { 948 ReadData *op; 949 GInputStreamClass *class; 950 GError *error = NULL; 951 952 op = g_simple_async_result_get_op_res_gpointer (res); 953 954 class = G_INPUT_STREAM_GET_CLASS (object); 955 956 op->count_read = class->read_fn (G_INPUT_STREAM (object), 957 op->buffer, op->count_requested, 958 cancellable, &error); 959 if (op->count_read == -1) 960 { 961 g_simple_async_result_set_from_error (res, error); 962 g_error_free (error); 963 } 964 } 965 966 static void 967 g_input_stream_real_read_async (GInputStream *stream, 968 void *buffer, 969 gsize count, 970 int io_priority, 971 GCancellable *cancellable, 972 GAsyncReadyCallback callback, 973 gpointer user_data) 974 { 975 GSimpleAsyncResult *res; 976 ReadData *op; 977 978 op = g_new (ReadData, 1); 979 res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async); 980 g_simple_async_result_set_op_res_gpointer (res, op, g_free); 981 op->buffer = buffer; 982 op->count_requested = count; 983 984 g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable); 985 g_object_unref (res); 986 } 987 988 static gssize 989 g_input_stream_real_read_finish (GInputStream *stream, 990 GAsyncResult *result, 991 GError **error) 992 { 993 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); 994 ReadData *op; 995 996 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 997 g_input_stream_real_read_async); 998 999 op = g_simple_async_result_get_op_res_gpointer (simple); 1000 1001 return op->count_read; 1002 } 1003 1004 typedef struct { 1005 gsize count_requested; 1006 gssize count_skipped; 1007 } SkipData; 1008 1009 1010 static void 1011 skip_async_thread (GSimpleAsyncResult *res, 1012 GObject *object, 1013 GCancellable *cancellable) 1014 { 1015 SkipData *op; 1016 GInputStreamClass *class; 1017 GError *error = NULL; 1018 1019 class = G_INPUT_STREAM_GET_CLASS (object); 1020 op = g_simple_async_result_get_op_res_gpointer (res); 1021 op->count_skipped = class->skip (G_INPUT_STREAM (object), 1022 op->count_requested, 1023 cancellable, &error); 1024 if (op->count_skipped == -1) 1025 { 1026 g_simple_async_result_set_from_error (res, error); 1027 g_error_free (error); 1028 } 1029 } 1030 1031 typedef struct { 1032 char buffer[8192]; 1033 gsize count; 1034 gsize count_skipped; 1035 int io_prio; 1036 GCancellable *cancellable; 1037 gpointer user_data; 1038 GAsyncReadyCallback callback; 1039 } SkipFallbackAsyncData; 1040 1041 static void 1042 skip_callback_wrapper (GObject *source_object, 1043 GAsyncResult *res, 1044 gpointer user_data) 1045 { 1046 GInputStreamClass *class; 1047 SkipFallbackAsyncData *data = user_data; 1048 SkipData *op; 1049 GSimpleAsyncResult *simple; 1050 GError *error = NULL; 1051 gssize ret; 1052 1053 ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error); 1054 1055 if (ret > 0) 1056 { 1057 data->count -= ret; 1058 data->count_skipped += ret; 1059 1060 if (data->count > 0) 1061 { 1062 class = G_INPUT_STREAM_GET_CLASS (source_object); 1063 class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable, 1064 skip_callback_wrapper, data); 1065 return; 1066 } 1067 } 1068 1069 op = g_new0 (SkipData, 1); 1070 op->count_skipped = data->count_skipped; 1071 simple = g_simple_async_result_new (source_object, 1072 data->callback, data->user_data, 1073 g_input_stream_real_skip_async); 1074 1075 g_simple_async_result_set_op_res_gpointer (simple, op, g_free); 1076 1077 if (ret == -1) 1078 { 1079 if (data->count_skipped && 1080 error->domain == G_IO_ERROR && 1081 error->code == G_IO_ERROR_CANCELLED) 1082 { /* No error, return partial read */ } 1083 else 1084 g_simple_async_result_set_from_error (simple, error); 1085 g_error_free (error); 1086 } 1087 1088 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1089 g_simple_async_result_complete (simple); 1090 g_object_unref (simple); 1091 1092 g_free (data); 1093 } 1094 1095 static void 1096 g_input_stream_real_skip_async (GInputStream *stream, 1097 gsize count, 1098 int io_priority, 1099 GCancellable *cancellable, 1100 GAsyncReadyCallback callback, 1101 gpointer user_data) 1102 { 1103 GInputStreamClass *class; 1104 SkipData *op; 1105 SkipFallbackAsyncData *data; 1106 GSimpleAsyncResult *res; 1107 1108 class = G_INPUT_STREAM_GET_CLASS (stream); 1109 1110 if (class->read_async == g_input_stream_real_read_async) 1111 { 1112 /* Read is thread-using async fallback. 1113 * Make skip use threads too, so that we can use a possible sync skip 1114 * implementation. */ 1115 op = g_new0 (SkipData, 1); 1116 1117 res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, 1118 g_input_stream_real_skip_async); 1119 1120 g_simple_async_result_set_op_res_gpointer (res, op, g_free); 1121 1122 op->count_requested = count; 1123 1124 g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable); 1125 g_object_unref (res); 1126 } 1127 else 1128 { 1129 /* TODO: Skip fallback uses too much memory, should do multiple read calls */ 1130 1131 /* There is a custom async read function, lets use that. */ 1132 data = g_new (SkipFallbackAsyncData, 1); 1133 data->count = count; 1134 data->count_skipped = 0; 1135 data->io_prio = io_priority; 1136 data->cancellable = cancellable; 1137 data->callback = callback; 1138 data->user_data = user_data; 1139 class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable, 1140 skip_callback_wrapper, data); 1141 } 1142 1143 } 1144 1145 static gssize 1146 g_input_stream_real_skip_finish (GInputStream *stream, 1147 GAsyncResult *result, 1148 GError **error) 1149 { 1150 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); 1151 SkipData *op; 1152 1153 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async); 1154 op = g_simple_async_result_get_op_res_gpointer (simple); 1155 return op->count_skipped; 1156 } 1157 1158 static void 1159 close_async_thread (GSimpleAsyncResult *res, 1160 GObject *object, 1161 GCancellable *cancellable) 1162 { 1163 GInputStreamClass *class; 1164 GError *error = NULL; 1165 gboolean result; 1166 1167 /* Auto handling of cancelation disabled, and ignore 1168 cancellation, since we want to close things anyway, although 1169 possibly in a quick-n-dirty way. At least we never want to leak 1170 open handles */ 1171 1172 class = G_INPUT_STREAM_GET_CLASS (object); 1173 result = class->close_fn (G_INPUT_STREAM (object), cancellable, &error); 1174 if (!result) 1175 { 1176 g_simple_async_result_set_from_error (res, error); 1177 g_error_free (error); 1178 } 1179 } 1180 1181 static void 1182 g_input_stream_real_close_async (GInputStream *stream, 1183 int io_priority, 1184 GCancellable *cancellable, 1185 GAsyncReadyCallback callback, 1186 gpointer user_data) 1187 { 1188 GSimpleAsyncResult *res; 1189 1190 res = g_simple_async_result_new (G_OBJECT (stream), 1191 callback, 1192 user_data, 1193 g_input_stream_real_close_async); 1194 1195 g_simple_async_result_set_handle_cancellation (res, FALSE); 1196 1197 g_simple_async_result_run_in_thread (res, 1198 close_async_thread, 1199 io_priority, 1200 cancellable); 1201 g_object_unref (res); 1202 } 1203 1204 static gboolean 1205 g_input_stream_real_close_finish (GInputStream *stream, 1206 GAsyncResult *result, 1207 GError **error) 1208 { 1209 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); 1210 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async); 1211 return TRUE; 1212 } 1213 1214 #define __G_INPUT_STREAM_C__ 1215 #include "gioaliasdef.c" 1216