1 /* 2 * Basic workqueue like code, that sets up a thread and allows async 3 * processing of some sort. Could be extended to allow for multiple 4 * worker threads. But right now fio associates one of this per IO 5 * thread, so should be enough to have just a single thread doing the 6 * work. 7 */ 8 #include <stdio.h> 9 #include <stdlib.h> 10 #include <stdarg.h> 11 #include <unistd.h> 12 #include <errno.h> 13 #include <pthread.h> 14 #include <string.h> 15 16 #include "../smalloc.h" 17 #include "../log.h" 18 #include "tp.h" 19 20 static void tp_flush_work(struct flist_head *list) 21 { 22 struct tp_work *work; 23 24 while (!flist_empty(list)) { 25 int prio; 26 27 work = flist_entry(list->next, struct tp_work, list); 28 flist_del(&work->list); 29 30 prio = work->prio; 31 if (nice(prio) < 0) 32 log_err("fio: nice %s\n", strerror(errno)); 33 34 work->fn(work); 35 36 if (nice(prio) < 0) 37 log_err("fio: nice %s\n", strerror(errno)); 38 } 39 } 40 41 static void *tp_thread(void *data) 42 { 43 struct tp_data *tdat = data; 44 struct flist_head work_list; 45 46 INIT_FLIST_HEAD(&work_list); 47 48 while (1) { 49 pthread_mutex_lock(&tdat->lock); 50 51 if (!tdat->thread_exit && flist_empty(&tdat->work)) 52 pthread_cond_wait(&tdat->cv, &tdat->lock); 53 54 if (!flist_empty(&tdat->work)) 55 flist_splice_tail_init(&tdat->work, &work_list); 56 57 pthread_mutex_unlock(&tdat->lock); 58 59 if (flist_empty(&work_list)) { 60 if (tdat->thread_exit) 61 break; 62 continue; 63 } 64 65 tp_flush_work(&work_list); 66 } 67 68 return NULL; 69 } 70 71 void tp_queue_work(struct tp_data *tdat, struct tp_work *work) 72 { 73 work->done = 0; 74 75 pthread_mutex_lock(&tdat->lock); 76 flist_add_tail(&work->list, &tdat->work); 77 pthread_mutex_unlock(&tdat->lock); 78 79 pthread_cond_signal(&tdat->cv); 80 } 81 82 void tp_init(struct tp_data **tdatp) 83 { 84 struct tp_data *tdat; 85 int ret; 86 87 if (*tdatp) 88 return; 89 90 *tdatp = tdat = smalloc(sizeof(*tdat)); 91 pthread_mutex_init(&tdat->lock, NULL); 92 INIT_FLIST_HEAD(&tdat->work); 93 pthread_cond_init(&tdat->cv, NULL); 94 pthread_cond_init(&tdat->sleep_cv, NULL); 95 96 ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat); 97 if (ret) 98 log_err("fio: failed to create tp thread\n"); 99 } 100 101 void tp_exit(struct tp_data **tdatp) 102 { 103 struct tp_data *tdat = *tdatp; 104 void *ret; 105 106 if (!tdat) 107 return; 108 109 pthread_mutex_lock(&tdat->lock); 110 tdat->thread_exit = 1; 111 pthread_mutex_unlock(&tdat->lock); 112 113 pthread_cond_signal(&tdat->cv); 114 115 pthread_join(tdat->thread, &ret); 116 117 sfree(tdat); 118 *tdatp = NULL; 119 } 120