Home | History | Annotate | Download | only in lib
      1 /*
      2  * Basic workqueue like code, that sets up a thread and allows async
      3  * processing of some sort. Could be extended to allow for multiple
      4  * worker threads. But right now fio associates one of this per IO
      5  * thread, so should be enough to have just a single thread doing the
      6  * work.
      7  */
      8 #include <stdio.h>
      9 #include <stdlib.h>
     10 #include <stdarg.h>
     11 #include <unistd.h>
     12 #include <errno.h>
     13 #include <pthread.h>
     14 #include <string.h>
     15 
     16 #include "../smalloc.h"
     17 #include "../log.h"
     18 #include "tp.h"
     19 
     20 static void tp_flush_work(struct flist_head *list)
     21 {
     22 	struct tp_work *work;
     23 
     24 	while (!flist_empty(list)) {
     25 		int prio;
     26 
     27 		work = flist_entry(list->next, struct tp_work, list);
     28 		flist_del(&work->list);
     29 
     30 		prio = work->prio;
     31 		if (nice(prio) < 0)
     32 			log_err("fio: nice %s\n", strerror(errno));
     33 
     34 		work->fn(work);
     35 
     36 		if (nice(prio) < 0)
     37 			log_err("fio: nice %s\n", strerror(errno));
     38 	}
     39 }
     40 
     41 static void *tp_thread(void *data)
     42 {
     43 	struct tp_data *tdat = data;
     44 	struct flist_head work_list;
     45 
     46 	INIT_FLIST_HEAD(&work_list);
     47 
     48 	while (1) {
     49 		pthread_mutex_lock(&tdat->lock);
     50 
     51 		if (!tdat->thread_exit && flist_empty(&tdat->work))
     52 			pthread_cond_wait(&tdat->cv, &tdat->lock);
     53 
     54 		if (!flist_empty(&tdat->work))
     55 			flist_splice_tail_init(&tdat->work, &work_list);
     56 
     57 		pthread_mutex_unlock(&tdat->lock);
     58 
     59 		if (flist_empty(&work_list)) {
     60 			if (tdat->thread_exit)
     61 				break;
     62 			continue;
     63 		}
     64 
     65 		tp_flush_work(&work_list);
     66 	}
     67 
     68 	return NULL;
     69 }
     70 
     71 void tp_queue_work(struct tp_data *tdat, struct tp_work *work)
     72 {
     73 	work->done = 0;
     74 
     75 	pthread_mutex_lock(&tdat->lock);
     76 	flist_add_tail(&work->list, &tdat->work);
     77 	pthread_mutex_unlock(&tdat->lock);
     78 
     79 	pthread_cond_signal(&tdat->cv);
     80 }
     81 
     82 void tp_init(struct tp_data **tdatp)
     83 {
     84 	struct tp_data *tdat;
     85 	int ret;
     86 
     87 	if (*tdatp)
     88 		return;
     89 
     90 	*tdatp = tdat = smalloc(sizeof(*tdat));
     91 	pthread_mutex_init(&tdat->lock, NULL);
     92 	INIT_FLIST_HEAD(&tdat->work);
     93 	pthread_cond_init(&tdat->cv, NULL);
     94 	pthread_cond_init(&tdat->sleep_cv, NULL);
     95 
     96 	ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat);
     97 	if (ret)
     98 		log_err("fio: failed to create tp thread\n");
     99 }
    100 
    101 void tp_exit(struct tp_data **tdatp)
    102 {
    103 	struct tp_data *tdat = *tdatp;
    104 	void *ret;
    105 
    106 	if (!tdat)
    107 		return;
    108 
    109 	pthread_mutex_lock(&tdat->lock);
    110 	tdat->thread_exit = 1;
    111 	pthread_mutex_unlock(&tdat->lock);
    112 
    113 	pthread_cond_signal(&tdat->cv);
    114 
    115 	pthread_join(tdat->thread, &ret);
    116 
    117 	sfree(tdat);
    118 	*tdatp = NULL;
    119 }
    120