1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori (at) us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 */ 13 14 #include "qemu-common.h" 15 #include "block.h" 16 #include "qemu-queue.h" 17 #include "qemu_socket.h" 18 #include "iolooper.h" 19 20 typedef struct AioHandler AioHandler; 21 22 /* The list of registered AIO handlers */ 23 static QLIST_HEAD(, AioHandler) aio_handlers; 24 25 /* This is a simple lock used to protect the aio_handlers list. Specifically, 26 * it's used to ensure that no callbacks are removed while we're walking and 27 * dispatching callbacks. 28 */ 29 static int walking_handlers; 30 31 struct AioHandler 32 { 33 int fd; 34 IOHandler *io_read; 35 IOHandler *io_write; 36 AioFlushHandler *io_flush; 37 AioProcessQueue *io_process_queue; 38 int deleted; 39 void *opaque; 40 QLIST_ENTRY(AioHandler) node; 41 }; 42 43 static AioHandler *find_aio_handler(int fd) 44 { 45 AioHandler *node; 46 47 QLIST_FOREACH(node, &aio_handlers, node) { 48 if (node->fd == fd) 49 if (!node->deleted) 50 return node; 51 } 52 53 return NULL; 54 } 55 56 int qemu_aio_set_fd_handler(int fd, 57 IOHandler *io_read, 58 IOHandler *io_write, 59 AioFlushHandler *io_flush, 60 AioProcessQueue *io_process_queue, 61 void *opaque) 62 { 63 AioHandler *node; 64 65 node = find_aio_handler(fd); 66 67 /* Are we deleting the fd handler? */ 68 if (!io_read && !io_write) { 69 if (node) { 70 /* If the lock is held, just mark the node as deleted */ 71 if (walking_handlers) 72 node->deleted = 1; 73 else { 74 /* Otherwise, delete it for real. We can't just mark it as 75 * deleted because deleted nodes are only cleaned up after 76 * releasing the walking_handlers lock. 77 */ 78 QLIST_REMOVE(node, node); 79 qemu_free(node); 80 } 81 } 82 } else { 83 if (node == NULL) { 84 /* Alloc and insert if it's not already there */ 85 node = qemu_mallocz(sizeof(AioHandler)); 86 node->fd = fd; 87 QLIST_INSERT_HEAD(&aio_handlers, node, node); 88 } 89 /* Update handler with latest information */ 90 node->io_read = io_read; 91 node->io_write = io_write; 92 node->io_flush = io_flush; 93 node->io_process_queue = io_process_queue; 94 node->opaque = opaque; 95 } 96 97 qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque); 98 99 return 0; 100 } 101 102 void qemu_aio_flush(void) 103 { 104 AioHandler *node; 105 int ret; 106 107 do { 108 ret = 0; 109 110 /* 111 * If there are pending emulated aio start them now so flush 112 * will be able to return 1. 113 */ 114 qemu_aio_wait(); 115 116 QLIST_FOREACH(node, &aio_handlers, node) { 117 if (node->io_flush) { 118 ret |= node->io_flush(node->opaque); 119 } 120 } 121 } while (qemu_bh_poll() || ret > 0); 122 } 123 124 int qemu_aio_process_queue(void) 125 { 126 AioHandler *node; 127 int ret = 0; 128 129 walking_handlers = 1; 130 131 QLIST_FOREACH(node, &aio_handlers, node) { 132 if (node->io_process_queue) { 133 if (node->io_process_queue(node->opaque)) { 134 ret = 1; 135 } 136 } 137 } 138 139 walking_handlers = 0; 140 141 return ret; 142 } 143 144 void qemu_aio_wait(void) 145 { 146 int ret; 147 IoLooper* looper; 148 149 if (qemu_bh_poll()) 150 return; 151 152 /* 153 * If there are callbacks left that have been queued, we need to call then. 154 * Return afterwards to avoid waiting needlessly in select(). 155 */ 156 if (qemu_aio_process_queue()) 157 return; 158 159 looper = iolooper_new(); 160 161 do { 162 AioHandler *node; 163 164 iolooper_reset(looper); 165 walking_handlers = 1; 166 167 /* fill fd sets */ 168 QLIST_FOREACH(node, &aio_handlers, node) { 169 /* If there aren't pending AIO operations, don't invoke callbacks. 170 * Otherwise, if there are no AIO requests, qemu_aio_wait() would 171 * wait indefinitely. 172 */ 173 if (node->io_flush && node->io_flush(node->opaque) == 0) 174 continue; 175 176 if (!node->deleted && node->io_read) { 177 iolooper_add_read(looper, node->fd); 178 } 179 if (!node->deleted && node->io_write) { 180 iolooper_add_write(looper, node->fd); 181 } 182 } 183 184 walking_handlers = 0; 185 186 /* No AIO operations? Get us out of here */ 187 if (!iolooper_has_operations(looper)) { 188 break; 189 } 190 191 /* wait until next event */ 192 ret = iolooper_wait(looper, -1); 193 194 /* if we have any readable fds, dispatch event */ 195 if (ret > 0) { 196 walking_handlers = 1; 197 198 /* we have to walk very carefully in case 199 * qemu_aio_set_fd_handler is called while we're walking */ 200 node = QLIST_FIRST(&aio_handlers); 201 while (node) { 202 AioHandler *tmp; 203 204 if (!node->deleted && 205 iolooper_is_read(looper, node->fd) && 206 node->io_read) { 207 node->io_read(node->opaque); 208 } 209 if (!node->deleted && 210 iolooper_is_write(looper, node->fd) && 211 node->io_write) { 212 node->io_write(node->opaque); 213 } 214 215 tmp = node; 216 node = QLIST_NEXT(node, node); 217 218 if (tmp->deleted) { 219 QLIST_REMOVE(tmp, node); 220 qemu_free(tmp); 221 } 222 } 223 224 walking_handlers = 0; 225 } 226 } while (ret == 0); 227 228 iolooper_free(looper); 229 } 230