Home | History | Annotate | Download | only in tests
      1 /* Test program that performs producer-consumer style communication through
      2  * a circular buffer. This test program is a slightly modified version of the
      3  * test program made available by Miguel Ojeda
      4  * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
      5  */
      6 
      7 
      8 #include <stdio.h>
      9 #include <string.h>
     10 #include <stdlib.h>
     11 #include <unistd.h>
     12 #include <time.h>
     13 #include <pthread.h>
     14 #include <semaphore.h>
     15 #include <fcntl.h>
     16 #include "../../config.h"
     17 
     18 
     19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
     20 
     21 #ifndef HAVE_BUILTIN_ATOMIC
     22 #error Sorry, but this test program can only be compiled by a compiler that\
     23 has built-in functions for atomic memory access.
     24 #endif
     25 
     26 
     27 #define BUFFER_MAX (2)
     28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
     29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
     30 
     31 
     32 typedef int data_t;
     33 
     34 typedef struct {
     35   /* Counting semaphore representing the number of data items in the buffer. */
     36   sem_t* data;
     37   /* Counting semaphore representing the number of free elements. */
     38   sem_t* free;
     39   /* Position where a new elements should be written. */
     40   int in;
     41   /* Position from where an element can be removed. */
     42   int out;
     43   /* Mutex that protects 'in'. */
     44   pthread_mutex_t mutex_in;
     45   /* Mutex that protects 'out'. */
     46   pthread_mutex_t mutex_out;
     47   /* Data buffer. */
     48   data_t buffer[BUFFER_MAX];
     49 } buffer_t;
     50 
     51 static int quiet = 0;
     52 static int use_locking = 1;
     53 
     54 static __inline__
     55 int fetch_and_add(int* p, int i)
     56 {
     57   return __sync_fetch_and_add(p, i);
     58 }
     59 
     60 static sem_t* create_semaphore(const char* const name, const int value)
     61 {
     62 #ifdef VGO_darwin
     63   char name_and_pid[32];
     64   snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
     65   sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
     66   if (p == SEM_FAILED) {
     67     perror("sem_open");
     68     return NULL;
     69   }
     70   return p;
     71 #else
     72   sem_t* p = malloc(sizeof(*p));
     73   if (p)
     74     sem_init(p, 0, value);
     75   return p;
     76 #endif
     77 }
     78 
     79 static void destroy_semaphore(const char* const name, sem_t* p)
     80 {
     81 #ifdef VGO_darwin
     82   sem_close(p);
     83   sem_unlink(name);
     84 #else
     85   sem_destroy(p);
     86   free(p);
     87 #endif
     88 }
     89 
     90 static void buffer_init(buffer_t * b)
     91 {
     92   b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
     93   b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
     94 
     95   pthread_mutex_init(&b->mutex_in, NULL);
     96   pthread_mutex_init(&b->mutex_out, NULL);
     97 
     98   b->in = 0;
     99   b->out = 0;
    100 }
    101 
    102 static void buffer_recv(buffer_t* b, data_t* d)
    103 {
    104   int out;
    105   sem_wait(b->data);
    106   if (use_locking)
    107     pthread_mutex_lock(&b->mutex_out);
    108   out = fetch_and_add(&b->out, 1);
    109   if (out >= BUFFER_MAX)
    110   {
    111     fetch_and_add(&b->out, -BUFFER_MAX);
    112     out -= BUFFER_MAX;
    113   }
    114   *d = b->buffer[out];
    115   if (use_locking)
    116     pthread_mutex_unlock(&b->mutex_out);
    117   if (! quiet)
    118   {
    119     printf("received %d from buffer[%d]\n", *d, out);
    120     fflush(stdout);
    121   }
    122   sem_post(b->free);
    123 }
    124 
    125 static void buffer_send(buffer_t* b, data_t* d)
    126 {
    127   int in;
    128   sem_wait(b->free);
    129   if (use_locking)
    130     pthread_mutex_lock(&b->mutex_in);
    131   in = fetch_and_add(&b->in, 1);
    132   if (in >= BUFFER_MAX)
    133   {
    134     fetch_and_add(&b->in, -BUFFER_MAX);
    135     in -= BUFFER_MAX;
    136   }
    137   b->buffer[in] = *d;
    138   if (use_locking)
    139     pthread_mutex_unlock(&b->mutex_in);
    140   if (! quiet)
    141   {
    142     printf("sent %d to buffer[%d]\n", *d, in);
    143     fflush(stdout);
    144   }
    145   sem_post(b->data);
    146 }
    147 
    148 static void buffer_destroy(buffer_t* b)
    149 {
    150   destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
    151   destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
    152 
    153   pthread_mutex_destroy(&b->mutex_in);
    154   pthread_mutex_destroy(&b->mutex_out);
    155 }
    156 
    157 static buffer_t b;
    158 
    159 static void producer(int* id)
    160 {
    161   buffer_send(&b, id);
    162   pthread_exit(NULL);
    163 }
    164 
    165 #define MAXSLEEP (100 * 1000)
    166 
    167 static void consumer(int* id)
    168 {
    169   int d;
    170   usleep(rand() % MAXSLEEP);
    171   buffer_recv(&b, &d);
    172   if (! quiet)
    173   {
    174     printf("%i: %i\n", *id, d);
    175     fflush(stdout);
    176   }
    177   pthread_exit(NULL);
    178 }
    179 
    180 #define THREADS (10)
    181 
    182 int main(int argc, char** argv)
    183 {
    184   pthread_t producers[THREADS];
    185   pthread_t consumers[THREADS];
    186   int thread_arg[THREADS];
    187   int i;
    188   int optchar;
    189 
    190   while ((optchar = getopt(argc, argv, "nq")) != EOF)
    191   {
    192     switch (optchar)
    193     {
    194     case 'n':
    195       use_locking = 0;
    196       break;
    197     case 'q':
    198       quiet = 1;
    199       break;
    200     }
    201   }
    202 
    203   srand(time(NULL));
    204 
    205   buffer_init(&b);
    206 
    207   for (i = 0; i < THREADS; ++i)
    208   {
    209     thread_arg[i] = i;
    210     pthread_create(producers + i, NULL,
    211                    (void * (*)(void *)) producer, &thread_arg[i]);
    212   }
    213 
    214   for (i = 0; i < THREADS; ++i)
    215     pthread_create(consumers + i, NULL,
    216                    (void * (*)(void *)) consumer, &thread_arg[i]);
    217 
    218   for (i = 0; i < THREADS; ++i)
    219   {
    220     pthread_join(producers[i], NULL);
    221     pthread_join(consumers[i], NULL);
    222   }
    223 
    224   buffer_destroy(&b);
    225 
    226   return 0;
    227 }
    228