Home | History | Annotate | Download | only in tests
      1 /* GLib testing framework examples and tests
      2  * Copyright (C) 2008 Red Hat, Inc
      3  *
      4  * This work is provided "as is"; redistribution and modification
      5  * in whole or in part, in any medium, physical or electronic is
      6  * permitted without restriction.
      7  *
      8  * This work is distributed in the hope that it will be useful,
      9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
     11  *
     12  * In no event shall the authors or contributors be liable for any
     13  * direct, indirect, incidental, special, exemplary, or consequential
     14  * damages (including, but not limited to, procurement of substitute
     15  * goods or services; loss of use, data, or profits; or business
     16  * interruption) however caused and on any theory of liability, whether
     17  * in contract, strict liability, or tort (including negligence or
     18  * otherwise) arising in any way out of the use of this software, even
     19  * if advised of the possibility of such damage.
     20  */
     21 
     22 #include <glib/glib.h>
     23 #include <gio/gio.h>
     24 #include <gio/gunixinputstream.h>
     25 #include <gio/gunixoutputstream.h>
     26 #include <signal.h>
     27 #include <stdlib.h>
     28 #include <string.h>
     29 #include <unistd.h>
     30 
     31 #define DATA "abcdefghijklmnopqrstuvwxyz"
     32 
     33 int writer_pipe[2], reader_pipe[2];
     34 GCancellable *writer_cancel, *reader_cancel, *main_cancel;
     35 GMainLoop *loop;
     36 
     37 static gpointer
     38 writer_thread (gpointer user_data)
     39 {
     40   GOutputStream *out;
     41   gssize nwrote, offset;
     42   GError *err = NULL;
     43 
     44   out = g_unix_output_stream_new (writer_pipe[1], TRUE);
     45 
     46   do
     47     {
     48       g_usleep (10);
     49 
     50       offset = 0;
     51       while (offset < sizeof (DATA))
     52 	{
     53 	  nwrote = g_output_stream_write (out, DATA + offset,
     54 					  sizeof (DATA) - offset,
     55 					  writer_cancel, &err);
     56 	  if (nwrote <= 0 || err != NULL)
     57 	    break;
     58 	  offset += nwrote;
     59 	}
     60 
     61       g_assert (nwrote > 0 || err != NULL);
     62     }
     63   while (err == NULL);
     64 
     65   if (g_cancellable_is_cancelled (writer_cancel))
     66     {
     67       g_cancellable_cancel (main_cancel);
     68       g_object_unref (out);
     69       return NULL;
     70     }
     71 
     72   g_warning ("writer: %s", err->message);
     73   g_assert_not_reached ();
     74 }
     75 
     76 static gpointer
     77 reader_thread (gpointer user_data)
     78 {
     79   GInputStream *in;
     80   gssize nread, total;
     81   GError *err = NULL;
     82   char buf[sizeof (DATA)];
     83 
     84   in = g_unix_input_stream_new (reader_pipe[0], TRUE);
     85 
     86   do
     87     {
     88       total = 0;
     89       while (total < sizeof (DATA))
     90 	{
     91 	  nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
     92 				       reader_cancel, &err);
     93 	  if (nread <= 0 || err != NULL)
     94 	    break;
     95 	  total += nread;
     96 	}
     97 
     98       if (err)
     99 	break;
    100 
    101       if (nread == 0)
    102 	{
    103 	  g_assert (err == NULL);
    104 	  /* pipe closed */
    105 	  g_object_unref (in);
    106 	  return NULL;
    107 	}
    108 
    109       g_assert_cmpstr (buf, ==, DATA);
    110       g_assert (!g_cancellable_is_cancelled (reader_cancel));
    111     }
    112   while (err == NULL);
    113 
    114   g_warning ("reader: %s", err->message);
    115   g_assert_not_reached ();
    116 }
    117 
    118 char main_buf[sizeof (DATA)];
    119 gssize main_len, main_offset;
    120 
    121 static void readable (GObject *source, GAsyncResult *res, gpointer user_data);
    122 static void writable (GObject *source, GAsyncResult *res, gpointer user_data);
    123 
    124 static void
    125 do_main_cancel (GOutputStream *out)
    126 {
    127   g_output_stream_close (out, NULL, NULL);
    128   g_main_loop_quit (loop);
    129 }
    130 
    131 static void
    132 readable (GObject *source, GAsyncResult *res, gpointer user_data)
    133 {
    134   GInputStream *in = G_INPUT_STREAM (source);
    135   GOutputStream *out = user_data;
    136   GError *err = NULL;
    137 
    138   main_len = g_input_stream_read_finish (in, res, &err);
    139 
    140   if (g_cancellable_is_cancelled (main_cancel))
    141     {
    142       do_main_cancel (out);
    143       return;
    144     }
    145 
    146   g_assert (err == NULL);
    147 
    148   main_offset = 0;
    149   g_output_stream_write_async (out, main_buf, main_len,
    150 			       G_PRIORITY_DEFAULT, main_cancel,
    151 			       writable, in);
    152 }
    153 
    154 static void
    155 writable (GObject *source, GAsyncResult *res, gpointer user_data)
    156 {
    157   GOutputStream *out = G_OUTPUT_STREAM (source);
    158   GInputStream *in = user_data;
    159   GError *err = NULL;
    160   gssize nwrote;
    161 
    162   nwrote = g_output_stream_write_finish (out, res, &err);
    163 
    164   if (g_cancellable_is_cancelled (main_cancel))
    165     {
    166       do_main_cancel (out);
    167       return;
    168     }
    169 
    170   g_assert (err == NULL);
    171   g_assert_cmpint (nwrote, <=, main_len - main_offset);
    172 
    173   main_offset += nwrote;
    174   if (main_offset == main_len)
    175     {
    176       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
    177 				 G_PRIORITY_DEFAULT, main_cancel,
    178 				 readable, out);
    179     }
    180   else
    181     {
    182       g_output_stream_write_async (out, main_buf + main_offset,
    183 				   main_len - main_offset,
    184 				   G_PRIORITY_DEFAULT, main_cancel,
    185 				   writable, in);
    186     }
    187 }
    188 
    189 static gboolean
    190 timeout (gpointer cancellable)
    191 {
    192   g_cancellable_cancel (cancellable);
    193   return FALSE;
    194 }
    195 
    196 static void
    197 test_pipe_io (void)
    198 {
    199   GThread *writer, *reader;
    200   GInputStream *in;
    201   GOutputStream *out;
    202 
    203   /* Split off two (additional) threads, a reader and a writer. From
    204    * the writer thread, write data synchronously in small chunks,
    205    * which gets read asynchronously by the main thread and then
    206    * written asynchronously to the reader thread, which reads it
    207    * synchronously. Eventually a timeout in the main thread will cause
    208    * it to cancel the writer thread, which will in turn cancel the
    209    * read op in the main thread, which will then close the pipe to
    210    * the reader thread, causing the read op to fail.
    211    */
    212 
    213   g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
    214 
    215   writer_cancel = g_cancellable_new ();
    216   reader_cancel = g_cancellable_new ();
    217   main_cancel = g_cancellable_new ();
    218 
    219   writer = g_thread_create (writer_thread, NULL, TRUE, NULL);
    220   reader = g_thread_create (reader_thread, NULL, TRUE, NULL);
    221 
    222   in = g_unix_input_stream_new (writer_pipe[0], TRUE);
    223   out = g_unix_output_stream_new (reader_pipe[1], TRUE);
    224 
    225   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
    226 			     G_PRIORITY_DEFAULT, main_cancel,
    227 			     readable, out);
    228 
    229   g_timeout_add (500, timeout, writer_cancel);
    230 
    231   loop = g_main_loop_new (NULL, TRUE);
    232   g_main_loop_run (loop);
    233   g_main_loop_unref (loop);
    234 
    235   g_thread_join (reader);
    236   g_thread_join (writer);
    237 
    238   g_object_unref (main_cancel);
    239   g_object_unref (reader_cancel);
    240   g_object_unref (writer_cancel);
    241   g_object_unref (in);
    242   g_object_unref (out);
    243 }
    244 
    245 int
    246 main (int   argc,
    247       char *argv[])
    248 {
    249   g_thread_init (NULL);
    250   g_type_init ();
    251   g_test_init (&argc, &argv, NULL);
    252 
    253   g_test_add_func ("/unix-streams/pipe-io-test", test_pipe_io);
    254 
    255   return g_test_run();
    256 }
    257