1 /* Test program that performs producer-consumer style communication through 2 * a circular buffer. This test program is a slightly modified version of the 3 * test program made available by Miguel Ojeda 4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782. 5 */ 6 7 8 #include <stdio.h> 9 #include <string.h> 10 #include <stdlib.h> 11 #include <unistd.h> 12 #include <time.h> 13 #include <pthread.h> 14 #include <semaphore.h> 15 #include <fcntl.h> 16 #include "../../config.h" 17 18 19 /** gcc versions 4.1.0 and later have support for atomic builtins. */ 20 21 #ifndef HAVE_BUILTIN_ATOMIC 22 #error Sorry, but this test program can only be compiled by a compiler that\ 23 has built-in functions for atomic memory access. 24 #endif 25 26 27 #define BUFFER_MAX (2) 28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore" 29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore" 30 31 32 typedef int data_t; 33 34 typedef struct { 35 /* Counting semaphore representing the number of data items in the buffer. */ 36 sem_t* data; 37 /* Counting semaphore representing the number of free elements. */ 38 sem_t* free; 39 /* Position where a new elements should be written. */ 40 int in; 41 /* Position from where an element can be removed. */ 42 int out; 43 /* Mutex that protects 'in'. */ 44 pthread_mutex_t mutex_in; 45 /* Mutex that protects 'out'. */ 46 pthread_mutex_t mutex_out; 47 /* Data buffer. */ 48 data_t buffer[BUFFER_MAX]; 49 } buffer_t; 50 51 static int quiet = 0; 52 static int use_locking = 1; 53 54 static __inline__ 55 int fetch_and_add(int* p, int i) 56 { 57 return __sync_fetch_and_add(p, i); 58 } 59 60 static sem_t* create_semaphore(const char* const name, const int value) 61 { 62 #ifdef VGO_darwin 63 char name_and_pid[32]; 64 snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid()); 65 sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value); 66 if (p == SEM_FAILED) { 67 perror("sem_open"); 68 return NULL; 69 } 70 return p; 71 #else 72 sem_t* p = malloc(sizeof(*p)); 73 if (p) 74 sem_init(p, 0, value); 75 return p; 76 #endif 77 } 78 79 static void destroy_semaphore(const char* const name, sem_t* p) 80 { 81 #ifdef VGO_darwin 82 sem_close(p); 83 sem_unlink(name); 84 #else 85 sem_destroy(p); 86 free(p); 87 #endif 88 } 89 90 static void buffer_init(buffer_t * b) 91 { 92 b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0); 93 b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX); 94 95 pthread_mutex_init(&b->mutex_in, NULL); 96 pthread_mutex_init(&b->mutex_out, NULL); 97 98 b->in = 0; 99 b->out = 0; 100 } 101 102 static void buffer_recv(buffer_t* b, data_t* d) 103 { 104 int out; 105 sem_wait(b->data); 106 if (use_locking) 107 pthread_mutex_lock(&b->mutex_out); 108 out = fetch_and_add(&b->out, 1); 109 if (out >= BUFFER_MAX) 110 { 111 fetch_and_add(&b->out, -BUFFER_MAX); 112 out -= BUFFER_MAX; 113 } 114 *d = b->buffer[out]; 115 if (use_locking) 116 pthread_mutex_unlock(&b->mutex_out); 117 if (! quiet) 118 { 119 printf("received %d from buffer[%d]\n", *d, out); 120 fflush(stdout); 121 } 122 sem_post(b->free); 123 } 124 125 static void buffer_send(buffer_t* b, data_t* d) 126 { 127 int in; 128 sem_wait(b->free); 129 if (use_locking) 130 pthread_mutex_lock(&b->mutex_in); 131 in = fetch_and_add(&b->in, 1); 132 if (in >= BUFFER_MAX) 133 { 134 fetch_and_add(&b->in, -BUFFER_MAX); 135 in -= BUFFER_MAX; 136 } 137 b->buffer[in] = *d; 138 if (use_locking) 139 pthread_mutex_unlock(&b->mutex_in); 140 if (! quiet) 141 { 142 printf("sent %d to buffer[%d]\n", *d, in); 143 fflush(stdout); 144 } 145 sem_post(b->data); 146 } 147 148 static void buffer_destroy(buffer_t* b) 149 { 150 destroy_semaphore(DATA_SEMAPHORE_NAME, b->data); 151 destroy_semaphore(FREE_SEMAPHORE_NAME, b->free); 152 153 pthread_mutex_destroy(&b->mutex_in); 154 pthread_mutex_destroy(&b->mutex_out); 155 } 156 157 static buffer_t b; 158 159 static void producer(int* id) 160 { 161 buffer_send(&b, id); 162 pthread_exit(NULL); 163 } 164 165 #define MAXSLEEP (100 * 1000) 166 167 static void consumer(int* id) 168 { 169 int d; 170 usleep(rand() % MAXSLEEP); 171 buffer_recv(&b, &d); 172 if (! quiet) 173 { 174 printf("%i: %i\n", *id, d); 175 fflush(stdout); 176 } 177 pthread_exit(NULL); 178 } 179 180 #define THREADS (10) 181 182 int main(int argc, char** argv) 183 { 184 pthread_t producers[THREADS]; 185 pthread_t consumers[THREADS]; 186 int thread_arg[THREADS]; 187 int i; 188 int optchar; 189 190 while ((optchar = getopt(argc, argv, "nq")) != EOF) 191 { 192 switch (optchar) 193 { 194 case 'n': 195 use_locking = 0; 196 break; 197 case 'q': 198 quiet = 1; 199 break; 200 } 201 } 202 203 srand(time(NULL)); 204 205 buffer_init(&b); 206 207 for (i = 0; i < THREADS; ++i) 208 { 209 thread_arg[i] = i; 210 pthread_create(producers + i, NULL, 211 (void * (*)(void *)) producer, &thread_arg[i]); 212 } 213 214 for (i = 0; i < THREADS; ++i) 215 pthread_create(consumers + i, NULL, 216 (void * (*)(void *)) consumer, &thread_arg[i]); 217 218 for (i = 0; i < THREADS; ++i) 219 { 220 pthread_join(producers[i], NULL); 221 pthread_join(consumers[i], NULL); 222 } 223 224 buffer_destroy(&b); 225 226 return 0; 227 } 228