Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/iomgr/port.h"
     22 
     23 #ifdef GRPC_UV
     24 #include <limits.h>
     25 #include <string.h>
     26 
     27 #include <grpc/slice_buffer.h>
     28 
     29 #include <grpc/support/alloc.h>
     30 #include <grpc/support/log.h>
     31 #include <grpc/support/string_util.h>
     32 
     33 #include "src/core/lib/gpr/string.h"
     34 #include "src/core/lib/iomgr/error.h"
     35 #include "src/core/lib/iomgr/iomgr_custom.h"
     36 #include "src/core/lib/iomgr/network_status_tracker.h"
     37 #include "src/core/lib/iomgr/resolve_address_custom.h"
     38 #include "src/core/lib/iomgr/resource_quota.h"
     39 #include "src/core/lib/iomgr/tcp_custom.h"
     40 #include "src/core/lib/slice/slice_internal.h"
     41 #include "src/core/lib/slice/slice_string_helpers.h"
     42 
     43 #include <uv.h>
     44 
     45 #define IGNORE_CONST(addr) ((grpc_sockaddr*)(uintptr_t)(addr))
     46 
     47 typedef struct uv_socket_t {
     48   uv_connect_t connect_req;
     49   uv_write_t write_req;
     50   uv_shutdown_t shutdown_req;
     51   uv_tcp_t* handle;
     52   uv_buf_t* write_buffers;
     53 
     54   char* read_buf;
     55   size_t read_len;
     56 
     57   bool pending_connection;
     58   grpc_custom_socket* accept_socket;
     59   grpc_error* accept_error;
     60 
     61   grpc_custom_connect_callback connect_cb;
     62   grpc_custom_write_callback write_cb;
     63   grpc_custom_read_callback read_cb;
     64   grpc_custom_accept_callback accept_cb;
     65   grpc_custom_close_callback close_cb;
     66 
     67 } uv_socket_t;
     68 
     69 static grpc_error* tcp_error_create(const char* desc, int status) {
     70   if (status == 0) {
     71     return GRPC_ERROR_NONE;
     72   }
     73   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc);
     74   /* All tcp errors are marked with UNAVAILABLE so that application may
     75    * choose to retry. */
     76   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
     77                              GRPC_STATUS_UNAVAILABLE);
     78   return grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
     79                             grpc_slice_from_static_string(uv_strerror(status)));
     80 }
     81 
     82 static void uv_socket_destroy(grpc_custom_socket* socket) {
     83   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
     84   gpr_free(uv_socket->handle);
     85   gpr_free(uv_socket);
     86 }
     87 
     88 static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size,
     89                          uv_buf_t* buf) {
     90   uv_socket_t* uv_socket =
     91       (uv_socket_t*)((grpc_custom_socket*)handle->data)->impl;
     92   (void)suggested_size;
     93   buf->base = uv_socket->read_buf;
     94   buf->len = uv_socket->read_len;
     95 }
     96 
     97 static void uv_read_callback(uv_stream_t* stream, ssize_t nread,
     98                              const uv_buf_t* buf) {
     99   grpc_error* error = GRPC_ERROR_NONE;
    100   if (nread == 0) {
    101     // Nothing happened. Wait for the next callback
    102     return;
    103   }
    104   // TODO(murgatroid99): figure out what the return value here means
    105   uv_read_stop(stream);
    106   if (nread == UV_EOF) {
    107     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
    108   } else if (nread < 0) {
    109     error = tcp_error_create("TCP Read failed", nread);
    110   }
    111   grpc_custom_socket* socket = (grpc_custom_socket*)stream->data;
    112   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    113   uv_socket->read_cb(socket, (size_t)nread, error);
    114 }
    115 
    116 static void uv_close_callback(uv_handle_t* handle) {
    117   grpc_custom_socket* socket = (grpc_custom_socket*)handle->data;
    118   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    119   if (uv_socket->accept_socket) {
    120     uv_socket->accept_cb(socket, uv_socket->accept_socket,
    121                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket closed"));
    122   }
    123   uv_socket->close_cb(socket);
    124 }
    125 
    126 static void uv_socket_read(grpc_custom_socket* socket, char* buffer,
    127                            size_t length, grpc_custom_read_callback read_cb) {
    128   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    129   int status;
    130   grpc_error* error;
    131   uv_socket->read_cb = read_cb;
    132   uv_socket->read_buf = buffer;
    133   uv_socket->read_len = length;
    134   // TODO(murgatroid99): figure out what the return value here means
    135   status =
    136       uv_read_start((uv_stream_t*)uv_socket->handle, (uv_alloc_cb)alloc_uv_buf,
    137                     (uv_read_cb)uv_read_callback);
    138   if (status != 0) {
    139     error = tcp_error_create("TCP Read failed at start", status);
    140     uv_socket->read_cb(socket, 0, error);
    141   }
    142 }
    143 
    144 static void uv_write_callback(uv_write_t* req, int status) {
    145   grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
    146   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    147   gpr_free(uv_socket->write_buffers);
    148   uv_socket->write_cb(socket, tcp_error_create("TCP Write failed", status));
    149 }
    150 
    151 void uv_socket_write(grpc_custom_socket* socket,
    152                      grpc_slice_buffer* write_slices,
    153                      grpc_custom_write_callback write_cb) {
    154   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    155   uv_socket->write_cb = write_cb;
    156   uv_buf_t* uv_buffers;
    157   uv_write_t* write_req;
    158 
    159   uv_buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * write_slices->count);
    160   for (size_t i = 0; i < write_slices->count; i++) {
    161     uv_buffers[i].base = (char*)GRPC_SLICE_START_PTR(write_slices->slices[i]);
    162     uv_buffers[i].len = GRPC_SLICE_LENGTH(write_slices->slices[i]);
    163   }
    164 
    165   uv_socket->write_buffers = uv_buffers;
    166   write_req = &uv_socket->write_req;
    167   write_req->data = socket;
    168   // TODO(murgatroid99): figure out what the return value here means
    169   uv_write(write_req, (uv_stream_t*)uv_socket->handle, uv_buffers,
    170            write_slices->count, uv_write_callback);
    171 }
    172 
    173 static void shutdown_callback(uv_shutdown_t* req, int status) {}
    174 
    175 static void uv_socket_shutdown(grpc_custom_socket* socket) {
    176   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    177   uv_shutdown_t* req = &uv_socket->shutdown_req;
    178   uv_shutdown(req, (uv_stream_t*)uv_socket->handle, shutdown_callback);
    179 }
    180 
    181 static void uv_socket_close(grpc_custom_socket* socket,
    182                             grpc_custom_close_callback close_cb) {
    183   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    184   uv_socket->close_cb = close_cb;
    185   uv_close((uv_handle_t*)uv_socket->handle, uv_close_callback);
    186 }
    187 
    188 static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) {
    189   uv_tcp_t* tcp = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
    190   uv_socket->handle = tcp;
    191   int status = uv_tcp_init_ex(uv_default_loop(), tcp, (unsigned int)domain);
    192   if (status != 0) {
    193     return tcp_error_create("Failed to initialize UV tcp handle", status);
    194   }
    195 #if defined(GPR_LINUX) && defined(SO_REUSEPORT)
    196   if (domain == AF_INET || domain == AF_INET6) {
    197     int enable = 1;
    198     int fd;
    199     uv_fileno((uv_handle_t*)tcp, &fd);
    200     // TODO Handle error here.
    201     setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
    202   }
    203 #endif
    204   uv_socket->write_buffers = nullptr;
    205   uv_socket->read_len = 0;
    206   uv_tcp_nodelay(uv_socket->handle, 1);
    207   // Node uses a garbage collector to call destructors, so we don't
    208   // want to hold the uv loop open with active gRPC objects.
    209   uv_unref((uv_handle_t*)uv_socket->handle);
    210   uv_socket->pending_connection = false;
    211   uv_socket->accept_socket = nullptr;
    212   uv_socket->accept_error = GRPC_ERROR_NONE;
    213   return GRPC_ERROR_NONE;
    214 }
    215 
    216 static grpc_error* uv_socket_init(grpc_custom_socket* socket, int domain) {
    217   uv_socket_t* uv_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
    218   grpc_error* error = uv_socket_init_helper(uv_socket, domain);
    219   if (error != GRPC_ERROR_NONE) {
    220     return error;
    221   }
    222   uv_socket->handle->data = socket;
    223   socket->impl = uv_socket;
    224   return GRPC_ERROR_NONE;
    225 }
    226 
    227 static grpc_error* uv_socket_getpeername(grpc_custom_socket* socket,
    228                                          const grpc_sockaddr* addr,
    229                                          int* addr_len) {
    230   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    231   int err = uv_tcp_getpeername(uv_socket->handle,
    232                                (struct sockaddr*)IGNORE_CONST(addr), addr_len);
    233   return tcp_error_create("getpeername failed", err);
    234 }
    235 
    236 static grpc_error* uv_socket_getsockname(grpc_custom_socket* socket,
    237                                          const grpc_sockaddr* addr,
    238                                          int* addr_len) {
    239   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    240   int err = uv_tcp_getsockname(uv_socket->handle,
    241                                (struct sockaddr*)IGNORE_CONST(addr), addr_len);
    242   return tcp_error_create("getsockname failed", err);
    243 }
    244 
    245 static void accept_new_connection(grpc_custom_socket* socket) {
    246   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    247   if (!uv_socket->pending_connection || !uv_socket->accept_socket) {
    248     return;
    249   }
    250   grpc_custom_socket* new_socket = uv_socket->accept_socket;
    251   grpc_error* error = uv_socket->accept_error;
    252   uv_socket->accept_socket = nullptr;
    253   uv_socket->accept_error = GRPC_ERROR_NONE;
    254   uv_socket->pending_connection = false;
    255   if (uv_socket->accept_error != GRPC_ERROR_NONE) {
    256     uv_stream_t dummy_handle;
    257     uv_accept((uv_stream_t*)uv_socket->handle, &dummy_handle);
    258     uv_socket->accept_cb(socket, new_socket, error);
    259   } else {
    260     uv_socket_t* uv_new_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
    261     uv_socket_init_helper(uv_new_socket, AF_UNSPEC);
    262     // UV documentation says this is guaranteed to succeed
    263     GPR_ASSERT(uv_accept((uv_stream_t*)uv_socket->handle,
    264                          (uv_stream_t*)uv_new_socket->handle) == 0);
    265     new_socket->impl = uv_new_socket;
    266     uv_new_socket->handle->data = new_socket;
    267     uv_socket->accept_cb(socket, new_socket, error);
    268   }
    269 }
    270 
    271 static void uv_on_connect(uv_stream_t* server, int status) {
    272   grpc_custom_socket* socket = (grpc_custom_socket*)server->data;
    273   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    274   GPR_ASSERT(!uv_socket->pending_connection);
    275   uv_socket->pending_connection = true;
    276   if (status < 0) {
    277     switch (status) {
    278       case UV_EINTR:
    279       case UV_EAGAIN:
    280         return;
    281       default:
    282         uv_socket->accept_error = tcp_error_create("accept failed", status);
    283     }
    284   }
    285   accept_new_connection(socket);
    286 }
    287 
    288 void uv_socket_accept(grpc_custom_socket* socket,
    289                       grpc_custom_socket* new_socket,
    290                       grpc_custom_accept_callback accept_cb) {
    291   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    292   uv_socket->accept_cb = accept_cb;
    293   GPR_ASSERT(uv_socket->accept_socket == nullptr);
    294   uv_socket->accept_socket = new_socket;
    295   accept_new_connection(socket);
    296 }
    297 
    298 static grpc_error* uv_socket_bind(grpc_custom_socket* socket,
    299                                   const grpc_sockaddr* addr, size_t len,
    300                                   int flags) {
    301   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    302   int status =
    303       uv_tcp_bind((uv_tcp_t*)uv_socket->handle, (struct sockaddr*)addr, 0);
    304   return tcp_error_create("Failed to bind to port", status);
    305 }
    306 
    307 static grpc_error* uv_socket_listen(grpc_custom_socket* socket) {
    308   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    309   int status =
    310       uv_listen((uv_stream_t*)uv_socket->handle, SOMAXCONN, uv_on_connect);
    311   return tcp_error_create("Failed to listen to port", status);
    312 }
    313 
    314 static void uv_tc_on_connect(uv_connect_t* req, int status) {
    315   grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
    316   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    317   grpc_error* error;
    318   if (status == UV_ECANCELED) {
    319     // This should only happen if the handle is already closed
    320     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timeout occurred");
    321   } else {
    322     error = tcp_error_create("Failed to connect to remote host", status);
    323   }
    324   uv_socket->connect_cb(socket, error);
    325 }
    326 
    327 static void uv_socket_connect(grpc_custom_socket* socket,
    328                               const grpc_sockaddr* addr, size_t len,
    329                               grpc_custom_connect_callback connect_cb) {
    330   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
    331   uv_socket->connect_cb = connect_cb;
    332   uv_socket->connect_req.data = socket;
    333   int status = uv_tcp_connect(&uv_socket->connect_req, uv_socket->handle,
    334                               (struct sockaddr*)addr, uv_tc_on_connect);
    335   if (status != 0) {
    336     // The callback will not be called
    337     uv_socket->connect_cb(socket, tcp_error_create("connect failed", status));
    338   }
    339 }
    340 
    341 static grpc_resolved_addresses* handle_addrinfo_result(
    342     struct addrinfo* result) {
    343   struct addrinfo* resp;
    344   size_t i;
    345   grpc_resolved_addresses* addresses =
    346       (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
    347   addresses->naddrs = 0;
    348   for (resp = result; resp != nullptr; resp = resp->ai_next) {
    349     addresses->naddrs++;
    350   }
    351   addresses->addrs = (grpc_resolved_address*)gpr_malloc(
    352       sizeof(grpc_resolved_address) * addresses->naddrs);
    353   for (resp = result, i = 0; resp != nullptr; resp = resp->ai_next, i++) {
    354     memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
    355     addresses->addrs[i].len = resp->ai_addrlen;
    356   }
    357   // addrinfo objects are allocated by libuv (e.g. in uv_getaddrinfo)
    358   // and not by gpr_malloc
    359   uv_freeaddrinfo(result);
    360   return addresses;
    361 }
    362 
    363 static void uv_resolve_callback(uv_getaddrinfo_t* req, int status,
    364                                 struct addrinfo* res) {
    365   grpc_custom_resolver* r = (grpc_custom_resolver*)req->data;
    366   gpr_free(req);
    367   grpc_resolved_addresses* result = nullptr;
    368   if (status == 0) {
    369     result = handle_addrinfo_result(res);
    370   }
    371   grpc_custom_resolve_callback(r, result,
    372                                tcp_error_create("getaddrinfo failed", status));
    373 }
    374 
    375 static grpc_error* uv_resolve(char* host, char* port,
    376                               grpc_resolved_addresses** result) {
    377   int status;
    378   uv_getaddrinfo_t req;
    379   struct addrinfo hints;
    380   memset(&hints, 0, sizeof(struct addrinfo));
    381   hints.ai_family = AF_UNSPEC;     /* ipv4 or ipv6 */
    382   hints.ai_socktype = SOCK_STREAM; /* stream socket */
    383   hints.ai_flags = AI_PASSIVE;     /* for wildcard IP address */
    384   status = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
    385   if (status != 0) {
    386     *result = nullptr;
    387   } else {
    388     *result = handle_addrinfo_result(req.addrinfo);
    389   }
    390   return tcp_error_create("getaddrinfo failed", status);
    391 }
    392 
    393 static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) {
    394   int status;
    395   uv_getaddrinfo_t* req =
    396       (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
    397   req->data = r;
    398   struct addrinfo hints;
    399   memset(&hints, 0, sizeof(struct addrinfo));
    400   hints.ai_family = GRPC_AF_UNSPEC;     /* ipv4 or ipv6 */
    401   hints.ai_socktype = GRPC_SOCK_STREAM; /* stream socket */
    402   hints.ai_flags = GRPC_AI_PASSIVE;     /* for wildcard IP address */
    403   status = uv_getaddrinfo(uv_default_loop(), req, uv_resolve_callback, host,
    404                           port, &hints);
    405   if (status != 0) {
    406     gpr_free(req);
    407     grpc_error* error = tcp_error_create("getaddrinfo failed", status);
    408     grpc_custom_resolve_callback(r, NULL, error);
    409   }
    410 }
    411 
    412 grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async};
    413 
    414 grpc_socket_vtable grpc_uv_socket_vtable = {
    415     uv_socket_init,     uv_socket_connect,     uv_socket_destroy,
    416     uv_socket_shutdown, uv_socket_close,       uv_socket_write,
    417     uv_socket_read,     uv_socket_getpeername, uv_socket_getsockname,
    418     uv_socket_bind,     uv_socket_listen,      uv_socket_accept};
    419 
    420 #endif
    421