1 /* 2 * Simple templated message queue implementation that relies on only mutexes for 3 * synchronization (which reduces portability issues). Given the following 4 * setup: 5 * 6 * typedef struct mq_msg_s mq_msg_t; 7 * struct mq_msg_s { 8 * mq_msg(mq_msg_t) link; 9 * [message data] 10 * }; 11 * mq_gen(, mq_, mq_t, mq_msg_t, link) 12 * 13 * The API is as follows: 14 * 15 * bool mq_init(mq_t *mq); 16 * void mq_fini(mq_t *mq); 17 * unsigned mq_count(mq_t *mq); 18 * mq_msg_t *mq_tryget(mq_t *mq); 19 * mq_msg_t *mq_get(mq_t *mq); 20 * void mq_put(mq_t *mq, mq_msg_t *msg); 21 * 22 * The message queue linkage embedded in each message is to be treated as 23 * externally opaque (no need to initialize or clean up externally). mq_fini() 24 * does not perform any cleanup of messages, since it knows nothing of their 25 * payloads. 26 */ 27 #define mq_msg(a_mq_msg_type) ql_elm(a_mq_msg_type) 28 29 #define mq_gen(a_attr, a_prefix, a_mq_type, a_mq_msg_type, a_field) \ 30 typedef struct { \ 31 mtx_t lock; \ 32 ql_head(a_mq_msg_type) msgs; \ 33 unsigned count; \ 34 } a_mq_type; \ 35 a_attr bool \ 36 a_prefix##init(a_mq_type *mq) { \ 37 \ 38 if (mtx_init(&mq->lock)) \ 39 return (true); \ 40 ql_new(&mq->msgs); \ 41 mq->count = 0; \ 42 return (false); \ 43 } \ 44 a_attr void \ 45 a_prefix##fini(a_mq_type *mq) \ 46 { \ 47 \ 48 mtx_fini(&mq->lock); \ 49 } \ 50 a_attr unsigned \ 51 a_prefix##count(a_mq_type *mq) \ 52 { \ 53 unsigned count; \ 54 \ 55 mtx_lock(&mq->lock); \ 56 count = mq->count; \ 57 mtx_unlock(&mq->lock); \ 58 return (count); \ 59 } \ 60 a_attr a_mq_msg_type * \ 61 a_prefix##tryget(a_mq_type *mq) \ 62 { \ 63 a_mq_msg_type *msg; \ 64 \ 65 mtx_lock(&mq->lock); \ 66 msg = ql_first(&mq->msgs); \ 67 if (msg != NULL) { \ 68 ql_head_remove(&mq->msgs, a_mq_msg_type, a_field); \ 69 mq->count--; \ 70 } \ 71 mtx_unlock(&mq->lock); \ 72 return (msg); \ 73 } \ 74 a_attr a_mq_msg_type * \ 75 a_prefix##get(a_mq_type *mq) \ 76 { \ 77 a_mq_msg_type *msg; \ 78 struct timespec timeout; \ 79 \ 80 msg = a_prefix##tryget(mq); \ 81 if (msg != NULL) \ 82 return (msg); \ 83 \ 84 timeout.tv_sec = 0; \ 85 timeout.tv_nsec = 1; \ 86 while (true) { \ 87 nanosleep(&timeout, NULL); \ 88 msg = a_prefix##tryget(mq); \ 89 if (msg != NULL) \ 90 return (msg); \ 91 if (timeout.tv_sec == 0) { \ 92 /* Double sleep time, up to max 1 second. */ \ 93 timeout.tv_nsec <<= 1; \ 94 if (timeout.tv_nsec >= 1000*1000*1000) { \ 95 timeout.tv_sec = 1; \ 96 timeout.tv_nsec = 0; \ 97 } \ 98 } \ 99 } \ 100 } \ 101 a_attr void \ 102 a_prefix##put(a_mq_type *mq, a_mq_msg_type *msg) \ 103 { \ 104 \ 105 mtx_lock(&mq->lock); \ 106 ql_elm_new(msg, a_field); \ 107 ql_tail_insert(&mq->msgs, msg, a_field); \ 108 mq->count++; \ 109 mtx_unlock(&mq->lock); \ 110 } 111