Home | History | Annotate | Download | only in libevent
      1 /*
      2  * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
      3  *
      4  * Redistribution and use in source and binary forms, with or without
      5  * modification, are permitted provided that the following conditions
      6  * are met:
      7  * 1. Redistributions of source code must retain the above copyright
      8  *    notice, this list of conditions and the following disclaimer.
      9  * 2. Redistributions in binary form must reproduce the above copyright
     10  *    notice, this list of conditions and the following disclaimer in the
     11  *    documentation and/or other materials provided with the distribution.
     12  * 3. The name of the author may not be used to endorse or promote products
     13  *    derived from this software without specific prior written permission.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     25  */
     26 
     27 #ifndef _WIN32_WINNT
     28 /* Minimum required for InitializeCriticalSectionAndSpinCount */
     29 #define _WIN32_WINNT 0x0403
     30 #endif
     31 #include <winsock2.h>
     32 #include <windows.h>
     33 #include <process.h>
     34 #include <stdio.h>
     35 #include <mswsock.h>
     36 
     37 #include "event2/util.h"
     38 #include "util-internal.h"
     39 #include "iocp-internal.h"
     40 #include "log-internal.h"
     41 #include "mm-internal.h"
     42 #include "event-internal.h"
     43 #include "evthread-internal.h"
     44 
     45 #define NOTIFICATION_KEY ((ULONG_PTR)-1)
     46 
     47 void
     48 event_overlapped_init(struct event_overlapped *o, iocp_callback cb)
     49 {
     50 	memset(o, 0, sizeof(struct event_overlapped));
     51 	o->cb = cb;
     52 }
     53 
     54 static void
     55 handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok)
     56 {
     57 	struct event_overlapped *eo =
     58 	    EVUTIL_UPCAST(o, struct event_overlapped, overlapped);
     59 	eo->cb(eo, completion_key, nBytes, ok);
     60 }
     61 
     62 static void
     63 loop(void *_port)
     64 {
     65 	struct event_iocp_port *port = _port;
     66 	long ms = port->ms;
     67 	HANDLE p = port->port;
     68 
     69 	if (ms <= 0)
     70 		ms = INFINITE;
     71 
     72 	while (1) {
     73 		OVERLAPPED *overlapped=NULL;
     74 		ULONG_PTR key=0;
     75 		DWORD bytes=0;
     76 		int ok = GetQueuedCompletionStatus(p, &bytes, &key,
     77 			&overlapped, ms);
     78 		EnterCriticalSection(&port->lock);
     79 		if (port->shutdown) {
     80 			if (--port->n_live_threads == 0)
     81 				ReleaseSemaphore(port->shutdownSemaphore, 1,
     82 						NULL);
     83 			LeaveCriticalSection(&port->lock);
     84 			return;
     85 		}
     86 		LeaveCriticalSection(&port->lock);
     87 
     88 		if (key != NOTIFICATION_KEY && overlapped)
     89 			handle_entry(overlapped, key, bytes, ok);
     90 		else if (!overlapped)
     91 			break;
     92 	}
     93 	event_warnx("GetQueuedCompletionStatus exited with no event.");
     94 	EnterCriticalSection(&port->lock);
     95 	if (--port->n_live_threads == 0)
     96 		ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
     97 	LeaveCriticalSection(&port->lock);
     98 }
     99 
    100 int
    101 event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd,
    102     ev_uintptr_t key)
    103 {
    104 	HANDLE h;
    105 	h = CreateIoCompletionPort((HANDLE)fd, port->port, key, port->n_threads);
    106 	if (!h)
    107 		return -1;
    108 	return 0;
    109 }
    110 
    111 static void *
    112 get_extension_function(SOCKET s, const GUID *which_fn)
    113 {
    114 	void *ptr = NULL;
    115 	DWORD bytes=0;
    116 	WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER,
    117 	    (GUID*)which_fn, sizeof(*which_fn),
    118 	    &ptr, sizeof(ptr),
    119 	    &bytes, NULL, NULL);
    120 
    121 	/* No need to detect errors here: if ptr is set, then we have a good
    122 	   function pointer.  Otherwise, we should behave as if we had no
    123 	   function pointer.
    124 	*/
    125 	return ptr;
    126 }
    127 
    128 /* Mingw doesn't have these in its mswsock.h.  The values are copied from
    129    wine.h.   Perhaps if we copy them exactly, the cargo will come again.
    130 */
    131 #ifndef WSAID_ACCEPTEX
    132 #define WSAID_ACCEPTEX \
    133 	{0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
    134 #endif
    135 #ifndef WSAID_CONNECTEX
    136 #define WSAID_CONNECTEX \
    137 	{0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
    138 #endif
    139 #ifndef WSAID_GETACCEPTEXSOCKADDRS
    140 #define WSAID_GETACCEPTEXSOCKADDRS \
    141 	{0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
    142 #endif
    143 
    144 static int extension_fns_initialized = 0;
    145 
    146 static void
    147 init_extension_functions(struct win32_extension_fns *ext)
    148 {
    149 	const GUID acceptex = WSAID_ACCEPTEX;
    150 	const GUID connectex = WSAID_CONNECTEX;
    151 	const GUID getacceptexsockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
    152 	SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
    153 	if (s == INVALID_SOCKET)
    154 		return;
    155 	ext->AcceptEx = get_extension_function(s, &acceptex);
    156 	ext->ConnectEx = get_extension_function(s, &connectex);
    157 	ext->GetAcceptExSockaddrs = get_extension_function(s,
    158 	    &getacceptexsockaddrs);
    159 	closesocket(s);
    160 
    161 	extension_fns_initialized = 1;
    162 }
    163 
    164 static struct win32_extension_fns the_extension_fns;
    165 
    166 const struct win32_extension_fns *
    167 event_get_win32_extension_fns(void)
    168 {
    169 	return &the_extension_fns;
    170 }
    171 
    172 #define N_CPUS_DEFAULT 2
    173 
    174 struct event_iocp_port *
    175 event_iocp_port_launch(int n_cpus)
    176 {
    177 	struct event_iocp_port *port;
    178 	int i;
    179 
    180 	if (!extension_fns_initialized)
    181 		init_extension_functions(&the_extension_fns);
    182 
    183 	if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
    184 		return NULL;
    185 
    186 	if (n_cpus <= 0)
    187 		n_cpus = N_CPUS_DEFAULT;
    188 	port->n_threads = n_cpus * 2;
    189 	port->threads = mm_calloc(port->n_threads, sizeof(HANDLE));
    190 	if (!port->threads)
    191 		goto err;
    192 
    193 	port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
    194 			n_cpus);
    195 	port->ms = -1;
    196 	if (!port->port)
    197 		goto err;
    198 
    199 	port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
    200 	if (!port->shutdownSemaphore)
    201 		goto err;
    202 
    203 	for (i=0; i<port->n_threads; ++i) {
    204 		ev_uintptr_t th = _beginthread(loop, 0, port);
    205 		if (th == (ev_uintptr_t)-1)
    206 			goto err;
    207 		port->threads[i] = (HANDLE)th;
    208 		++port->n_live_threads;
    209 	}
    210 
    211 	InitializeCriticalSectionAndSpinCount(&port->lock, 1000);
    212 
    213 	return port;
    214 err:
    215 	if (port->port)
    216 		CloseHandle(port->port);
    217 	if (port->threads)
    218 		mm_free(port->threads);
    219 	if (port->shutdownSemaphore)
    220 		CloseHandle(port->shutdownSemaphore);
    221 	mm_free(port);
    222 	return NULL;
    223 }
    224 
    225 static void
    226 _event_iocp_port_unlock_and_free(struct event_iocp_port *port)
    227 {
    228 	DeleteCriticalSection(&port->lock);
    229 	CloseHandle(port->port);
    230 	CloseHandle(port->shutdownSemaphore);
    231 	mm_free(port->threads);
    232 	mm_free(port);
    233 }
    234 
    235 static int
    236 event_iocp_notify_all(struct event_iocp_port *port)
    237 {
    238 	int i, r, ok=1;
    239 	for (i=0; i<port->n_threads; ++i) {
    240 		r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY,
    241 		    NULL);
    242 		if (!r)
    243 			ok = 0;
    244 	}
    245 	return ok ? 0 : -1;
    246 }
    247 
    248 int
    249 event_iocp_shutdown(struct event_iocp_port *port, long waitMsec)
    250 {
    251 	DWORD ms = INFINITE;
    252 	int n;
    253 
    254 	EnterCriticalSection(&port->lock);
    255 	port->shutdown = 1;
    256 	LeaveCriticalSection(&port->lock);
    257 	event_iocp_notify_all(port);
    258 
    259 	if (waitMsec >= 0)
    260 		ms = waitMsec;
    261 
    262 	WaitForSingleObject(port->shutdownSemaphore, ms);
    263 	EnterCriticalSection(&port->lock);
    264 	n = port->n_live_threads;
    265 	LeaveCriticalSection(&port->lock);
    266 	if (n == 0) {
    267 		_event_iocp_port_unlock_and_free(port);
    268 		return 0;
    269 	} else {
    270 		return -1;
    271 	}
    272 }
    273 
    274 int
    275 event_iocp_activate_overlapped(
    276     struct event_iocp_port *port, struct event_overlapped *o,
    277     ev_uintptr_t key, ev_uint32_t n)
    278 {
    279 	BOOL r;
    280 
    281 	r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped);
    282 	return (r==0) ? -1 : 0;
    283 }
    284 
    285 struct event_iocp_port *
    286 event_base_get_iocp(struct event_base *base)
    287 {
    288 #ifdef WIN32
    289 	return base->iocp;
    290 #else
    291 	return NULL;
    292 #endif
    293 }
    294