Home | History | Annotate | Download | only in tests
      1 /* Test program that performs producer-consumer style communication through
      2  * a circular buffer. This test program is a slightly modified version of the
      3  * test program made available by Miguel Ojeda
      4  * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
      5  */
      6 
      7 
      8 #include <stdio.h>
      9 #include <string.h>
     10 #include <stdlib.h>
     11 #include <unistd.h>
     12 #include <time.h>
     13 #include <pthread.h>
     14 #include <semaphore.h>
     15 #include <fcntl.h>
     16 #include "../../config.h"
     17 
     18 
     19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
     20 
     21 #ifndef HAVE_BUILTIN_ATOMIC
     22 #error Sorry, but this test program can only be compiled by a compiler that\
     23 has built-in functions for atomic memory access.
     24 #endif
     25 
     26 
     27 #define BUFFER_MAX (2)
     28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
     29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
     30 
     31 
     32 typedef int data_t;
     33 
     34 typedef struct {
     35   /* Counting semaphore representing the number of data items in the buffer. */
     36   sem_t* data;
     37   /* Counting semaphore representing the number of free elements. */
     38   sem_t* free;
     39   /* Position where a new elements should be written. */
     40   int in;
     41   /* Position from where an element can be removed. */
     42   int out;
     43   /* Mutex that protects 'in'. */
     44   pthread_mutex_t mutex_in;
     45   /* Mutex that protects 'out'. */
     46   pthread_mutex_t mutex_out;
     47   /* Data buffer. */
     48   data_t buffer[BUFFER_MAX];
     49 } buffer_t;
     50 
     51 static int quiet = 0;
     52 static int use_locking = 1;
     53 
     54 static __inline__
     55 int fetch_and_add(int* p, int i)
     56 {
     57   return __sync_fetch_and_add(p, i);
     58 }
     59 
     60 static sem_t* create_semaphore(const char* const name, const int value)
     61 {
     62 #ifdef __APPLE__
     63   sem_t* p = sem_open(name, O_CREAT, 0600, value);
     64   return p;
     65 #else
     66   sem_t* p = malloc(sizeof(*p));
     67   if (p)
     68     sem_init(p, 0, value);
     69   return p;
     70 #endif
     71 }
     72 
     73 static void destroy_semaphore(const char* const name, sem_t* p)
     74 {
     75 #ifdef __APPLE__
     76   sem_close(p);
     77   sem_unlink(name);
     78 #else
     79   sem_destroy(p);
     80   free(p);
     81 #endif
     82 }
     83 
     84 static void buffer_init(buffer_t * b)
     85 {
     86   b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
     87   b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
     88 
     89   pthread_mutex_init(&b->mutex_in, NULL);
     90   pthread_mutex_init(&b->mutex_out, NULL);
     91 
     92   b->in = 0;
     93   b->out = 0;
     94 }
     95 
     96 static void buffer_recv(buffer_t* b, data_t* d)
     97 {
     98   int out;
     99   sem_wait(b->data);
    100   if (use_locking)
    101     pthread_mutex_lock(&b->mutex_out);
    102   out = fetch_and_add(&b->out, 1);
    103   if (out >= BUFFER_MAX)
    104   {
    105     fetch_and_add(&b->out, -BUFFER_MAX);
    106     out -= BUFFER_MAX;
    107   }
    108   *d = b->buffer[out];
    109   if (use_locking)
    110     pthread_mutex_unlock(&b->mutex_out);
    111   if (! quiet)
    112   {
    113     printf("received %d from buffer[%d]\n", *d, out);
    114     fflush(stdout);
    115   }
    116   sem_post(b->free);
    117 }
    118 
    119 static void buffer_send(buffer_t* b, data_t* d)
    120 {
    121   int in;
    122   sem_wait(b->free);
    123   if (use_locking)
    124     pthread_mutex_lock(&b->mutex_in);
    125   in = fetch_and_add(&b->in, 1);
    126   if (in >= BUFFER_MAX)
    127   {
    128     fetch_and_add(&b->in, -BUFFER_MAX);
    129     in -= BUFFER_MAX;
    130   }
    131   b->buffer[in] = *d;
    132   if (use_locking)
    133     pthread_mutex_unlock(&b->mutex_in);
    134   if (! quiet)
    135   {
    136     printf("sent %d to buffer[%d]\n", *d, in);
    137     fflush(stdout);
    138   }
    139   sem_post(b->data);
    140 }
    141 
    142 static void buffer_destroy(buffer_t* b)
    143 {
    144   destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
    145   destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
    146 
    147   pthread_mutex_destroy(&b->mutex_in);
    148   pthread_mutex_destroy(&b->mutex_out);
    149 }
    150 
    151 static buffer_t b;
    152 
    153 static void producer(int* id)
    154 {
    155   buffer_send(&b, id);
    156   pthread_exit(NULL);
    157 }
    158 
    159 #define MAXSLEEP (100 * 1000)
    160 
    161 static void consumer(int* id)
    162 {
    163   int d;
    164   usleep(rand() % MAXSLEEP);
    165   buffer_recv(&b, &d);
    166   if (! quiet)
    167   {
    168     printf("%i: %i\n", *id, d);
    169     fflush(stdout);
    170   }
    171   pthread_exit(NULL);
    172 }
    173 
    174 #define THREADS (10)
    175 
    176 int main(int argc, char** argv)
    177 {
    178   pthread_t producers[THREADS];
    179   pthread_t consumers[THREADS];
    180   int thread_arg[THREADS];
    181   int i;
    182   int optchar;
    183 
    184   while ((optchar = getopt(argc, argv, "nq")) != EOF)
    185   {
    186     switch (optchar)
    187     {
    188     case 'n':
    189       use_locking = 0;
    190       break;
    191     case 'q':
    192       quiet = 1;
    193       break;
    194     }
    195   }
    196 
    197   srand(time(NULL));
    198 
    199   buffer_init(&b);
    200 
    201   for (i = 0; i < THREADS; ++i)
    202   {
    203     thread_arg[i] = i;
    204     pthread_create(producers + i, NULL,
    205                    (void * (*)(void *)) producer, &thread_arg[i]);
    206   }
    207 
    208   for (i = 0; i < THREADS; ++i)
    209     pthread_create(consumers + i, NULL,
    210                    (void * (*)(void *)) consumer, &thread_arg[i]);
    211 
    212   for (i = 0; i < THREADS; ++i)
    213   {
    214     pthread_join(producers[i], NULL);
    215     pthread_join(consumers[i], NULL);
    216   }
    217 
    218   buffer_destroy(&b);
    219 
    220   return 0;
    221 }
    222