1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when 20 using that endpoint. Because of various transitive includes in uv.h, 21 including windows.h on Windows, uv.h must be included before other system 22 headers. Therefore, sockaddr.h must always be included first */ 23 #include "src/core/lib/iomgr/sockaddr.h" 24 25 #include <memory.h> 26 #include <stdio.h> 27 28 #include <grpc/grpc.h> 29 #include <grpc/support/alloc.h> 30 #include <grpc/support/log.h> 31 #include <grpc/support/string_util.h> 32 33 #include "src/core/lib/gprpp/thd.h" 34 #include "src/core/lib/iomgr/exec_ctx.h" 35 #include "src/core/lib/iomgr/iomgr.h" 36 #include "src/core/lib/iomgr/resolve_address.h" 37 #include "src/core/lib/iomgr/sockaddr_utils.h" 38 #include "src/core/lib/iomgr/tcp_server.h" 39 40 #include "test/core/util/port.h" 41 #include "test/core/util/test_config.h" 42 43 #define NUM_THREADS 100 44 #define NUM_OUTER_LOOPS 10 45 #define NUM_INNER_LOOPS 10 46 #define DELAY_MILLIS 10 47 #define POLL_MILLIS 3000 48 49 #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10 50 #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100 51 #define DELAY_MILLIS_SHORT_TIMEOUTS 1 52 // in a successful test run, POLL_MILLIS should never be reached because all 53 // runs should end after the shorter delay_millis 54 #define POLL_MILLIS_SHORT_TIMEOUTS 30000 55 // it should never take longer that this to shutdown the server 56 #define SERVER_SHUTDOWN_TIMEOUT 30000 57 58 static void* tag(int n) { return (void*)static_cast<uintptr_t>(n); } 59 static int detag(void* p) { return static_cast<int>((uintptr_t)p); } 60 61 void create_loop_destroy(void* addr) { 62 for (int i = 0; i < NUM_OUTER_LOOPS; ++i) { 63 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); 64 grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr), 65 nullptr, nullptr); 66 67 for (int j = 0; j < NUM_INNER_LOOPS; ++j) { 68 gpr_timespec later_time = 69 grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS); 70 grpc_connectivity_state state = 71 grpc_channel_check_connectivity_state(chan, 1); 72 grpc_channel_watch_connectivity_state(chan, state, later_time, cq, 73 nullptr); 74 gpr_timespec poll_time = 75 grpc_timeout_milliseconds_to_deadline(POLL_MILLIS); 76 GPR_ASSERT(grpc_completion_queue_next(cq, poll_time, nullptr).type == 77 GRPC_OP_COMPLETE); 78 /* check that the watcher from "watch state" was free'd */ 79 GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0); 80 } 81 grpc_channel_destroy(chan); 82 grpc_completion_queue_destroy(cq); 83 } 84 } 85 86 struct server_thread_args { 87 char* addr; 88 grpc_server* server; 89 grpc_completion_queue* cq; 90 grpc_pollset* pollset; 91 gpr_mu* mu; 92 gpr_event ready; 93 gpr_atm stop; 94 }; 95 96 void server_thread(void* vargs) { 97 struct server_thread_args* args = 98 static_cast<struct server_thread_args*>(vargs); 99 grpc_event ev; 100 gpr_timespec deadline = 101 grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT); 102 ev = grpc_completion_queue_next(args->cq, deadline, nullptr); 103 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); 104 GPR_ASSERT(detag(ev.tag) == 0xd1e); 105 } 106 107 static void on_connect(void* vargs, grpc_endpoint* tcp, 108 grpc_pollset* accepting_pollset, 109 grpc_tcp_server_acceptor* acceptor) { 110 gpr_free(acceptor); 111 struct server_thread_args* args = 112 static_cast<struct server_thread_args*>(vargs); 113 grpc_endpoint_shutdown(tcp, 114 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected")); 115 grpc_endpoint_destroy(tcp); 116 gpr_mu_lock(args->mu); 117 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr)); 118 gpr_mu_unlock(args->mu); 119 } 120 121 void bad_server_thread(void* vargs) { 122 struct server_thread_args* args = 123 static_cast<struct server_thread_args*>(vargs); 124 125 grpc_core::ExecCtx exec_ctx; 126 grpc_resolved_address resolved_addr; 127 grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_addr.addr); 128 int port; 129 grpc_tcp_server* s; 130 grpc_error* error = grpc_tcp_server_create(nullptr, nullptr, &s); 131 GPR_ASSERT(error == GRPC_ERROR_NONE); 132 memset(&resolved_addr, 0, sizeof(resolved_addr)); 133 addr->sa_family = GRPC_AF_INET; 134 error = grpc_tcp_server_add_port(s, &resolved_addr, &port); 135 GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error)); 136 GPR_ASSERT(port > 0); 137 gpr_asprintf(&args->addr, "localhost:%d", port); 138 139 grpc_tcp_server_start(s, &args->pollset, 1, on_connect, args); 140 gpr_event_set(&args->ready, (void*)1); 141 142 gpr_mu_lock(args->mu); 143 while (gpr_atm_acq_load(&args->stop) == 0) { 144 grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 100; 145 146 grpc_pollset_worker* worker = nullptr; 147 if (!GRPC_LOG_IF_ERROR( 148 "pollset_work", 149 grpc_pollset_work(args->pollset, &worker, deadline))) { 150 gpr_atm_rel_store(&args->stop, 1); 151 } 152 gpr_mu_unlock(args->mu); 153 154 gpr_mu_lock(args->mu); 155 } 156 gpr_mu_unlock(args->mu); 157 158 grpc_tcp_server_unref(s); 159 160 gpr_free(args->addr); 161 } 162 163 static void done_pollset_shutdown(void* pollset, grpc_error* error) { 164 grpc_pollset_destroy(static_cast<grpc_pollset*>(pollset)); 165 gpr_free(pollset); 166 } 167 168 int run_concurrent_connectivity_test() { 169 struct server_thread_args args; 170 memset(&args, 0, sizeof(args)); 171 172 grpc_init(); 173 174 /* First round, no server */ 175 { 176 gpr_log(GPR_DEBUG, "Wave 1"); 177 char* localhost = gpr_strdup("localhost:54321"); 178 grpc_core::Thread threads[NUM_THREADS]; 179 for (auto& th : threads) { 180 th = grpc_core::Thread("grpc_wave_1", create_loop_destroy, localhost); 181 th.Start(); 182 } 183 for (auto& th : threads) { 184 th.Join(); 185 } 186 gpr_free(localhost); 187 } 188 189 { 190 /* Second round, actual grpc server */ 191 gpr_log(GPR_DEBUG, "Wave 2"); 192 int port = grpc_pick_unused_port_or_die(); 193 gpr_asprintf(&args.addr, "localhost:%d", port); 194 args.server = grpc_server_create(nullptr, nullptr); 195 grpc_server_add_insecure_http2_port(args.server, args.addr); 196 args.cq = grpc_completion_queue_create_for_next(nullptr); 197 grpc_server_register_completion_queue(args.server, args.cq, nullptr); 198 grpc_server_start(args.server); 199 grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args); 200 server2.Start(); 201 202 grpc_core::Thread threads[NUM_THREADS]; 203 for (auto& th : threads) { 204 th = grpc_core::Thread("grpc_wave_2", create_loop_destroy, args.addr); 205 th.Start(); 206 } 207 for (auto& th : threads) { 208 th.Join(); 209 } 210 grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e)); 211 212 server2.Join(); 213 grpc_server_destroy(args.server); 214 grpc_completion_queue_destroy(args.cq); 215 gpr_free(args.addr); 216 } 217 218 { 219 /* Third round, bogus tcp server */ 220 gpr_log(GPR_DEBUG, "Wave 3"); 221 args.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); 222 grpc_pollset_init(args.pollset, &args.mu); 223 gpr_event_init(&args.ready); 224 grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args); 225 server3.Start(); 226 gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC)); 227 228 grpc_core::Thread threads[NUM_THREADS]; 229 for (auto& th : threads) { 230 th = grpc_core::Thread("grpc_wave_3", create_loop_destroy, args.addr); 231 th.Start(); 232 } 233 for (auto& th : threads) { 234 th.Join(); 235 } 236 237 gpr_atm_rel_store(&args.stop, 1); 238 server3.Join(); 239 { 240 grpc_core::ExecCtx exec_ctx; 241 grpc_pollset_shutdown( 242 args.pollset, GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset, 243 grpc_schedule_on_exec_ctx)); 244 } 245 } 246 247 grpc_shutdown(); 248 return 0; 249 } 250 251 void watches_with_short_timeouts(void* addr) { 252 for (int i = 0; i < NUM_OUTER_LOOPS_SHORT_TIMEOUTS; ++i) { 253 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); 254 grpc_channel* chan = grpc_insecure_channel_create(static_cast<char*>(addr), 255 nullptr, nullptr); 256 257 for (int j = 0; j < NUM_INNER_LOOPS_SHORT_TIMEOUTS; ++j) { 258 gpr_timespec later_time = 259 grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS_SHORT_TIMEOUTS); 260 grpc_connectivity_state state = 261 grpc_channel_check_connectivity_state(chan, 0); 262 GPR_ASSERT(state == GRPC_CHANNEL_IDLE); 263 grpc_channel_watch_connectivity_state(chan, state, later_time, cq, 264 nullptr); 265 gpr_timespec poll_time = 266 grpc_timeout_milliseconds_to_deadline(POLL_MILLIS_SHORT_TIMEOUTS); 267 grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr); 268 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); 269 GPR_ASSERT(ev.success == false); 270 /* check that the watcher from "watch state" was free'd */ 271 GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0); 272 } 273 grpc_channel_destroy(chan); 274 grpc_completion_queue_destroy(cq); 275 } 276 } 277 278 // This test tries to catch deadlock situations. 279 // With short timeouts on "watches" and long timeouts on cq next calls, 280 // so that a QUEUE_TIMEOUT likely means that something is stuck. 281 int run_concurrent_watches_with_short_timeouts_test() { 282 grpc_init(); 283 284 grpc_core::Thread threads[NUM_THREADS]; 285 286 char* localhost = gpr_strdup("localhost:54321"); 287 288 for (auto& th : threads) { 289 th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts, 290 localhost); 291 th.Start(); 292 } 293 for (auto& th : threads) { 294 th.Join(); 295 } 296 gpr_free(localhost); 297 298 grpc_shutdown(); 299 return 0; 300 } 301 302 int main(int argc, char** argv) { 303 grpc_test_init(argc, argv); 304 305 run_concurrent_connectivity_test(); 306 run_concurrent_watches_with_short_timeouts_test(); 307 } 308