1 /* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * Copyright (C) 2007 Jrg Billeter 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General 17 * Public License along with this library; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 19 * Boston, MA 02111-1307, USA. 20 * 21 * Author: Christian Kellner <gicmo (at) gnome.org> 22 */ 23 24 #include "config.h" 25 #include "gbufferedinputstream.h" 26 #include "ginputstream.h" 27 #include "gcancellable.h" 28 #include "gasyncresult.h" 29 #include "gsimpleasyncresult.h" 30 #include "gioerror.h" 31 #include <string.h> 32 #include "glibintl.h" 33 34 #include "gioalias.h" 35 36 /** 37 * SECTION:gbufferedinputstream 38 * @short_description: Buffered Input Stream 39 * @include: gio/gio.h 40 * @see_also: #GFilterInputStream, #GInputStream 41 * 42 * Buffered input stream implements #GFilterInputStream and provides 43 * for buffered reads. 44 * 45 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes. 46 * 47 * To create a buffered input stream, use g_buffered_input_stream_new(), 48 * or g_buffered_input_stream_new_sized() to specify the buffer's size at 49 * construction. 50 * 51 * To get the size of a buffer within a buffered input stream, use 52 * g_buffered_input_stream_get_buffer_size(). To change the size of a 53 * buffered input stream's buffer, use 54 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 55 * cannot be reduced below the size of the data within the buffer. 56 * 57 **/ 58 59 60 61 #define DEFAULT_BUFFER_SIZE 4096 62 63 struct _GBufferedInputStreamPrivate { 64 guint8 *buffer; 65 gsize len; 66 gsize pos; 67 gsize end; 68 GAsyncReadyCallback outstanding_callback; 69 }; 70 71 enum { 72 PROP_0, 73 PROP_BUFSIZE 74 }; 75 76 static void g_buffered_input_stream_set_property (GObject *object, 77 guint prop_id, 78 const GValue *value, 79 GParamSpec *pspec); 80 81 static void g_buffered_input_stream_get_property (GObject *object, 82 guint prop_id, 83 GValue *value, 84 GParamSpec *pspec); 85 static void g_buffered_input_stream_finalize (GObject *object); 86 87 88 static gssize g_buffered_input_stream_skip (GInputStream *stream, 89 gsize count, 90 GCancellable *cancellable, 91 GError **error); 92 static void g_buffered_input_stream_skip_async (GInputStream *stream, 93 gsize count, 94 int io_priority, 95 GCancellable *cancellable, 96 GAsyncReadyCallback callback, 97 gpointer user_data); 98 static gssize g_buffered_input_stream_skip_finish (GInputStream *stream, 99 GAsyncResult *result, 100 GError **error); 101 static gssize g_buffered_input_stream_read (GInputStream *stream, 102 void *buffer, 103 gsize count, 104 GCancellable *cancellable, 105 GError **error); 106 static void g_buffered_input_stream_read_async (GInputStream *stream, 107 void *buffer, 108 gsize count, 109 int io_priority, 110 GCancellable *cancellable, 111 GAsyncReadyCallback callback, 112 gpointer user_data); 113 static gssize g_buffered_input_stream_read_finish (GInputStream *stream, 114 GAsyncResult *result, 115 GError **error); 116 static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream, 117 gssize count, 118 GCancellable *cancellable, 119 GError **error); 120 static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream, 121 gssize count, 122 int io_priority, 123 GCancellable *cancellable, 124 GAsyncReadyCallback callback, 125 gpointer user_data); 126 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, 127 GAsyncResult *result, 128 GError **error); 129 130 static void compact_buffer (GBufferedInputStream *stream); 131 132 G_DEFINE_TYPE (GBufferedInputStream, 133 g_buffered_input_stream, 134 G_TYPE_FILTER_INPUT_STREAM) 135 136 137 static void 138 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass) 139 { 140 GObjectClass *object_class; 141 GInputStreamClass *istream_class; 142 GBufferedInputStreamClass *bstream_class; 143 144 g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate)); 145 146 object_class = G_OBJECT_CLASS (klass); 147 object_class->get_property = g_buffered_input_stream_get_property; 148 object_class->set_property = g_buffered_input_stream_set_property; 149 object_class->finalize = g_buffered_input_stream_finalize; 150 151 istream_class = G_INPUT_STREAM_CLASS (klass); 152 istream_class->skip = g_buffered_input_stream_skip; 153 istream_class->skip_async = g_buffered_input_stream_skip_async; 154 istream_class->skip_finish = g_buffered_input_stream_skip_finish; 155 istream_class->read_fn = g_buffered_input_stream_read; 156 istream_class->read_async = g_buffered_input_stream_read_async; 157 istream_class->read_finish = g_buffered_input_stream_read_finish; 158 159 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass); 160 bstream_class->fill = g_buffered_input_stream_real_fill; 161 bstream_class->fill_async = g_buffered_input_stream_real_fill_async; 162 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish; 163 164 g_object_class_install_property (object_class, 165 PROP_BUFSIZE, 166 g_param_spec_uint ("buffer-size", 167 P_("Buffer Size"), 168 P_("The size of the backend buffer"), 169 1, 170 G_MAXUINT, 171 DEFAULT_BUFFER_SIZE, 172 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | 173 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 174 175 176 } 177 178 /** 179 * g_buffered_input_stream_get_buffer_size: 180 * @stream: #GBufferedInputStream. 181 * 182 * Gets the size of the input buffer. 183 * 184 * Returns: the current buffer size. 185 **/ 186 gsize 187 g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream) 188 { 189 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0); 190 191 return stream->priv->len; 192 } 193 194 /** 195 * g_buffered_input_stream_set_buffer_size: 196 * @stream: #GBufferedInputStream. 197 * @size: a #gsize. 198 * 199 * Sets the size of the internal buffer of @stream to @size, or to the 200 * size of the contents of the buffer. The buffer can never be resized 201 * smaller than its current contents. 202 **/ 203 void 204 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream, 205 gsize size) 206 { 207 GBufferedInputStreamPrivate *priv; 208 gsize in_buffer; 209 guint8 *buffer; 210 211 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream)); 212 213 priv = stream->priv; 214 215 if (priv->len == size) 216 return; 217 218 if (priv->buffer) 219 { 220 in_buffer = priv->end - priv->pos; 221 222 /* Never resize smaller than current buffer contents */ 223 size = MAX (size, in_buffer); 224 225 buffer = g_malloc (size); 226 memcpy (buffer, priv->buffer + priv->pos, in_buffer); 227 priv->len = size; 228 priv->pos = 0; 229 priv->end = in_buffer; 230 g_free (priv->buffer); 231 priv->buffer = buffer; 232 } 233 else 234 { 235 priv->len = size; 236 priv->pos = 0; 237 priv->end = 0; 238 priv->buffer = g_malloc (size); 239 } 240 241 g_object_notify (G_OBJECT (stream), "buffer-size"); 242 } 243 244 static void 245 g_buffered_input_stream_set_property (GObject *object, 246 guint prop_id, 247 const GValue *value, 248 GParamSpec *pspec) 249 { 250 GBufferedInputStreamPrivate *priv; 251 GBufferedInputStream *bstream; 252 253 bstream = G_BUFFERED_INPUT_STREAM (object); 254 priv = bstream->priv; 255 256 switch (prop_id) 257 { 258 case PROP_BUFSIZE: 259 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value)); 260 break; 261 262 default: 263 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 264 break; 265 } 266 267 } 268 269 static void 270 g_buffered_input_stream_get_property (GObject *object, 271 guint prop_id, 272 GValue *value, 273 GParamSpec *pspec) 274 { 275 GBufferedInputStreamPrivate *priv; 276 GBufferedInputStream *bstream; 277 278 bstream = G_BUFFERED_INPUT_STREAM (object); 279 priv = bstream->priv; 280 281 switch (prop_id) 282 { 283 case PROP_BUFSIZE: 284 g_value_set_uint (value, priv->len); 285 break; 286 287 default: 288 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 289 break; 290 } 291 } 292 293 static void 294 g_buffered_input_stream_finalize (GObject *object) 295 { 296 GBufferedInputStreamPrivate *priv; 297 GBufferedInputStream *stream; 298 299 stream = G_BUFFERED_INPUT_STREAM (object); 300 priv = stream->priv; 301 302 g_free (priv->buffer); 303 304 G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object); 305 } 306 307 static void 308 g_buffered_input_stream_init (GBufferedInputStream *stream) 309 { 310 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 311 G_TYPE_BUFFERED_INPUT_STREAM, 312 GBufferedInputStreamPrivate); 313 } 314 315 316 /** 317 * g_buffered_input_stream_new: 318 * @base_stream: a #GInputStream. 319 * 320 * Creates a new #GInputStream from the given @base_stream, with 321 * a buffer set to the default size (4 kilobytes). 322 * 323 * Returns: a #GInputStream for the given @base_stream. 324 **/ 325 GInputStream * 326 g_buffered_input_stream_new (GInputStream *base_stream) 327 { 328 GInputStream *stream; 329 330 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL); 331 332 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM, 333 "base-stream", base_stream, 334 NULL); 335 336 return stream; 337 } 338 339 /** 340 * g_buffered_input_stream_new_sized: 341 * @base_stream: a #GInputStream. 342 * @size: a #gsize. 343 * 344 * Creates a new #GBufferedInputStream from the given @base_stream, 345 * with a buffer set to @size. 346 * 347 * Returns: a #GInputStream. 348 **/ 349 GInputStream * 350 g_buffered_input_stream_new_sized (GInputStream *base_stream, 351 gsize size) 352 { 353 GInputStream *stream; 354 355 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL); 356 357 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM, 358 "base-stream", base_stream, 359 "buffer-size", (guint)size, 360 NULL); 361 362 return stream; 363 } 364 365 /** 366 * g_buffered_input_stream_fill: 367 * @stream: #GBufferedInputStream. 368 * @count: the number of bytes that will be read from the stream. 369 * @cancellable: optional #GCancellable object, %NULL to ignore. 370 * @error: location to store the error occuring, or %NULL to ignore. 371 * 372 * Tries to read @count bytes from the stream into the buffer. 373 * Will block during this read. 374 * 375 * If @count is zero, returns zero and does nothing. A value of @count 376 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. 377 * 378 * On success, the number of bytes read into the buffer is returned. 379 * It is not an error if this is not the same as the requested size, as it 380 * can happen e.g. near the end of a file. Zero is returned on end of file 381 * (or if @count is zero), but never otherwise. 382 * 383 * If @count is -1 then the attempted read size is equal to the number of 384 * bytes that are required to fill the buffer. 385 * 386 * If @cancellable is not %NULL, then the operation can be cancelled by 387 * triggering the cancellable object from another thread. If the operation 388 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 389 * operation was partially finished when the operation was cancelled the 390 * partial result will be returned, without an error. 391 * 392 * On error -1 is returned and @error is set accordingly. 393 * 394 * For the asynchronous, non-blocking, version of this function, see 395 * g_buffered_input_stream_fill_async(). 396 * 397 * Returns: the number of bytes read into @stream's buffer, up to @count, 398 * or -1 on error. 399 **/ 400 gssize 401 g_buffered_input_stream_fill (GBufferedInputStream *stream, 402 gssize count, 403 GCancellable *cancellable, 404 GError **error) 405 { 406 GBufferedInputStreamClass *class; 407 GInputStream *input_stream; 408 gssize res; 409 410 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 411 412 input_stream = G_INPUT_STREAM (stream); 413 414 if (count < -1) 415 { 416 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 417 _("Too large count value passed to %s"), G_STRFUNC); 418 return -1; 419 } 420 421 if (!g_input_stream_set_pending (input_stream, error)) 422 return -1; 423 424 if (cancellable) 425 g_cancellable_push_current (cancellable); 426 427 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 428 res = class->fill (stream, count, cancellable, error); 429 430 if (cancellable) 431 g_cancellable_pop_current (cancellable); 432 433 g_input_stream_clear_pending (input_stream); 434 435 return res; 436 } 437 438 static void 439 async_fill_callback_wrapper (GObject *source_object, 440 GAsyncResult *res, 441 gpointer user_data) 442 { 443 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object); 444 445 g_input_stream_clear_pending (G_INPUT_STREAM (stream)); 446 (*stream->priv->outstanding_callback) (source_object, res, user_data); 447 g_object_unref (stream); 448 } 449 450 /** 451 * g_buffered_input_stream_fill_async: 452 * @stream: #GBufferedInputStream. 453 * @count: the number of bytes that will be read from the stream. 454 * @io_priority: the <link linkend="io-priority">I/O priority</link> 455 * of the request. 456 * @cancellable: optional #GCancellable object 457 * @callback: a #GAsyncReadyCallback. 458 * @user_data: a #gpointer. 459 * 460 * Reads data into @stream's buffer asynchronously, up to @count size. 461 * @io_priority can be used to prioritize reads. For the synchronous 462 * version of this function, see g_buffered_input_stream_fill(). 463 * 464 * If @count is -1 then the attempted read size is equal to the number 465 * of bytes that are required to fill the buffer. 466 **/ 467 void 468 g_buffered_input_stream_fill_async (GBufferedInputStream *stream, 469 gssize count, 470 int io_priority, 471 GCancellable *cancellable, 472 GAsyncReadyCallback callback, 473 gpointer user_data) 474 { 475 GBufferedInputStreamClass *class; 476 GSimpleAsyncResult *simple; 477 GError *error = NULL; 478 479 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream)); 480 481 if (count == 0) 482 { 483 simple = g_simple_async_result_new (G_OBJECT (stream), 484 callback, 485 user_data, 486 g_buffered_input_stream_fill_async); 487 g_simple_async_result_complete_in_idle (simple); 488 g_object_unref (simple); 489 return; 490 } 491 492 if (count < -1) 493 { 494 g_simple_async_report_error_in_idle (G_OBJECT (stream), 495 callback, 496 user_data, 497 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, 498 _("Too large count value passed to %s"), 499 G_STRFUNC); 500 return; 501 } 502 503 if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error)) 504 { 505 g_simple_async_report_gerror_in_idle (G_OBJECT (stream), 506 callback, 507 user_data, 508 error); 509 g_error_free (error); 510 return; 511 } 512 513 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 514 515 stream->priv->outstanding_callback = callback; 516 g_object_ref (stream); 517 class->fill_async (stream, count, io_priority, cancellable, 518 async_fill_callback_wrapper, user_data); 519 } 520 521 /** 522 * g_buffered_input_stream_fill_finish: 523 * @stream: a #GBufferedInputStream. 524 * @result: a #GAsyncResult. 525 * @error: a #GError. 526 * 527 * Finishes an asynchronous read. 528 * 529 * Returns: a #gssize of the read stream, or %-1 on an error. 530 **/ 531 gssize 532 g_buffered_input_stream_fill_finish (GBufferedInputStream *stream, 533 GAsyncResult *result, 534 GError **error) 535 { 536 GSimpleAsyncResult *simple; 537 GBufferedInputStreamClass *class; 538 539 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 540 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); 541 542 if (G_IS_SIMPLE_ASYNC_RESULT (result)) 543 { 544 simple = G_SIMPLE_ASYNC_RESULT (result); 545 if (g_simple_async_result_propagate_error (simple, error)) 546 return -1; 547 548 /* Special case read of 0 bytes */ 549 if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async) 550 return 0; 551 } 552 553 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 554 return class->fill_finish (stream, result, error); 555 } 556 557 /** 558 * g_buffered_input_stream_get_available: 559 * @stream: #GBufferedInputStream. 560 * 561 * Gets the size of the available data within the stream. 562 * 563 * Returns: size of the available stream. 564 **/ 565 gsize 566 g_buffered_input_stream_get_available (GBufferedInputStream *stream) 567 { 568 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 569 570 return stream->priv->end - stream->priv->pos; 571 } 572 573 /** 574 * g_buffered_input_stream_peek: 575 * @stream: a #GBufferedInputStream. 576 * @buffer: a pointer to an allocated chunk of memory. 577 * @offset: a #gsize. 578 * @count: a #gsize. 579 * 580 * Peeks in the buffer, copying data of size @count into @buffer, 581 * offset @offset bytes. 582 * 583 * Returns: a #gsize of the number of bytes peeked, or %-1 on error. 584 **/ 585 gsize 586 g_buffered_input_stream_peek (GBufferedInputStream *stream, 587 void *buffer, 588 gsize offset, 589 gsize count) 590 { 591 gsize available; 592 gsize end; 593 594 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 595 g_return_val_if_fail (buffer != NULL, -1); 596 597 available = g_buffered_input_stream_get_available (stream); 598 599 if (offset > available) 600 return 0; 601 602 end = MIN (offset + count, available); 603 count = end - offset; 604 605 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count); 606 return count; 607 } 608 609 /** 610 * g_buffered_input_stream_peek_buffer: 611 * @stream: a #GBufferedInputStream. 612 * @count: a #gsize to get the number of bytes available in the buffer. 613 * 614 * Returns the buffer with the currently available bytes. The returned 615 * buffer must not be modified and will become invalid when reading from 616 * the stream or filling the buffer. 617 * 618 * Returns: read-only buffer 619 **/ 620 const void* 621 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream, 622 gsize *count) 623 { 624 GBufferedInputStreamPrivate *priv; 625 626 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL); 627 628 priv = stream->priv; 629 630 if (count) 631 *count = priv->end - priv->pos; 632 633 return priv->buffer + priv->pos; 634 } 635 636 static void 637 compact_buffer (GBufferedInputStream *stream) 638 { 639 GBufferedInputStreamPrivate *priv; 640 gsize current_size; 641 642 priv = stream->priv; 643 644 current_size = priv->end - priv->pos; 645 646 g_memmove (priv->buffer, priv->buffer + priv->pos, current_size); 647 648 priv->pos = 0; 649 priv->end = current_size; 650 } 651 652 static gssize 653 g_buffered_input_stream_real_fill (GBufferedInputStream *stream, 654 gssize count, 655 GCancellable *cancellable, 656 GError **error) 657 { 658 GBufferedInputStreamPrivate *priv; 659 GInputStream *base_stream; 660 gssize nread; 661 gsize in_buffer; 662 663 priv = stream->priv; 664 665 if (count == -1) 666 count = priv->len; 667 668 in_buffer = priv->end - priv->pos; 669 670 /* Never fill more than can fit in the buffer */ 671 count = MIN (count, priv->len - in_buffer); 672 673 /* If requested length does not fit at end, compact */ 674 if (priv->len - priv->end < count) 675 compact_buffer (stream); 676 677 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 678 nread = g_input_stream_read (base_stream, 679 priv->buffer + priv->end, 680 count, 681 cancellable, 682 error); 683 684 if (nread > 0) 685 priv->end += nread; 686 687 return nread; 688 } 689 690 static gssize 691 g_buffered_input_stream_skip (GInputStream *stream, 692 gsize count, 693 GCancellable *cancellable, 694 GError **error) 695 { 696 GBufferedInputStream *bstream; 697 GBufferedInputStreamPrivate *priv; 698 GBufferedInputStreamClass *class; 699 GInputStream *base_stream; 700 gsize available, bytes_skipped; 701 gssize nread; 702 703 bstream = G_BUFFERED_INPUT_STREAM (stream); 704 priv = bstream->priv; 705 706 available = priv->end - priv->pos; 707 708 if (count <= available) 709 { 710 priv->pos += count; 711 return count; 712 } 713 714 /* Full request not available, skip all currently available and 715 * request refill for more 716 */ 717 718 priv->pos = 0; 719 priv->end = 0; 720 bytes_skipped = available; 721 count -= available; 722 723 if (bytes_skipped > 0) 724 error = NULL; /* Ignore further errors if we already read some data */ 725 726 if (count > priv->len) 727 { 728 /* Large request, shortcut buffer */ 729 730 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 731 732 nread = g_input_stream_skip (base_stream, 733 count, 734 cancellable, 735 error); 736 737 if (nread < 0 && bytes_skipped == 0) 738 return -1; 739 740 if (nread > 0) 741 bytes_skipped += nread; 742 743 return bytes_skipped; 744 } 745 746 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 747 nread = class->fill (bstream, priv->len, cancellable, error); 748 749 if (nread < 0) 750 { 751 if (bytes_skipped == 0) 752 return -1; 753 else 754 return bytes_skipped; 755 } 756 757 available = priv->end - priv->pos; 758 count = MIN (count, available); 759 760 bytes_skipped += count; 761 priv->pos += count; 762 763 return bytes_skipped; 764 } 765 766 static gssize 767 g_buffered_input_stream_read (GInputStream *stream, 768 void *buffer, 769 gsize count, 770 GCancellable *cancellable, 771 GError **error) 772 { 773 GBufferedInputStream *bstream; 774 GBufferedInputStreamPrivate *priv; 775 GBufferedInputStreamClass *class; 776 GInputStream *base_stream; 777 gsize available, bytes_read; 778 gssize nread; 779 780 bstream = G_BUFFERED_INPUT_STREAM (stream); 781 priv = bstream->priv; 782 783 available = priv->end - priv->pos; 784 785 if (count <= available) 786 { 787 memcpy (buffer, priv->buffer + priv->pos, count); 788 priv->pos += count; 789 return count; 790 } 791 792 /* Full request not available, read all currently availbile and request refill for more */ 793 794 memcpy (buffer, priv->buffer + priv->pos, available); 795 priv->pos = 0; 796 priv->end = 0; 797 bytes_read = available; 798 count -= available; 799 800 if (bytes_read > 0) 801 error = NULL; /* Ignore further errors if we already read some data */ 802 803 if (count > priv->len) 804 { 805 /* Large request, shortcut buffer */ 806 807 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 808 809 nread = g_input_stream_read (base_stream, 810 (char *)buffer + bytes_read, 811 count, 812 cancellable, 813 error); 814 815 if (nread < 0 && bytes_read == 0) 816 return -1; 817 818 if (nread > 0) 819 bytes_read += nread; 820 821 return bytes_read; 822 } 823 824 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 825 nread = class->fill (bstream, priv->len, cancellable, error); 826 if (nread < 0) 827 { 828 if (bytes_read == 0) 829 return -1; 830 else 831 return bytes_read; 832 } 833 834 available = priv->end - priv->pos; 835 count = MIN (count, available); 836 837 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count); 838 bytes_read += count; 839 priv->pos += count; 840 841 return bytes_read; 842 } 843 844 /** 845 * g_buffered_input_stream_read_byte: 846 * @stream: #GBufferedInputStream. 847 * @cancellable: optional #GCancellable object, %NULL to ignore. 848 * @error: location to store the error occuring, or %NULL to ignore. 849 * 850 * Tries to read a single byte from the stream or the buffer. Will block 851 * during this read. 852 * 853 * On success, the byte read from the stream is returned. On end of stream 854 * -1 is returned but it's not an exceptional error and @error is not set. 855 * 856 * If @cancellable is not %NULL, then the operation can be cancelled by 857 * triggering the cancellable object from another thread. If the operation 858 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an 859 * operation was partially finished when the operation was cancelled the 860 * partial result will be returned, without an error. 861 * 862 * On error -1 is returned and @error is set accordingly. 863 * 864 * Returns: the byte read from the @stream, or -1 on end of stream or error. 865 **/ 866 int 867 g_buffered_input_stream_read_byte (GBufferedInputStream *stream, 868 GCancellable *cancellable, 869 GError **error) 870 { 871 GBufferedInputStreamPrivate *priv; 872 GBufferedInputStreamClass *class; 873 GInputStream *input_stream; 874 gsize available; 875 gssize nread; 876 877 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1); 878 879 priv = stream->priv; 880 input_stream = G_INPUT_STREAM (stream); 881 882 if (g_input_stream_is_closed (input_stream)) 883 { 884 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, 885 _("Stream is already closed")); 886 return -1; 887 } 888 889 if (!g_input_stream_set_pending (input_stream, error)) 890 return -1; 891 892 available = priv->end - priv->pos; 893 894 if (available != 0) 895 { 896 g_input_stream_clear_pending (input_stream); 897 return priv->buffer[priv->pos++]; 898 } 899 900 /* Byte not available, request refill for more */ 901 902 if (cancellable) 903 g_cancellable_push_current (cancellable); 904 905 priv->pos = 0; 906 priv->end = 0; 907 908 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 909 nread = class->fill (stream, priv->len, cancellable, error); 910 911 if (cancellable) 912 g_cancellable_pop_current (cancellable); 913 914 g_input_stream_clear_pending (input_stream); 915 916 if (nread <= 0) 917 return -1; /* error or end of stream */ 918 919 return priv->buffer[priv->pos++]; 920 } 921 922 /* ************************** */ 923 /* Async stuff implementation */ 924 /* ************************** */ 925 926 static void 927 fill_async_callback (GObject *source_object, 928 GAsyncResult *result, 929 gpointer user_data) 930 { 931 GError *error; 932 gssize res; 933 GSimpleAsyncResult *simple; 934 935 simple = user_data; 936 937 error = NULL; 938 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object), 939 result, &error); 940 941 g_simple_async_result_set_op_res_gssize (simple, res); 942 if (res == -1) 943 { 944 g_simple_async_result_set_from_error (simple, error); 945 g_error_free (error); 946 } 947 else 948 { 949 GBufferedInputStreamPrivate *priv; 950 GObject *object; 951 952 object = g_async_result_get_source_object (G_ASYNC_RESULT (simple)); 953 priv = G_BUFFERED_INPUT_STREAM (object)->priv; 954 955 g_assert_cmpint (priv->end + res, <=, priv->len); 956 priv->end += res; 957 958 g_object_unref (object); 959 } 960 961 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 962 g_simple_async_result_complete (simple); 963 g_object_unref (simple); 964 } 965 966 static void 967 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream, 968 gssize count, 969 int io_priority, 970 GCancellable *cancellable, 971 GAsyncReadyCallback callback, 972 gpointer user_data) 973 { 974 GBufferedInputStreamPrivate *priv; 975 GInputStream *base_stream; 976 GSimpleAsyncResult *simple; 977 gsize in_buffer; 978 979 priv = stream->priv; 980 981 if (count == -1) 982 count = priv->len; 983 984 in_buffer = priv->end - priv->pos; 985 986 /* Never fill more than can fit in the buffer */ 987 count = MIN (count, priv->len - in_buffer); 988 989 /* If requested length does not fit at end, compact */ 990 if (priv->len - priv->end < count) 991 compact_buffer (stream); 992 993 simple = g_simple_async_result_new (G_OBJECT (stream), 994 callback, user_data, 995 g_buffered_input_stream_real_fill_async); 996 997 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 998 g_input_stream_read_async (base_stream, 999 priv->buffer + priv->end, 1000 count, 1001 io_priority, 1002 cancellable, 1003 fill_async_callback, 1004 simple); 1005 } 1006 1007 static gssize 1008 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream, 1009 GAsyncResult *result, 1010 GError **error) 1011 { 1012 GSimpleAsyncResult *simple; 1013 gssize nread; 1014 1015 simple = G_SIMPLE_ASYNC_RESULT (result); 1016 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async); 1017 1018 nread = g_simple_async_result_get_op_res_gssize (simple); 1019 return nread; 1020 } 1021 1022 typedef struct { 1023 gssize bytes_read; 1024 gssize count; 1025 void *buffer; 1026 } ReadAsyncData; 1027 1028 static void 1029 free_read_async_data (gpointer _data) 1030 { 1031 ReadAsyncData *data = _data; 1032 g_slice_free (ReadAsyncData, data); 1033 } 1034 1035 static void 1036 large_read_callback (GObject *source_object, 1037 GAsyncResult *result, 1038 gpointer user_data) 1039 { 1040 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1041 ReadAsyncData *data; 1042 GError *error; 1043 gssize nread; 1044 1045 data = g_simple_async_result_get_op_res_gpointer (simple); 1046 1047 error = NULL; 1048 nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object), 1049 result, &error); 1050 1051 /* Only report the error if we've not already read some data */ 1052 if (nread < 0 && data->bytes_read == 0) 1053 g_simple_async_result_set_from_error (simple, error); 1054 1055 if (nread > 0) 1056 data->bytes_read += nread; 1057 1058 if (error) 1059 g_error_free (error); 1060 1061 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1062 g_simple_async_result_complete (simple); 1063 g_object_unref (simple); 1064 } 1065 1066 static void 1067 read_fill_buffer_callback (GObject *source_object, 1068 GAsyncResult *result, 1069 gpointer user_data) 1070 { 1071 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1072 GBufferedInputStream *bstream; 1073 GBufferedInputStreamPrivate *priv; 1074 ReadAsyncData *data; 1075 GError *error; 1076 gssize nread; 1077 gsize available; 1078 1079 bstream = G_BUFFERED_INPUT_STREAM (source_object); 1080 priv = bstream->priv; 1081 1082 data = g_simple_async_result_get_op_res_gpointer (simple); 1083 1084 error = NULL; 1085 nread = g_buffered_input_stream_fill_finish (bstream, 1086 result, &error); 1087 1088 if (nread < 0 && data->bytes_read == 0) 1089 g_simple_async_result_set_from_error (simple, error); 1090 1091 1092 if (nread > 0) 1093 { 1094 available = priv->end - priv->pos; 1095 data->count = MIN (data->count, available); 1096 1097 memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count); 1098 data->bytes_read += data->count; 1099 priv->pos += data->count; 1100 } 1101 1102 if (error) 1103 g_error_free (error); 1104 1105 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1106 g_simple_async_result_complete (simple); 1107 g_object_unref (simple); 1108 } 1109 1110 static void 1111 g_buffered_input_stream_read_async (GInputStream *stream, 1112 void *buffer, 1113 gsize count, 1114 int io_priority, 1115 GCancellable *cancellable, 1116 GAsyncReadyCallback callback, 1117 gpointer user_data) 1118 { 1119 GBufferedInputStream *bstream; 1120 GBufferedInputStreamPrivate *priv; 1121 GBufferedInputStreamClass *class; 1122 GInputStream *base_stream; 1123 gsize available; 1124 GSimpleAsyncResult *simple; 1125 ReadAsyncData *data; 1126 1127 bstream = G_BUFFERED_INPUT_STREAM (stream); 1128 priv = bstream->priv; 1129 1130 data = g_slice_new (ReadAsyncData); 1131 data->buffer = buffer; 1132 data->bytes_read = 0; 1133 simple = g_simple_async_result_new (G_OBJECT (stream), 1134 callback, user_data, 1135 g_buffered_input_stream_read_async); 1136 g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data); 1137 1138 available = priv->end - priv->pos; 1139 1140 if (count <= available) 1141 { 1142 memcpy (buffer, priv->buffer + priv->pos, count); 1143 priv->pos += count; 1144 data->bytes_read = count; 1145 1146 g_simple_async_result_complete_in_idle (simple); 1147 g_object_unref (simple); 1148 return; 1149 } 1150 1151 1152 /* Full request not available, read all currently availbile and request refill for more */ 1153 1154 memcpy (buffer, priv->buffer + priv->pos, available); 1155 priv->pos = 0; 1156 priv->end = 0; 1157 1158 count -= available; 1159 1160 data->bytes_read = available; 1161 data->count = count; 1162 1163 if (count > priv->len) 1164 { 1165 /* Large request, shortcut buffer */ 1166 1167 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 1168 1169 g_input_stream_read_async (base_stream, 1170 (char *)buffer + data->bytes_read, 1171 count, 1172 io_priority, cancellable, 1173 large_read_callback, 1174 simple); 1175 } 1176 else 1177 { 1178 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 1179 class->fill_async (bstream, priv->len, io_priority, cancellable, 1180 read_fill_buffer_callback, simple); 1181 } 1182 } 1183 1184 static gssize 1185 g_buffered_input_stream_read_finish (GInputStream *stream, 1186 GAsyncResult *result, 1187 GError **error) 1188 { 1189 GSimpleAsyncResult *simple; 1190 ReadAsyncData *data; 1191 1192 simple = G_SIMPLE_ASYNC_RESULT (result); 1193 1194 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async); 1195 1196 data = g_simple_async_result_get_op_res_gpointer (simple); 1197 1198 return data->bytes_read; 1199 } 1200 1201 typedef struct { 1202 gssize bytes_skipped; 1203 gssize count; 1204 } SkipAsyncData; 1205 1206 static void 1207 free_skip_async_data (gpointer _data) 1208 { 1209 SkipAsyncData *data = _data; 1210 g_slice_free (SkipAsyncData, data); 1211 } 1212 1213 static void 1214 large_skip_callback (GObject *source_object, 1215 GAsyncResult *result, 1216 gpointer user_data) 1217 { 1218 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1219 SkipAsyncData *data; 1220 GError *error; 1221 gssize nread; 1222 1223 data = g_simple_async_result_get_op_res_gpointer (simple); 1224 1225 error = NULL; 1226 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object), 1227 result, &error); 1228 1229 /* Only report the error if we've not already read some data */ 1230 if (nread < 0 && data->bytes_skipped == 0) 1231 g_simple_async_result_set_from_error (simple, error); 1232 1233 if (nread > 0) 1234 data->bytes_skipped += nread; 1235 1236 if (error) 1237 g_error_free (error); 1238 1239 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1240 g_simple_async_result_complete (simple); 1241 g_object_unref (simple); 1242 } 1243 1244 static void 1245 skip_fill_buffer_callback (GObject *source_object, 1246 GAsyncResult *result, 1247 gpointer user_data) 1248 { 1249 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); 1250 GBufferedInputStream *bstream; 1251 GBufferedInputStreamPrivate *priv; 1252 SkipAsyncData *data; 1253 GError *error; 1254 gssize nread; 1255 gsize available; 1256 1257 bstream = G_BUFFERED_INPUT_STREAM (source_object); 1258 priv = bstream->priv; 1259 1260 data = g_simple_async_result_get_op_res_gpointer (simple); 1261 1262 error = NULL; 1263 nread = g_buffered_input_stream_fill_finish (bstream, 1264 result, &error); 1265 1266 if (nread < 0 && data->bytes_skipped == 0) 1267 g_simple_async_result_set_from_error (simple, error); 1268 1269 1270 if (nread > 0) 1271 { 1272 available = priv->end - priv->pos; 1273 data->count = MIN (data->count, available); 1274 1275 data->bytes_skipped += data->count; 1276 priv->pos += data->count; 1277 } 1278 1279 if (error) 1280 g_error_free (error); 1281 1282 /* Complete immediately, not in idle, since we're already in a mainloop callout */ 1283 g_simple_async_result_complete (simple); 1284 g_object_unref (simple); 1285 } 1286 1287 static void 1288 g_buffered_input_stream_skip_async (GInputStream *stream, 1289 gsize count, 1290 int io_priority, 1291 GCancellable *cancellable, 1292 GAsyncReadyCallback callback, 1293 gpointer user_data) 1294 { 1295 GBufferedInputStream *bstream; 1296 GBufferedInputStreamPrivate *priv; 1297 GBufferedInputStreamClass *class; 1298 GInputStream *base_stream; 1299 gsize available; 1300 GSimpleAsyncResult *simple; 1301 SkipAsyncData *data; 1302 1303 bstream = G_BUFFERED_INPUT_STREAM (stream); 1304 priv = bstream->priv; 1305 1306 data = g_slice_new (SkipAsyncData); 1307 data->bytes_skipped = 0; 1308 simple = g_simple_async_result_new (G_OBJECT (stream), 1309 callback, user_data, 1310 g_buffered_input_stream_skip_async); 1311 g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data); 1312 1313 available = priv->end - priv->pos; 1314 1315 if (count <= available) 1316 { 1317 priv->pos += count; 1318 data->bytes_skipped = count; 1319 1320 g_simple_async_result_complete_in_idle (simple); 1321 g_object_unref (simple); 1322 return; 1323 } 1324 1325 1326 /* Full request not available, skip all currently availbile and request refill for more */ 1327 1328 priv->pos = 0; 1329 priv->end = 0; 1330 1331 count -= available; 1332 1333 data->bytes_skipped = available; 1334 data->count = count; 1335 1336 if (count > priv->len) 1337 { 1338 /* Large request, shortcut buffer */ 1339 1340 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream; 1341 1342 g_input_stream_skip_async (base_stream, 1343 count, 1344 io_priority, cancellable, 1345 large_skip_callback, 1346 simple); 1347 } 1348 else 1349 { 1350 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream); 1351 class->fill_async (bstream, priv->len, io_priority, cancellable, 1352 skip_fill_buffer_callback, simple); 1353 } 1354 } 1355 1356 static gssize 1357 g_buffered_input_stream_skip_finish (GInputStream *stream, 1358 GAsyncResult *result, 1359 GError **error) 1360 { 1361 GSimpleAsyncResult *simple; 1362 SkipAsyncData *data; 1363 1364 simple = G_SIMPLE_ASYNC_RESULT (result); 1365 1366 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async); 1367 1368 data = g_simple_async_result_get_op_res_gpointer (simple); 1369 1370 return data->bytes_skipped; 1371 } 1372 1373 1374 #define __G_BUFFERED_INPUT_STREAM_C__ 1375 #include "gioaliasdef.c" 1376