Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/iomgr/port.h"
     22 
     23 #ifdef GRPC_WINSOCK_SOCKET
     24 
     25 #include <winsock2.h>
     26 #include <limits>
     27 
     28 #include <grpc/support/alloc.h>
     29 #include <grpc/support/log.h>
     30 #include <grpc/support/log_windows.h>
     31 
     32 #include "src/core/lib/debug/stats.h"
     33 #include "src/core/lib/gprpp/thd.h"
     34 #include "src/core/lib/iomgr/iocp_windows.h"
     35 #include "src/core/lib/iomgr/iomgr_internal.h"
     36 #include "src/core/lib/iomgr/socket_windows.h"
     37 #include "src/core/lib/iomgr/timer.h"
     38 
     39 static ULONG g_iocp_kick_token;
     40 static OVERLAPPED g_iocp_custom_overlap;
     41 
     42 static gpr_atm g_custom_events = 0;
     43 
     44 static HANDLE g_iocp;
     45 
     46 static DWORD deadline_to_millis_timeout(grpc_millis deadline) {
     47   if (deadline == GRPC_MILLIS_INF_FUTURE) {
     48     return INFINITE;
     49   }
     50   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
     51   if (deadline < now) return 0;
     52   grpc_millis timeout = deadline - now;
     53   if (timeout > std::numeric_limits<DWORD>::max()) return INFINITE;
     54   return static_cast<DWORD>(deadline - now);
     55 }
     56 
     57 grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
     58   BOOL success;
     59   DWORD bytes = 0;
     60   DWORD flags = 0;
     61   ULONG_PTR completion_key;
     62   LPOVERLAPPED overlapped;
     63   grpc_winsocket* socket;
     64   grpc_winsocket_callback_info* info;
     65   GRPC_STATS_INC_SYSCALL_POLL();
     66   success =
     67       GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped,
     68                                 deadline_to_millis_timeout(deadline));
     69   grpc_core::ExecCtx::Get()->InvalidateNow();
     70   if (success == 0 && overlapped == NULL) {
     71     return GRPC_IOCP_WORK_TIMEOUT;
     72   }
     73   GPR_ASSERT(completion_key && overlapped);
     74   if (overlapped == &g_iocp_custom_overlap) {
     75     gpr_atm_full_fetch_add(&g_custom_events, -1);
     76     if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
     77       /* We were awoken from a kick. */
     78       return GRPC_IOCP_WORK_KICK;
     79     }
     80     gpr_log(GPR_ERROR, "Unknown custom completion key.");
     81     abort();
     82   }
     83 
     84   socket = (grpc_winsocket*)completion_key;
     85   if (overlapped == &socket->write_info.overlapped) {
     86     info = &socket->write_info;
     87   } else if (overlapped == &socket->read_info.overlapped) {
     88     info = &socket->read_info;
     89   } else {
     90     abort();
     91   }
     92   if (socket->shutdown_called) {
     93     info->bytes_transfered = 0;
     94     info->wsa_error = WSA_OPERATION_ABORTED;
     95   } else {
     96     success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
     97                                      FALSE, &flags);
     98     info->bytes_transfered = bytes;
     99     info->wsa_error = success ? 0 : WSAGetLastError();
    100   }
    101   GPR_ASSERT(overlapped == &info->overlapped);
    102   grpc_socket_become_ready(socket, info);
    103   return GRPC_IOCP_WORK_WORK;
    104 }
    105 
    106 void grpc_iocp_init(void) {
    107   g_iocp =
    108       CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
    109   GPR_ASSERT(g_iocp);
    110 }
    111 
    112 void grpc_iocp_kick(void) {
    113   BOOL success;
    114 
    115   gpr_atm_full_fetch_add(&g_custom_events, 1);
    116   success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
    117                                        &g_iocp_custom_overlap);
    118   GPR_ASSERT(success);
    119 }
    120 
    121 void grpc_iocp_flush(void) {
    122   grpc_core::ExecCtx exec_ctx;
    123   grpc_iocp_work_status work_status;
    124 
    125   do {
    126     work_status = grpc_iocp_work(GRPC_MILLIS_INF_PAST);
    127   } while (work_status == GRPC_IOCP_WORK_KICK ||
    128            grpc_core::ExecCtx::Get()->Flush());
    129 }
    130 
    131 void grpc_iocp_shutdown(void) {
    132   grpc_core::ExecCtx exec_ctx;
    133   while (gpr_atm_acq_load(&g_custom_events)) {
    134     grpc_iocp_work(GRPC_MILLIS_INF_FUTURE);
    135     grpc_core::ExecCtx::Get()->Flush();
    136   }
    137 
    138   GPR_ASSERT(CloseHandle(g_iocp));
    139 }
    140 
    141 void grpc_iocp_add_socket(grpc_winsocket* socket) {
    142   HANDLE ret;
    143   if (socket->added_to_iocp) return;
    144   ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
    145                                (uintptr_t)socket, 0);
    146   if (!ret) {
    147     char* utf8_message = gpr_format_message(WSAGetLastError());
    148     gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
    149     gpr_free(utf8_message);
    150     __debugbreak();
    151     abort();
    152   }
    153   socket->added_to_iocp = 1;
    154   GPR_ASSERT(ret == g_iocp);
    155 }
    156 
    157 #endif /* GRPC_WINSOCK_SOCKET */
    158