Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "src/core/lib/iomgr/port.h"
     20 
     21 // This test won't work except with posix sockets enabled
     22 #ifdef GRPC_POSIX_SOCKET
     23 
     24 #include "src/core/lib/iomgr/tcp_server.h"
     25 
     26 #include <errno.h>
     27 #include <ifaddrs.h>
     28 #include <netinet/in.h>
     29 #include <stdio.h>
     30 #include <string.h>
     31 #include <sys/socket.h>
     32 #include <sys/types.h>
     33 #include <unistd.h>
     34 
     35 #include <grpc/grpc.h>
     36 #include <grpc/support/alloc.h>
     37 #include <grpc/support/log.h>
     38 #include <grpc/support/sync.h>
     39 #include <grpc/support/time.h>
     40 
     41 #include "src/core/lib/iomgr/error.h"
     42 #include "src/core/lib/iomgr/iomgr.h"
     43 #include "src/core/lib/iomgr/resolve_address.h"
     44 #include "src/core/lib/iomgr/sockaddr_utils.h"
     45 #include "test/core/util/port.h"
     46 #include "test/core/util/test_config.h"
     47 
     48 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
     49 
     50 static gpr_mu* g_mu;
     51 static grpc_pollset* g_pollset;
     52 static int g_nconnects = 0;
     53 
     54 typedef struct {
     55   /* Owns a ref to server. */
     56   grpc_tcp_server* server;
     57   unsigned port_index;
     58   unsigned fd_index;
     59   int server_fd;
     60 } on_connect_result;
     61 
     62 typedef struct {
     63   grpc_tcp_server* server;
     64 
     65   /* arg is this server_weak_ref. */
     66   grpc_closure server_shutdown;
     67 } server_weak_ref;
     68 
     69 #define MAX_URI 1024
     70 typedef struct {
     71   grpc_resolved_address addr;
     72   char str[MAX_URI];
     73 } test_addr;
     74 
     75 #define MAX_ADDRS 100
     76 typedef struct {
     77   size_t naddrs;
     78   test_addr addrs[MAX_ADDRS];
     79 } test_addrs;
     80 
     81 static on_connect_result g_result = {nullptr, 0, 0, -1};
     82 
     83 static char family_name_buf[1024];
     84 static const char* sock_family_name(int family) {
     85   if (family == AF_INET) {
     86     return "AF_INET";
     87   } else if (family == AF_INET6) {
     88     return "AF_INET6";
     89   } else if (family == AF_UNSPEC) {
     90     return "AF_UNSPEC";
     91   } else {
     92     sprintf(family_name_buf, "%d", family);
     93     return family_name_buf;
     94   }
     95 }
     96 
     97 static void on_connect_result_init(on_connect_result* result) {
     98   result->server = nullptr;
     99   result->port_index = 0;
    100   result->fd_index = 0;
    101   result->server_fd = -1;
    102 }
    103 
    104 static void on_connect_result_set(on_connect_result* result,
    105                                   const grpc_tcp_server_acceptor* acceptor) {
    106   result->server = grpc_tcp_server_ref(acceptor->from_server);
    107   result->port_index = acceptor->port_index;
    108   result->fd_index = acceptor->fd_index;
    109   result->server_fd = grpc_tcp_server_port_fd(
    110       result->server, acceptor->port_index, acceptor->fd_index);
    111 }
    112 
    113 static void server_weak_ref_shutdown(void* arg, grpc_error* error) {
    114   server_weak_ref* weak_ref = static_cast<server_weak_ref*>(arg);
    115   weak_ref->server = nullptr;
    116 }
    117 
    118 static void server_weak_ref_init(server_weak_ref* weak_ref) {
    119   weak_ref->server = nullptr;
    120   GRPC_CLOSURE_INIT(&weak_ref->server_shutdown, server_weak_ref_shutdown,
    121                     weak_ref, grpc_schedule_on_exec_ctx);
    122 }
    123 
    124 /* Make weak_ref->server_shutdown a shutdown_starting cb on server.
    125    grpc_tcp_server promises that the server object will live until
    126    weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
    127    should be held until server_weak_ref_set() returns to avoid a race where the
    128    server is deleted before the shutdown_starting cb is added. */
    129 static void server_weak_ref_set(server_weak_ref* weak_ref,
    130                                 grpc_tcp_server* server) {
    131   grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
    132   weak_ref->server = server;
    133 }
    134 
    135 static void test_addr_init_str(test_addr* addr) {
    136   char* str = nullptr;
    137   if (grpc_sockaddr_to_string(&str, &addr->addr, 0) != -1) {
    138     size_t str_len;
    139     memcpy(addr->str, str, (str_len = strnlen(str, sizeof(addr->str) - 1)));
    140     addr->str[str_len] = '\0';
    141     gpr_free(str);
    142   } else {
    143     addr->str[0] = '\0';
    144   }
    145 }
    146 
    147 static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* pollset,
    148                        grpc_tcp_server_acceptor* acceptor) {
    149   grpc_endpoint_shutdown(tcp,
    150                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
    151   grpc_endpoint_destroy(tcp);
    152 
    153   on_connect_result temp_result;
    154   on_connect_result_set(&temp_result, acceptor);
    155   gpr_free(acceptor);
    156 
    157   gpr_mu_lock(g_mu);
    158   g_result = temp_result;
    159   g_nconnects++;
    160   GPR_ASSERT(
    161       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
    162   gpr_mu_unlock(g_mu);
    163 }
    164 
    165 static void test_no_op(void) {
    166   grpc_core::ExecCtx exec_ctx;
    167   grpc_tcp_server* s;
    168   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
    169   grpc_tcp_server_unref(s);
    170 }
    171 
    172 static void test_no_op_with_start(void) {
    173   grpc_core::ExecCtx exec_ctx;
    174   grpc_tcp_server* s;
    175   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
    176   LOG_TEST("test_no_op_with_start");
    177   grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr);
    178   grpc_tcp_server_unref(s);
    179 }
    180 
    181 static void test_no_op_with_port(void) {
    182   grpc_core::ExecCtx exec_ctx;
    183   grpc_resolved_address resolved_addr;
    184   struct sockaddr_in* addr =
    185       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
    186   grpc_tcp_server* s;
    187   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
    188   LOG_TEST("test_no_op_with_port");
    189 
    190   memset(&resolved_addr, 0, sizeof(resolved_addr));
    191   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
    192   addr->sin_family = AF_INET;
    193   int port = -1;
    194   GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) ==
    195                  GRPC_ERROR_NONE &&
    196              port > 0);
    197 
    198   grpc_tcp_server_unref(s);
    199 }
    200 
    201 static void test_no_op_with_port_and_start(void) {
    202   grpc_core::ExecCtx exec_ctx;
    203   grpc_resolved_address resolved_addr;
    204   struct sockaddr_in* addr =
    205       reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
    206   grpc_tcp_server* s;
    207   GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, nullptr, &s));
    208   LOG_TEST("test_no_op_with_port_and_start");
    209   int port = -1;
    210 
    211   memset(&resolved_addr, 0, sizeof(resolved_addr));
    212   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
    213   addr->sin_family = AF_INET;
    214   GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr, &port) ==
    215                  GRPC_ERROR_NONE &&
    216              port > 0);
    217 
    218   grpc_tcp_server_start(s, nullptr, 0, on_connect, nullptr);
    219 
    220   grpc_tcp_server_unref(s);
    221 }
    222 
    223 static grpc_error* tcp_connect(const test_addr* remote,
    224                                on_connect_result* result) {
    225   grpc_millis deadline =
    226       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
    227   int clifd;
    228   int nconnects_before;
    229   const struct sockaddr* remote_addr =
    230       reinterpret_cast<const struct sockaddr*>(remote->addr.addr);
    231 
    232   gpr_log(GPR_INFO, "Connecting to %s", remote->str);
    233   gpr_mu_lock(g_mu);
    234   nconnects_before = g_nconnects;
    235   on_connect_result_init(&g_result);
    236   clifd = socket(remote_addr->sa_family, SOCK_STREAM, 0);
    237   if (clifd < 0) {
    238     gpr_mu_unlock(g_mu);
    239     return GRPC_OS_ERROR(errno, "Failed to create socket");
    240   }
    241   gpr_log(GPR_DEBUG, "start connect to %s", remote->str);
    242   if (connect(clifd, remote_addr, static_cast<socklen_t>(remote->addr.len)) !=
    243       0) {
    244     gpr_mu_unlock(g_mu);
    245     close(clifd);
    246     return GRPC_OS_ERROR(errno, "connect");
    247   }
    248   gpr_log(GPR_DEBUG, "wait");
    249   while (g_nconnects == nconnects_before &&
    250          deadline > grpc_core::ExecCtx::Get()->Now()) {
    251     grpc_pollset_worker* worker = nullptr;
    252     grpc_error* err;
    253     if ((err = grpc_pollset_work(g_pollset, &worker, deadline)) !=
    254         GRPC_ERROR_NONE) {
    255       gpr_mu_unlock(g_mu);
    256       close(clifd);
    257       return err;
    258     }
    259     gpr_mu_unlock(g_mu);
    260 
    261     gpr_mu_lock(g_mu);
    262   }
    263   gpr_log(GPR_DEBUG, "wait done");
    264   if (g_nconnects != nconnects_before + 1) {
    265     gpr_mu_unlock(g_mu);
    266     close(clifd);
    267     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Didn't connect");
    268   }
    269   close(clifd);
    270   *result = g_result;
    271 
    272   gpr_mu_unlock(g_mu);
    273   gpr_log(GPR_INFO, "Result (%d, %d) fd %d", result->port_index,
    274           result->fd_index, result->server_fd);
    275   grpc_tcp_server_unref(result->server);
    276   return GRPC_ERROR_NONE;
    277 }
    278 
    279 /* Tests a tcp server on "::" listeners with multiple ports. If channel_args is
    280    non-NULL, pass them to the server. If dst_addrs is non-NULL, use valid addrs
    281    as destination addrs (port is not set). If dst_addrs is NULL, use listener
    282    addrs as destination addrs. If test_dst_addrs is true, test connectivity with
    283    each destination address, set grpc_resolved_address::len=0 for failures, but
    284    don't fail the overall unitest. */
    285 static void test_connect(size_t num_connects,
    286                          const grpc_channel_args* channel_args,
    287                          test_addrs* dst_addrs, bool test_dst_addrs) {
    288   grpc_core::ExecCtx exec_ctx;
    289   grpc_resolved_address resolved_addr;
    290   grpc_resolved_address resolved_addr1;
    291   struct sockaddr_storage* const addr =
    292       reinterpret_cast<struct sockaddr_storage*>(resolved_addr.addr);
    293   struct sockaddr_storage* const addr1 =
    294       reinterpret_cast<struct sockaddr_storage*>(resolved_addr1.addr);
    295   unsigned svr_fd_count;
    296   int port;
    297   int svr_port;
    298   unsigned svr1_fd_count;
    299   int svr1_port;
    300   grpc_tcp_server* s;
    301   const unsigned num_ports = 2;
    302   GPR_ASSERT(GRPC_ERROR_NONE ==
    303              grpc_tcp_server_create(nullptr, channel_args, &s));
    304   unsigned port_num;
    305   server_weak_ref weak_ref;
    306   server_weak_ref_init(&weak_ref);
    307   server_weak_ref_set(&weak_ref, s);
    308   LOG_TEST("test_connect");
    309   gpr_log(GPR_INFO,
    310           "clients=%lu, num chan args=%lu, remote IP=%s, test_dst_addrs=%d",
    311           static_cast<unsigned long>(num_connects),
    312           static_cast<unsigned long>(
    313               channel_args != nullptr ? channel_args->num_args : 0),
    314           dst_addrs != nullptr ? "<specific>" : "::", test_dst_addrs);
    315   memset(&resolved_addr, 0, sizeof(resolved_addr));
    316   memset(&resolved_addr1, 0, sizeof(resolved_addr1));
    317   resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
    318   resolved_addr1.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
    319   addr->ss_family = addr1->ss_family = AF_INET;
    320   GPR_ASSERT(GRPC_LOG_IF_ERROR(
    321       "grpc_tcp_server_add_port",
    322       grpc_tcp_server_add_port(s, &resolved_addr, &svr_port)));
    323   gpr_log(GPR_INFO, "Allocated port %d", svr_port);
    324   GPR_ASSERT(svr_port > 0);
    325   /* Cannot use wildcard (port==0), because add_port() will try to reuse the
    326      same port as a previous add_port(). */
    327   svr1_port = grpc_pick_unused_port_or_die();
    328   GPR_ASSERT(svr1_port > 0);
    329   gpr_log(GPR_INFO, "Picked unused port %d", svr1_port);
    330   grpc_sockaddr_set_port(&resolved_addr1, svr1_port);
    331   GPR_ASSERT(grpc_tcp_server_add_port(s, &resolved_addr1, &port) ==
    332                  GRPC_ERROR_NONE &&
    333              port == svr1_port);
    334 
    335   /* Bad port_index. */
    336   GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 2) == 0);
    337   GPR_ASSERT(grpc_tcp_server_port_fd(s, 2, 0) < 0);
    338 
    339   /* Bad fd_index. */
    340   GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 100) < 0);
    341   GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 100) < 0);
    342 
    343   /* Got at least one fd per port. */
    344   svr_fd_count = grpc_tcp_server_port_fd_count(s, 0);
    345   GPR_ASSERT(svr_fd_count >= 1);
    346   svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
    347   GPR_ASSERT(svr1_fd_count >= 1);
    348 
    349   grpc_tcp_server_start(s, &g_pollset, 1, on_connect, nullptr);
    350 
    351   if (dst_addrs != nullptr) {
    352     int ports[] = {svr_port, svr1_port};
    353     for (port_num = 0; port_num < num_ports; ++port_num) {
    354       size_t dst_idx;
    355       size_t num_tested = 0;
    356       for (dst_idx = 0; dst_idx < dst_addrs->naddrs; ++dst_idx) {
    357         test_addr dst = dst_addrs->addrs[dst_idx];
    358         on_connect_result result;
    359         grpc_error* err;
    360         if (dst.addr.len == 0) {
    361           gpr_log(GPR_DEBUG, "Skipping test of non-functional local IP %s",
    362                   dst.str);
    363           continue;
    364         }
    365         GPR_ASSERT(grpc_sockaddr_set_port(&dst.addr, ports[port_num]));
    366         test_addr_init_str(&dst);
    367         ++num_tested;
    368         on_connect_result_init(&result);
    369         if ((err = tcp_connect(&dst, &result)) == GRPC_ERROR_NONE &&
    370             result.server_fd >= 0 && result.server == s) {
    371           continue;
    372         }
    373         gpr_log(GPR_ERROR, "Failed to connect to %s: %s", dst.str,
    374                 grpc_error_string(err));
    375         GPR_ASSERT(test_dst_addrs);
    376         dst_addrs->addrs[dst_idx].addr.len = 0;
    377         GRPC_ERROR_UNREF(err);
    378       }
    379       GPR_ASSERT(num_tested > 0);
    380     }
    381   } else {
    382     for (port_num = 0; port_num < num_ports; ++port_num) {
    383       const unsigned num_fds = grpc_tcp_server_port_fd_count(s, port_num);
    384       unsigned fd_num;
    385       for (fd_num = 0; fd_num < num_fds; ++fd_num) {
    386         int fd = grpc_tcp_server_port_fd(s, port_num, fd_num);
    387         size_t connect_num;
    388         test_addr dst;
    389         GPR_ASSERT(fd >= 0);
    390         dst.addr.len = static_cast<socklen_t>(sizeof(dst.addr.addr));
    391         GPR_ASSERT(getsockname(fd, (struct sockaddr*)dst.addr.addr,
    392                                (socklen_t*)&dst.addr.len) == 0);
    393         GPR_ASSERT(dst.addr.len <= sizeof(dst.addr.addr));
    394         test_addr_init_str(&dst);
    395         gpr_log(GPR_INFO, "(%d, %d) fd %d family %s listening on %s", port_num,
    396                 fd_num, fd, sock_family_name(addr->ss_family), dst.str);
    397         for (connect_num = 0; connect_num < num_connects; ++connect_num) {
    398           on_connect_result result;
    399           on_connect_result_init(&result);
    400           GPR_ASSERT(
    401               GRPC_LOG_IF_ERROR("tcp_connect", tcp_connect(&dst, &result)));
    402           GPR_ASSERT(result.server_fd == fd);
    403           GPR_ASSERT(result.port_index == port_num);
    404           GPR_ASSERT(result.fd_index == fd_num);
    405           GPR_ASSERT(result.server == s);
    406           GPR_ASSERT(
    407               grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) ==
    408               result.server_fd);
    409         }
    410       }
    411     }
    412   }
    413   /* Weak ref to server valid until final unref. */
    414   GPR_ASSERT(weak_ref.server != nullptr);
    415   GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0);
    416 
    417   grpc_tcp_server_unref(s);
    418   grpc_core::ExecCtx::Get()->Flush();
    419 
    420   /* Weak ref lost. */
    421   GPR_ASSERT(weak_ref.server == nullptr);
    422 }
    423 
    424 static void destroy_pollset(void* p, grpc_error* error) {
    425   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
    426 }
    427 
    428 int main(int argc, char** argv) {
    429   grpc_closure destroyed;
    430   grpc_arg chan_args[1];
    431   chan_args[0].type = GRPC_ARG_INTEGER;
    432   chan_args[0].key = const_cast<char*>(GRPC_ARG_EXPAND_WILDCARD_ADDRS);
    433   chan_args[0].value.integer = 1;
    434   const grpc_channel_args channel_args = {1, chan_args};
    435   struct ifaddrs* ifa = nullptr;
    436   struct ifaddrs* ifa_it;
    437   // Zalloc dst_addrs to avoid oversized frames.
    438   test_addrs* dst_addrs =
    439       static_cast<test_addrs*>(gpr_zalloc(sizeof(*dst_addrs)));
    440   grpc_test_init(argc, argv);
    441   grpc_init();
    442   {
    443     grpc_core::ExecCtx exec_ctx;
    444     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
    445     grpc_pollset_init(g_pollset, &g_mu);
    446 
    447     test_no_op();
    448     test_no_op_with_start();
    449     test_no_op_with_port();
    450     test_no_op_with_port_and_start();
    451 
    452     if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
    453       gpr_log(GPR_ERROR, "getifaddrs: %s", strerror(errno));
    454       return EXIT_FAILURE;
    455     }
    456     dst_addrs->naddrs = 0;
    457     for (ifa_it = ifa; ifa_it != nullptr && dst_addrs->naddrs < MAX_ADDRS;
    458          ifa_it = ifa_it->ifa_next) {
    459       if (ifa_it->ifa_addr == nullptr) {
    460         continue;
    461       } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
    462         dst_addrs->addrs[dst_addrs->naddrs].addr.len =
    463             static_cast<socklen_t>(sizeof(struct sockaddr_in));
    464       } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
    465         dst_addrs->addrs[dst_addrs->naddrs].addr.len =
    466             static_cast<socklen_t>(sizeof(struct sockaddr_in6));
    467       } else {
    468         continue;
    469       }
    470       memcpy(dst_addrs->addrs[dst_addrs->naddrs].addr.addr, ifa_it->ifa_addr,
    471              dst_addrs->addrs[dst_addrs->naddrs].addr.len);
    472       GPR_ASSERT(
    473           grpc_sockaddr_set_port(&dst_addrs->addrs[dst_addrs->naddrs].addr, 0));
    474       test_addr_init_str(&dst_addrs->addrs[dst_addrs->naddrs]);
    475       ++dst_addrs->naddrs;
    476     }
    477     freeifaddrs(ifa);
    478     ifa = nullptr;
    479 
    480     /* Connect to same addresses as listeners. */
    481     test_connect(1, nullptr, nullptr, false);
    482     test_connect(10, nullptr, nullptr, false);
    483 
    484     /* Set dst_addrs->addrs[i].len=0 for dst_addrs that are unreachable with a
    485        "::" listener. */
    486     test_connect(1, nullptr, dst_addrs, true);
    487 
    488     /* Test connect(2) with dst_addrs. */
    489     test_connect(1, &channel_args, dst_addrs, false);
    490     /* Test connect(2) with dst_addrs. */
    491     test_connect(10, &channel_args, dst_addrs, false);
    492 
    493     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
    494                       grpc_schedule_on_exec_ctx);
    495     grpc_pollset_shutdown(g_pollset, &destroyed);
    496   }
    497   grpc_shutdown();
    498   gpr_free(dst_addrs);
    499   gpr_free(g_pollset);
    500   return EXIT_SUCCESS;
    501 }
    502 
    503 #else /* GRPC_POSIX_SOCKET */
    504 
    505 int main(int argc, char** argv) { return 1; }
    506 
    507 #endif /* GRPC_POSIX_SOCKET */
    508