1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include "src/core/lib/iomgr/port.h" 20 21 // This test won't work except with posix sockets enabled 22 #ifdef GRPC_POSIX_SOCKET 23 24 #include "src/core/lib/iomgr/tcp_server.h" 25 26 #include <errno.h> 27 #include <ifaddrs.h> 28 #include <netinet/in.h> 29 #include <stdio.h> 30 #include <string.h> 31 #include <sys/socket.h> 32 #include <sys/types.h> 33 #include <unistd.h> 34 35 #include <grpc/grpc.h> 36 #include <grpc/support/alloc.h> 37 #include <grpc/support/log.h> 38 #include <grpc/support/sync.h> 39 #include <grpc/support/time.h> 40 41 #include "src/core/lib/iomgr/error.h" 42 #include "src/core/lib/iomgr/iomgr.h" 43 #include "src/core/lib/iomgr/resolve_address.h" 44 #include "src/core/lib/iomgr/sockaddr_utils.h" 45 #include "test/core/util/port.h" 46 #include "test/core/util/test_config.h" 47 48 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) 49 50 static gpr_mu* g_mu; 51 static grpc_pollset* g_pollset; 52 static int g_nconnects = 0; 53 54 typedef struct { 55 /* Owns a ref to server. */ 56 grpc_tcp_server* server; 57 unsigned port_index; 58 unsigned fd_index; 59 int server_fd; 60 } on_connect_result; 61 62 typedef struct { 63 grpc_tcp_server* server; 64 65 /* arg is this server_weak_ref. */ 66 grpc_closure server_shutdown; 67 } server_weak_ref; 68 69 #define MAX_URI 1024 70 typedef struct { 71 grpc_resolved_address addr; 72 char str[MAX_URI]; 73 } test_addr; 74 75 #define MAX_ADDRS 100 76 typedef struct { 77 size_t naddrs; 78 test_addr addrs[MAX_ADDRS]; 79 } test_addrs; 80 81 static on_connect_result g_result = {nullptr, 0, 0, -1}; 82 83 static char family_name_buf[1024]; 84 static const char* sock_family_name(int family) { 85 if (family == AF_INET) { 86 return "AF_INET"; 87 } else if (family == AF_INET6) { 88 return "AF_INET6"; 89 } else if (family == AF_UNSPEC) { 90 return "AF_UNSPEC"; 91 } else { 92 sprintf(family_name_buf, "%d", family); 93 return family_name_buf; 94 } 95 } 96 97 static void on_connect_result_init(on_connect_result* result) { 98 result->server = nullptr; 99 result->port_index = 0; 100 result->fd_index = 0; 101 result->server_fd = -1; 102 } 103 104 static void on_connect_result_set(on_connect_result* result, 105 const grpc_tcp_server_acceptor* acceptor) { 106 result->server = grpc_tcp_server_ref(acceptor->from_server); 107 result->port_index = acceptor->port_index; 108 result->fd_index = acceptor->fd_index; 109 result->server_fd = grpc_tcp_server_port_fd( 110 result->server, acceptor->port_index, acceptor->fd_index); 111 } 112 113 static void server_weak_ref_shutdown(void* arg, grpc_error* error) { 114 server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg); 115 weak_ref->server = nullptr; 116 } 117 118 static void server_weak_ref_init(server_weak_ref* weak_ref) { 119 weak_ref->server = nullptr; 120 GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown, 121 weak_ref, grpc_schedule_on_exec_ctx); 122 } 123 124 /* Make weak_ref->server_shutdown a shutdown_starting cb on server. 125 grpc_tcp_server promises that the server object will live until 126 weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server 127 should be held until server_weak_ref_set() returns to avoid a race where the 128 server is deleted before the shutdown_starting cb is added. */ 129 static void server_weak_ref_set(server_weak_ref* weak_ref, 130 grpc_tcp_server* server) { 131 grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown); 132 weak_ref->server = server; 133 } 134 135 static void test_addr_init_str(test_addr* addr) { 136 char* str = nullptr; 137 if (grpc_sockaddr_to_string(&str, &addr->addr, 0) != -1) { 138 size_t str_len; 139 memcpy(addr->str, str, (str_len = strnlen(str, sizeof(addr->str) - 1))); 140 addr->str[str_len] = '\0'; 141 gpr_free(str); 142 } else { 143 addr->str[0] = '\0'; 144 } 145 } 146 147 static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset, 148 grpc_tcp_server_acceptor* acceptor) { 149 grpc_endpoint_shutdown(tcp, 150 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected")); 151 grpc_endpoint_destroy(tcp); 152 153 on_connect_result temp_result; 154 on_connect_result_set(&temp_result, acceptor); 155 gpr_free(acceptor); 156 157 gpr_mu_lock(g_mu); 158 g_result = temp_result; 159 g_nconnects++; 160 GPR_ASSERT( 161 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); 162 gpr_mu_unlock(g_mu); 163 } 164 165 static void test_no_op(void) { 166 grpc_core::ExecCtx exec_ctx; 167 grpc_tcp_server* s; 168 GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); 169 grpc_tcp_server_unref(s); 170 } 171 172 static void test_no_op_with_start(void) { 173 grpc_core::ExecCtx exec_ctx; 174 grpc_tcp_server* s; 175 GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); 176 LOG_TEST("test_no_op_with_start"); 177 grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr); 178 grpc_tcp_server_unref(s); 179 } 180 181 static void test_no_op_with_port(void) { 182 grpc_core::ExecCtx exec_ctx; 183 grpc_resolved_address resolved_addr; 184 struct sockaddr_in* addr = 185 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr); 186 grpc_tcp_server* s; 187 GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); 188 LOG_TEST("test_no_op_with_port"); 189 190 memset(&resolved_addr, 0, sizeof(resolved_addr)); 191 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in)); 192 addr->sin_family = AF_INET; 193 int port = -1; 194 GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) == 195 GRPC_ERROR_NONE && 196 port > 0); 197 198 grpc_tcp_server_unref(s); 199 } 200 201 static void test_no_op_with_port_and_start(void) { 202 grpc_core::ExecCtx exec_ctx; 203 grpc_resolved_address resolved_addr; 204 struct sockaddr_in* addr = 205 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr); 206 grpc_tcp_server* s; 207 GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s)); 208 LOG_TEST("test_no_op_with_port_and_start"); 209 int port = -1; 210 211 memset(&resolved_addr, 0, sizeof(resolved_addr)); 212 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in)); 213 addr->sin_family = AF_INET; 214 GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) == 215 GRPC_ERROR_NONE && 216 port > 0); 217 218 grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr); 219 220 grpc_tcp_server_unref(s); 221 } 222 223 static grpc_error* tcp_connect(const test_addr* remote, 224 on_connect_result* result) { 225 grpc_millis deadline = 226 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); 227 int clifd; 228 int nconnects_before; 229 const struct sockaddr* remote_addr = 230 reinterpret_cast<const struct sockaddr*>(remote->addr.addr); 231 232 gpr_log(GPR_INFO, "Connecting to %s", remote->str); 233 gpr_mu_lock(g_mu); 234 nconnects_before = g_nconnects; 235 on_connect_result_init(&g_result); 236 clifd = socket(remote_addr->sa_family, SOCK_STREAM, 0); 237 if (clifd < 0) { 238 gpr_mu_unlock(g_mu); 239 return GRPC_OS_ERROR(errno, "Failed to create socket"); 240 } 241 gpr_log(GPR_DEBUG, "start connect to %s", remote->str); 242 if (connect(clifd, remote_addr, static_cast<socklen_t>(remote->addr.len)) != 243 0) { 244 gpr_mu_unlock(g_mu); 245 close(clifd); 246 return GRPC_OS_ERROR(errno, "connect"); 247 } 248 gpr_log(GPR_DEBUG, "wait"); 249 while (g_nconnects == nconnects_before && 250 deadline > grpc_core::ExecCtx::Get()->Now()) { 251 grpc_pollset_worker* worker = nullptr; 252 grpc_error* err; 253 if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) != 254 GRPC_ERROR_NONE) { 255 gpr_mu_unlock(g_mu); 256 close(clifd); 257 return err; 258 } 259 gpr_mu_unlock(g_mu); 260 261 gpr_mu_lock(g_mu); 262 } 263 gpr_log(GPR_DEBUG, "wait done"); 264 if (g_nconnects != nconnects_before + 1) { 265 gpr_mu_unlock(g_mu); 266 close(clifd); 267 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Didn't connect"); 268 } 269 close(clifd); 270 *result = g_result; 271 272 gpr_mu_unlock(g_mu); 273 gpr_log(GPR_INFO, "Result (%d, %d) fd %d", result->port_index, 274 result->fd_index, result->server_fd); 275 grpc_tcp_server_unref(result->server); 276 return GRPC_ERROR_NONE; 277 } 278 279 /* Tests a tcp server on "::" listeners with multiple ports. If channel_args is 280 non-NULL, pass them to the server. If dst_addrs is non-NULL, use valid addrs 281 as destination addrs (port is not set). If dst_addrs is NULL, use listener 282 addrs as destination addrs. If test_dst_addrs is true, test connectivity with 283 each destination address, set grpc_resolved_address::len=0 for failures, but 284 don't fail the overall unitest. */ 285 static void test_connect(size_t num_connects, 286 const grpc_channel_args* channel_args, 287 test_addrs* dst_addrs, bool test_dst_addrs) { 288 grpc_core::ExecCtx exec_ctx; 289 grpc_resolved_address resolved_addr; 290 grpc_resolved_address resolved_addr1; 291 struct sockaddr_storage* const addr = 292 reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr); 293 struct sockaddr_storage* const addr1 = 294 reinterpret_cast<struct sockaddr_storage*>(resolved_addr1.addr); 295 unsigned svr_fd_count; 296 int port; 297 int svr_port; 298 unsigned svr1_fd_count; 299 int svr1_port; 300 grpc_tcp_server* s; 301 const unsigned num_ports = 2; 302 GPR_ASSERT(GRPC_ERROR_NONE == 303 grpc_tcp_server_create(nullptr, channel_args, &s)); 304 unsigned port_num; 305 server_weak_ref weak_ref; 306 server_weak_ref_init(&weak_ref); 307 server_weak_ref_set(&weak_ref, s); 308 LOG_TEST("test_connect"); 309 gpr_log(GPR_INFO, 310 "clients=%lu, num chan args=%lu, remote IP=%s, test_dst_addrs=%d", 311 static_cast<unsigned long>(num_connects), 312 static_cast<unsigned long>( 313 channel_args != nullptr ? channel_args->num_args : 0), 314 dst_addrs != nullptr ? "<specific>" : "::", test_dst_addrs); 315 memset(&resolved_addr, 0, sizeof(resolved_addr)); 316 memset(&resolved_addr1, 0, sizeof(resolved_addr1)); 317 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage)); 318 resolved_addr1.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage)); 319 addr->ss_family = addr1->ss_family = AF_INET; 320 GPR_ASSERT(GRPC_LOG_IF_ERROR( 321 "grpc_tcp_server_add_port", 322 grpc_tcp_server_add_port(s, &resolved_addr, &svr_port))); 323 gpr_log(GPR_INFO, "Allocated port %d", svr_port); 324 GPR_ASSERT(svr_port > 0); 325 /* Cannot use wildcard (port==0), because add_port() will try to reuse the 326 same port as a previous add_port(). */ 327 svr1_port = grpc_pick_unused_port_or_die(); 328 GPR_ASSERT(svr1_port > 0); 329 gpr_log(GPR_INFO, "Picked unused port %d", svr1_port); 330 grpc_sockaddr_set_port(&resolved_addr1, svr1_port); 331 GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr1, &port) == 332 GRPC_ERROR_NONE && 333 port == svr1_port); 334 335 /* Bad port_index. */ 336 GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 2) == 0); 337 GPR_ASSERT(grpc_tcp_server_port_fd(s, 2, 0) < 0); 338 339 /* Bad fd_index. */ 340 GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 100) < 0); 341 GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 100) < 0); 342 343 /* Got at least one fd per port. */ 344 svr_fd_count = grpc_tcp_server_port_fd_count(s, 0); 345 GPR_ASSERT(svr_fd_count >= 1); 346 svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1); 347 GPR_ASSERT(svr1_fd_count >= 1); 348 349 grpc_tcp_server_start(s, &g_pollset, 1, on_connect, nullptr); 350 351 if (dst_addrs != nullptr) { 352 int ports[] = {svr_port, svr1_port}; 353 for (port_num = 0; port_num < num_ports; ++port_num) { 354 size_t dst_idx; 355 size_t num_tested = 0; 356 for (dst_idx = 0; dst_idx < dst_addrs->naddrs; ++dst_idx) { 357 test_addr dst = dst_addrs->addrs[dst_idx]; 358 on_connect_result result; 359 grpc_error* err; 360 if (dst.addr.len == 0) { 361 gpr_log(GPR_DEBUG, "Skipping test of non-functional local IP %s", 362 dst.str); 363 continue; 364 } 365 GPR_ASSERT(grpc_sockaddr_set_port(&dst.addr, ports[port_num])); 366 test_addr_init_str(&dst); 367 ++num_tested; 368 on_connect_result_init(&result); 369 if ((err = tcp_connect(&dst, &result)) == GRPC_ERROR_NONE && 370 result.server_fd >= 0 && result.server == s) { 371 continue; 372 } 373 gpr_log(GPR_ERROR, "Failed to connect to %s: %s", dst.str, 374 grpc_error_string(err)); 375 GPR_ASSERT(test_dst_addrs); 376 dst_addrs->addrs[dst_idx].addr.len = 0; 377 GRPC_ERROR_UNREF(err); 378 } 379 GPR_ASSERT(num_tested > 0); 380 } 381 } else { 382 for (port_num = 0; port_num < num_ports; ++port_num) { 383 const unsigned num_fds = grpc_tcp_server_port_fd_count(s, port_num); 384 unsigned fd_num; 385 for (fd_num = 0; fd_num < num_fds; ++fd_num) { 386 int fd = grpc_tcp_server_port_fd(s, port_num, fd_num); 387 size_t connect_num; 388 test_addr dst; 389 GPR_ASSERT(fd >= 0); 390 dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr)); 391 GPR_ASSERT(getsockname(fd, (struct sockaddr*)dst.addr.addr, 392 (socklen_t*)&dst.addr.len) == 0); 393 GPR_ASSERT(dst.addr.len <= sizeof(dst.addr.addr)); 394 test_addr_init_str(&dst); 395 gpr_log(GPR_INFO, "(%d, %d) fd %d family %s listening on %s", port_num, 396 fd_num, fd, sock_family_name(addr->ss_family), dst.str); 397 for (connect_num = 0; connect_num < num_connects; ++connect_num) { 398 on_connect_result result; 399 on_connect_result_init(&result); 400 GPR_ASSERT( 401 GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result))); 402 GPR_ASSERT(result.server_fd == fd); 403 GPR_ASSERT(result.port_index == port_num); 404 GPR_ASSERT(result.fd_index == fd_num); 405 GPR_ASSERT(result.server == s); 406 GPR_ASSERT( 407 grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) == 408 result.server_fd); 409 } 410 } 411 } 412 } 413 /* Weak ref to server valid until final unref. */ 414 GPR_ASSERT(weak_ref.server != nullptr); 415 GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0); 416 417 grpc_tcp_server_unref(s); 418 grpc_core::ExecCtx::Get()->Flush(); 419 420 /* Weak ref lost. */ 421 GPR_ASSERT(weak_ref.server == nullptr); 422 } 423 424 static void destroy_pollset(void* p, grpc_error* error) { 425 grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); 426 } 427 428 int main(int argc, char** argv) { 429 grpc_closure destroyed; 430 grpc_arg chan_args[1]; 431 chan_args[0].type = GRPC_ARG_INTEGER; 432 chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS); 433 chan_args[0].value.integer = 1; 434 const grpc_channel_args channel_args = {1, chan_args}; 435 struct ifaddrs* ifa = nullptr; 436 struct ifaddrs* ifa_it; 437 // Zalloc dst_addrs to avoid oversized frames. 438 test_addrs* dst_addrs = 439 static_cast<test_addrs*>(gpr_zalloc(sizeof(*dst_addrs))); 440 grpc_test_init(argc, argv); 441 grpc_init(); 442 { 443 grpc_core::ExecCtx exec_ctx; 444 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); 445 grpc_pollset_init(g_pollset, &g_mu); 446 447 test_no_op(); 448 test_no_op_with_start(); 449 test_no_op_with_port(); 450 test_no_op_with_port_and_start(); 451 452 if (getifaddrs(&ifa) != 0 || ifa == nullptr) { 453 gpr_log(GPR_ERROR, "getifaddrs: %s", strerror(errno)); 454 return EXIT_FAILURE; 455 } 456 dst_addrs->naddrs = 0; 457 for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS; 458 ifa_it = ifa_it->ifa_next) { 459 if (ifa_it->ifa_addr == nullptr) { 460 continue; 461 } else if (ifa_it->ifa_addr->sa_family == AF_INET) { 462 dst_addrs->addrs[dst_addrs->naddrs].addr.len = 463 static_cast<socklen_t>(sizeof(struct sockaddr_in)); 464 } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { 465 dst_addrs->addrs[dst_addrs->naddrs].addr.len = 466 static_cast<socklen_t>(sizeof(struct sockaddr_in6)); 467 } else { 468 continue; 469 } 470 memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr, 471 dst_addrs->addrs[dst_addrs->naddrs].addr.len); 472 GPR_ASSERT( 473 grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0)); 474 test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]); 475 ++dst_addrs->naddrs; 476 } 477 freeifaddrs(ifa); 478 ifa = nullptr; 479 480 /* Connect to same addresses as listeners. */ 481 test_connect(1, nullptr, nullptr, false); 482 test_connect(10, nullptr, nullptr, false); 483 484 /* Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a 485 "::" listener. */ 486 test_connect(1, nullptr, dst_addrs, true); 487 488 /* Test connect(2) with dst_addrs. */ 489 test_connect(1, &channel_args, dst_addrs, false); 490 /* Test connect(2) with dst_addrs. */ 491 test_connect(10, &channel_args, dst_addrs, false); 492 493 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, 494 grpc_schedule_on_exec_ctx); 495 grpc_pollset_shutdown(g_pollset, &destroyed); 496 } 497 grpc_shutdown(); 498 gpr_free(dst_addrs); 499 gpr_free(g_pollset); 500 return EXIT_SUCCESS; 501 } 502 503 #else /* GRPC_POSIX_SOCKET */ 504 505 int main(int argc, char** argv) { return 1; } 506 507 #endif /* GRPC_POSIX_SOCKET */ 508