1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/iomgr/port.h" 22 23 #ifdef GRPC_WINSOCK_SOCKET 24 25 #include <winsock2.h> 26 #include <limits> 27 28 #include <grpc/support/alloc.h> 29 #include <grpc/support/log.h> 30 #include <grpc/support/log_windows.h> 31 32 #include "src/core/lib/debug/stats.h" 33 #include "src/core/lib/gprpp/thd.h" 34 #include "src/core/lib/iomgr/iocp_windows.h" 35 #include "src/core/lib/iomgr/iomgr_internal.h" 36 #include "src/core/lib/iomgr/socket_windows.h" 37 #include "src/core/lib/iomgr/timer.h" 38 39 static ULONG g_iocp_kick_token; 40 static OVERLAPPED g_iocp_custom_overlap; 41 42 static gpr_atm g_custom_events = 0; 43 44 static HANDLE g_iocp; 45 46 static DWORD deadline_to_millis_timeout(grpc_millis deadline) { 47 if (deadline == GRPC_MILLIS_INF_FUTURE) { 48 return INFINITE; 49 } 50 grpc_millis now = grpc_core::ExecCtx::Get()->Now(); 51 if (deadline < now) return 0; 52 grpc_millis timeout = deadline - now; 53 if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE; 54 return static_cast<DWORD>(deadline - now); 55 } 56 57 grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) { 58 BOOL success; 59 DWORD bytes = 0; 60 DWORD flags = 0; 61 ULONG_PTR completion_key; 62 LPOVERLAPPED overlapped; 63 grpc_winsocket* socket; 64 grpc_winsocket_callback_info* info; 65 GRPC_STATS_INC_SYSCALL_POLL(); 66 success = 67 GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, 68 deadline_to_millis_timeout(deadline)); 69 grpc_core::ExecCtx::Get()->InvalidateNow(); 70 if (success == 0 && overlapped == NULL) { 71 return GRPC_IOCP_WORK_TIMEOUT; 72 } 73 GPR_ASSERT(completion_key && overlapped); 74 if (overlapped == &g_iocp_custom_overlap) { 75 gpr_atm_full_fetch_add(&g_custom_events, -1); 76 if (completion_key == (ULONG_PTR)&g_iocp_kick_token) { 77 /* We were awoken from a kick. */ 78 return GRPC_IOCP_WORK_KICK; 79 } 80 gpr_log(GPR_ERROR, "Unknown custom completion key."); 81 abort(); 82 } 83 84 socket = (grpc_winsocket*)completion_key; 85 if (overlapped == &socket->write_info.overlapped) { 86 info = &socket->write_info; 87 } else if (overlapped == &socket->read_info.overlapped) { 88 info = &socket->read_info; 89 } else { 90 abort(); 91 } 92 if (socket->shutdown_called) { 93 info->bytes_transfered = 0; 94 info->wsa_error = WSA_OPERATION_ABORTED; 95 } else { 96 success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, 97 FALSE, &flags); 98 info->bytes_transfered = bytes; 99 info->wsa_error = success ? 0 : WSAGetLastError(); 100 } 101 GPR_ASSERT(overlapped == &info->overlapped); 102 grpc_socket_become_ready(socket, info); 103 return GRPC_IOCP_WORK_WORK; 104 } 105 106 void grpc_iocp_init(void) { 107 g_iocp = 108 CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); 109 GPR_ASSERT(g_iocp); 110 } 111 112 void grpc_iocp_kick(void) { 113 BOOL success; 114 115 gpr_atm_full_fetch_add(&g_custom_events, 1); 116 success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token, 117 &g_iocp_custom_overlap); 118 GPR_ASSERT(success); 119 } 120 121 void grpc_iocp_flush(void) { 122 grpc_core::ExecCtx exec_ctx; 123 grpc_iocp_work_status work_status; 124 125 do { 126 work_status = grpc_iocp_work(GRPC_MILLIS_INF_PAST); 127 } while (work_status == GRPC_IOCP_WORK_KICK || 128 grpc_core::ExecCtx::Get()->Flush()); 129 } 130 131 void grpc_iocp_shutdown(void) { 132 grpc_core::ExecCtx exec_ctx; 133 while (gpr_atm_acq_load(&g_custom_events)) { 134 grpc_iocp_work(GRPC_MILLIS_INF_FUTURE); 135 grpc_core::ExecCtx::Get()->Flush(); 136 } 137 138 GPR_ASSERT(CloseHandle(g_iocp)); 139 } 140 141 void grpc_iocp_add_socket(grpc_winsocket* socket) { 142 HANDLE ret; 143 if (socket->added_to_iocp) return; 144 ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp, 145 (uintptr_t)socket, 0); 146 if (!ret) { 147 char* utf8_message = gpr_format_message(WSAGetLastError()); 148 gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message); 149 gpr_free(utf8_message); 150 __debugbreak(); 151 abort(); 152 } 153 socket->added_to_iocp = 1; 154 GPR_ASSERT(ret == g_iocp); 155 } 156 157 #endif /* GRPC_WINSOCK_SOCKET */ 158