1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/iomgr/port.h" 22 23 #ifdef GRPC_UV 24 #include <limits.h> 25 #include <string.h> 26 27 #include <grpc/slice_buffer.h> 28 29 #include <grpc/support/alloc.h> 30 #include <grpc/support/log.h> 31 #include <grpc/support/string_util.h> 32 33 #include "src/core/lib/gpr/string.h" 34 #include "src/core/lib/iomgr/error.h" 35 #include "src/core/lib/iomgr/iomgr_custom.h" 36 #include "src/core/lib/iomgr/network_status_tracker.h" 37 #include "src/core/lib/iomgr/resolve_address_custom.h" 38 #include "src/core/lib/iomgr/resource_quota.h" 39 #include "src/core/lib/iomgr/tcp_custom.h" 40 #include "src/core/lib/slice/slice_internal.h" 41 #include "src/core/lib/slice/slice_string_helpers.h" 42 43 #include <uv.h> 44 45 #define IGNORE_CONST(addr) ((grpc_sockaddr*)(uintptr_t)(addr)) 46 47 typedef struct uv_socket_t { 48 uv_connect_t connect_req; 49 uv_write_t write_req; 50 uv_shutdown_t shutdown_req; 51 uv_tcp_t* handle; 52 uv_buf_t* write_buffers; 53 54 char* read_buf; 55 size_t read_len; 56 57 bool pending_connection; 58 grpc_custom_socket* accept_socket; 59 grpc_error* accept_error; 60 61 grpc_custom_connect_callback connect_cb; 62 grpc_custom_write_callback write_cb; 63 grpc_custom_read_callback read_cb; 64 grpc_custom_accept_callback accept_cb; 65 grpc_custom_close_callback close_cb; 66 67 } uv_socket_t; 68 69 static grpc_error* tcp_error_create(const char* desc, int status) { 70 if (status == 0) { 71 return GRPC_ERROR_NONE; 72 } 73 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc); 74 /* All tcp errors are marked with UNAVAILABLE so that application may 75 * choose to retry. */ 76 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, 77 GRPC_STATUS_UNAVAILABLE); 78 return grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, 79 grpc_slice_from_static_string(uv_strerror(status))); 80 } 81 82 static void uv_socket_destroy(grpc_custom_socket* socket) { 83 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 84 gpr_free(uv_socket->handle); 85 gpr_free(uv_socket); 86 } 87 88 static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size, 89 uv_buf_t* buf) { 90 uv_socket_t* uv_socket = 91 (uv_socket_t*)((grpc_custom_socket*)handle->data)->impl; 92 (void)suggested_size; 93 buf->base = uv_socket->read_buf; 94 buf->len = uv_socket->read_len; 95 } 96 97 static void uv_read_callback(uv_stream_t* stream, ssize_t nread, 98 const uv_buf_t* buf) { 99 grpc_error* error = GRPC_ERROR_NONE; 100 if (nread == 0) { 101 // Nothing happened. Wait for the next callback 102 return; 103 } 104 // TODO(murgatroid99): figure out what the return value here means 105 uv_read_stop(stream); 106 if (nread == UV_EOF) { 107 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); 108 } else if (nread < 0) { 109 error = tcp_error_create("TCP Read failed", nread); 110 } 111 grpc_custom_socket* socket = (grpc_custom_socket*)stream->data; 112 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 113 uv_socket->read_cb(socket, (size_t)nread, error); 114 } 115 116 static void uv_close_callback(uv_handle_t* handle) { 117 grpc_custom_socket* socket = (grpc_custom_socket*)handle->data; 118 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 119 if (uv_socket->accept_socket) { 120 uv_socket->accept_cb(socket, uv_socket->accept_socket, 121 GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket closed")); 122 } 123 uv_socket->close_cb(socket); 124 } 125 126 static void uv_socket_read(grpc_custom_socket* socket, char* buffer, 127 size_t length, grpc_custom_read_callback read_cb) { 128 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 129 int status; 130 grpc_error* error; 131 uv_socket->read_cb = read_cb; 132 uv_socket->read_buf = buffer; 133 uv_socket->read_len = length; 134 // TODO(murgatroid99): figure out what the return value here means 135 status = 136 uv_read_start((uv_stream_t*)uv_socket->handle, (uv_alloc_cb)alloc_uv_buf, 137 (uv_read_cb)uv_read_callback); 138 if (status != 0) { 139 error = tcp_error_create("TCP Read failed at start", status); 140 uv_socket->read_cb(socket, 0, error); 141 } 142 } 143 144 static void uv_write_callback(uv_write_t* req, int status) { 145 grpc_custom_socket* socket = (grpc_custom_socket*)req->data; 146 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 147 gpr_free(uv_socket->write_buffers); 148 uv_socket->write_cb(socket, tcp_error_create("TCP Write failed", status)); 149 } 150 151 void uv_socket_write(grpc_custom_socket* socket, 152 grpc_slice_buffer* write_slices, 153 grpc_custom_write_callback write_cb) { 154 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 155 uv_socket->write_cb = write_cb; 156 uv_buf_t* uv_buffers; 157 uv_write_t* write_req; 158 159 uv_buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * write_slices->count); 160 for (size_t i = 0; i < write_slices->count; i++) { 161 uv_buffers[i].base = (char*)GRPC_SLICE_START_PTR(write_slices->slices[i]); 162 uv_buffers[i].len = GRPC_SLICE_LENGTH(write_slices->slices[i]); 163 } 164 165 uv_socket->write_buffers = uv_buffers; 166 write_req = &uv_socket->write_req; 167 write_req->data = socket; 168 // TODO(murgatroid99): figure out what the return value here means 169 uv_write(write_req, (uv_stream_t*)uv_socket->handle, uv_buffers, 170 write_slices->count, uv_write_callback); 171 } 172 173 static void shutdown_callback(uv_shutdown_t* req, int status) {} 174 175 static void uv_socket_shutdown(grpc_custom_socket* socket) { 176 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 177 uv_shutdown_t* req = &uv_socket->shutdown_req; 178 uv_shutdown(req, (uv_stream_t*)uv_socket->handle, shutdown_callback); 179 } 180 181 static void uv_socket_close(grpc_custom_socket* socket, 182 grpc_custom_close_callback close_cb) { 183 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 184 uv_socket->close_cb = close_cb; 185 uv_close((uv_handle_t*)uv_socket->handle, uv_close_callback); 186 } 187 188 static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) { 189 uv_tcp_t* tcp = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); 190 uv_socket->handle = tcp; 191 int status = uv_tcp_init_ex(uv_default_loop(), tcp, (unsigned int)domain); 192 if (status != 0) { 193 return tcp_error_create("Failed to initialize UV tcp handle", status); 194 } 195 #if defined(GPR_LINUX) && defined(SO_REUSEPORT) 196 if (domain == AF_INET || domain == AF_INET6) { 197 int enable = 1; 198 int fd; 199 uv_fileno((uv_handle_t*)tcp, &fd); 200 // TODO Handle error here. 201 setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); 202 } 203 #endif 204 uv_socket->write_buffers = nullptr; 205 uv_socket->read_len = 0; 206 uv_tcp_nodelay(uv_socket->handle, 1); 207 // Node uses a garbage collector to call destructors, so we don't 208 // want to hold the uv loop open with active gRPC objects. 209 uv_unref((uv_handle_t*)uv_socket->handle); 210 uv_socket->pending_connection = false; 211 uv_socket->accept_socket = nullptr; 212 uv_socket->accept_error = GRPC_ERROR_NONE; 213 return GRPC_ERROR_NONE; 214 } 215 216 static grpc_error* uv_socket_init(grpc_custom_socket* socket, int domain) { 217 uv_socket_t* uv_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t)); 218 grpc_error* error = uv_socket_init_helper(uv_socket, domain); 219 if (error != GRPC_ERROR_NONE) { 220 return error; 221 } 222 uv_socket->handle->data = socket; 223 socket->impl = uv_socket; 224 return GRPC_ERROR_NONE; 225 } 226 227 static grpc_error* uv_socket_getpeername(grpc_custom_socket* socket, 228 const grpc_sockaddr* addr, 229 int* addr_len) { 230 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 231 int err = uv_tcp_getpeername(uv_socket->handle, 232 (struct sockaddr*)IGNORE_CONST(addr), addr_len); 233 return tcp_error_create("getpeername failed", err); 234 } 235 236 static grpc_error* uv_socket_getsockname(grpc_custom_socket* socket, 237 const grpc_sockaddr* addr, 238 int* addr_len) { 239 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 240 int err = uv_tcp_getsockname(uv_socket->handle, 241 (struct sockaddr*)IGNORE_CONST(addr), addr_len); 242 return tcp_error_create("getsockname failed", err); 243 } 244 245 static void accept_new_connection(grpc_custom_socket* socket) { 246 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 247 if (!uv_socket->pending_connection || !uv_socket->accept_socket) { 248 return; 249 } 250 grpc_custom_socket* new_socket = uv_socket->accept_socket; 251 grpc_error* error = uv_socket->accept_error; 252 uv_socket->accept_socket = nullptr; 253 uv_socket->accept_error = GRPC_ERROR_NONE; 254 uv_socket->pending_connection = false; 255 if (uv_socket->accept_error != GRPC_ERROR_NONE) { 256 uv_stream_t dummy_handle; 257 uv_accept((uv_stream_t*)uv_socket->handle, &dummy_handle); 258 uv_socket->accept_cb(socket, new_socket, error); 259 } else { 260 uv_socket_t* uv_new_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t)); 261 uv_socket_init_helper(uv_new_socket, AF_UNSPEC); 262 // UV documentation says this is guaranteed to succeed 263 GPR_ASSERT(uv_accept((uv_stream_t*)uv_socket->handle, 264 (uv_stream_t*)uv_new_socket->handle) == 0); 265 new_socket->impl = uv_new_socket; 266 uv_new_socket->handle->data = new_socket; 267 uv_socket->accept_cb(socket, new_socket, error); 268 } 269 } 270 271 static void uv_on_connect(uv_stream_t* server, int status) { 272 grpc_custom_socket* socket = (grpc_custom_socket*)server->data; 273 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 274 GPR_ASSERT(!uv_socket->pending_connection); 275 uv_socket->pending_connection = true; 276 if (status < 0) { 277 switch (status) { 278 case UV_EINTR: 279 case UV_EAGAIN: 280 return; 281 default: 282 uv_socket->accept_error = tcp_error_create("accept failed", status); 283 } 284 } 285 accept_new_connection(socket); 286 } 287 288 void uv_socket_accept(grpc_custom_socket* socket, 289 grpc_custom_socket* new_socket, 290 grpc_custom_accept_callback accept_cb) { 291 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 292 uv_socket->accept_cb = accept_cb; 293 GPR_ASSERT(uv_socket->accept_socket == nullptr); 294 uv_socket->accept_socket = new_socket; 295 accept_new_connection(socket); 296 } 297 298 static grpc_error* uv_socket_bind(grpc_custom_socket* socket, 299 const grpc_sockaddr* addr, size_t len, 300 int flags) { 301 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 302 int status = 303 uv_tcp_bind((uv_tcp_t*)uv_socket->handle, (struct sockaddr*)addr, 0); 304 return tcp_error_create("Failed to bind to port", status); 305 } 306 307 static grpc_error* uv_socket_listen(grpc_custom_socket* socket) { 308 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 309 int status = 310 uv_listen((uv_stream_t*)uv_socket->handle, SOMAXCONN, uv_on_connect); 311 return tcp_error_create("Failed to listen to port", status); 312 } 313 314 static void uv_tc_on_connect(uv_connect_t* req, int status) { 315 grpc_custom_socket* socket = (grpc_custom_socket*)req->data; 316 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 317 grpc_error* error; 318 if (status == UV_ECANCELED) { 319 // This should only happen if the handle is already closed 320 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timeout occurred"); 321 } else { 322 error = tcp_error_create("Failed to connect to remote host", status); 323 } 324 uv_socket->connect_cb(socket, error); 325 } 326 327 static void uv_socket_connect(grpc_custom_socket* socket, 328 const grpc_sockaddr* addr, size_t len, 329 grpc_custom_connect_callback connect_cb) { 330 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; 331 uv_socket->connect_cb = connect_cb; 332 uv_socket->connect_req.data = socket; 333 int status = uv_tcp_connect(&uv_socket->connect_req, uv_socket->handle, 334 (struct sockaddr*)addr, uv_tc_on_connect); 335 if (status != 0) { 336 // The callback will not be called 337 uv_socket->connect_cb(socket, tcp_error_create("connect failed", status)); 338 } 339 } 340 341 static grpc_resolved_addresses* handle_addrinfo_result( 342 struct addrinfo* result) { 343 struct addrinfo* resp; 344 size_t i; 345 grpc_resolved_addresses* addresses = 346 (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses)); 347 addresses->naddrs = 0; 348 for (resp = result; resp != nullptr; resp = resp->ai_next) { 349 addresses->naddrs++; 350 } 351 addresses->addrs = (grpc_resolved_address*)gpr_malloc( 352 sizeof(grpc_resolved_address) * addresses->naddrs); 353 for (resp = result, i = 0; resp != nullptr; resp = resp->ai_next, i++) { 354 memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); 355 addresses->addrs[i].len = resp->ai_addrlen; 356 } 357 // addrinfo objects are allocated by libuv (e.g. in uv_getaddrinfo) 358 // and not by gpr_malloc 359 uv_freeaddrinfo(result); 360 return addresses; 361 } 362 363 static void uv_resolve_callback(uv_getaddrinfo_t* req, int status, 364 struct addrinfo* res) { 365 grpc_custom_resolver* r = (grpc_custom_resolver*)req->data; 366 gpr_free(req); 367 grpc_resolved_addresses* result = nullptr; 368 if (status == 0) { 369 result = handle_addrinfo_result(res); 370 } 371 grpc_custom_resolve_callback(r, result, 372 tcp_error_create("getaddrinfo failed", status)); 373 } 374 375 static grpc_error* uv_resolve(char* host, char* port, 376 grpc_resolved_addresses** result) { 377 int status; 378 uv_getaddrinfo_t req; 379 struct addrinfo hints; 380 memset(&hints, 0, sizeof(struct addrinfo)); 381 hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ 382 hints.ai_socktype = SOCK_STREAM; /* stream socket */ 383 hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ 384 status = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints); 385 if (status != 0) { 386 *result = nullptr; 387 } else { 388 *result = handle_addrinfo_result(req.addrinfo); 389 } 390 return tcp_error_create("getaddrinfo failed", status); 391 } 392 393 static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) { 394 int status; 395 uv_getaddrinfo_t* req = 396 (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t)); 397 req->data = r; 398 struct addrinfo hints; 399 memset(&hints, 0, sizeof(struct addrinfo)); 400 hints.ai_family = GRPC_AF_UNSPEC; /* ipv4 or ipv6 */ 401 hints.ai_socktype = GRPC_SOCK_STREAM; /* stream socket */ 402 hints.ai_flags = GRPC_AI_PASSIVE; /* for wildcard IP address */ 403 status = uv_getaddrinfo(uv_default_loop(), req, uv_resolve_callback, host, 404 port, &hints); 405 if (status != 0) { 406 gpr_free(req); 407 grpc_error* error = tcp_error_create("getaddrinfo failed", status); 408 grpc_custom_resolve_callback(r, NULL, error); 409 } 410 } 411 412 grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async}; 413 414 grpc_socket_vtable grpc_uv_socket_vtable = { 415 uv_socket_init, uv_socket_connect, uv_socket_destroy, 416 uv_socket_shutdown, uv_socket_close, uv_socket_write, 417 uv_socket_read, uv_socket_getpeername, uv_socket_getsockname, 418 uv_socket_bind, uv_socket_listen, uv_socket_accept}; 419 420 #endif 421