1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include "src/core/lib/surface/completion_queue.h" 20 21 #include <grpc/support/alloc.h> 22 #include <grpc/support/log.h> 23 #include <grpc/support/time.h> 24 #include "src/core/lib/gpr/useful.h" 25 #include "src/core/lib/gprpp/memory.h" 26 #include "src/core/lib/iomgr/iomgr.h" 27 #include "test/core/util/test_config.h" 28 29 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x) 30 31 static void* create_test_tag(void) { 32 static intptr_t i = 0; 33 return (void*)(++i); 34 } 35 36 /* helper for tests to shutdown correctly and tersely */ 37 static void shutdown_and_destroy(grpc_completion_queue* cc) { 38 grpc_event ev; 39 grpc_completion_queue_shutdown(cc); 40 41 switch (grpc_get_cq_completion_type(cc)) { 42 case GRPC_CQ_NEXT: { 43 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), 44 nullptr); 45 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); 46 break; 47 } 48 case GRPC_CQ_PLUCK: { 49 ev = grpc_completion_queue_pluck( 50 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); 51 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); 52 break; 53 } 54 case GRPC_CQ_CALLBACK: { 55 // Nothing to do here. The shutdown callback will be invoked when 56 // possible. 57 break; 58 } 59 default: { 60 gpr_log(GPR_ERROR, "Unknown completion type"); 61 break; 62 } 63 } 64 65 grpc_completion_queue_destroy(cc); 66 } 67 68 /* ensure we can create and destroy a completion channel */ 69 static void test_no_op(void) { 70 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK}; 71 grpc_cq_polling_type polling_types[] = { 72 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 73 grpc_completion_queue_attributes attr; 74 LOG_TEST("test_no_op"); 75 76 attr.version = 1; 77 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) { 78 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) { 79 attr.cq_completion_type = completion_types[i]; 80 attr.cq_polling_type = polling_types[j]; 81 shutdown_and_destroy(grpc_completion_queue_create( 82 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr)); 83 } 84 } 85 } 86 87 static void test_pollset_conversion(void) { 88 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK}; 89 grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING, 90 GRPC_CQ_NON_LISTENING}; 91 grpc_completion_queue* cq; 92 grpc_completion_queue_attributes attr; 93 94 LOG_TEST("test_pollset_conversion"); 95 96 attr.version = 1; 97 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) { 98 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) { 99 attr.cq_completion_type = completion_types[i]; 100 attr.cq_polling_type = polling_types[j]; 101 cq = grpc_completion_queue_create( 102 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 103 GPR_ASSERT(grpc_cq_pollset(cq) != nullptr); 104 shutdown_and_destroy(cq); 105 } 106 } 107 } 108 109 static void test_wait_empty(void) { 110 grpc_cq_polling_type polling_types[] = { 111 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 112 grpc_completion_queue* cc; 113 grpc_completion_queue_attributes attr; 114 grpc_event event; 115 116 LOG_TEST("test_wait_empty"); 117 118 attr.version = 1; 119 attr.cq_completion_type = GRPC_CQ_NEXT; 120 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 121 attr.cq_polling_type = polling_types[i]; 122 cc = grpc_completion_queue_create( 123 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 124 event = 125 grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr); 126 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT); 127 shutdown_and_destroy(cc); 128 } 129 } 130 131 static void do_nothing_end_completion(void* arg, grpc_cq_completion* c) {} 132 133 static void test_cq_end_op(void) { 134 grpc_event ev; 135 grpc_completion_queue* cc; 136 grpc_cq_completion completion; 137 grpc_cq_polling_type polling_types[] = { 138 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 139 grpc_completion_queue_attributes attr; 140 void* tag = create_test_tag(); 141 142 LOG_TEST("test_cq_end_op"); 143 144 attr.version = 1; 145 attr.cq_completion_type = GRPC_CQ_NEXT; 146 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 147 grpc_core::ExecCtx exec_ctx; 148 attr.cq_polling_type = polling_types[i]; 149 cc = grpc_completion_queue_create( 150 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 151 152 GPR_ASSERT(grpc_cq_begin_op(cc, tag)); 153 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr, 154 &completion); 155 156 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), 157 nullptr); 158 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); 159 GPR_ASSERT(ev.tag == tag); 160 GPR_ASSERT(ev.success); 161 162 shutdown_and_destroy(cc); 163 } 164 } 165 166 static void test_cq_tls_cache_full(void) { 167 grpc_event ev; 168 grpc_completion_queue* cc; 169 grpc_cq_completion completion; 170 grpc_cq_polling_type polling_types[] = { 171 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 172 grpc_completion_queue_attributes attr; 173 void* tag = create_test_tag(); 174 void* res_tag; 175 int ok; 176 177 LOG_TEST("test_cq_tls_cache_full"); 178 179 attr.version = 1; 180 attr.cq_completion_type = GRPC_CQ_NEXT; 181 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 182 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx 183 attr.cq_polling_type = polling_types[i]; 184 cc = grpc_completion_queue_create( 185 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 186 187 grpc_completion_queue_thread_local_cache_init(cc); 188 GPR_ASSERT(grpc_cq_begin_op(cc, tag)); 189 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr, 190 &completion); 191 192 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), 193 nullptr); 194 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); 195 196 GPR_ASSERT( 197 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1); 198 GPR_ASSERT(res_tag == tag); 199 GPR_ASSERT(ok); 200 201 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), 202 nullptr); 203 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); 204 205 shutdown_and_destroy(cc); 206 } 207 } 208 209 static void test_cq_tls_cache_empty(void) { 210 grpc_completion_queue* cc; 211 grpc_cq_polling_type polling_types[] = { 212 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 213 grpc_completion_queue_attributes attr; 214 void* res_tag; 215 int ok; 216 217 LOG_TEST("test_cq_tls_cache_empty"); 218 219 attr.version = 1; 220 attr.cq_completion_type = GRPC_CQ_NEXT; 221 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 222 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx 223 attr.cq_polling_type = polling_types[i]; 224 cc = grpc_completion_queue_create( 225 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 226 227 GPR_ASSERT( 228 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); 229 grpc_completion_queue_thread_local_cache_init(cc); 230 GPR_ASSERT( 231 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); 232 shutdown_and_destroy(cc); 233 } 234 } 235 236 static void test_shutdown_then_next_polling(void) { 237 grpc_cq_polling_type polling_types[] = { 238 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 239 grpc_completion_queue* cc; 240 grpc_completion_queue_attributes attr; 241 grpc_event event; 242 LOG_TEST("test_shutdown_then_next_polling"); 243 244 attr.version = 1; 245 attr.cq_completion_type = GRPC_CQ_NEXT; 246 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 247 attr.cq_polling_type = polling_types[i]; 248 cc = grpc_completion_queue_create( 249 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 250 grpc_completion_queue_shutdown(cc); 251 event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), 252 nullptr); 253 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN); 254 grpc_completion_queue_destroy(cc); 255 } 256 } 257 258 static void test_shutdown_then_next_with_timeout(void) { 259 grpc_cq_polling_type polling_types[] = { 260 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 261 grpc_completion_queue* cc; 262 grpc_completion_queue_attributes attr; 263 grpc_event event; 264 LOG_TEST("test_shutdown_then_next_with_timeout"); 265 266 attr.version = 1; 267 attr.cq_completion_type = GRPC_CQ_NEXT; 268 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 269 attr.cq_polling_type = polling_types[i]; 270 cc = grpc_completion_queue_create( 271 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 272 273 grpc_completion_queue_shutdown(cc); 274 event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME), 275 nullptr); 276 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN); 277 grpc_completion_queue_destroy(cc); 278 } 279 } 280 281 static void test_pluck(void) { 282 grpc_event ev; 283 grpc_completion_queue* cc; 284 void* tags[128]; 285 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; 286 grpc_cq_polling_type polling_types[] = { 287 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 288 grpc_completion_queue_attributes attr; 289 unsigned i, j; 290 291 LOG_TEST("test_pluck"); 292 293 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 294 tags[i] = create_test_tag(); 295 for (j = 0; j < i; j++) { 296 GPR_ASSERT(tags[i] != tags[j]); 297 } 298 } 299 300 attr.version = 1; 301 attr.cq_completion_type = GRPC_CQ_PLUCK; 302 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { 303 grpc_core::ExecCtx exec_ctx; // reset exec_ctx 304 attr.cq_polling_type = polling_types[pidx]; 305 cc = grpc_completion_queue_create( 306 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 307 308 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 309 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); 310 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, 311 nullptr, &completions[i]); 312 } 313 314 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 315 ev = grpc_completion_queue_pluck( 316 cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); 317 GPR_ASSERT(ev.tag == tags[i]); 318 } 319 320 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 321 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); 322 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, 323 nullptr, &completions[i]); 324 } 325 326 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 327 ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1], 328 gpr_inf_past(GPR_CLOCK_REALTIME), 329 nullptr); 330 GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]); 331 } 332 333 shutdown_and_destroy(cc); 334 } 335 } 336 337 static void test_pluck_after_shutdown(void) { 338 grpc_cq_polling_type polling_types[] = { 339 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 340 grpc_event ev; 341 grpc_completion_queue* cc; 342 grpc_completion_queue_attributes attr; 343 344 LOG_TEST("test_pluck_after_shutdown"); 345 346 attr.version = 1; 347 attr.cq_completion_type = GRPC_CQ_PLUCK; 348 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { 349 attr.cq_polling_type = polling_types[i]; 350 cc = grpc_completion_queue_create( 351 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 352 grpc_completion_queue_shutdown(cc); 353 ev = grpc_completion_queue_pluck( 354 cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); 355 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); 356 grpc_completion_queue_destroy(cc); 357 } 358 } 359 360 static void test_callback(void) { 361 grpc_completion_queue* cc; 362 void* tags[128]; 363 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; 364 grpc_cq_polling_type polling_types[] = { 365 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; 366 grpc_completion_queue_attributes attr; 367 unsigned i; 368 369 LOG_TEST("test_callback"); 370 371 bool got_shutdown = false; 372 class ShutdownCallback : public grpc_experimental_completion_queue_functor { 373 public: 374 ShutdownCallback(bool* done) : done_(done) { 375 functor_run = &ShutdownCallback::Run; 376 } 377 ~ShutdownCallback() {} 378 static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { 379 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok); 380 } 381 382 private: 383 bool* done_; 384 }; 385 ShutdownCallback shutdown_cb(&got_shutdown); 386 387 attr.version = 2; 388 attr.cq_completion_type = GRPC_CQ_CALLBACK; 389 attr.cq_shutdown_cb = &shutdown_cb; 390 391 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { 392 grpc_core::ExecCtx exec_ctx; // reset exec_ctx 393 attr.cq_polling_type = polling_types[pidx]; 394 cc = grpc_completion_queue_create( 395 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); 396 397 int counter = 0; 398 class TagCallback : public grpc_experimental_completion_queue_functor { 399 public: 400 TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) { 401 functor_run = &TagCallback::Run; 402 } 403 ~TagCallback() {} 404 static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { 405 GPR_ASSERT(static_cast<bool>(ok)); 406 auto* callback = static_cast<TagCallback*>(cb); 407 *callback->counter_ += callback->tag_; 408 grpc_core::Delete(callback); 409 }; 410 411 private: 412 int* counter_; 413 int tag_; 414 }; 415 416 int sumtags = 0; 417 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 418 tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i)); 419 sumtags += i; 420 } 421 422 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { 423 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); 424 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, 425 nullptr, &completions[i]); 426 } 427 428 GPR_ASSERT(sumtags == counter); 429 430 shutdown_and_destroy(cc); 431 432 GPR_ASSERT(got_shutdown); 433 got_shutdown = false; 434 } 435 } 436 437 struct thread_state { 438 grpc_completion_queue* cc; 439 void* tag; 440 }; 441 442 int main(int argc, char** argv) { 443 grpc_test_init(argc, argv); 444 grpc_init(); 445 test_no_op(); 446 test_pollset_conversion(); 447 test_wait_empty(); 448 test_shutdown_then_next_polling(); 449 test_shutdown_then_next_with_timeout(); 450 test_cq_end_op(); 451 test_pluck(); 452 test_pluck_after_shutdown(); 453 test_cq_tls_cache_full(); 454 test_cq_tls_cache_empty(); 455 test_callback(); 456 grpc_shutdown(); 457 return 0; 458 } 459