1 /* GIO - GLib Input, Output and Streaming Library 2 * 3 * Copyright (C) 2006-2007 Red Hat, Inc. 4 * 5 * This library is free software; you can redistribute it and/or 6 * modify it under the terms of the GNU Lesser General Public 7 * License as published by the Free Software Foundation; either 8 * version 2 of the License, or (at your option) any later version. 9 * 10 * This library is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * Lesser General Public License for more details. 14 * 15 * You should have received a copy of the GNU Lesser General 16 * Public License along with this library; if not, write to the 17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330, 18 * Boston, MA 02111-1307, USA. 19 * 20 * Author: Christian Kellner <gicmo (at) gnome.org> 21 */ 22 23 #include "config.h" 24 #include "gbufferedoutputstream.h" 25 #include "goutputstream.h" 26 #include "gsimpleasyncresult.h" 27 #include "string.h" 28 #include "glibintl.h" 29 30 #include <gioalias.h> 31 32 /** 33 * SECTION:gbufferedoutputstream 34 * @short_description: Buffered Output Stream 35 * @include: gio/gio.h 36 * @see_also: #GFilterOutputStream, #GOutputStream 37 * 38 * Buffered output stream implements #GFilterOutputStream and provides 39 * for buffered writes. 40 * 41 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes. 42 * 43 * To create a buffered output stream, use g_buffered_output_stream_new(), 44 * or g_buffered_output_stream_new_sized() to specify the buffer's size 45 * at construction. 46 * 47 * To get the size of a buffer within a buffered input stream, use 48 * g_buffered_output_stream_get_buffer_size(). To change the size of a 49 * buffered output stream's buffer, use 50 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's 51 * size cannot be reduced below the size of the data within the buffer. 52 **/ 53 54 #define DEFAULT_BUFFER_SIZE 4096 55 56 struct _GBufferedOutputStreamPrivate { 57 guint8 *buffer; 58 gsize len; 59 goffset pos; 60 gboolean auto_grow; 61 }; 62 63 enum { 64 PROP_0, 65 PROP_BUFSIZE, 66 PROP_AUTO_GROW 67 }; 68 69 static void g_buffered_output_stream_set_property (GObject *object, 70 guint prop_id, 71 const GValue *value, 72 GParamSpec *pspec); 73 74 static void g_buffered_output_stream_get_property (GObject *object, 75 guint prop_id, 76 GValue *value, 77 GParamSpec *pspec); 78 static void g_buffered_output_stream_finalize (GObject *object); 79 80 81 static gssize g_buffered_output_stream_write (GOutputStream *stream, 82 const void *buffer, 83 gsize count, 84 GCancellable *cancellable, 85 GError **error); 86 static gboolean g_buffered_output_stream_flush (GOutputStream *stream, 87 GCancellable *cancellable, 88 GError **error); 89 static gboolean g_buffered_output_stream_close (GOutputStream *stream, 90 GCancellable *cancellable, 91 GError **error); 92 93 static void g_buffered_output_stream_write_async (GOutputStream *stream, 94 const void *buffer, 95 gsize count, 96 int io_priority, 97 GCancellable *cancellable, 98 GAsyncReadyCallback callback, 99 gpointer data); 100 static gssize g_buffered_output_stream_write_finish (GOutputStream *stream, 101 GAsyncResult *result, 102 GError **error); 103 static void g_buffered_output_stream_flush_async (GOutputStream *stream, 104 int io_priority, 105 GCancellable *cancellable, 106 GAsyncReadyCallback callback, 107 gpointer data); 108 static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream, 109 GAsyncResult *result, 110 GError **error); 111 static void g_buffered_output_stream_close_async (GOutputStream *stream, 112 int io_priority, 113 GCancellable *cancellable, 114 GAsyncReadyCallback callback, 115 gpointer data); 116 static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream, 117 GAsyncResult *result, 118 GError **error); 119 120 G_DEFINE_TYPE (GBufferedOutputStream, 121 g_buffered_output_stream, 122 G_TYPE_FILTER_OUTPUT_STREAM) 123 124 125 static void 126 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass) 127 { 128 GObjectClass *object_class; 129 GOutputStreamClass *ostream_class; 130 131 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate)); 132 133 object_class = G_OBJECT_CLASS (klass); 134 object_class->get_property = g_buffered_output_stream_get_property; 135 object_class->set_property = g_buffered_output_stream_set_property; 136 object_class->finalize = g_buffered_output_stream_finalize; 137 138 ostream_class = G_OUTPUT_STREAM_CLASS (klass); 139 ostream_class->write_fn = g_buffered_output_stream_write; 140 ostream_class->flush = g_buffered_output_stream_flush; 141 ostream_class->close_fn = g_buffered_output_stream_close; 142 ostream_class->write_async = g_buffered_output_stream_write_async; 143 ostream_class->write_finish = g_buffered_output_stream_write_finish; 144 ostream_class->flush_async = g_buffered_output_stream_flush_async; 145 ostream_class->flush_finish = g_buffered_output_stream_flush_finish; 146 ostream_class->close_async = g_buffered_output_stream_close_async; 147 ostream_class->close_finish = g_buffered_output_stream_close_finish; 148 149 g_object_class_install_property (object_class, 150 PROP_BUFSIZE, 151 g_param_spec_uint ("buffer-size", 152 P_("Buffer Size"), 153 P_("The size of the backend buffer"), 154 1, 155 G_MAXUINT, 156 DEFAULT_BUFFER_SIZE, 157 G_PARAM_READWRITE|G_PARAM_CONSTRUCT| 158 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 159 160 g_object_class_install_property (object_class, 161 PROP_AUTO_GROW, 162 g_param_spec_boolean ("auto-grow", 163 P_("Auto-grow"), 164 P_("Whether the buffer should automatically grow"), 165 FALSE, 166 G_PARAM_READWRITE| 167 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB)); 168 169 } 170 171 /** 172 * g_buffered_output_stream_get_buffer_size: 173 * @stream: a #GBufferedOutputStream. 174 * 175 * Gets the size of the buffer in the @stream. 176 * 177 * Returns: the current size of the buffer. 178 **/ 179 gsize 180 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream) 181 { 182 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1); 183 184 return stream->priv->len; 185 } 186 187 /** 188 * g_buffered_output_stream_set_buffer_size: 189 * @stream: a #GBufferedOutputStream. 190 * @size: a #gsize. 191 * 192 * Sets the size of the internal buffer to @size. 193 **/ 194 void 195 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream, 196 gsize size) 197 { 198 GBufferedOutputStreamPrivate *priv; 199 guint8 *buffer; 200 201 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 202 203 priv = stream->priv; 204 205 if (size == priv->len) 206 return; 207 208 if (priv->buffer) 209 { 210 size = MAX (size, priv->pos); 211 212 buffer = g_malloc (size); 213 memcpy (buffer, priv->buffer, priv->pos); 214 g_free (priv->buffer); 215 priv->buffer = buffer; 216 priv->len = size; 217 /* Keep old pos */ 218 } 219 else 220 { 221 priv->buffer = g_malloc (size); 222 priv->len = size; 223 priv->pos = 0; 224 } 225 226 g_object_notify (G_OBJECT (stream), "buffer-size"); 227 } 228 229 /** 230 * g_buffered_output_stream_get_auto_grow: 231 * @stream: a #GBufferedOutputStream. 232 * 233 * Checks if the buffer automatically grows as data is added. 234 * 235 * Returns: %TRUE if the @stream's buffer automatically grows, 236 * %FALSE otherwise. 237 **/ 238 gboolean 239 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream) 240 { 241 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE); 242 243 return stream->priv->auto_grow; 244 } 245 246 /** 247 * g_buffered_output_stream_set_auto_grow: 248 * @stream: a #GBufferedOutputStream. 249 * @auto_grow: a #gboolean. 250 * 251 * Sets whether or not the @stream's buffer should automatically grow. 252 * If @auto_grow is true, then each write will just make the buffer 253 * larger, and you must manually flush the buffer to actually write out 254 * the data to the underlying stream. 255 **/ 256 void 257 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream, 258 gboolean auto_grow) 259 { 260 GBufferedOutputStreamPrivate *priv; 261 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream)); 262 priv = stream->priv; 263 auto_grow = auto_grow != FALSE; 264 if (priv->auto_grow != auto_grow) 265 { 266 priv->auto_grow = auto_grow; 267 g_object_notify (G_OBJECT (stream), "auto-grow"); 268 } 269 } 270 271 static void 272 g_buffered_output_stream_set_property (GObject *object, 273 guint prop_id, 274 const GValue *value, 275 GParamSpec *pspec) 276 { 277 GBufferedOutputStream *stream; 278 279 stream = G_BUFFERED_OUTPUT_STREAM (object); 280 281 switch (prop_id) 282 { 283 case PROP_BUFSIZE: 284 g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value)); 285 break; 286 287 case PROP_AUTO_GROW: 288 g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value)); 289 break; 290 291 default: 292 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 293 break; 294 } 295 296 } 297 298 static void 299 g_buffered_output_stream_get_property (GObject *object, 300 guint prop_id, 301 GValue *value, 302 GParamSpec *pspec) 303 { 304 GBufferedOutputStream *buffered_stream; 305 GBufferedOutputStreamPrivate *priv; 306 307 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object); 308 priv = buffered_stream->priv; 309 310 switch (prop_id) 311 { 312 case PROP_BUFSIZE: 313 g_value_set_uint (value, priv->len); 314 break; 315 316 case PROP_AUTO_GROW: 317 g_value_set_boolean (value, priv->auto_grow); 318 break; 319 320 default: 321 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); 322 break; 323 } 324 325 } 326 327 static void 328 g_buffered_output_stream_finalize (GObject *object) 329 { 330 GBufferedOutputStream *stream; 331 GBufferedOutputStreamPrivate *priv; 332 333 stream = G_BUFFERED_OUTPUT_STREAM (object); 334 priv = stream->priv; 335 336 g_free (priv->buffer); 337 338 G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object); 339 } 340 341 static void 342 g_buffered_output_stream_init (GBufferedOutputStream *stream) 343 { 344 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, 345 G_TYPE_BUFFERED_OUTPUT_STREAM, 346 GBufferedOutputStreamPrivate); 347 348 } 349 350 /** 351 * g_buffered_output_stream_new: 352 * @base_stream: a #GOutputStream. 353 * 354 * Creates a new buffered output stream for a base stream. 355 * 356 * Returns: a #GOutputStream for the given @base_stream. 357 **/ 358 GOutputStream * 359 g_buffered_output_stream_new (GOutputStream *base_stream) 360 { 361 GOutputStream *stream; 362 363 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 364 365 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 366 "base-stream", base_stream, 367 NULL); 368 369 return stream; 370 } 371 372 /** 373 * g_buffered_output_stream_new_sized: 374 * @base_stream: a #GOutputStream. 375 * @size: a #gsize. 376 * 377 * Creates a new buffered output stream with a given buffer size. 378 * 379 * Returns: a #GOutputStream with an internal buffer set to @size. 380 **/ 381 GOutputStream * 382 g_buffered_output_stream_new_sized (GOutputStream *base_stream, 383 gsize size) 384 { 385 GOutputStream *stream; 386 387 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL); 388 389 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM, 390 "base-stream", base_stream, 391 "buffer-size", size, 392 NULL); 393 394 return stream; 395 } 396 397 static gboolean 398 flush_buffer (GBufferedOutputStream *stream, 399 GCancellable *cancellable, 400 GError **error) 401 { 402 GBufferedOutputStreamPrivate *priv; 403 GOutputStream *base_stream; 404 gboolean res; 405 gsize bytes_written; 406 gsize count; 407 408 priv = stream->priv; 409 bytes_written = 0; 410 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 411 412 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE); 413 414 res = g_output_stream_write_all (base_stream, 415 priv->buffer, 416 priv->pos, 417 &bytes_written, 418 cancellable, 419 error); 420 421 count = priv->pos - bytes_written; 422 423 if (count > 0) 424 g_memmove (priv->buffer, priv->buffer + bytes_written, count); 425 426 priv->pos -= bytes_written; 427 428 return res; 429 } 430 431 static gssize 432 g_buffered_output_stream_write (GOutputStream *stream, 433 const void *buffer, 434 gsize count, 435 GCancellable *cancellable, 436 GError **error) 437 { 438 GBufferedOutputStream *bstream; 439 GBufferedOutputStreamPrivate *priv; 440 gboolean res; 441 gsize n; 442 gsize new_size; 443 444 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 445 priv = bstream->priv; 446 447 n = priv->len - priv->pos; 448 449 if (priv->auto_grow && n < count) 450 { 451 new_size = MAX (priv->len * 2, priv->len + count); 452 g_buffered_output_stream_set_buffer_size (bstream, new_size); 453 } 454 else if (n == 0) 455 { 456 res = flush_buffer (bstream, cancellable, error); 457 458 if (res == FALSE) 459 return -1; 460 } 461 462 n = priv->len - priv->pos; 463 464 count = MIN (count, n); 465 memcpy (priv->buffer + priv->pos, buffer, count); 466 priv->pos += count; 467 468 return count; 469 } 470 471 static gboolean 472 g_buffered_output_stream_flush (GOutputStream *stream, 473 GCancellable *cancellable, 474 GError **error) 475 { 476 GBufferedOutputStream *bstream; 477 GBufferedOutputStreamPrivate *priv; 478 GOutputStream *base_stream; 479 gboolean res; 480 481 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 482 priv = bstream->priv; 483 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 484 485 res = flush_buffer (bstream, cancellable, error); 486 487 if (res == FALSE) 488 return FALSE; 489 490 res = g_output_stream_flush (base_stream, cancellable, error); 491 492 return res; 493 } 494 495 static gboolean 496 g_buffered_output_stream_close (GOutputStream *stream, 497 GCancellable *cancellable, 498 GError **error) 499 { 500 GBufferedOutputStream *bstream; 501 GBufferedOutputStreamPrivate *priv; 502 GOutputStream *base_stream; 503 gboolean res; 504 505 bstream = G_BUFFERED_OUTPUT_STREAM (stream); 506 priv = bstream->priv; 507 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream; 508 509 res = flush_buffer (bstream, cancellable, error); 510 511 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream))) 512 { 513 /* report the first error but still close the stream */ 514 if (res) 515 res = g_output_stream_close (base_stream, cancellable, error); 516 else 517 g_output_stream_close (base_stream, cancellable, NULL); 518 } 519 520 return res; 521 } 522 523 /* ************************** */ 524 /* Async stuff implementation */ 525 /* ************************** */ 526 527 /* TODO: This should be using the base class async ops, not threads */ 528 529 typedef struct { 530 531 guint flush_stream : 1; 532 guint close_stream : 1; 533 534 } FlushData; 535 536 static void 537 free_flush_data (gpointer data) 538 { 539 g_slice_free (FlushData, data); 540 } 541 542 /* This function is used by all three (i.e. 543 * _write, _flush, _close) functions since 544 * all of them will need to flush the buffer 545 * and so closing and writing is just a special 546 * case of flushing + some addition stuff */ 547 static void 548 flush_buffer_thread (GSimpleAsyncResult *result, 549 GObject *object, 550 GCancellable *cancellable) 551 { 552 GBufferedOutputStream *stream; 553 GOutputStream *base_stream; 554 FlushData *fdata; 555 gboolean res; 556 GError *error = NULL; 557 558 stream = G_BUFFERED_OUTPUT_STREAM (object); 559 fdata = g_simple_async_result_get_op_res_gpointer (result); 560 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream; 561 562 res = flush_buffer (stream, cancellable, &error); 563 564 /* if flushing the buffer didn't work don't even bother 565 * to flush the stream but just report that error */ 566 if (res && fdata->flush_stream) 567 res = g_output_stream_flush (base_stream, cancellable, &error); 568 569 if (fdata->close_stream) 570 { 571 572 /* if flushing the buffer or the stream returned 573 * an error report that first error but still try 574 * close the stream */ 575 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream))) 576 { 577 if (res == FALSE) 578 g_output_stream_close (base_stream, cancellable, NULL); 579 else 580 res = g_output_stream_close (base_stream, cancellable, &error); 581 } 582 } 583 584 if (res == FALSE) 585 { 586 g_simple_async_result_set_from_error (result, error); 587 g_error_free (error); 588 } 589 } 590 591 typedef struct { 592 593 FlushData fdata; 594 595 gsize count; 596 const void *buffer; 597 598 } WriteData; 599 600 static void 601 free_write_data (gpointer data) 602 { 603 g_slice_free (WriteData, data); 604 } 605 606 static void 607 g_buffered_output_stream_write_async (GOutputStream *stream, 608 const void *buffer, 609 gsize count, 610 int io_priority, 611 GCancellable *cancellable, 612 GAsyncReadyCallback callback, 613 gpointer data) 614 { 615 GBufferedOutputStream *buffered_stream; 616 GBufferedOutputStreamPrivate *priv; 617 GSimpleAsyncResult *res; 618 WriteData *wdata; 619 620 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 621 priv = buffered_stream->priv; 622 623 wdata = g_slice_new (WriteData); 624 wdata->count = count; 625 wdata->buffer = buffer; 626 627 res = g_simple_async_result_new (G_OBJECT (stream), 628 callback, 629 data, 630 g_buffered_output_stream_write_async); 631 632 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data); 633 634 /* if we have space left directly call the 635 * callback (from idle) otherwise schedule a buffer 636 * flush in the thread. In both cases the actual 637 * copying of the data to the buffer will be done in 638 * the write_finish () func since that should 639 * be fast enough */ 640 if (priv->len - priv->pos > 0) 641 { 642 g_simple_async_result_complete_in_idle (res); 643 } 644 else 645 { 646 wdata->fdata.flush_stream = FALSE; 647 wdata->fdata.close_stream = FALSE; 648 g_simple_async_result_run_in_thread (res, 649 flush_buffer_thread, 650 io_priority, 651 cancellable); 652 g_object_unref (res); 653 } 654 } 655 656 static gssize 657 g_buffered_output_stream_write_finish (GOutputStream *stream, 658 GAsyncResult *result, 659 GError **error) 660 { 661 GBufferedOutputStreamPrivate *priv; 662 GBufferedOutputStream *buffered_stream; 663 GSimpleAsyncResult *simple; 664 WriteData *wdata; 665 gssize count; 666 667 simple = G_SIMPLE_ASYNC_RESULT (result); 668 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream); 669 priv = buffered_stream->priv; 670 671 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 672 g_buffered_output_stream_write_async); 673 674 wdata = g_simple_async_result_get_op_res_gpointer (simple); 675 676 /* Now do the real copying of data to the buffer */ 677 count = priv->len - priv->pos; 678 count = MIN (wdata->count, count); 679 680 memcpy (priv->buffer + priv->pos, wdata->buffer, count); 681 682 priv->pos += count; 683 684 return count; 685 } 686 687 static void 688 g_buffered_output_stream_flush_async (GOutputStream *stream, 689 int io_priority, 690 GCancellable *cancellable, 691 GAsyncReadyCallback callback, 692 gpointer data) 693 { 694 GSimpleAsyncResult *res; 695 FlushData *fdata; 696 697 fdata = g_slice_new (FlushData); 698 fdata->flush_stream = TRUE; 699 fdata->close_stream = FALSE; 700 701 res = g_simple_async_result_new (G_OBJECT (stream), 702 callback, 703 data, 704 g_buffered_output_stream_flush_async); 705 706 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 707 708 g_simple_async_result_run_in_thread (res, 709 flush_buffer_thread, 710 io_priority, 711 cancellable); 712 g_object_unref (res); 713 } 714 715 static gboolean 716 g_buffered_output_stream_flush_finish (GOutputStream *stream, 717 GAsyncResult *result, 718 GError **error) 719 { 720 GSimpleAsyncResult *simple; 721 722 simple = G_SIMPLE_ASYNC_RESULT (result); 723 724 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 725 g_buffered_output_stream_flush_async); 726 727 return TRUE; 728 } 729 730 static void 731 g_buffered_output_stream_close_async (GOutputStream *stream, 732 int io_priority, 733 GCancellable *cancellable, 734 GAsyncReadyCallback callback, 735 gpointer data) 736 { 737 GSimpleAsyncResult *res; 738 FlushData *fdata; 739 740 fdata = g_slice_new (FlushData); 741 fdata->close_stream = TRUE; 742 743 res = g_simple_async_result_new (G_OBJECT (stream), 744 callback, 745 data, 746 g_buffered_output_stream_close_async); 747 748 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data); 749 750 g_simple_async_result_run_in_thread (res, 751 flush_buffer_thread, 752 io_priority, 753 cancellable); 754 g_object_unref (res); 755 } 756 757 static gboolean 758 g_buffered_output_stream_close_finish (GOutputStream *stream, 759 GAsyncResult *result, 760 GError **error) 761 { 762 GSimpleAsyncResult *simple; 763 764 simple = G_SIMPLE_ASYNC_RESULT (result); 765 766 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 767 g_buffered_output_stream_close_async); 768 769 return TRUE; 770 } 771 772 #define __G_BUFFERED_OUTPUT_STREAM_C__ 773 #include "gioaliasdef.c" 774