Home | History | Annotate | Download | only in surface
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "src/core/lib/surface/completion_queue.h"
     20 
     21 #include <grpc/support/alloc.h>
     22 #include <grpc/support/log.h>
     23 #include <grpc/support/time.h>
     24 #include "src/core/lib/gpr/useful.h"
     25 #include "src/core/lib/gprpp/memory.h"
     26 #include "src/core/lib/iomgr/iomgr.h"
     27 #include "test/core/util/test_config.h"
     28 
     29 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
     30 
     31 static void* create_test_tag(void) {
     32   static intptr_t i = 0;
     33   return (void*)(++i);
     34 }
     35 
     36 /* helper for tests to shutdown correctly and tersely */
     37 static void shutdown_and_destroy(grpc_completion_queue* cc) {
     38   grpc_event ev;
     39   grpc_completion_queue_shutdown(cc);
     40 
     41   switch (grpc_get_cq_completion_type(cc)) {
     42     case GRPC_CQ_NEXT: {
     43       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
     44                                       nullptr);
     45       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
     46       break;
     47     }
     48     case GRPC_CQ_PLUCK: {
     49       ev = grpc_completion_queue_pluck(
     50           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
     51       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
     52       break;
     53     }
     54     case GRPC_CQ_CALLBACK: {
     55       // Nothing to do here. The shutdown callback will be invoked when
     56       // possible.
     57       break;
     58     }
     59     default: {
     60       gpr_log(GPR_ERROR, "Unknown completion type");
     61       break;
     62     }
     63   }
     64 
     65   grpc_completion_queue_destroy(cc);
     66 }
     67 
     68 /* ensure we can create and destroy a completion channel */
     69 static void test_no_op(void) {
     70   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
     71   grpc_cq_polling_type polling_types[] = {
     72       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
     73   grpc_completion_queue_attributes attr;
     74   LOG_TEST("test_no_op");
     75 
     76   attr.version = 1;
     77   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
     78     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
     79       attr.cq_completion_type = completion_types[i];
     80       attr.cq_polling_type = polling_types[j];
     81       shutdown_and_destroy(grpc_completion_queue_create(
     82           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
     83     }
     84   }
     85 }
     86 
     87 static void test_pollset_conversion(void) {
     88   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
     89   grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
     90                                           GRPC_CQ_NON_LISTENING};
     91   grpc_completion_queue* cq;
     92   grpc_completion_queue_attributes attr;
     93 
     94   LOG_TEST("test_pollset_conversion");
     95 
     96   attr.version = 1;
     97   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
     98     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
     99       attr.cq_completion_type = completion_types[i];
    100       attr.cq_polling_type = polling_types[j];
    101       cq = grpc_completion_queue_create(
    102           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    103       GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
    104       shutdown_and_destroy(cq);
    105     }
    106   }
    107 }
    108 
    109 static void test_wait_empty(void) {
    110   grpc_cq_polling_type polling_types[] = {
    111       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    112   grpc_completion_queue* cc;
    113   grpc_completion_queue_attributes attr;
    114   grpc_event event;
    115 
    116   LOG_TEST("test_wait_empty");
    117 
    118   attr.version = 1;
    119   attr.cq_completion_type = GRPC_CQ_NEXT;
    120   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    121     attr.cq_polling_type = polling_types[i];
    122     cc = grpc_completion_queue_create(
    123         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    124     event =
    125         grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
    126     GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
    127     shutdown_and_destroy(cc);
    128   }
    129 }
    130 
    131 static void do_nothing_end_completion(void* arg, grpc_cq_completion* c) {}
    132 
    133 static void test_cq_end_op(void) {
    134   grpc_event ev;
    135   grpc_completion_queue* cc;
    136   grpc_cq_completion completion;
    137   grpc_cq_polling_type polling_types[] = {
    138       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    139   grpc_completion_queue_attributes attr;
    140   void* tag = create_test_tag();
    141 
    142   LOG_TEST("test_cq_end_op");
    143 
    144   attr.version = 1;
    145   attr.cq_completion_type = GRPC_CQ_NEXT;
    146   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    147     grpc_core::ExecCtx exec_ctx;
    148     attr.cq_polling_type = polling_types[i];
    149     cc = grpc_completion_queue_create(
    150         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    151 
    152     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
    153     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
    154                    &completion);
    155 
    156     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
    157                                     nullptr);
    158     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
    159     GPR_ASSERT(ev.tag == tag);
    160     GPR_ASSERT(ev.success);
    161 
    162     shutdown_and_destroy(cc);
    163   }
    164 }
    165 
    166 static void test_cq_tls_cache_full(void) {
    167   grpc_event ev;
    168   grpc_completion_queue* cc;
    169   grpc_cq_completion completion;
    170   grpc_cq_polling_type polling_types[] = {
    171       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    172   grpc_completion_queue_attributes attr;
    173   void* tag = create_test_tag();
    174   void* res_tag;
    175   int ok;
    176 
    177   LOG_TEST("test_cq_tls_cache_full");
    178 
    179   attr.version = 1;
    180   attr.cq_completion_type = GRPC_CQ_NEXT;
    181   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    182     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
    183     attr.cq_polling_type = polling_types[i];
    184     cc = grpc_completion_queue_create(
    185         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    186 
    187     grpc_completion_queue_thread_local_cache_init(cc);
    188     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
    189     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
    190                    &completion);
    191 
    192     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
    193                                     nullptr);
    194     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
    195 
    196     GPR_ASSERT(
    197         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
    198     GPR_ASSERT(res_tag == tag);
    199     GPR_ASSERT(ok);
    200 
    201     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
    202                                     nullptr);
    203     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
    204 
    205     shutdown_and_destroy(cc);
    206   }
    207 }
    208 
    209 static void test_cq_tls_cache_empty(void) {
    210   grpc_completion_queue* cc;
    211   grpc_cq_polling_type polling_types[] = {
    212       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    213   grpc_completion_queue_attributes attr;
    214   void* res_tag;
    215   int ok;
    216 
    217   LOG_TEST("test_cq_tls_cache_empty");
    218 
    219   attr.version = 1;
    220   attr.cq_completion_type = GRPC_CQ_NEXT;
    221   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    222     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
    223     attr.cq_polling_type = polling_types[i];
    224     cc = grpc_completion_queue_create(
    225         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    226 
    227     GPR_ASSERT(
    228         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
    229     grpc_completion_queue_thread_local_cache_init(cc);
    230     GPR_ASSERT(
    231         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
    232     shutdown_and_destroy(cc);
    233   }
    234 }
    235 
    236 static void test_shutdown_then_next_polling(void) {
    237   grpc_cq_polling_type polling_types[] = {
    238       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    239   grpc_completion_queue* cc;
    240   grpc_completion_queue_attributes attr;
    241   grpc_event event;
    242   LOG_TEST("test_shutdown_then_next_polling");
    243 
    244   attr.version = 1;
    245   attr.cq_completion_type = GRPC_CQ_NEXT;
    246   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    247     attr.cq_polling_type = polling_types[i];
    248     cc = grpc_completion_queue_create(
    249         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    250     grpc_completion_queue_shutdown(cc);
    251     event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
    252                                        nullptr);
    253     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
    254     grpc_completion_queue_destroy(cc);
    255   }
    256 }
    257 
    258 static void test_shutdown_then_next_with_timeout(void) {
    259   grpc_cq_polling_type polling_types[] = {
    260       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    261   grpc_completion_queue* cc;
    262   grpc_completion_queue_attributes attr;
    263   grpc_event event;
    264   LOG_TEST("test_shutdown_then_next_with_timeout");
    265 
    266   attr.version = 1;
    267   attr.cq_completion_type = GRPC_CQ_NEXT;
    268   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    269     attr.cq_polling_type = polling_types[i];
    270     cc = grpc_completion_queue_create(
    271         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    272 
    273     grpc_completion_queue_shutdown(cc);
    274     event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
    275                                        nullptr);
    276     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
    277     grpc_completion_queue_destroy(cc);
    278   }
    279 }
    280 
    281 static void test_pluck(void) {
    282   grpc_event ev;
    283   grpc_completion_queue* cc;
    284   void* tags[128];
    285   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
    286   grpc_cq_polling_type polling_types[] = {
    287       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    288   grpc_completion_queue_attributes attr;
    289   unsigned i, j;
    290 
    291   LOG_TEST("test_pluck");
    292 
    293   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    294     tags[i] = create_test_tag();
    295     for (j = 0; j < i; j++) {
    296       GPR_ASSERT(tags[i] != tags[j]);
    297     }
    298   }
    299 
    300   attr.version = 1;
    301   attr.cq_completion_type = GRPC_CQ_PLUCK;
    302   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
    303     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
    304     attr.cq_polling_type = polling_types[pidx];
    305     cc = grpc_completion_queue_create(
    306         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    307 
    308     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    309       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
    310       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
    311                      nullptr, &completions[i]);
    312     }
    313 
    314     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    315       ev = grpc_completion_queue_pluck(
    316           cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
    317       GPR_ASSERT(ev.tag == tags[i]);
    318     }
    319 
    320     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    321       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
    322       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
    323                      nullptr, &completions[i]);
    324     }
    325 
    326     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    327       ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
    328                                        gpr_inf_past(GPR_CLOCK_REALTIME),
    329                                        nullptr);
    330       GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
    331     }
    332 
    333     shutdown_and_destroy(cc);
    334   }
    335 }
    336 
    337 static void test_pluck_after_shutdown(void) {
    338   grpc_cq_polling_type polling_types[] = {
    339       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    340   grpc_event ev;
    341   grpc_completion_queue* cc;
    342   grpc_completion_queue_attributes attr;
    343 
    344   LOG_TEST("test_pluck_after_shutdown");
    345 
    346   attr.version = 1;
    347   attr.cq_completion_type = GRPC_CQ_PLUCK;
    348   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
    349     attr.cq_polling_type = polling_types[i];
    350     cc = grpc_completion_queue_create(
    351         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    352     grpc_completion_queue_shutdown(cc);
    353     ev = grpc_completion_queue_pluck(
    354         cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
    355     GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
    356     grpc_completion_queue_destroy(cc);
    357   }
    358 }
    359 
    360 static void test_callback(void) {
    361   grpc_completion_queue* cc;
    362   void* tags[128];
    363   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
    364   grpc_cq_polling_type polling_types[] = {
    365       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
    366   grpc_completion_queue_attributes attr;
    367   unsigned i;
    368 
    369   LOG_TEST("test_callback");
    370 
    371   bool got_shutdown = false;
    372   class ShutdownCallback : public grpc_experimental_completion_queue_functor {
    373    public:
    374     ShutdownCallback(bool* done) : done_(done) {
    375       functor_run = &ShutdownCallback::Run;
    376     }
    377     ~ShutdownCallback() {}
    378     static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
    379       *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
    380     }
    381 
    382    private:
    383     bool* done_;
    384   };
    385   ShutdownCallback shutdown_cb(&got_shutdown);
    386 
    387   attr.version = 2;
    388   attr.cq_completion_type = GRPC_CQ_CALLBACK;
    389   attr.cq_shutdown_cb = &shutdown_cb;
    390 
    391   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
    392     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
    393     attr.cq_polling_type = polling_types[pidx];
    394     cc = grpc_completion_queue_create(
    395         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
    396 
    397     int counter = 0;
    398     class TagCallback : public grpc_experimental_completion_queue_functor {
    399      public:
    400       TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
    401         functor_run = &TagCallback::Run;
    402       }
    403       ~TagCallback() {}
    404       static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
    405         GPR_ASSERT(static_cast<bool>(ok));
    406         auto* callback = static_cast<TagCallback*>(cb);
    407         *callback->counter_ += callback->tag_;
    408         grpc_core::Delete(callback);
    409       };
    410 
    411      private:
    412       int* counter_;
    413       int tag_;
    414     };
    415 
    416     int sumtags = 0;
    417     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    418       tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
    419       sumtags += i;
    420     }
    421 
    422     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
    423       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
    424       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
    425                      nullptr, &completions[i]);
    426     }
    427 
    428     GPR_ASSERT(sumtags == counter);
    429 
    430     shutdown_and_destroy(cc);
    431 
    432     GPR_ASSERT(got_shutdown);
    433     got_shutdown = false;
    434   }
    435 }
    436 
    437 struct thread_state {
    438   grpc_completion_queue* cc;
    439   void* tag;
    440 };
    441 
    442 int main(int argc, char** argv) {
    443   grpc_test_init(argc, argv);
    444   grpc_init();
    445   test_no_op();
    446   test_pollset_conversion();
    447   test_wait_empty();
    448   test_shutdown_then_next_polling();
    449   test_shutdown_then_next_with_timeout();
    450   test_cq_end_op();
    451   test_pluck();
    452   test_pluck_after_shutdown();
    453   test_cq_tls_cache_full();
    454   test_cq_tls_cache_empty();
    455   test_callback();
    456   grpc_shutdown();
    457   return 0;
    458 }
    459