Home | History | Annotate | Download | only in test
      1 /*
      2  * Simple templated message queue implementation that relies on only mutexes for
      3  * synchronization (which reduces portability issues).  Given the following
      4  * setup:
      5  *
      6  *   typedef struct mq_msg_s mq_msg_t;
      7  *   struct mq_msg_s {
      8  *           mq_msg(mq_msg_t) link;
      9  *           [message data]
     10  *   };
     11  *   mq_gen(, mq_, mq_t, mq_msg_t, link)
     12  *
     13  * The API is as follows:
     14  *
     15  *   bool mq_init(mq_t *mq);
     16  *   void mq_fini(mq_t *mq);
     17  *   unsigned mq_count(mq_t *mq);
     18  *   mq_msg_t *mq_tryget(mq_t *mq);
     19  *   mq_msg_t *mq_get(mq_t *mq);
     20  *   void mq_put(mq_t *mq, mq_msg_t *msg);
     21  *
     22  * The message queue linkage embedded in each message is to be treated as
     23  * externally opaque (no need to initialize or clean up externally).  mq_fini()
     24  * does not perform any cleanup of messages, since it knows nothing of their
     25  * payloads.
     26  */
     27 #define	mq_msg(a_mq_msg_type)	ql_elm(a_mq_msg_type)
     28 
     29 #define	mq_gen(a_attr, a_prefix, a_mq_type, a_mq_msg_type, a_field)	\
     30 typedef struct {							\
     31 	mtx_t			lock;					\
     32 	ql_head(a_mq_msg_type)	msgs;					\
     33 	unsigned		count;					\
     34 } a_mq_type;								\
     35 a_attr bool								\
     36 a_prefix##init(a_mq_type *mq) {						\
     37 									\
     38 	if (mtx_init(&mq->lock))					\
     39 		return (true);						\
     40 	ql_new(&mq->msgs);						\
     41 	mq->count = 0;							\
     42 	return (false);							\
     43 }									\
     44 a_attr void								\
     45 a_prefix##fini(a_mq_type *mq)						\
     46 {									\
     47 									\
     48 	mtx_fini(&mq->lock);						\
     49 }									\
     50 a_attr unsigned								\
     51 a_prefix##count(a_mq_type *mq)						\
     52 {									\
     53 	unsigned count;							\
     54 									\
     55 	mtx_lock(&mq->lock);						\
     56 	count = mq->count;						\
     57 	mtx_unlock(&mq->lock);						\
     58 	return (count);							\
     59 }									\
     60 a_attr a_mq_msg_type *							\
     61 a_prefix##tryget(a_mq_type *mq)						\
     62 {									\
     63 	a_mq_msg_type *msg;						\
     64 									\
     65 	mtx_lock(&mq->lock);						\
     66 	msg = ql_first(&mq->msgs);					\
     67 	if (msg != NULL) {						\
     68 		ql_head_remove(&mq->msgs, a_mq_msg_type, a_field);	\
     69 		mq->count--;						\
     70 	}								\
     71 	mtx_unlock(&mq->lock);						\
     72 	return (msg);							\
     73 }									\
     74 a_attr a_mq_msg_type *							\
     75 a_prefix##get(a_mq_type *mq)						\
     76 {									\
     77 	a_mq_msg_type *msg;						\
     78 	struct timespec timeout;					\
     79 									\
     80 	msg = a_prefix##tryget(mq);					\
     81 	if (msg != NULL)						\
     82 		return (msg);						\
     83 									\
     84 	timeout.tv_sec = 0;						\
     85 	timeout.tv_nsec = 1;						\
     86 	while (true) {							\
     87 		nanosleep(&timeout, NULL);				\
     88 		msg = a_prefix##tryget(mq);				\
     89 		if (msg != NULL)					\
     90 			return (msg);					\
     91 		if (timeout.tv_sec == 0) {				\
     92 			/* Double sleep time, up to max 1 second. */	\
     93 			timeout.tv_nsec <<= 1;				\
     94 			if (timeout.tv_nsec >= 1000*1000*1000) {	\
     95 				timeout.tv_sec = 1;			\
     96 				timeout.tv_nsec = 0;			\
     97 			}						\
     98 		}							\
     99 	}								\
    100 }									\
    101 a_attr void								\
    102 a_prefix##put(a_mq_type *mq, a_mq_msg_type *msg)			\
    103 {									\
    104 									\
    105 	mtx_lock(&mq->lock);						\
    106 	ql_elm_new(msg, a_field);					\
    107 	ql_tail_insert(&mq->msgs, msg, a_field);			\
    108 	mq->count++;							\
    109 	mtx_unlock(&mq->lock);						\
    110 }
    111