Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions
      6  * are met:
      7  * 1. Redistributions of source code must retain the above copyright
      8  *    notice, this list of conditions and the following disclaimer.
      9  * 2. Redistributions in binary form must reproduce the above copyright
     10  *    notice, this list of conditions and the following disclaimer in the
     11  *    documentation and/or other materials provided with the distribution.
     12  * 3. The name of the author may not be used to endorse or promote products
     13  *    derived from this software without specific prior written permission.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     25  */
     26 #include "evconfig-private.h"
     27 
     28 #ifndef _WIN32_WINNT
     29 /* Minimum required for InitializeCriticalSectionAndSpinCount */
     30 #define _WIN32_WINNT 0x0403
     31 #endif
     32 #include <winsock2.h>
     33 #include <windows.h>
     34 #include <process.h>
     35 #include <stdio.h>
     36 #include <mswsock.h>
     37 
     38 #include "event2/util.h"
     39 #include "util-internal.h"
     40 #include "iocp-internal.h"
     41 #include "log-internal.h"
     42 #include "mm-internal.h"
     43 #include "event-internal.h"
     44 #include "evthread-internal.h"
     45 
     46 #define NOTIFICATION_KEY ((ULONG_PTR)-1)
     47 
     48 void
     49 event_overlapped_init_(struct event_overlapped *o, iocp_callback cb)
     50 {
     51 	memset(o, 0, sizeof(struct event_overlapped));
     52 	o->cb = cb;
     53 }
     54 
     55 static void
     56 handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok)
     57 {
     58 	struct event_overlapped *eo =
     59 	    EVUTIL_UPCAST(o, struct event_overlapped, overlapped);
     60 	eo->cb(eo, completion_key, nBytes, ok);
     61 }
     62 
     63 static void
     64 loop(void *port_)
     65 {
     66 	struct event_iocp_port *port = port_;
     67 	long ms = port->ms;
     68 	HANDLE p = port->port;
     69 
     70 	if (ms <= 0)
     71 		ms = INFINITE;
     72 
     73 	while (1) {
     74 		OVERLAPPED *overlapped=NULL;
     75 		ULONG_PTR key=0;
     76 		DWORD bytes=0;
     77 		int ok = GetQueuedCompletionStatus(p, &bytes, &key,
     78 			&overlapped, ms);
     79 		EnterCriticalSection(&port->lock);
     80 		if (port->shutdown) {
     81 			if (--port->n_live_threads == 0)
     82 				ReleaseSemaphore(port->shutdownSemaphore, 1,
     83 						NULL);
     84 			LeaveCriticalSection(&port->lock);
     85 			return;
     86 		}
     87 		LeaveCriticalSection(&port->lock);
     88 
     89 		if (key != NOTIFICATION_KEY && overlapped)
     90 			handle_entry(overlapped, key, bytes, ok);
     91 		else if (!overlapped)
     92 			break;
     93 	}
     94 	event_warnx("GetQueuedCompletionStatus exited with no event.");
     95 	EnterCriticalSection(&port->lock);
     96 	if (--port->n_live_threads == 0)
     97 		ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
     98 	LeaveCriticalSection(&port->lock);
     99 }
    100 
    101 int
    102 event_iocp_port_associate_(struct event_iocp_port *port, evutil_socket_t fd,
    103     ev_uintptr_t key)
    104 {
    105 	HANDLE h;
    106 	h = CreateIoCompletionPort((HANDLE)fd, port->port, key, port->n_threads);
    107 	if (!h)
    108 		return -1;
    109 	return 0;
    110 }
    111 
    112 static void *
    113 get_extension_function(SOCKET s, const GUID *which_fn)
    114 {
    115 	void *ptr = NULL;
    116 	DWORD bytes=0;
    117 	WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER,
    118 	    (GUID*)which_fn, sizeof(*which_fn),
    119 	    &ptr, sizeof(ptr),
    120 	    &bytes, NULL, NULL);
    121 
    122 	/* No need to detect errors here: if ptr is set, then we have a good
    123 	   function pointer.  Otherwise, we should behave as if we had no
    124 	   function pointer.
    125 	*/
    126 	return ptr;
    127 }
    128 
    129 /* Mingw doesn't have these in its mswsock.h.  The values are copied from
    130    wine.h.   Perhaps if we copy them exactly, the cargo will come again.
    131 */
    132 #ifndef WSAID_ACCEPTEX
    133 #define WSAID_ACCEPTEX \
    134 	{0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
    135 #endif
    136 #ifndef WSAID_CONNECTEX
    137 #define WSAID_CONNECTEX \
    138 	{0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
    139 #endif
    140 #ifndef WSAID_GETACCEPTEXSOCKADDRS
    141 #define WSAID_GETACCEPTEXSOCKADDRS \
    142 	{0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
    143 #endif
    144 
    145 static int extension_fns_initialized = 0;
    146 
    147 static void
    148 init_extension_functions(struct win32_extension_fns *ext)
    149 {
    150 	const GUID acceptex = WSAID_ACCEPTEX;
    151 	const GUID connectex = WSAID_CONNECTEX;
    152 	const GUID getacceptexsockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
    153 	SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
    154 	if (s == INVALID_SOCKET)
    155 		return;
    156 	ext->AcceptEx = get_extension_function(s, &acceptex);
    157 	ext->ConnectEx = get_extension_function(s, &connectex);
    158 	ext->GetAcceptExSockaddrs = get_extension_function(s,
    159 	    &getacceptexsockaddrs);
    160 	closesocket(s);
    161 
    162 	extension_fns_initialized = 1;
    163 }
    164 
    165 static struct win32_extension_fns the_extension_fns;
    166 
    167 const struct win32_extension_fns *
    168 event_get_win32_extension_fns_(void)
    169 {
    170 	return &the_extension_fns;
    171 }
    172 
    173 #define N_CPUS_DEFAULT 2
    174 
    175 struct event_iocp_port *
    176 event_iocp_port_launch_(int n_cpus)
    177 {
    178 	struct event_iocp_port *port;
    179 	int i;
    180 
    181 	if (!extension_fns_initialized)
    182 		init_extension_functions(&the_extension_fns);
    183 
    184 	if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
    185 		return NULL;
    186 
    187 	if (n_cpus <= 0)
    188 		n_cpus = N_CPUS_DEFAULT;
    189 	port->n_threads = n_cpus * 2;
    190 	port->threads = mm_calloc(port->n_threads, sizeof(HANDLE));
    191 	if (!port->threads)
    192 		goto err;
    193 
    194 	port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
    195 			n_cpus);
    196 	port->ms = -1;
    197 	if (!port->port)
    198 		goto err;
    199 
    200 	port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
    201 	if (!port->shutdownSemaphore)
    202 		goto err;
    203 
    204 	for (i=0; i<port->n_threads; ++i) {
    205 		ev_uintptr_t th = _beginthread(loop, 0, port);
    206 		if (th == (ev_uintptr_t)-1)
    207 			goto err;
    208 		port->threads[i] = (HANDLE)th;
    209 		++port->n_live_threads;
    210 	}
    211 
    212 	InitializeCriticalSectionAndSpinCount(&port->lock, 1000);
    213 
    214 	return port;
    215 err:
    216 	if (port->port)
    217 		CloseHandle(port->port);
    218 	if (port->threads)
    219 		mm_free(port->threads);
    220 	if (port->shutdownSemaphore)
    221 		CloseHandle(port->shutdownSemaphore);
    222 	mm_free(port);
    223 	return NULL;
    224 }
    225 
    226 static void
    227 event_iocp_port_unlock_and_free_(struct event_iocp_port *port)
    228 {
    229 	DeleteCriticalSection(&port->lock);
    230 	CloseHandle(port->port);
    231 	CloseHandle(port->shutdownSemaphore);
    232 	mm_free(port->threads);
    233 	mm_free(port);
    234 }
    235 
    236 static int
    237 event_iocp_notify_all(struct event_iocp_port *port)
    238 {
    239 	int i, r, ok=1;
    240 	for (i=0; i<port->n_threads; ++i) {
    241 		r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY,
    242 		    NULL);
    243 		if (!r)
    244 			ok = 0;
    245 	}
    246 	return ok ? 0 : -1;
    247 }
    248 
    249 int
    250 event_iocp_shutdown_(struct event_iocp_port *port, long waitMsec)
    251 {
    252 	DWORD ms = INFINITE;
    253 	int n;
    254 
    255 	EnterCriticalSection(&port->lock);
    256 	port->shutdown = 1;
    257 	LeaveCriticalSection(&port->lock);
    258 	event_iocp_notify_all(port);
    259 
    260 	if (waitMsec >= 0)
    261 		ms = waitMsec;
    262 
    263 	WaitForSingleObject(port->shutdownSemaphore, ms);
    264 	EnterCriticalSection(&port->lock);
    265 	n = port->n_live_threads;
    266 	LeaveCriticalSection(&port->lock);
    267 	if (n == 0) {
    268 		event_iocp_port_unlock_and_free_(port);
    269 		return 0;
    270 	} else {
    271 		return -1;
    272 	}
    273 }
    274 
    275 int
    276 event_iocp_activate_overlapped_(
    277     struct event_iocp_port *port, struct event_overlapped *o,
    278     ev_uintptr_t key, ev_uint32_t n)
    279 {
    280 	BOOL r;
    281 
    282 	r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped);
    283 	return (r==0) ? -1 : 0;
    284 }
    285 
    286 struct event_iocp_port *
    287 event_base_get_iocp_(struct event_base *base)
    288 {
    289 #ifdef _WIN32
    290 	return base->iocp;
    291 #else
    292 	return NULL;
    293 #endif
    294 }
    295