1 /* 2 * QEMU buffered QEMUFile 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori (at) us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 */ 13 14 #include "qemu-common.h" 15 #include "hw/hw.h" 16 #include "qemu-timer.h" 17 #include "sysemu.h" 18 #include "qemu-char.h" 19 #include "buffered_file.h" 20 21 //#define DEBUG_BUFFERED_FILE 22 23 typedef struct QEMUFileBuffered 24 { 25 BufferedPutFunc *put_buffer; 26 BufferedPutReadyFunc *put_ready; 27 BufferedWaitForUnfreezeFunc *wait_for_unfreeze; 28 BufferedCloseFunc *close; 29 void *opaque; 30 QEMUFile *file; 31 int has_error; 32 int freeze_output; 33 size_t bytes_xfer; 34 size_t xfer_limit; 35 uint8_t *buffer; 36 size_t buffer_size; 37 size_t buffer_capacity; 38 QEMUTimer *timer; 39 } QEMUFileBuffered; 40 41 #ifdef DEBUG_BUFFERED_FILE 42 #define DPRINTF(fmt, ...) \ 43 do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0) 44 #else 45 #define DPRINTF(fmt, ...) \ 46 do { } while (0) 47 #endif 48 49 static void buffered_append(QEMUFileBuffered *s, 50 const uint8_t *buf, size_t size) 51 { 52 if (size > (s->buffer_capacity - s->buffer_size)) { 53 void *tmp; 54 55 DPRINTF("increasing buffer capacity from %zu by %zu\n", 56 s->buffer_capacity, size + 1024); 57 58 s->buffer_capacity += size + 1024; 59 60 tmp = qemu_realloc(s->buffer, s->buffer_capacity); 61 if (tmp == NULL) { 62 fprintf(stderr, "qemu file buffer expansion failed\n"); 63 exit(1); 64 } 65 66 s->buffer = tmp; 67 } 68 69 memcpy(s->buffer + s->buffer_size, buf, size); 70 s->buffer_size += size; 71 } 72 73 static void buffered_flush(QEMUFileBuffered *s) 74 { 75 size_t offset = 0; 76 77 if (s->has_error) { 78 DPRINTF("flush when error, bailing\n"); 79 return; 80 } 81 82 DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size); 83 84 while (offset < s->buffer_size) { 85 ssize_t ret; 86 87 ret = s->put_buffer(s->opaque, s->buffer + offset, 88 s->buffer_size - offset); 89 if (ret == -EAGAIN) { 90 DPRINTF("backend not ready, freezing\n"); 91 s->freeze_output = 1; 92 break; 93 } 94 95 if (ret <= 0) { 96 DPRINTF("error flushing data, %zd\n", ret); 97 s->has_error = 1; 98 break; 99 } else { 100 DPRINTF("flushed %zd byte(s)\n", ret); 101 offset += ret; 102 } 103 } 104 105 DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size); 106 memmove(s->buffer, s->buffer + offset, s->buffer_size - offset); 107 s->buffer_size -= offset; 108 } 109 110 static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) 111 { 112 QEMUFileBuffered *s = opaque; 113 int offset = 0; 114 ssize_t ret; 115 116 DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); 117 118 if (s->has_error) { 119 DPRINTF("flush when error, bailing\n"); 120 return -EINVAL; 121 } 122 123 DPRINTF("unfreezing output\n"); 124 s->freeze_output = 0; 125 126 buffered_flush(s); 127 128 while (!s->freeze_output && offset < size) { 129 if (s->bytes_xfer > s->xfer_limit) { 130 DPRINTF("transfer limit exceeded when putting\n"); 131 break; 132 } 133 134 ret = s->put_buffer(s->opaque, buf + offset, size - offset); 135 if (ret == -EAGAIN) { 136 DPRINTF("backend not ready, freezing\n"); 137 s->freeze_output = 1; 138 break; 139 } 140 141 if (ret <= 0) { 142 DPRINTF("error putting\n"); 143 s->has_error = 1; 144 offset = -EINVAL; 145 break; 146 } 147 148 DPRINTF("put %zd byte(s)\n", ret); 149 offset += ret; 150 s->bytes_xfer += ret; 151 } 152 153 if (offset >= 0) { 154 DPRINTF("buffering %d bytes\n", size - offset); 155 buffered_append(s, buf + offset, size - offset); 156 offset = size; 157 } 158 159 return offset; 160 } 161 162 static int buffered_close(void *opaque) 163 { 164 QEMUFileBuffered *s = opaque; 165 int ret; 166 167 DPRINTF("closing\n"); 168 169 while (!s->has_error && s->buffer_size) { 170 buffered_flush(s); 171 if (s->freeze_output) 172 s->wait_for_unfreeze(s); 173 } 174 175 ret = s->close(s->opaque); 176 177 qemu_del_timer(s->timer); 178 qemu_free_timer(s->timer); 179 qemu_free(s->buffer); 180 qemu_free(s); 181 182 return ret; 183 } 184 185 static int buffered_rate_limit(void *opaque) 186 { 187 QEMUFileBuffered *s = opaque; 188 189 if (s->has_error) 190 return 0; 191 192 if (s->freeze_output) 193 return 1; 194 195 if (s->bytes_xfer > s->xfer_limit) 196 return 1; 197 198 return 0; 199 } 200 201 static size_t buffered_set_rate_limit(void *opaque, size_t new_rate) 202 { 203 QEMUFileBuffered *s = opaque; 204 205 if (s->has_error) 206 goto out; 207 208 s->xfer_limit = new_rate / 10; 209 210 out: 211 return s->xfer_limit; 212 } 213 214 static size_t buffered_get_rate_limit(void *opaque) 215 { 216 QEMUFileBuffered *s = opaque; 217 218 return s->xfer_limit; 219 } 220 221 static void buffered_rate_tick(void *opaque) 222 { 223 QEMUFileBuffered *s = opaque; 224 225 if (s->has_error) 226 return; 227 228 qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100); 229 230 if (s->freeze_output) 231 return; 232 233 s->bytes_xfer = 0; 234 235 buffered_flush(s); 236 237 /* Add some checks around this */ 238 s->put_ready(s->opaque); 239 } 240 241 QEMUFile *qemu_fopen_ops_buffered(void *opaque, 242 size_t bytes_per_sec, 243 BufferedPutFunc *put_buffer, 244 BufferedPutReadyFunc *put_ready, 245 BufferedWaitForUnfreezeFunc *wait_for_unfreeze, 246 BufferedCloseFunc *close) 247 { 248 QEMUFileBuffered *s; 249 250 s = qemu_mallocz(sizeof(*s)); 251 252 s->opaque = opaque; 253 s->xfer_limit = bytes_per_sec / 10; 254 s->put_buffer = put_buffer; 255 s->put_ready = put_ready; 256 s->wait_for_unfreeze = wait_for_unfreeze; 257 s->close = close; 258 259 s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, 260 buffered_close, buffered_rate_limit, 261 buffered_set_rate_limit); 262 263 s->timer = qemu_new_timer(rt_clock, buffered_rate_tick, s); 264 265 qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100); 266 267 return s->file; 268 } 269