Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
      3  *
      4  * All rights reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  * 3. The name of the author may not be used to endorse or promote products
     15  *    derived from this software without specific prior written permission.
     16  *
     17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27  */
     28 
     29 #include "event2/event-config.h"
     30 #include "evconfig-private.h"
     31 
     32 #ifdef EVENT__HAVE_SYS_TIME_H
     33 #include <sys/time.h>
     34 #endif
     35 
     36 #include <errno.h>
     37 #include <stdio.h>
     38 #include <stdlib.h>
     39 #include <string.h>
     40 #ifdef EVENT__HAVE_STDARG_H
     41 #include <stdarg.h>
     42 #endif
     43 #ifdef EVENT__HAVE_UNISTD_H
     44 #include <unistd.h>
     45 #endif
     46 
     47 #ifdef _WIN32
     48 #include <winsock2.h>
     49 #include <ws2tcpip.h>
     50 #endif
     51 
     52 #include <sys/queue.h>
     53 
     54 #include "event2/util.h"
     55 #include "event2/bufferevent.h"
     56 #include "event2/buffer.h"
     57 #include "event2/bufferevent_struct.h"
     58 #include "event2/event.h"
     59 #include "event2/util.h"
     60 #include "event-internal.h"
     61 #include "log-internal.h"
     62 #include "mm-internal.h"
     63 #include "bufferevent-internal.h"
     64 #include "util-internal.h"
     65 #include "iocp-internal.h"
     66 
     67 #ifndef SO_UPDATE_CONNECT_CONTEXT
     68 /* Mingw is sometimes missing this */
     69 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
     70 #endif
     71 
     72 /* prototypes */
     73 static int be_async_enable(struct bufferevent *, short);
     74 static int be_async_disable(struct bufferevent *, short);
     75 static void be_async_destruct(struct bufferevent *);
     76 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
     77 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
     78 
     79 struct bufferevent_async {
     80 	struct bufferevent_private bev;
     81 	struct event_overlapped connect_overlapped;
     82 	struct event_overlapped read_overlapped;
     83 	struct event_overlapped write_overlapped;
     84 	size_t read_in_progress;
     85 	size_t write_in_progress;
     86 	unsigned ok : 1;
     87 	unsigned read_added : 1;
     88 	unsigned write_added : 1;
     89 };
     90 
     91 const struct bufferevent_ops bufferevent_ops_async = {
     92 	"socket_async",
     93 	evutil_offsetof(struct bufferevent_async, bev.bev),
     94 	be_async_enable,
     95 	be_async_disable,
     96 	NULL, /* Unlink */
     97 	be_async_destruct,
     98 	bufferevent_generic_adj_timeouts_,
     99 	be_async_flush,
    100 	be_async_ctrl,
    101 };
    102 
    103 static inline struct bufferevent_async *
    104 upcast(struct bufferevent *bev)
    105 {
    106 	struct bufferevent_async *bev_a;
    107 	if (bev->be_ops != &bufferevent_ops_async)
    108 		return NULL;
    109 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
    110 	return bev_a;
    111 }
    112 
    113 static inline struct bufferevent_async *
    114 upcast_connect(struct event_overlapped *eo)
    115 {
    116 	struct bufferevent_async *bev_a;
    117 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
    118 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    119 	return bev_a;
    120 }
    121 
    122 static inline struct bufferevent_async *
    123 upcast_read(struct event_overlapped *eo)
    124 {
    125 	struct bufferevent_async *bev_a;
    126 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
    127 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    128 	return bev_a;
    129 }
    130 
    131 static inline struct bufferevent_async *
    132 upcast_write(struct event_overlapped *eo)
    133 {
    134 	struct bufferevent_async *bev_a;
    135 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
    136 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    137 	return bev_a;
    138 }
    139 
    140 static void
    141 bev_async_del_write(struct bufferevent_async *beva)
    142 {
    143 	struct bufferevent *bev = &beva->bev.bev;
    144 
    145 	if (beva->write_added) {
    146 		beva->write_added = 0;
    147 		event_base_del_virtual_(bev->ev_base);
    148 	}
    149 }
    150 
    151 static void
    152 bev_async_del_read(struct bufferevent_async *beva)
    153 {
    154 	struct bufferevent *bev = &beva->bev.bev;
    155 
    156 	if (beva->read_added) {
    157 		beva->read_added = 0;
    158 		event_base_del_virtual_(bev->ev_base);
    159 	}
    160 }
    161 
    162 static void
    163 bev_async_add_write(struct bufferevent_async *beva)
    164 {
    165 	struct bufferevent *bev = &beva->bev.bev;
    166 
    167 	if (!beva->write_added) {
    168 		beva->write_added = 1;
    169 		event_base_add_virtual_(bev->ev_base);
    170 	}
    171 }
    172 
    173 static void
    174 bev_async_add_read(struct bufferevent_async *beva)
    175 {
    176 	struct bufferevent *bev = &beva->bev.bev;
    177 
    178 	if (!beva->read_added) {
    179 		beva->read_added = 1;
    180 		event_base_add_virtual_(bev->ev_base);
    181 	}
    182 }
    183 
    184 static void
    185 bev_async_consider_writing(struct bufferevent_async *beva)
    186 {
    187 	size_t at_most;
    188 	int limit;
    189 	struct bufferevent *bev = &beva->bev.bev;
    190 
    191 	/* Don't write if there's a write in progress, or we do not
    192 	 * want to write, or when there's nothing left to write. */
    193 	if (beva->write_in_progress || beva->bev.connecting)
    194 		return;
    195 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
    196 	    !evbuffer_get_length(bev->output)) {
    197 		bev_async_del_write(beva);
    198 		return;
    199 	}
    200 
    201 	at_most = evbuffer_get_length(bev->output);
    202 
    203 	/* This is safe so long as bufferevent_get_write_max never returns
    204 	 * more than INT_MAX.  That's true for now. XXXX */
    205 	limit = (int)bufferevent_get_write_max_(&beva->bev);
    206 	if (at_most >= (size_t)limit && limit >= 0)
    207 		at_most = limit;
    208 
    209 	if (beva->bev.write_suspended) {
    210 		bev_async_del_write(beva);
    211 		return;
    212 	}
    213 
    214 	/*  XXXX doesn't respect low-water mark very well. */
    215 	bufferevent_incref_(bev);
    216 	if (evbuffer_launch_write_(bev->output, at_most,
    217 	    &beva->write_overlapped)) {
    218 		bufferevent_decref_(bev);
    219 		beva->ok = 0;
    220 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
    221 	} else {
    222 		beva->write_in_progress = at_most;
    223 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
    224 		bev_async_add_write(beva);
    225 	}
    226 }
    227 
    228 static void
    229 bev_async_consider_reading(struct bufferevent_async *beva)
    230 {
    231 	size_t cur_size;
    232 	size_t read_high;
    233 	size_t at_most;
    234 	int limit;
    235 	struct bufferevent *bev = &beva->bev.bev;
    236 
    237 	/* Don't read if there is a read in progress, or we do not
    238 	 * want to read. */
    239 	if (beva->read_in_progress || beva->bev.connecting)
    240 		return;
    241 	if (!beva->ok || !(bev->enabled&EV_READ)) {
    242 		bev_async_del_read(beva);
    243 		return;
    244 	}
    245 
    246 	/* Don't read if we're full */
    247 	cur_size = evbuffer_get_length(bev->input);
    248 	read_high = bev->wm_read.high;
    249 	if (read_high) {
    250 		if (cur_size >= read_high) {
    251 			bev_async_del_read(beva);
    252 			return;
    253 		}
    254 		at_most = read_high - cur_size;
    255 	} else {
    256 		at_most = 16384; /* FIXME totally magic. */
    257 	}
    258 
    259 	/* XXXX This over-commits. */
    260 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
    261 	limit = (int)bufferevent_get_read_max_(&beva->bev);
    262 	if (at_most >= (size_t)limit && limit >= 0)
    263 		at_most = limit;
    264 
    265 	if (beva->bev.read_suspended) {
    266 		bev_async_del_read(beva);
    267 		return;
    268 	}
    269 
    270 	bufferevent_incref_(bev);
    271 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
    272 		beva->ok = 0;
    273 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
    274 		bufferevent_decref_(bev);
    275 	} else {
    276 		beva->read_in_progress = at_most;
    277 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
    278 		bev_async_add_read(beva);
    279 	}
    280 
    281 	return;
    282 }
    283 
    284 static void
    285 be_async_outbuf_callback(struct evbuffer *buf,
    286     const struct evbuffer_cb_info *cbinfo,
    287     void *arg)
    288 {
    289 	struct bufferevent *bev = arg;
    290 	struct bufferevent_async *bev_async = upcast(bev);
    291 
    292 	/* If we added data to the outbuf and were not writing before,
    293 	 * we may want to write now. */
    294 
    295 	bufferevent_incref_and_lock_(bev);
    296 
    297 	if (cbinfo->n_added)
    298 		bev_async_consider_writing(bev_async);
    299 
    300 	bufferevent_decref_and_unlock_(bev);
    301 }
    302 
    303 static void
    304 be_async_inbuf_callback(struct evbuffer *buf,
    305     const struct evbuffer_cb_info *cbinfo,
    306     void *arg)
    307 {
    308 	struct bufferevent *bev = arg;
    309 	struct bufferevent_async *bev_async = upcast(bev);
    310 
    311 	/* If we drained data from the inbuf and were not reading before,
    312 	 * we may want to read now */
    313 
    314 	bufferevent_incref_and_lock_(bev);
    315 
    316 	if (cbinfo->n_deleted)
    317 		bev_async_consider_reading(bev_async);
    318 
    319 	bufferevent_decref_and_unlock_(bev);
    320 }
    321 
    322 static int
    323 be_async_enable(struct bufferevent *buf, short what)
    324 {
    325 	struct bufferevent_async *bev_async = upcast(buf);
    326 
    327 	if (!bev_async->ok)
    328 		return -1;
    329 
    330 	if (bev_async->bev.connecting) {
    331 		/* Don't launch anything during connection attempts. */
    332 		return 0;
    333 	}
    334 
    335 	if (what & EV_READ)
    336 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
    337 	if (what & EV_WRITE)
    338 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
    339 
    340 	/* If we newly enable reading or writing, and we aren't reading or
    341 	   writing already, consider launching a new read or write. */
    342 
    343 	if (what & EV_READ)
    344 		bev_async_consider_reading(bev_async);
    345 	if (what & EV_WRITE)
    346 		bev_async_consider_writing(bev_async);
    347 	return 0;
    348 }
    349 
    350 static int
    351 be_async_disable(struct bufferevent *bev, short what)
    352 {
    353 	struct bufferevent_async *bev_async = upcast(bev);
    354 	/* XXXX If we disable reading or writing, we may want to consider
    355 	 * canceling any in-progress read or write operation, though it might
    356 	 * not work. */
    357 
    358 	if (what & EV_READ) {
    359 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
    360 		bev_async_del_read(bev_async);
    361 	}
    362 	if (what & EV_WRITE) {
    363 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
    364 		bev_async_del_write(bev_async);
    365 	}
    366 
    367 	return 0;
    368 }
    369 
    370 static void
    371 be_async_destruct(struct bufferevent *bev)
    372 {
    373 	struct bufferevent_async *bev_async = upcast(bev);
    374 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    375 	evutil_socket_t fd;
    376 
    377 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
    378 			!upcast(bev)->read_in_progress);
    379 
    380 	bev_async_del_read(bev_async);
    381 	bev_async_del_write(bev_async);
    382 
    383 	fd = evbuffer_overlapped_get_fd_(bev->input);
    384 	if (fd != (evutil_socket_t)INVALID_SOCKET &&
    385 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
    386 		evutil_closesocket(fd);
    387 		evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
    388 	}
    389 }
    390 
    391 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
    392  * we use WSAGetOverlappedResult to translate. */
    393 static void
    394 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
    395 {
    396 	DWORD bytes, flags;
    397 	evutil_socket_t fd;
    398 
    399 	fd = evbuffer_overlapped_get_fd_(bev->input);
    400 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
    401 }
    402 
    403 static int
    404 be_async_flush(struct bufferevent *bev, short what,
    405     enum bufferevent_flush_mode mode)
    406 {
    407 	return 0;
    408 }
    409 
    410 static void
    411 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
    412     ev_ssize_t nbytes, int ok)
    413 {
    414 	struct bufferevent_async *bev_a = upcast_connect(eo);
    415 	struct bufferevent *bev = &bev_a->bev.bev;
    416 	evutil_socket_t sock;
    417 
    418 	BEV_LOCK(bev);
    419 
    420 	EVUTIL_ASSERT(bev_a->bev.connecting);
    421 	bev_a->bev.connecting = 0;
    422 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
    423 	/* XXXX Handle error? */
    424 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
    425 
    426 	if (ok)
    427 		bufferevent_async_set_connected_(bev);
    428 	else
    429 		bev_async_set_wsa_error(bev, eo);
    430 
    431 	bufferevent_run_eventcb_(bev,
    432 			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
    433 
    434 	event_base_del_virtual_(bev->ev_base);
    435 
    436 	bufferevent_decref_and_unlock_(bev);
    437 }
    438 
    439 static void
    440 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
    441     ev_ssize_t nbytes, int ok)
    442 {
    443 	struct bufferevent_async *bev_a = upcast_read(eo);
    444 	struct bufferevent *bev = &bev_a->bev.bev;
    445 	short what = BEV_EVENT_READING;
    446 	ev_ssize_t amount_unread;
    447 	BEV_LOCK(bev);
    448 	EVUTIL_ASSERT(bev_a->read_in_progress);
    449 
    450 	amount_unread = bev_a->read_in_progress - nbytes;
    451 	evbuffer_commit_read_(bev->input, nbytes);
    452 	bev_a->read_in_progress = 0;
    453 	if (amount_unread)
    454 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
    455 
    456 	if (!ok)
    457 		bev_async_set_wsa_error(bev, eo);
    458 
    459 	if (bev_a->ok) {
    460 		if (ok && nbytes) {
    461 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    462 			bufferevent_trigger_nolock_(bev, EV_READ, 0);
    463 			bev_async_consider_reading(bev_a);
    464 		} else if (!ok) {
    465 			what |= BEV_EVENT_ERROR;
    466 			bev_a->ok = 0;
    467 			bufferevent_run_eventcb_(bev, what, 0);
    468 		} else if (!nbytes) {
    469 			what |= BEV_EVENT_EOF;
    470 			bev_a->ok = 0;
    471 			bufferevent_run_eventcb_(bev, what, 0);
    472 		}
    473 	}
    474 
    475 	bufferevent_decref_and_unlock_(bev);
    476 }
    477 
    478 static void
    479 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
    480     ev_ssize_t nbytes, int ok)
    481 {
    482 	struct bufferevent_async *bev_a = upcast_write(eo);
    483 	struct bufferevent *bev = &bev_a->bev.bev;
    484 	short what = BEV_EVENT_WRITING;
    485 	ev_ssize_t amount_unwritten;
    486 
    487 	BEV_LOCK(bev);
    488 	EVUTIL_ASSERT(bev_a->write_in_progress);
    489 
    490 	amount_unwritten = bev_a->write_in_progress - nbytes;
    491 	evbuffer_commit_write_(bev->output, nbytes);
    492 	bev_a->write_in_progress = 0;
    493 
    494 	if (amount_unwritten)
    495 		bufferevent_decrement_write_buckets_(&bev_a->bev,
    496 		                                     -amount_unwritten);
    497 
    498 
    499 	if (!ok)
    500 		bev_async_set_wsa_error(bev, eo);
    501 
    502 	if (bev_a->ok) {
    503 		if (ok && nbytes) {
    504 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
    505 			bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
    506 			bev_async_consider_writing(bev_a);
    507 		} else if (!ok) {
    508 			what |= BEV_EVENT_ERROR;
    509 			bev_a->ok = 0;
    510 			bufferevent_run_eventcb_(bev, what, 0);
    511 		} else if (!nbytes) {
    512 			what |= BEV_EVENT_EOF;
    513 			bev_a->ok = 0;
    514 			bufferevent_run_eventcb_(bev, what, 0);
    515 		}
    516 	}
    517 
    518 	bufferevent_decref_and_unlock_(bev);
    519 }
    520 
    521 struct bufferevent *
    522 bufferevent_async_new_(struct event_base *base,
    523     evutil_socket_t fd, int options)
    524 {
    525 	struct bufferevent_async *bev_a;
    526 	struct bufferevent *bev;
    527 	struct event_iocp_port *iocp;
    528 
    529 	options |= BEV_OPT_THREADSAFE;
    530 
    531 	if (!(iocp = event_base_get_iocp_(base)))
    532 		return NULL;
    533 
    534 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
    535 		int err = GetLastError();
    536 		/* We may have alrady associated this fd with a port.
    537 		 * Let's hope it's this port, and that the error code
    538 		 * for doing this neer changes. */
    539 		if (err != ERROR_INVALID_PARAMETER)
    540 			return NULL;
    541 	}
    542 
    543 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
    544 		return NULL;
    545 
    546 	bev = &bev_a->bev.bev;
    547 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
    548 		mm_free(bev_a);
    549 		return NULL;
    550 	}
    551 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
    552 		evbuffer_free(bev->input);
    553 		mm_free(bev_a);
    554 		return NULL;
    555 	}
    556 
    557 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
    558 		options)<0)
    559 		goto err;
    560 
    561 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
    562 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
    563 
    564 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
    565 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
    566 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
    567 
    568 	bufferevent_init_generic_timeout_cbs_(bev);
    569 
    570 	bev_a->ok = fd >= 0;
    571 
    572 	return bev;
    573 err:
    574 	bufferevent_free(&bev_a->bev.bev);
    575 	return NULL;
    576 }
    577 
    578 void
    579 bufferevent_async_set_connected_(struct bufferevent *bev)
    580 {
    581 	struct bufferevent_async *bev_async = upcast(bev);
    582 	bev_async->ok = 1;
    583 	bufferevent_init_generic_timeout_cbs_(bev);
    584 	/* Now's a good time to consider reading/writing */
    585 	be_async_enable(bev, bev->enabled);
    586 }
    587 
    588 int
    589 bufferevent_async_can_connect_(struct bufferevent *bev)
    590 {
    591 	const struct win32_extension_fns *ext =
    592 	    event_get_win32_extension_fns_();
    593 
    594 	if (BEV_IS_ASYNC(bev) &&
    595 	    event_base_get_iocp_(bev->ev_base) &&
    596 	    ext && ext->ConnectEx)
    597 		return 1;
    598 
    599 	return 0;
    600 }
    601 
    602 int
    603 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
    604 	const struct sockaddr *sa, int socklen)
    605 {
    606 	BOOL rc;
    607 	struct bufferevent_async *bev_async = upcast(bev);
    608 	struct sockaddr_storage ss;
    609 	const struct win32_extension_fns *ext =
    610 	    event_get_win32_extension_fns_();
    611 
    612 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
    613 
    614 	/* ConnectEx() requires that the socket be bound to an address
    615 	 * with bind() before using, otherwise it will fail. We attempt
    616 	 * to issue a bind() here, taking into account that the error
    617 	 * code is set to WSAEINVAL when the socket is already bound. */
    618 	memset(&ss, 0, sizeof(ss));
    619 	if (sa->sa_family == AF_INET) {
    620 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
    621 		sin->sin_family = AF_INET;
    622 		sin->sin_addr.s_addr = INADDR_ANY;
    623 	} else if (sa->sa_family == AF_INET6) {
    624 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
    625 		sin6->sin6_family = AF_INET6;
    626 		sin6->sin6_addr = in6addr_any;
    627 	} else {
    628 		/* Well, the user will have to bind() */
    629 		return -1;
    630 	}
    631 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
    632 	    WSAGetLastError() != WSAEINVAL)
    633 		return -1;
    634 
    635 	event_base_add_virtual_(bev->ev_base);
    636 	bufferevent_incref_(bev);
    637 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
    638 			    &bev_async->connect_overlapped.overlapped);
    639 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
    640 		return 0;
    641 
    642 	event_base_del_virtual_(bev->ev_base);
    643 	bufferevent_decref_(bev);
    644 
    645 	return -1;
    646 }
    647 
    648 static int
    649 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
    650     union bufferevent_ctrl_data *data)
    651 {
    652 	switch (op) {
    653 	case BEV_CTRL_GET_FD:
    654 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
    655 		return 0;
    656 	case BEV_CTRL_SET_FD: {
    657 		struct event_iocp_port *iocp;
    658 
    659 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
    660 			return 0;
    661 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
    662 			return -1;
    663 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
    664 			return -1;
    665 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
    666 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
    667 		return 0;
    668 	}
    669 	case BEV_CTRL_CANCEL_ALL: {
    670 		struct bufferevent_async *bev_a = upcast(bev);
    671 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
    672 		if (fd != (evutil_socket_t)INVALID_SOCKET &&
    673 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
    674 			closesocket(fd);
    675 			evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
    676 		}
    677 		bev_a->ok = 0;
    678 		return 0;
    679 	}
    680 	case BEV_CTRL_GET_UNDERLYING:
    681 	default:
    682 		return -1;
    683 	}
    684 }
    685 
    686 
    687