1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include "test/core/end2end/end2end_tests.h" 20 21 #include <grpc/support/log.h> 22 #include <grpc/support/sync.h> 23 #include <grpc/support/time.h> 24 25 #include "src/core/lib/gprpp/thd.h" 26 #include "test/core/end2end/cq_verifier.h" 27 28 static void* tag(intptr_t t) { return (void*)t; } 29 30 typedef struct { 31 gpr_event started; 32 grpc_channel* channel; 33 grpc_completion_queue* cq; 34 } child_events; 35 36 static void child_thread(void* arg) { 37 child_events* ce = static_cast<child_events*>(arg); 38 grpc_event ev; 39 gpr_event_set(&ce->started, (void*)1); 40 gpr_log(GPR_DEBUG, "verifying"); 41 ev = grpc_completion_queue_next(ce->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), 42 nullptr); 43 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); 44 GPR_ASSERT(ev.tag == tag(1)); 45 GPR_ASSERT(ev.success == 0); 46 } 47 48 static void test_connectivity(grpc_end2end_test_config config) { 49 grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr); 50 grpc_connectivity_state state; 51 cq_verifier* cqv = cq_verifier_create(f.cq); 52 child_events ce; 53 54 grpc_channel_args client_args; 55 grpc_arg arg_array[1]; 56 arg_array[0].type = GRPC_ARG_INTEGER; 57 arg_array[0].key = 58 const_cast<char*>("grpc.testing.fixed_reconnect_backoff_ms"); 59 arg_array[0].value.integer = 1000; 60 client_args.args = arg_array; 61 client_args.num_args = 1; 62 63 config.init_client(&f, &client_args); 64 65 ce.channel = f.client; 66 ce.cq = f.cq; 67 gpr_event_init(&ce.started); 68 grpc_core::Thread thd("grpc_connectivity", child_thread, &ce); 69 thd.Start(); 70 71 gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC)); 72 73 /* channels should start life in IDLE, and stay there */ 74 GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) == 75 GRPC_CHANNEL_IDLE); 76 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); 77 GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) == 78 GRPC_CHANNEL_IDLE); 79 80 /* start watching for a change */ 81 gpr_log(GPR_DEBUG, "watching"); 82 grpc_channel_watch_connectivity_state( 83 f.client, GRPC_CHANNEL_IDLE, gpr_now(GPR_CLOCK_MONOTONIC), f.cq, tag(1)); 84 85 /* eventually the child thread completion should trigger */ 86 thd.Join(); 87 88 /* check that we're still in idle, and start connecting */ 89 GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) == 90 GRPC_CHANNEL_IDLE); 91 /* start watching for a change */ 92 grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE, 93 grpc_timeout_seconds_to_deadline(3), 94 f.cq, tag(2)); 95 96 /* and now the watch should trigger */ 97 CQ_EXPECT_COMPLETION(cqv, tag(2), 1); 98 cq_verify(cqv); 99 state = grpc_channel_check_connectivity_state(f.client, 0); 100 GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE || 101 state == GRPC_CHANNEL_CONNECTING); 102 103 /* quickly followed by a transition to TRANSIENT_FAILURE */ 104 grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_CONNECTING, 105 grpc_timeout_seconds_to_deadline(3), 106 f.cq, tag(3)); 107 CQ_EXPECT_COMPLETION(cqv, tag(3), 1); 108 cq_verify(cqv); 109 state = grpc_channel_check_connectivity_state(f.client, 0); 110 GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE || 111 state == GRPC_CHANNEL_CONNECTING); 112 113 gpr_log(GPR_DEBUG, "*** STARTING SERVER ***"); 114 115 /* now let's bring up a server to connect to */ 116 config.init_server(&f, nullptr); 117 118 gpr_log(GPR_DEBUG, "*** STARTED SERVER ***"); 119 120 /* we'll go through some set of transitions (some might be missed), until 121 READY is reached */ 122 while (state != GRPC_CHANNEL_READY) { 123 grpc_channel_watch_connectivity_state( 124 f.client, state, grpc_timeout_seconds_to_deadline(3), f.cq, tag(4)); 125 CQ_EXPECT_COMPLETION(cqv, tag(4), 1); 126 cq_verify(cqv); 127 state = grpc_channel_check_connectivity_state(f.client, 0); 128 GPR_ASSERT(state == GRPC_CHANNEL_READY || 129 state == GRPC_CHANNEL_CONNECTING || 130 state == GRPC_CHANNEL_TRANSIENT_FAILURE); 131 } 132 133 /* bring down the server again */ 134 /* we should go immediately to TRANSIENT_FAILURE */ 135 gpr_log(GPR_DEBUG, "*** SHUTTING DOWN SERVER ***"); 136 137 grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_READY, 138 grpc_timeout_seconds_to_deadline(3), 139 f.cq, tag(5)); 140 141 grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead)); 142 143 CQ_EXPECT_COMPLETION(cqv, tag(5), 1); 144 CQ_EXPECT_COMPLETION(cqv, tag(0xdead), 1); 145 cq_verify(cqv); 146 state = grpc_channel_check_connectivity_state(f.client, 0); 147 GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE || 148 state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_IDLE); 149 150 /* cleanup server */ 151 grpc_server_destroy(f.server); 152 153 gpr_log(GPR_DEBUG, "*** SHUTDOWN SERVER ***"); 154 155 grpc_channel_destroy(f.client); 156 grpc_completion_queue_shutdown(f.cq); 157 grpc_completion_queue_destroy(f.cq); 158 159 /* shutdown_cq is not used in this test */ 160 grpc_completion_queue_destroy(f.shutdown_cq); 161 config.tear_down_data(&f); 162 163 cq_verifier_destroy(cqv); 164 } 165 166 void connectivity(grpc_end2end_test_config config) { 167 GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION); 168 test_connectivity(config); 169 } 170 171 void connectivity_pre_init(void) {} 172