Home | History | Annotate | Download | only in gio
      1 /* GIO - GLib Input, Output and Streaming Library
      2  *
      3  * Copyright (C) 2006-2007 Red Hat, Inc.
      4  * Copyright (C) 2007 Jrg Billeter
      5  *
      6  * This library is free software; you can redistribute it and/or
      7  * modify it under the terms of the GNU Lesser General Public
      8  * License as published by the Free Software Foundation; either
      9  * version 2 of the License, or (at your option) any later version.
     10  *
     11  * This library is distributed in the hope that it will be useful,
     12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     14  * Lesser General Public License for more details.
     15  *
     16  * You should have received a copy of the GNU Lesser General
     17  * Public License along with this library; if not, write to the
     18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
     19  * Boston, MA 02111-1307, USA.
     20  *
     21  * Author: Christian Kellner <gicmo (at) gnome.org>
     22  */
     23 
     24 #include "config.h"
     25 #include "gbufferedinputstream.h"
     26 #include "ginputstream.h"
     27 #include "gcancellable.h"
     28 #include "gasyncresult.h"
     29 #include "gsimpleasyncresult.h"
     30 #include "gioerror.h"
     31 #include <string.h>
     32 #include "glibintl.h"
     33 
     34 #include "gioalias.h"
     35 
     36 /**
     37  * SECTION:gbufferedinputstream
     38  * @short_description: Buffered Input Stream
     39  * @include: gio/gio.h
     40  * @see_also: #GFilterInputStream, #GInputStream
     41  *
     42  * Buffered input stream implements #GFilterInputStream and provides
     43  * for buffered reads.
     44  *
     45  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
     46  *
     47  * To create a buffered input stream, use g_buffered_input_stream_new(),
     48  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
     49  * construction.
     50  *
     51  * To get the size of a buffer within a buffered input stream, use
     52  * g_buffered_input_stream_get_buffer_size(). To change the size of a
     53  * buffered input stream's buffer, use
     54  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
     55  * cannot be reduced below the size of the data within the buffer.
     56  *
     57  **/
     58 
     59 
     60 
     61 #define DEFAULT_BUFFER_SIZE 4096
     62 
     63 struct _GBufferedInputStreamPrivate {
     64   guint8 *buffer;
     65   gsize   len;
     66   gsize   pos;
     67   gsize   end;
     68   GAsyncReadyCallback outstanding_callback;
     69 };
     70 
     71 enum {
     72   PROP_0,
     73   PROP_BUFSIZE
     74 };
     75 
     76 static void g_buffered_input_stream_set_property  (GObject      *object,
     77                                                    guint         prop_id,
     78                                                    const GValue *value,
     79                                                    GParamSpec   *pspec);
     80 
     81 static void g_buffered_input_stream_get_property  (GObject      *object,
     82                                                    guint         prop_id,
     83                                                    GValue       *value,
     84                                                    GParamSpec   *pspec);
     85 static void g_buffered_input_stream_finalize      (GObject *object);
     86 
     87 
     88 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
     89 							gsize                  count,
     90 							GCancellable          *cancellable,
     91 							GError               **error);
     92 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
     93 							gsize                  count,
     94 							int                    io_priority,
     95 							GCancellable          *cancellable,
     96 							GAsyncReadyCallback    callback,
     97 							gpointer               user_data);
     98 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
     99 							GAsyncResult          *result,
    100 							GError               **error);
    101 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
    102 							void                  *buffer,
    103 							gsize                  count,
    104 							GCancellable          *cancellable,
    105 							GError               **error);
    106 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
    107 							void                  *buffer,
    108 							gsize                  count,
    109 							int                    io_priority,
    110 							GCancellable          *cancellable,
    111 							GAsyncReadyCallback    callback,
    112 							gpointer               user_data);
    113 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
    114 							GAsyncResult          *result,
    115 							GError               **error);
    116 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
    117 							gssize                 count,
    118 							GCancellable          *cancellable,
    119 							GError               **error);
    120 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
    121 							gssize                 count,
    122 							int                    io_priority,
    123 							GCancellable          *cancellable,
    124 							GAsyncReadyCallback    callback,
    125 							gpointer               user_data);
    126 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
    127 							GAsyncResult          *result,
    128 							GError               **error);
    129 
    130 static void compact_buffer (GBufferedInputStream *stream);
    131 
    132 G_DEFINE_TYPE (GBufferedInputStream,
    133                g_buffered_input_stream,
    134                G_TYPE_FILTER_INPUT_STREAM)
    135 
    136 
    137 static void
    138 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
    139 {
    140   GObjectClass *object_class;
    141   GInputStreamClass *istream_class;
    142   GBufferedInputStreamClass *bstream_class;
    143 
    144   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
    145 
    146   object_class = G_OBJECT_CLASS (klass);
    147   object_class->get_property = g_buffered_input_stream_get_property;
    148   object_class->set_property = g_buffered_input_stream_set_property;
    149   object_class->finalize     = g_buffered_input_stream_finalize;
    150 
    151   istream_class = G_INPUT_STREAM_CLASS (klass);
    152   istream_class->skip = g_buffered_input_stream_skip;
    153   istream_class->skip_async  = g_buffered_input_stream_skip_async;
    154   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
    155   istream_class->read_fn = g_buffered_input_stream_read;
    156   istream_class->read_async  = g_buffered_input_stream_read_async;
    157   istream_class->read_finish = g_buffered_input_stream_read_finish;
    158 
    159   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
    160   bstream_class->fill = g_buffered_input_stream_real_fill;
    161   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
    162   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
    163 
    164   g_object_class_install_property (object_class,
    165                                    PROP_BUFSIZE,
    166                                    g_param_spec_uint ("buffer-size",
    167                                                       P_("Buffer Size"),
    168                                                       P_("The size of the backend buffer"),
    169                                                       1,
    170                                                       G_MAXUINT,
    171                                                       DEFAULT_BUFFER_SIZE,
    172                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
    173                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
    174 
    175 
    176 }
    177 
    178 /**
    179  * g_buffered_input_stream_get_buffer_size:
    180  * @stream: #GBufferedInputStream.
    181  *
    182  * Gets the size of the input buffer.
    183  *
    184  * Returns: the current buffer size.
    185  **/
    186 gsize
    187 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
    188 {
    189   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
    190 
    191   return stream->priv->len;
    192 }
    193 
    194 /**
    195  * g_buffered_input_stream_set_buffer_size:
    196  * @stream: #GBufferedInputStream.
    197  * @size: a #gsize.
    198  *
    199  * Sets the size of the internal buffer of @stream to @size, or to the
    200  * size of the contents of the buffer. The buffer can never be resized
    201  * smaller than its current contents.
    202  **/
    203 void
    204 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
    205                                          gsize                  size)
    206 {
    207   GBufferedInputStreamPrivate *priv;
    208   gsize in_buffer;
    209   guint8 *buffer;
    210 
    211   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
    212 
    213   priv = stream->priv;
    214 
    215   if (priv->len == size)
    216     return;
    217 
    218   if (priv->buffer)
    219     {
    220       in_buffer = priv->end - priv->pos;
    221 
    222       /* Never resize smaller than current buffer contents */
    223       size = MAX (size, in_buffer);
    224 
    225       buffer = g_malloc (size);
    226       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
    227       priv->len = size;
    228       priv->pos = 0;
    229       priv->end = in_buffer;
    230       g_free (priv->buffer);
    231       priv->buffer = buffer;
    232     }
    233   else
    234     {
    235       priv->len = size;
    236       priv->pos = 0;
    237       priv->end = 0;
    238       priv->buffer = g_malloc (size);
    239     }
    240 
    241   g_object_notify (G_OBJECT (stream), "buffer-size");
    242 }
    243 
    244 static void
    245 g_buffered_input_stream_set_property (GObject      *object,
    246                                       guint         prop_id,
    247                                       const GValue *value,
    248                                       GParamSpec   *pspec)
    249 {
    250   GBufferedInputStreamPrivate *priv;
    251   GBufferedInputStream        *bstream;
    252 
    253   bstream = G_BUFFERED_INPUT_STREAM (object);
    254   priv = bstream->priv;
    255 
    256   switch (prop_id)
    257     {
    258     case PROP_BUFSIZE:
    259       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
    260       break;
    261 
    262     default:
    263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
    264       break;
    265     }
    266 
    267 }
    268 
    269 static void
    270 g_buffered_input_stream_get_property (GObject    *object,
    271                                       guint       prop_id,
    272                                       GValue     *value,
    273                                       GParamSpec *pspec)
    274 {
    275   GBufferedInputStreamPrivate *priv;
    276   GBufferedInputStream        *bstream;
    277 
    278   bstream = G_BUFFERED_INPUT_STREAM (object);
    279   priv = bstream->priv;
    280 
    281   switch (prop_id)
    282     {
    283     case PROP_BUFSIZE:
    284       g_value_set_uint (value, priv->len);
    285       break;
    286 
    287     default:
    288       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
    289       break;
    290     }
    291 }
    292 
    293 static void
    294 g_buffered_input_stream_finalize (GObject *object)
    295 {
    296   GBufferedInputStreamPrivate *priv;
    297   GBufferedInputStream        *stream;
    298 
    299   stream = G_BUFFERED_INPUT_STREAM (object);
    300   priv = stream->priv;
    301 
    302   g_free (priv->buffer);
    303 
    304   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
    305 }
    306 
    307 static void
    308 g_buffered_input_stream_init (GBufferedInputStream *stream)
    309 {
    310   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
    311                                               G_TYPE_BUFFERED_INPUT_STREAM,
    312                                               GBufferedInputStreamPrivate);
    313 }
    314 
    315 
    316 /**
    317  * g_buffered_input_stream_new:
    318  * @base_stream: a #GInputStream.
    319  *
    320  * Creates a new #GInputStream from the given @base_stream, with
    321  * a buffer set to the default size (4 kilobytes).
    322  *
    323  * Returns: a #GInputStream for the given @base_stream.
    324  **/
    325 GInputStream *
    326 g_buffered_input_stream_new (GInputStream *base_stream)
    327 {
    328   GInputStream *stream;
    329 
    330   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
    331 
    332   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
    333                          "base-stream", base_stream,
    334                          NULL);
    335 
    336   return stream;
    337 }
    338 
    339 /**
    340  * g_buffered_input_stream_new_sized:
    341  * @base_stream: a #GInputStream.
    342  * @size: a #gsize.
    343  *
    344  * Creates a new #GBufferedInputStream from the given @base_stream,
    345  * with a buffer set to @size.
    346  *
    347  * Returns: a #GInputStream.
    348  **/
    349 GInputStream *
    350 g_buffered_input_stream_new_sized (GInputStream *base_stream,
    351                                    gsize         size)
    352 {
    353   GInputStream *stream;
    354 
    355   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
    356 
    357   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
    358                          "base-stream", base_stream,
    359                          "buffer-size", (guint)size,
    360                          NULL);
    361 
    362   return stream;
    363 }
    364 
    365 /**
    366  * g_buffered_input_stream_fill:
    367  * @stream: #GBufferedInputStream.
    368  * @count: the number of bytes that will be read from the stream.
    369  * @cancellable: optional #GCancellable object, %NULL to ignore.
    370  * @error: location to store the error occuring, or %NULL to ignore.
    371  *
    372  * Tries to read @count bytes from the stream into the buffer.
    373  * Will block during this read.
    374  *
    375  * If @count is zero, returns zero and does nothing. A value of @count
    376  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
    377  *
    378  * On success, the number of bytes read into the buffer is returned.
    379  * It is not an error if this is not the same as the requested size, as it
    380  * can happen e.g. near the end of a file. Zero is returned on end of file
    381  * (or if @count is zero),  but never otherwise.
    382  *
    383  * If @count is -1 then the attempted read size is equal to the number of
    384  * bytes that are required to fill the buffer.
    385  *
    386  * If @cancellable is not %NULL, then the operation can be cancelled by
    387  * triggering the cancellable object from another thread. If the operation
    388  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
    389  * operation was partially finished when the operation was cancelled the
    390  * partial result will be returned, without an error.
    391  *
    392  * On error -1 is returned and @error is set accordingly.
    393  *
    394  * For the asynchronous, non-blocking, version of this function, see
    395  * g_buffered_input_stream_fill_async().
    396  *
    397  * Returns: the number of bytes read into @stream's buffer, up to @count,
    398  *     or -1 on error.
    399  **/
    400 gssize
    401 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
    402                               gssize                 count,
    403                               GCancellable          *cancellable,
    404                               GError               **error)
    405 {
    406   GBufferedInputStreamClass *class;
    407   GInputStream *input_stream;
    408   gssize res;
    409 
    410   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
    411 
    412   input_stream = G_INPUT_STREAM (stream);
    413 
    414   if (count < -1)
    415     {
    416       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
    417                    _("Too large count value passed to %s"), G_STRFUNC);
    418       return -1;
    419     }
    420 
    421   if (!g_input_stream_set_pending (input_stream, error))
    422     return -1;
    423 
    424   if (cancellable)
    425     g_cancellable_push_current (cancellable);
    426 
    427   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
    428   res = class->fill (stream, count, cancellable, error);
    429 
    430   if (cancellable)
    431     g_cancellable_pop_current (cancellable);
    432 
    433   g_input_stream_clear_pending (input_stream);
    434 
    435   return res;
    436 }
    437 
    438 static void
    439 async_fill_callback_wrapper (GObject      *source_object,
    440                              GAsyncResult *res,
    441                              gpointer      user_data)
    442 {
    443   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
    444 
    445   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
    446   (*stream->priv->outstanding_callback) (source_object, res, user_data);
    447   g_object_unref (stream);
    448 }
    449 
    450 /**
    451  * g_buffered_input_stream_fill_async:
    452  * @stream: #GBufferedInputStream.
    453  * @count: the number of bytes that will be read from the stream.
    454  * @io_priority: the <link linkend="io-priority">I/O priority</link>
    455  *     of the request.
    456  * @cancellable: optional #GCancellable object
    457  * @callback: a #GAsyncReadyCallback.
    458  * @user_data: a #gpointer.
    459  *
    460  * Reads data into @stream's buffer asynchronously, up to @count size.
    461  * @io_priority can be used to prioritize reads. For the synchronous
    462  * version of this function, see g_buffered_input_stream_fill().
    463  *
    464  * If @count is -1 then the attempted read size is equal to the number
    465  * of bytes that are required to fill the buffer.
    466  **/
    467 void
    468 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
    469                                     gssize                count,
    470                                     int                   io_priority,
    471                                     GCancellable         *cancellable,
    472                                     GAsyncReadyCallback   callback,
    473                                     gpointer              user_data)
    474 {
    475   GBufferedInputStreamClass *class;
    476   GSimpleAsyncResult *simple;
    477   GError *error = NULL;
    478 
    479   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
    480 
    481   if (count == 0)
    482     {
    483       simple = g_simple_async_result_new (G_OBJECT (stream),
    484 					  callback,
    485 					  user_data,
    486 					  g_buffered_input_stream_fill_async);
    487       g_simple_async_result_complete_in_idle (simple);
    488       g_object_unref (simple);
    489       return;
    490     }
    491 
    492   if (count < -1)
    493     {
    494       g_simple_async_report_error_in_idle (G_OBJECT (stream),
    495 					   callback,
    496 					   user_data,
    497 					   G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
    498 					   _("Too large count value passed to %s"),
    499 					   G_STRFUNC);
    500       return;
    501     }
    502 
    503   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
    504     {
    505       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
    506 					    callback,
    507 					    user_data,
    508 					    error);
    509       g_error_free (error);
    510       return;
    511     }
    512 
    513   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
    514 
    515   stream->priv->outstanding_callback = callback;
    516   g_object_ref (stream);
    517   class->fill_async (stream, count, io_priority, cancellable,
    518                      async_fill_callback_wrapper, user_data);
    519 }
    520 
    521 /**
    522  * g_buffered_input_stream_fill_finish:
    523  * @stream: a #GBufferedInputStream.
    524  * @result: a #GAsyncResult.
    525  * @error: a #GError.
    526  *
    527  * Finishes an asynchronous read.
    528  *
    529  * Returns: a #gssize of the read stream, or %-1 on an error.
    530  **/
    531 gssize
    532 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
    533                                      GAsyncResult          *result,
    534                                      GError               **error)
    535 {
    536   GSimpleAsyncResult *simple;
    537   GBufferedInputStreamClass *class;
    538 
    539   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
    540   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
    541 
    542   if (G_IS_SIMPLE_ASYNC_RESULT (result))
    543     {
    544       simple = G_SIMPLE_ASYNC_RESULT (result);
    545       if (g_simple_async_result_propagate_error (simple, error))
    546         return -1;
    547 
    548       /* Special case read of 0 bytes */
    549       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
    550         return 0;
    551     }
    552 
    553   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
    554   return class->fill_finish (stream, result, error);
    555 }
    556 
    557 /**
    558  * g_buffered_input_stream_get_available:
    559  * @stream: #GBufferedInputStream.
    560  *
    561  * Gets the size of the available data within the stream.
    562  *
    563  * Returns: size of the available stream.
    564  **/
    565 gsize
    566 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
    567 {
    568   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
    569 
    570   return stream->priv->end - stream->priv->pos;
    571 }
    572 
    573 /**
    574  * g_buffered_input_stream_peek:
    575  * @stream: a #GBufferedInputStream.
    576  * @buffer: a pointer to an allocated chunk of memory.
    577  * @offset: a #gsize.
    578  * @count: a #gsize.
    579  *
    580  * Peeks in the buffer, copying data of size @count into @buffer,
    581  * offset @offset bytes.
    582  *
    583  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
    584  **/
    585 gsize
    586 g_buffered_input_stream_peek (GBufferedInputStream *stream,
    587                               void                 *buffer,
    588                               gsize                 offset,
    589                               gsize                 count)
    590 {
    591   gsize available;
    592   gsize end;
    593 
    594   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
    595   g_return_val_if_fail (buffer != NULL, -1);
    596 
    597   available = g_buffered_input_stream_get_available (stream);
    598 
    599   if (offset > available)
    600     return 0;
    601 
    602   end = MIN (offset + count, available);
    603   count = end - offset;
    604 
    605   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
    606   return count;
    607 }
    608 
    609 /**
    610  * g_buffered_input_stream_peek_buffer:
    611  * @stream: a #GBufferedInputStream.
    612  * @count: a #gsize to get the number of bytes available in the buffer.
    613  *
    614  * Returns the buffer with the currently available bytes. The returned
    615  * buffer must not be modified and will become invalid when reading from
    616  * the stream or filling the buffer.
    617  *
    618  * Returns: read-only buffer
    619  **/
    620 const void*
    621 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
    622                                      gsize                *count)
    623 {
    624   GBufferedInputStreamPrivate *priv;
    625 
    626   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
    627 
    628   priv = stream->priv;
    629 
    630   if (count)
    631     *count = priv->end - priv->pos;
    632 
    633   return priv->buffer + priv->pos;
    634 }
    635 
    636 static void
    637 compact_buffer (GBufferedInputStream *stream)
    638 {
    639   GBufferedInputStreamPrivate *priv;
    640   gsize current_size;
    641 
    642   priv = stream->priv;
    643 
    644   current_size = priv->end - priv->pos;
    645 
    646   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
    647 
    648   priv->pos = 0;
    649   priv->end = current_size;
    650 }
    651 
    652 static gssize
    653 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
    654                                    gssize                 count,
    655                                    GCancellable          *cancellable,
    656                                    GError               **error)
    657 {
    658   GBufferedInputStreamPrivate *priv;
    659   GInputStream *base_stream;
    660   gssize nread;
    661   gsize in_buffer;
    662 
    663   priv = stream->priv;
    664 
    665   if (count == -1)
    666     count = priv->len;
    667 
    668   in_buffer = priv->end - priv->pos;
    669 
    670   /* Never fill more than can fit in the buffer */
    671   count = MIN (count, priv->len - in_buffer);
    672 
    673   /* If requested length does not fit at end, compact */
    674   if (priv->len - priv->end < count)
    675     compact_buffer (stream);
    676 
    677   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
    678   nread = g_input_stream_read (base_stream,
    679                                priv->buffer + priv->end,
    680                                count,
    681                                cancellable,
    682                                error);
    683 
    684   if (nread > 0)
    685     priv->end += nread;
    686 
    687   return nread;
    688 }
    689 
    690 static gssize
    691 g_buffered_input_stream_skip (GInputStream  *stream,
    692                               gsize          count,
    693                               GCancellable  *cancellable,
    694                               GError       **error)
    695 {
    696   GBufferedInputStream        *bstream;
    697   GBufferedInputStreamPrivate *priv;
    698   GBufferedInputStreamClass *class;
    699   GInputStream *base_stream;
    700   gsize available, bytes_skipped;
    701   gssize nread;
    702 
    703   bstream = G_BUFFERED_INPUT_STREAM (stream);
    704   priv = bstream->priv;
    705 
    706   available = priv->end - priv->pos;
    707 
    708   if (count <= available)
    709     {
    710       priv->pos += count;
    711       return count;
    712     }
    713 
    714   /* Full request not available, skip all currently available and
    715    * request refill for more
    716    */
    717 
    718   priv->pos = 0;
    719   priv->end = 0;
    720   bytes_skipped = available;
    721   count -= available;
    722 
    723   if (bytes_skipped > 0)
    724     error = NULL; /* Ignore further errors if we already read some data */
    725 
    726   if (count > priv->len)
    727     {
    728       /* Large request, shortcut buffer */
    729 
    730       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
    731 
    732       nread = g_input_stream_skip (base_stream,
    733                                    count,
    734                                    cancellable,
    735                                    error);
    736 
    737       if (nread < 0 && bytes_skipped == 0)
    738         return -1;
    739 
    740       if (nread > 0)
    741         bytes_skipped += nread;
    742 
    743       return bytes_skipped;
    744     }
    745 
    746   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
    747   nread = class->fill (bstream, priv->len, cancellable, error);
    748 
    749   if (nread < 0)
    750     {
    751       if (bytes_skipped == 0)
    752         return -1;
    753       else
    754         return bytes_skipped;
    755     }
    756 
    757   available = priv->end - priv->pos;
    758   count = MIN (count, available);
    759 
    760   bytes_skipped += count;
    761   priv->pos += count;
    762 
    763   return bytes_skipped;
    764 }
    765 
    766 static gssize
    767 g_buffered_input_stream_read (GInputStream *stream,
    768                               void         *buffer,
    769                               gsize         count,
    770                               GCancellable *cancellable,
    771                               GError      **error)
    772 {
    773   GBufferedInputStream        *bstream;
    774   GBufferedInputStreamPrivate *priv;
    775   GBufferedInputStreamClass *class;
    776   GInputStream *base_stream;
    777   gsize available, bytes_read;
    778   gssize nread;
    779 
    780   bstream = G_BUFFERED_INPUT_STREAM (stream);
    781   priv = bstream->priv;
    782 
    783   available = priv->end - priv->pos;
    784 
    785   if (count <= available)
    786     {
    787       memcpy (buffer, priv->buffer + priv->pos, count);
    788       priv->pos += count;
    789       return count;
    790     }
    791 
    792   /* Full request not available, read all currently availbile and request refill for more */
    793 
    794   memcpy (buffer, priv->buffer + priv->pos, available);
    795   priv->pos = 0;
    796   priv->end = 0;
    797   bytes_read = available;
    798   count -= available;
    799 
    800   if (bytes_read > 0)
    801     error = NULL; /* Ignore further errors if we already read some data */
    802 
    803   if (count > priv->len)
    804     {
    805       /* Large request, shortcut buffer */
    806 
    807       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
    808 
    809       nread = g_input_stream_read (base_stream,
    810 				   (char *)buffer + bytes_read,
    811 				   count,
    812 				   cancellable,
    813 				   error);
    814 
    815       if (nread < 0 && bytes_read == 0)
    816         return -1;
    817 
    818       if (nread > 0)
    819         bytes_read += nread;
    820 
    821       return bytes_read;
    822     }
    823 
    824   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
    825   nread = class->fill (bstream, priv->len, cancellable, error);
    826   if (nread < 0)
    827     {
    828       if (bytes_read == 0)
    829         return -1;
    830       else
    831         return bytes_read;
    832     }
    833 
    834   available = priv->end - priv->pos;
    835   count = MIN (count, available);
    836 
    837   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
    838   bytes_read += count;
    839   priv->pos += count;
    840 
    841   return bytes_read;
    842 }
    843 
    844 /**
    845  * g_buffered_input_stream_read_byte:
    846  * @stream: #GBufferedInputStream.
    847  * @cancellable: optional #GCancellable object, %NULL to ignore.
    848  * @error: location to store the error occuring, or %NULL to ignore.
    849  *
    850  * Tries to read a single byte from the stream or the buffer. Will block
    851  * during this read.
    852  *
    853  * On success, the byte read from the stream is returned. On end of stream
    854  * -1 is returned but it's not an exceptional error and @error is not set.
    855  *
    856  * If @cancellable is not %NULL, then the operation can be cancelled by
    857  * triggering the cancellable object from another thread. If the operation
    858  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
    859  * operation was partially finished when the operation was cancelled the
    860  * partial result will be returned, without an error.
    861  *
    862  * On error -1 is returned and @error is set accordingly.
    863  *
    864  * Returns: the byte read from the @stream, or -1 on end of stream or error.
    865  **/
    866 int
    867 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
    868                                    GCancellable          *cancellable,
    869                                    GError               **error)
    870 {
    871   GBufferedInputStreamPrivate *priv;
    872   GBufferedInputStreamClass *class;
    873   GInputStream *input_stream;
    874   gsize available;
    875   gssize nread;
    876 
    877   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
    878 
    879   priv = stream->priv;
    880   input_stream = G_INPUT_STREAM (stream);
    881 
    882   if (g_input_stream_is_closed (input_stream))
    883     {
    884       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
    885                            _("Stream is already closed"));
    886       return -1;
    887     }
    888 
    889   if (!g_input_stream_set_pending (input_stream, error))
    890     return -1;
    891 
    892   available = priv->end - priv->pos;
    893 
    894   if (available != 0)
    895     {
    896       g_input_stream_clear_pending (input_stream);
    897       return priv->buffer[priv->pos++];
    898     }
    899 
    900   /* Byte not available, request refill for more */
    901 
    902   if (cancellable)
    903     g_cancellable_push_current (cancellable);
    904 
    905   priv->pos = 0;
    906   priv->end = 0;
    907 
    908   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
    909   nread = class->fill (stream, priv->len, cancellable, error);
    910 
    911   if (cancellable)
    912     g_cancellable_pop_current (cancellable);
    913 
    914   g_input_stream_clear_pending (input_stream);
    915 
    916   if (nread <= 0)
    917     return -1; /* error or end of stream */
    918 
    919   return priv->buffer[priv->pos++];
    920 }
    921 
    922 /* ************************** */
    923 /* Async stuff implementation */
    924 /* ************************** */
    925 
    926 static void
    927 fill_async_callback (GObject      *source_object,
    928                      GAsyncResult *result,
    929                      gpointer      user_data)
    930 {
    931   GError *error;
    932   gssize res;
    933   GSimpleAsyncResult *simple;
    934 
    935   simple = user_data;
    936 
    937   error = NULL;
    938   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
    939 				    result, &error);
    940 
    941   g_simple_async_result_set_op_res_gssize (simple, res);
    942   if (res == -1)
    943     {
    944       g_simple_async_result_set_from_error (simple, error);
    945       g_error_free (error);
    946     }
    947   else
    948     {
    949       GBufferedInputStreamPrivate *priv;
    950       GObject *object;
    951 
    952       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
    953       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
    954 
    955       g_assert_cmpint (priv->end + res, <=, priv->len);
    956       priv->end += res;
    957 
    958       g_object_unref (object);
    959     }
    960 
    961   /* Complete immediately, not in idle, since we're already in a mainloop callout */
    962   g_simple_async_result_complete (simple);
    963   g_object_unref (simple);
    964 }
    965 
    966 static void
    967 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
    968                                          gssize                count,
    969                                          int                   io_priority,
    970                                          GCancellable         *cancellable,
    971                                          GAsyncReadyCallback   callback,
    972                                          gpointer              user_data)
    973 {
    974   GBufferedInputStreamPrivate *priv;
    975   GInputStream *base_stream;
    976   GSimpleAsyncResult *simple;
    977   gsize in_buffer;
    978 
    979   priv = stream->priv;
    980 
    981   if (count == -1)
    982     count = priv->len;
    983 
    984   in_buffer = priv->end - priv->pos;
    985 
    986   /* Never fill more than can fit in the buffer */
    987   count = MIN (count, priv->len - in_buffer);
    988 
    989   /* If requested length does not fit at end, compact */
    990   if (priv->len - priv->end < count)
    991     compact_buffer (stream);
    992 
    993   simple = g_simple_async_result_new (G_OBJECT (stream),
    994 				      callback, user_data,
    995 				      g_buffered_input_stream_real_fill_async);
    996 
    997   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
    998   g_input_stream_read_async (base_stream,
    999 			     priv->buffer + priv->end,
   1000 			     count,
   1001 			     io_priority,
   1002 			     cancellable,
   1003 			     fill_async_callback,
   1004 			     simple);
   1005 }
   1006 
   1007 static gssize
   1008 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
   1009 					  GAsyncResult         *result,
   1010 					  GError              **error)
   1011 {
   1012   GSimpleAsyncResult *simple;
   1013   gssize nread;
   1014 
   1015   simple = G_SIMPLE_ASYNC_RESULT (result);
   1016   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
   1017 
   1018   nread = g_simple_async_result_get_op_res_gssize (simple);
   1019   return nread;
   1020 }
   1021 
   1022 typedef struct {
   1023   gssize bytes_read;
   1024   gssize count;
   1025   void *buffer;
   1026 } ReadAsyncData;
   1027 
   1028 static void
   1029 free_read_async_data (gpointer _data)
   1030 {
   1031   ReadAsyncData *data = _data;
   1032   g_slice_free (ReadAsyncData, data);
   1033 }
   1034 
   1035 static void
   1036 large_read_callback (GObject *source_object,
   1037 		     GAsyncResult *result,
   1038 		     gpointer user_data)
   1039 {
   1040   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
   1041   ReadAsyncData *data;
   1042   GError *error;
   1043   gssize nread;
   1044 
   1045   data = g_simple_async_result_get_op_res_gpointer (simple);
   1046 
   1047   error = NULL;
   1048   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
   1049 				      result, &error);
   1050 
   1051   /* Only report the error if we've not already read some data */
   1052   if (nread < 0 && data->bytes_read == 0)
   1053     g_simple_async_result_set_from_error (simple, error);
   1054 
   1055   if (nread > 0)
   1056     data->bytes_read += nread;
   1057 
   1058   if (error)
   1059     g_error_free (error);
   1060 
   1061   /* Complete immediately, not in idle, since we're already in a mainloop callout */
   1062   g_simple_async_result_complete (simple);
   1063   g_object_unref (simple);
   1064 }
   1065 
   1066 static void
   1067 read_fill_buffer_callback (GObject *source_object,
   1068 			   GAsyncResult *result,
   1069 			   gpointer user_data)
   1070 {
   1071   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
   1072   GBufferedInputStream *bstream;
   1073   GBufferedInputStreamPrivate *priv;
   1074   ReadAsyncData *data;
   1075   GError *error;
   1076   gssize nread;
   1077   gsize available;
   1078 
   1079   bstream = G_BUFFERED_INPUT_STREAM (source_object);
   1080   priv = bstream->priv;
   1081 
   1082   data = g_simple_async_result_get_op_res_gpointer (simple);
   1083 
   1084   error = NULL;
   1085   nread = g_buffered_input_stream_fill_finish (bstream,
   1086 					       result, &error);
   1087 
   1088   if (nread < 0 && data->bytes_read == 0)
   1089     g_simple_async_result_set_from_error (simple, error);
   1090 
   1091 
   1092   if (nread > 0)
   1093     {
   1094       available = priv->end - priv->pos;
   1095       data->count = MIN (data->count, available);
   1096 
   1097       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
   1098       data->bytes_read += data->count;
   1099       priv->pos += data->count;
   1100     }
   1101 
   1102   if (error)
   1103     g_error_free (error);
   1104 
   1105   /* Complete immediately, not in idle, since we're already in a mainloop callout */
   1106   g_simple_async_result_complete (simple);
   1107   g_object_unref (simple);
   1108 }
   1109 
   1110 static void
   1111 g_buffered_input_stream_read_async (GInputStream              *stream,
   1112                                     void                      *buffer,
   1113                                     gsize                      count,
   1114                                     int                        io_priority,
   1115                                     GCancellable              *cancellable,
   1116                                     GAsyncReadyCallback        callback,
   1117                                     gpointer                   user_data)
   1118 {
   1119   GBufferedInputStream *bstream;
   1120   GBufferedInputStreamPrivate *priv;
   1121   GBufferedInputStreamClass *class;
   1122   GInputStream *base_stream;
   1123   gsize available;
   1124   GSimpleAsyncResult *simple;
   1125   ReadAsyncData *data;
   1126 
   1127   bstream = G_BUFFERED_INPUT_STREAM (stream);
   1128   priv = bstream->priv;
   1129 
   1130   data = g_slice_new (ReadAsyncData);
   1131   data->buffer = buffer;
   1132   data->bytes_read = 0;
   1133   simple = g_simple_async_result_new (G_OBJECT (stream),
   1134 				      callback, user_data,
   1135 				      g_buffered_input_stream_read_async);
   1136   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
   1137 
   1138   available = priv->end - priv->pos;
   1139 
   1140   if (count <= available)
   1141     {
   1142       memcpy (buffer, priv->buffer + priv->pos, count);
   1143       priv->pos += count;
   1144       data->bytes_read = count;
   1145 
   1146       g_simple_async_result_complete_in_idle (simple);
   1147       g_object_unref (simple);
   1148       return;
   1149     }
   1150 
   1151 
   1152   /* Full request not available, read all currently availbile and request refill for more */
   1153 
   1154   memcpy (buffer, priv->buffer + priv->pos, available);
   1155   priv->pos = 0;
   1156   priv->end = 0;
   1157 
   1158   count -= available;
   1159 
   1160   data->bytes_read = available;
   1161   data->count = count;
   1162 
   1163   if (count > priv->len)
   1164     {
   1165       /* Large request, shortcut buffer */
   1166 
   1167       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
   1168 
   1169       g_input_stream_read_async (base_stream,
   1170 				 (char *)buffer + data->bytes_read,
   1171 				 count,
   1172 				 io_priority, cancellable,
   1173 				 large_read_callback,
   1174 				 simple);
   1175     }
   1176   else
   1177     {
   1178       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
   1179       class->fill_async (bstream, priv->len, io_priority, cancellable,
   1180 			 read_fill_buffer_callback, simple);
   1181     }
   1182 }
   1183 
   1184 static gssize
   1185 g_buffered_input_stream_read_finish (GInputStream   *stream,
   1186                                      GAsyncResult   *result,
   1187                                      GError        **error)
   1188 {
   1189   GSimpleAsyncResult *simple;
   1190   ReadAsyncData *data;
   1191 
   1192   simple = G_SIMPLE_ASYNC_RESULT (result);
   1193 
   1194   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
   1195 
   1196   data = g_simple_async_result_get_op_res_gpointer (simple);
   1197 
   1198   return data->bytes_read;
   1199 }
   1200 
   1201 typedef struct {
   1202   gssize bytes_skipped;
   1203   gssize count;
   1204 } SkipAsyncData;
   1205 
   1206 static void
   1207 free_skip_async_data (gpointer _data)
   1208 {
   1209   SkipAsyncData *data = _data;
   1210   g_slice_free (SkipAsyncData, data);
   1211 }
   1212 
   1213 static void
   1214 large_skip_callback (GObject *source_object,
   1215 		     GAsyncResult *result,
   1216 		     gpointer user_data)
   1217 {
   1218   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
   1219   SkipAsyncData *data;
   1220   GError *error;
   1221   gssize nread;
   1222 
   1223   data = g_simple_async_result_get_op_res_gpointer (simple);
   1224 
   1225   error = NULL;
   1226   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
   1227 				      result, &error);
   1228 
   1229   /* Only report the error if we've not already read some data */
   1230   if (nread < 0 && data->bytes_skipped == 0)
   1231     g_simple_async_result_set_from_error (simple, error);
   1232 
   1233   if (nread > 0)
   1234     data->bytes_skipped += nread;
   1235 
   1236   if (error)
   1237     g_error_free (error);
   1238 
   1239   /* Complete immediately, not in idle, since we're already in a mainloop callout */
   1240   g_simple_async_result_complete (simple);
   1241   g_object_unref (simple);
   1242 }
   1243 
   1244 static void
   1245 skip_fill_buffer_callback (GObject *source_object,
   1246 			   GAsyncResult *result,
   1247 			   gpointer user_data)
   1248 {
   1249   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
   1250   GBufferedInputStream *bstream;
   1251   GBufferedInputStreamPrivate *priv;
   1252   SkipAsyncData *data;
   1253   GError *error;
   1254   gssize nread;
   1255   gsize available;
   1256 
   1257   bstream = G_BUFFERED_INPUT_STREAM (source_object);
   1258   priv = bstream->priv;
   1259 
   1260   data = g_simple_async_result_get_op_res_gpointer (simple);
   1261 
   1262   error = NULL;
   1263   nread = g_buffered_input_stream_fill_finish (bstream,
   1264 					       result, &error);
   1265 
   1266   if (nread < 0 && data->bytes_skipped == 0)
   1267     g_simple_async_result_set_from_error (simple, error);
   1268 
   1269 
   1270   if (nread > 0)
   1271     {
   1272       available = priv->end - priv->pos;
   1273       data->count = MIN (data->count, available);
   1274 
   1275       data->bytes_skipped += data->count;
   1276       priv->pos += data->count;
   1277     }
   1278 
   1279   if (error)
   1280     g_error_free (error);
   1281 
   1282   /* Complete immediately, not in idle, since we're already in a mainloop callout */
   1283   g_simple_async_result_complete (simple);
   1284   g_object_unref (simple);
   1285 }
   1286 
   1287 static void
   1288 g_buffered_input_stream_skip_async (GInputStream              *stream,
   1289                                     gsize                      count,
   1290                                     int                        io_priority,
   1291                                     GCancellable              *cancellable,
   1292                                     GAsyncReadyCallback        callback,
   1293                                     gpointer                   user_data)
   1294 {
   1295   GBufferedInputStream *bstream;
   1296   GBufferedInputStreamPrivate *priv;
   1297   GBufferedInputStreamClass *class;
   1298   GInputStream *base_stream;
   1299   gsize available;
   1300   GSimpleAsyncResult *simple;
   1301   SkipAsyncData *data;
   1302 
   1303   bstream = G_BUFFERED_INPUT_STREAM (stream);
   1304   priv = bstream->priv;
   1305 
   1306   data = g_slice_new (SkipAsyncData);
   1307   data->bytes_skipped = 0;
   1308   simple = g_simple_async_result_new (G_OBJECT (stream),
   1309 				      callback, user_data,
   1310 				      g_buffered_input_stream_skip_async);
   1311   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
   1312 
   1313   available = priv->end - priv->pos;
   1314 
   1315   if (count <= available)
   1316     {
   1317       priv->pos += count;
   1318       data->bytes_skipped = count;
   1319 
   1320       g_simple_async_result_complete_in_idle (simple);
   1321       g_object_unref (simple);
   1322       return;
   1323     }
   1324 
   1325 
   1326   /* Full request not available, skip all currently availbile and request refill for more */
   1327 
   1328   priv->pos = 0;
   1329   priv->end = 0;
   1330 
   1331   count -= available;
   1332 
   1333   data->bytes_skipped = available;
   1334   data->count = count;
   1335 
   1336   if (count > priv->len)
   1337     {
   1338       /* Large request, shortcut buffer */
   1339 
   1340       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
   1341 
   1342       g_input_stream_skip_async (base_stream,
   1343 				 count,
   1344 				 io_priority, cancellable,
   1345 				 large_skip_callback,
   1346 				 simple);
   1347     }
   1348   else
   1349     {
   1350       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
   1351       class->fill_async (bstream, priv->len, io_priority, cancellable,
   1352 			 skip_fill_buffer_callback, simple);
   1353     }
   1354 }
   1355 
   1356 static gssize
   1357 g_buffered_input_stream_skip_finish (GInputStream   *stream,
   1358                                      GAsyncResult   *result,
   1359                                      GError        **error)
   1360 {
   1361   GSimpleAsyncResult *simple;
   1362   SkipAsyncData *data;
   1363 
   1364   simple = G_SIMPLE_ASYNC_RESULT (result);
   1365 
   1366   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
   1367 
   1368   data = g_simple_async_result_get_op_res_gpointer (simple);
   1369 
   1370   return data->bytes_skipped;
   1371 }
   1372 
   1373 
   1374 #define __G_BUFFERED_INPUT_STREAM_C__
   1375 #include "gioaliasdef.c"
   1376