Home | History | Annotate | Download | only in qemu
      1 /*
      2  * QEMU posix-aio emulation
      3  *
      4  * Copyright IBM, Corp. 2008
      5  *
      6  * Authors:
      7  *  Anthony Liguori   <aliguori (at) us.ibm.com>
      8  *
      9  * This work is licensed under the terms of the GNU GPL, version 2.  See
     10  * the COPYING file in the top-level directory.
     11  *
     12  */
     13 
     14 #include <sys/ioctl.h>
     15 #include <sys/types.h>
     16 #include <pthread.h>
     17 #include <unistd.h>
     18 #include <errno.h>
     19 #include <time.h>
     20 #include <signal.h>
     21 #include <string.h>
     22 #include <stdlib.h>
     23 #include <stdio.h>
     24 
     25 #include "qemu-queue.h"
     26 #include "osdep.h"
     27 #include "sysemu.h"
     28 #include "qemu-common.h"
     29 #include "block_int.h"
     30 
     31 #include "block/raw-posix-aio.h"
     32 
     33 
     34 struct qemu_paiocb {
     35     BlockDriverAIOCB common;
     36     int aio_fildes;
     37     union {
     38         struct iovec *aio_iov;
     39         void *aio_ioctl_buf;
     40     };
     41     int aio_niov;
     42     size_t aio_nbytes;
     43 #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
     44     int ev_signo;
     45     off_t aio_offset;
     46 
     47     QTAILQ_ENTRY(qemu_paiocb) node;
     48     int aio_type;
     49     ssize_t ret;
     50     int active;
     51     struct qemu_paiocb *next;
     52 
     53     int async_context_id;
     54 };
     55 
     56 typedef struct PosixAioState {
     57     int rfd, wfd;
     58     struct qemu_paiocb *first_aio;
     59 } PosixAioState;
     60 
     61 
     62 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
     63 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
     64 static pthread_t thread_id;
     65 static pthread_attr_t attr;
     66 static int max_threads = 64;
     67 static int cur_threads = 0;
     68 static int idle_threads = 0;
     69 static QTAILQ_HEAD(, qemu_paiocb) request_list;
     70 
     71 #ifdef CONFIG_PREADV
     72 static int preadv_present = 1;
     73 #else
     74 static int preadv_present = 0;
     75 #endif
     76 
     77 static void die2(int err, const char *what)
     78 {
     79     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
     80     abort();
     81 }
     82 
     83 static void die(const char *what)
     84 {
     85     die2(errno, what);
     86 }
     87 
     88 static void mutex_lock(pthread_mutex_t *mutex)
     89 {
     90     int ret = pthread_mutex_lock(mutex);
     91     if (ret) die2(ret, "pthread_mutex_lock");
     92 }
     93 
     94 static void mutex_unlock(pthread_mutex_t *mutex)
     95 {
     96     int ret = pthread_mutex_unlock(mutex);
     97     if (ret) die2(ret, "pthread_mutex_unlock");
     98 }
     99 
    100 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
    101                            struct timespec *ts)
    102 {
    103     int ret = pthread_cond_timedwait(cond, mutex, ts);
    104     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
    105     return ret;
    106 }
    107 
    108 static void cond_signal(pthread_cond_t *cond)
    109 {
    110     int ret = pthread_cond_signal(cond);
    111     if (ret) die2(ret, "pthread_cond_signal");
    112 }
    113 
    114 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
    115                           void *(*start_routine)(void*), void *arg)
    116 {
    117     int ret = pthread_create(thread, attr, start_routine, arg);
    118     if (ret) die2(ret, "pthread_create");
    119 }
    120 
    121 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
    122 {
    123     int ret;
    124 
    125     ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
    126     if (ret == -1)
    127         return -errno;
    128 
    129     /*
    130      * This looks weird, but the aio code only consideres a request
    131      * successful if it has written the number full number of bytes.
    132      *
    133      * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
    134      * so in fact we return the ioctl command here to make posix_aio_read()
    135      * happy..
    136      */
    137     return aiocb->aio_nbytes;
    138 }
    139 
    140 static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
    141 {
    142     int ret;
    143 
    144     ret = qemu_fdatasync(aiocb->aio_fildes);
    145     if (ret == -1)
    146         return -errno;
    147     return 0;
    148 }
    149 
    150 #ifdef CONFIG_PREADV
    151 
    152 static ssize_t
    153 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
    154 {
    155     return preadv(fd, iov, nr_iov, offset);
    156 }
    157 
    158 static ssize_t
    159 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
    160 {
    161     return pwritev(fd, iov, nr_iov, offset);
    162 }
    163 
    164 #else
    165 
    166 static ssize_t
    167 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
    168 {
    169     return -ENOSYS;
    170 }
    171 
    172 static ssize_t
    173 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
    174 {
    175     return -ENOSYS;
    176 }
    177 
    178 #endif
    179 
    180 static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
    181 {
    182     size_t offset = 0;
    183     ssize_t len;
    184 
    185     do {
    186         if (aiocb->aio_type & QEMU_AIO_WRITE)
    187             len = qemu_pwritev(aiocb->aio_fildes,
    188                                aiocb->aio_iov,
    189                                aiocb->aio_niov,
    190                                aiocb->aio_offset + offset);
    191          else
    192             len = qemu_preadv(aiocb->aio_fildes,
    193                               aiocb->aio_iov,
    194                               aiocb->aio_niov,
    195                               aiocb->aio_offset + offset);
    196     } while (len == -1 && errno == EINTR);
    197 
    198     if (len == -1)
    199         return -errno;
    200     return len;
    201 }
    202 
    203 static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
    204 {
    205     ssize_t offset = 0;
    206     ssize_t len;
    207 
    208     while (offset < aiocb->aio_nbytes) {
    209          if (aiocb->aio_type & QEMU_AIO_WRITE)
    210              len = pwrite(aiocb->aio_fildes,
    211                           (const char *)buf + offset,
    212                           aiocb->aio_nbytes - offset,
    213                           aiocb->aio_offset + offset);
    214          else
    215              len = pread(aiocb->aio_fildes,
    216                          buf + offset,
    217                          aiocb->aio_nbytes - offset,
    218                          aiocb->aio_offset + offset);
    219 
    220          if (len == -1 && errno == EINTR)
    221              continue;
    222          else if (len == -1) {
    223              offset = -errno;
    224              break;
    225          } else if (len == 0)
    226              break;
    227 
    228          offset += len;
    229     }
    230 
    231     return offset;
    232 }
    233 
    234 static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
    235 {
    236     ssize_t nbytes;
    237     char *buf;
    238 
    239     if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
    240         /*
    241          * If there is just a single buffer, and it is properly aligned
    242          * we can just use plain pread/pwrite without any problems.
    243          */
    244         if (aiocb->aio_niov == 1)
    245              return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
    246 
    247         /*
    248          * We have more than one iovec, and all are properly aligned.
    249          *
    250          * Try preadv/pwritev first and fall back to linearizing the
    251          * buffer if it's not supported.
    252          */
    253         if (preadv_present) {
    254             nbytes = handle_aiocb_rw_vector(aiocb);
    255             if (nbytes == aiocb->aio_nbytes)
    256                 return nbytes;
    257             if (nbytes < 0 && nbytes != -ENOSYS)
    258                 return nbytes;
    259             preadv_present = 0;
    260         }
    261 
    262         /*
    263          * XXX(hch): short read/write.  no easy way to handle the reminder
    264          * using these interfaces.  For now retry using plain
    265          * pread/pwrite?
    266          */
    267     }
    268 
    269     /*
    270      * Ok, we have to do it the hard way, copy all segments into
    271      * a single aligned buffer.
    272      */
    273     buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
    274     if (aiocb->aio_type & QEMU_AIO_WRITE) {
    275         char *p = buf;
    276         int i;
    277 
    278         for (i = 0; i < aiocb->aio_niov; ++i) {
    279             memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
    280             p += aiocb->aio_iov[i].iov_len;
    281         }
    282     }
    283 
    284     nbytes = handle_aiocb_rw_linear(aiocb, buf);
    285     if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
    286         char *p = buf;
    287         size_t count = aiocb->aio_nbytes, copy;
    288         int i;
    289 
    290         for (i = 0; i < aiocb->aio_niov && count; ++i) {
    291             copy = count;
    292             if (copy > aiocb->aio_iov[i].iov_len)
    293                 copy = aiocb->aio_iov[i].iov_len;
    294             memcpy(aiocb->aio_iov[i].iov_base, p, copy);
    295             p     += copy;
    296             count -= copy;
    297         }
    298     }
    299     qemu_vfree(buf);
    300 
    301     return nbytes;
    302 }
    303 
    304 static void *aio_thread(void *unused)
    305 {
    306     pid_t pid;
    307 
    308     pid = getpid();
    309 
    310     while (1) {
    311         struct qemu_paiocb *aiocb;
    312         ssize_t ret = 0;
    313         qemu_timeval tv;
    314         struct timespec ts;
    315 
    316         qemu_gettimeofday(&tv);
    317         ts.tv_sec = tv.tv_sec + 10;
    318         ts.tv_nsec = 0;
    319 
    320         mutex_lock(&lock);
    321 
    322         while (QTAILQ_EMPTY(&request_list) &&
    323                !(ret == ETIMEDOUT)) {
    324             ret = cond_timedwait(&cond, &lock, &ts);
    325         }
    326 
    327         if (QTAILQ_EMPTY(&request_list))
    328             break;
    329 
    330         aiocb = QTAILQ_FIRST(&request_list);
    331         QTAILQ_REMOVE(&request_list, aiocb, node);
    332         aiocb->active = 1;
    333         idle_threads--;
    334         mutex_unlock(&lock);
    335 
    336         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
    337         case QEMU_AIO_READ:
    338         case QEMU_AIO_WRITE:
    339             ret = handle_aiocb_rw(aiocb);
    340             break;
    341         case QEMU_AIO_FLUSH:
    342             ret = handle_aiocb_flush(aiocb);
    343             break;
    344         case QEMU_AIO_IOCTL:
    345             ret = handle_aiocb_ioctl(aiocb);
    346             break;
    347         default:
    348             fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
    349             ret = -EINVAL;
    350             break;
    351         }
    352 
    353         mutex_lock(&lock);
    354         aiocb->ret = ret;
    355         idle_threads++;
    356         mutex_unlock(&lock);
    357 
    358         if (kill(pid, aiocb->ev_signo)) die("kill failed");
    359     }
    360 
    361     idle_threads--;
    362     cur_threads--;
    363     mutex_unlock(&lock);
    364 
    365     return NULL;
    366 }
    367 
    368 static void spawn_thread(void)
    369 {
    370     sigset_t set, oldset;
    371 
    372     cur_threads++;
    373     idle_threads++;
    374 
    375     /* block all signals */
    376     if (sigfillset(&set)) die("sigfillset");
    377     if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
    378 
    379     thread_create(&thread_id, &attr, aio_thread, NULL);
    380 
    381     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
    382 }
    383 
    384 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
    385 {
    386     aiocb->ret = -EINPROGRESS;
    387     aiocb->active = 0;
    388     mutex_lock(&lock);
    389     if (idle_threads == 0 && cur_threads < max_threads)
    390         spawn_thread();
    391     QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
    392     mutex_unlock(&lock);
    393     cond_signal(&cond);
    394 }
    395 
    396 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
    397 {
    398     ssize_t ret;
    399 
    400     mutex_lock(&lock);
    401     ret = aiocb->ret;
    402     mutex_unlock(&lock);
    403 
    404     return ret;
    405 }
    406 
    407 static int qemu_paio_error(struct qemu_paiocb *aiocb)
    408 {
    409     ssize_t ret = qemu_paio_return(aiocb);
    410 
    411     if (ret < 0)
    412         ret = -ret;
    413     else
    414         ret = 0;
    415 
    416     return ret;
    417 }
    418 
    419 static int posix_aio_process_queue(void *opaque)
    420 {
    421     PosixAioState *s = opaque;
    422     struct qemu_paiocb *acb, **pacb;
    423     int ret;
    424     int result = 0;
    425     int async_context_id = get_async_context_id();
    426 
    427     for(;;) {
    428         pacb = &s->first_aio;
    429         for(;;) {
    430             acb = *pacb;
    431             if (!acb)
    432                 return result;
    433 
    434             /* we're only interested in requests in the right context */
    435             if (acb->async_context_id != async_context_id) {
    436                 pacb = &acb->next;
    437                 continue;
    438             }
    439 
    440             ret = qemu_paio_error(acb);
    441             if (ret == ECANCELED) {
    442                 /* remove the request */
    443                 *pacb = acb->next;
    444                 qemu_aio_release(acb);
    445                 result = 1;
    446             } else if (ret != EINPROGRESS) {
    447                 /* end of aio */
    448                 if (ret == 0) {
    449                     ret = qemu_paio_return(acb);
    450                     if (ret == acb->aio_nbytes)
    451                         ret = 0;
    452                     else
    453                         ret = -EINVAL;
    454                 } else {
    455                     ret = -ret;
    456                 }
    457 
    458                 //trace_paio_complete(acb, acb->common.opaque, ret);
    459 
    460                 /* remove the request */
    461                 *pacb = acb->next;
    462                 /* call the callback */
    463                 acb->common.cb(acb->common.opaque, ret);
    464                 qemu_aio_release(acb);
    465                 result = 1;
    466                 break;
    467             } else {
    468                 pacb = &acb->next;
    469             }
    470         }
    471     }
    472 
    473     return result;
    474 }
    475 
    476 static void posix_aio_read(void *opaque)
    477 {
    478     PosixAioState *s = opaque;
    479     ssize_t len;
    480 
    481     /* read all bytes from signal pipe */
    482     for (;;) {
    483         char bytes[16];
    484 
    485         len = read(s->rfd, bytes, sizeof(bytes));
    486         if (len == -1 && errno == EINTR)
    487             continue; /* try again */
    488         if (len == sizeof(bytes))
    489             continue; /* more to read */
    490         break;
    491     }
    492 
    493     posix_aio_process_queue(s);
    494 }
    495 
    496 static int posix_aio_flush(void *opaque)
    497 {
    498     PosixAioState *s = opaque;
    499     return !!s->first_aio;
    500 }
    501 
    502 static PosixAioState *posix_aio_state;
    503 
    504 static void aio_signal_handler(int signum)
    505 {
    506     if (posix_aio_state) {
    507         char byte = 0;
    508         ssize_t ret;
    509 
    510         ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
    511         if (ret < 0 && errno != EAGAIN)
    512             die("write()");
    513     }
    514 
    515     qemu_service_io();
    516 }
    517 
    518 static void paio_remove(struct qemu_paiocb *acb)
    519 {
    520     struct qemu_paiocb **pacb;
    521 
    522     /* remove the callback from the queue */
    523     pacb = &posix_aio_state->first_aio;
    524     for(;;) {
    525         if (*pacb == NULL) {
    526             fprintf(stderr, "paio_remove: aio request not found!\n");
    527             break;
    528         } else if (*pacb == acb) {
    529             *pacb = acb->next;
    530             qemu_aio_release(acb);
    531             break;
    532         }
    533         pacb = &(*pacb)->next;
    534     }
    535 }
    536 
    537 static void paio_cancel(BlockDriverAIOCB *blockacb)
    538 {
    539     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
    540     int active = 0;
    541 
    542     //trace_paio_cancel(acb, acb->common.opaque);
    543 
    544     mutex_lock(&lock);
    545     if (!acb->active) {
    546         QTAILQ_REMOVE(&request_list, acb, node);
    547         acb->ret = -ECANCELED;
    548     } else if (acb->ret == -EINPROGRESS) {
    549         active = 1;
    550     }
    551     mutex_unlock(&lock);
    552 
    553     if (active) {
    554         /* fail safe: if the aio could not be canceled, we wait for
    555            it */
    556         while (qemu_paio_error(acb) == EINPROGRESS)
    557             ;
    558     }
    559 
    560     paio_remove(acb);
    561 }
    562 
    563 static AIOPool raw_aio_pool = {
    564     .aiocb_size         = sizeof(struct qemu_paiocb),
    565     .cancel             = paio_cancel,
    566 };
    567 
    568 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
    569         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
    570         BlockDriverCompletionFunc *cb, void *opaque, int type)
    571 {
    572     struct qemu_paiocb *acb;
    573 
    574     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
    575     if (!acb)
    576         return NULL;
    577     acb->aio_type = type;
    578     acb->aio_fildes = fd;
    579     acb->ev_signo = SIGUSR2;
    580     acb->async_context_id = get_async_context_id();
    581 
    582     if (qiov) {
    583         acb->aio_iov = qiov->iov;
    584         acb->aio_niov = qiov->niov;
    585     }
    586     acb->aio_nbytes = nb_sectors * 512;
    587     acb->aio_offset = sector_num * 512;
    588 
    589     acb->next = posix_aio_state->first_aio;
    590     posix_aio_state->first_aio = acb;
    591 
    592     //trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
    593     qemu_paio_submit(acb);
    594     return &acb->common;
    595 }
    596 
    597 BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
    598         unsigned long int req, void *buf,
    599         BlockDriverCompletionFunc *cb, void *opaque)
    600 {
    601     struct qemu_paiocb *acb;
    602 
    603     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
    604     if (!acb)
    605         return NULL;
    606     acb->aio_type = QEMU_AIO_IOCTL;
    607     acb->aio_fildes = fd;
    608     acb->ev_signo = SIGUSR2;
    609     acb->async_context_id = get_async_context_id();
    610     acb->aio_offset = 0;
    611     acb->aio_ioctl_buf = buf;
    612     acb->aio_ioctl_cmd = req;
    613 
    614     acb->next = posix_aio_state->first_aio;
    615     posix_aio_state->first_aio = acb;
    616 
    617     qemu_paio_submit(acb);
    618     return &acb->common;
    619 }
    620 
    621 int paio_init(void)
    622 {
    623     struct sigaction act;
    624     PosixAioState *s;
    625     int fds[2];
    626     int ret;
    627 
    628     if (posix_aio_state)
    629         return 0;
    630 
    631     s = qemu_malloc(sizeof(PosixAioState));
    632 
    633     sigfillset(&act.sa_mask);
    634     act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
    635     act.sa_handler = aio_signal_handler;
    636     sigaction(SIGUSR2, &act, NULL);
    637 
    638     s->first_aio = NULL;
    639     if (qemu_pipe(fds) == -1) {
    640         fprintf(stderr, "failed to create pipe\n");
    641         return -1;
    642     }
    643 
    644     s->rfd = fds[0];
    645     s->wfd = fds[1];
    646 
    647     fcntl(s->rfd, F_SETFL, O_NONBLOCK);
    648     fcntl(s->wfd, F_SETFL, O_NONBLOCK);
    649 
    650     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
    651         posix_aio_process_queue, s);
    652 
    653     ret = pthread_attr_init(&attr);
    654     if (ret)
    655         die2(ret, "pthread_attr_init");
    656 
    657     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    658     if (ret)
    659         die2(ret, "pthread_attr_setdetachstate");
    660 
    661     QTAILQ_INIT(&request_list);
    662 
    663     posix_aio_state = s;
    664     return 0;
    665 }
    666