Home | History | Annotate | Download | only in tests
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "test/core/end2end/end2end_tests.h"
     20 
     21 #include <stdio.h>
     22 #include <string.h>
     23 
     24 #include <grpc/byte_buffer.h>
     25 #include <grpc/grpc.h>
     26 #include <grpc/support/alloc.h>
     27 #include <grpc/support/log.h>
     28 #include <grpc/support/string_util.h>
     29 #include <grpc/support/time.h>
     30 
     31 #include "src/core/lib/channel/channel_args.h"
     32 #include "src/core/lib/gpr/string.h"
     33 #include "src/core/lib/gpr/useful.h"
     34 #include "src/core/lib/iomgr/exec_ctx.h"
     35 #include "src/core/lib/transport/static_metadata.h"
     36 
     37 #include "test/core/end2end/cq_verifier.h"
     38 #include "test/core/end2end/tests/cancel_test_helpers.h"
     39 
     40 static void* tag(intptr_t t) { return (void*)t; }
     41 
     42 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
     43                                             const char* test_name,
     44                                             grpc_channel_args* client_args,
     45                                             grpc_channel_args* server_args) {
     46   grpc_end2end_test_fixture f;
     47   gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
     48   f = config.create_fixture(client_args, server_args);
     49   config.init_server(&f, server_args);
     50   config.init_client(&f, client_args);
     51   return f;
     52 }
     53 
     54 static gpr_timespec n_seconds_from_now(int n) {
     55   return grpc_timeout_seconds_to_deadline(n);
     56 }
     57 
     58 static gpr_timespec five_seconds_from_now(void) {
     59   return n_seconds_from_now(5);
     60 }
     61 
     62 static void drain_cq(grpc_completion_queue* cq) {
     63   grpc_event ev;
     64   do {
     65     ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
     66   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
     67 }
     68 
     69 static void shutdown_server(grpc_end2end_test_fixture* f) {
     70   if (!f->server) return;
     71   grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
     72   GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
     73                                          grpc_timeout_seconds_to_deadline(5),
     74                                          nullptr)
     75                  .type == GRPC_OP_COMPLETE);
     76   grpc_server_destroy(f->server);
     77   f->server = nullptr;
     78 }
     79 
     80 static void shutdown_client(grpc_end2end_test_fixture* f) {
     81   if (!f->client) return;
     82   grpc_channel_destroy(f->client);
     83   f->client = nullptr;
     84 }
     85 
     86 static void end_test(grpc_end2end_test_fixture* f) {
     87   shutdown_server(f);
     88   shutdown_client(f);
     89 
     90   grpc_completion_queue_shutdown(f->cq);
     91   drain_cq(f->cq);
     92   grpc_completion_queue_destroy(f->cq);
     93   grpc_completion_queue_destroy(f->shutdown_cq);
     94 }
     95 
     96 // Tests retry cancellation.
     97 static void test_retry_cancellation(grpc_end2end_test_config config,
     98                                     cancellation_mode mode) {
     99   grpc_call* c;
    100   grpc_call* s;
    101   grpc_op ops[6];
    102   grpc_op* op;
    103   grpc_metadata_array initial_metadata_recv;
    104   grpc_metadata_array trailing_metadata_recv;
    105   grpc_metadata_array request_metadata_recv;
    106   grpc_call_details call_details;
    107   grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
    108   grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
    109   grpc_byte_buffer* request_payload =
    110       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
    111   grpc_byte_buffer* response_payload =
    112       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
    113   grpc_byte_buffer* request_payload_recv = nullptr;
    114   grpc_byte_buffer* response_payload_recv = nullptr;
    115   grpc_status_code status;
    116   grpc_call_error error;
    117   grpc_slice details;
    118   int was_cancelled = 2;
    119   char* peer;
    120 
    121   grpc_arg arg;
    122   arg.type = GRPC_ARG_STRING;
    123   arg.key = const_cast<char*>(GRPC_ARG_SERVICE_CONFIG);
    124   arg.value.string = const_cast<char*>(
    125       "{\n"
    126       "  \"methodConfig\": [ {\n"
    127       "    \"name\": [\n"
    128       "      { \"service\": \"service\", \"method\": \"method\" }\n"
    129       "    ],\n"
    130       "    \"retryPolicy\": {\n"
    131       "      \"maxAttempts\": 3,\n"
    132       "      \"initialBackoff\": \"1s\",\n"
    133       "      \"maxBackoff\": \"120s\",\n"
    134       "      \"backoffMultiplier\": 1.6,\n"
    135       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
    136       "    },\n"
    137       "    \"timeout\": \"5s\"\n"
    138       "  } ]\n"
    139       "}");
    140   grpc_channel_args client_args = {1, &arg};
    141   char* name;
    142   gpr_asprintf(&name, "retry_cancellation/%s", mode.name);
    143   grpc_end2end_test_fixture f = begin_test(config, name, &client_args, nullptr);
    144   gpr_free(name);
    145 
    146   cq_verifier* cqv = cq_verifier_create(f.cq);
    147 
    148   gpr_timespec deadline = five_seconds_from_now();
    149   c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
    150                                grpc_slice_from_static_string("/service/method"),
    151                                nullptr, deadline, nullptr);
    152   GPR_ASSERT(c);
    153 
    154   peer = grpc_call_get_peer(c);
    155   GPR_ASSERT(peer != nullptr);
    156   gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
    157   gpr_free(peer);
    158 
    159   grpc_metadata_array_init(&initial_metadata_recv);
    160   grpc_metadata_array_init(&trailing_metadata_recv);
    161   grpc_metadata_array_init(&request_metadata_recv);
    162   grpc_call_details_init(&call_details);
    163   grpc_slice status_details = grpc_slice_from_static_string("xyz");
    164 
    165   // Client starts a batch with all 6 ops.
    166   memset(ops, 0, sizeof(ops));
    167   op = ops;
    168   op->op = GRPC_OP_SEND_INITIAL_METADATA;
    169   op->data.send_initial_metadata.count = 0;
    170   op++;
    171   op->op = GRPC_OP_SEND_MESSAGE;
    172   op->data.send_message.send_message = request_payload;
    173   op++;
    174   op->op = GRPC_OP_RECV_MESSAGE;
    175   op->data.recv_message.recv_message = &response_payload_recv;
    176   op++;
    177   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
    178   op++;
    179   op->op = GRPC_OP_RECV_INITIAL_METADATA;
    180   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
    181   op++;
    182   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
    183   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
    184   op->data.recv_status_on_client.status = &status;
    185   op->data.recv_status_on_client.status_details = &details;
    186   op++;
    187   error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), nullptr);
    188   GPR_ASSERT(GRPC_CALL_OK == error);
    189 
    190   // Server gets a call and fails with retryable status.
    191   error =
    192       grpc_server_request_call(f.server, &s, &call_details,
    193                                &request_metadata_recv, f.cq, f.cq, tag(101));
    194   GPR_ASSERT(GRPC_CALL_OK == error);
    195   CQ_EXPECT_COMPLETION(cqv, tag(101), true);
    196   cq_verify(cqv);
    197 
    198   peer = grpc_call_get_peer(s);
    199   GPR_ASSERT(peer != nullptr);
    200   gpr_log(GPR_DEBUG, "server_peer=%s", peer);
    201   gpr_free(peer);
    202   peer = grpc_call_get_peer(c);
    203   GPR_ASSERT(peer != nullptr);
    204   gpr_log(GPR_DEBUG, "client_peer=%s", peer);
    205   gpr_free(peer);
    206 
    207   memset(ops, 0, sizeof(ops));
    208   op = ops;
    209   op->op = GRPC_OP_SEND_INITIAL_METADATA;
    210   op->data.send_initial_metadata.count = 0;
    211   op++;
    212   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
    213   op->data.send_status_from_server.trailing_metadata_count = 0;
    214   op->data.send_status_from_server.status = GRPC_STATUS_ABORTED;
    215   op->data.send_status_from_server.status_details = &status_details;
    216   op++;
    217   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
    218   op->data.recv_close_on_server.cancelled = &was_cancelled;
    219   op++;
    220   error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), nullptr);
    221   GPR_ASSERT(GRPC_CALL_OK == error);
    222 
    223   CQ_EXPECT_COMPLETION(cqv, tag(102), true);
    224   cq_verify(cqv);
    225 
    226   grpc_call_unref(s);
    227   grpc_metadata_array_destroy(&request_metadata_recv);
    228   grpc_metadata_array_init(&request_metadata_recv);
    229   grpc_call_details_destroy(&call_details);
    230   grpc_call_details_init(&call_details);
    231 
    232   // Server gets a second call (the retry).
    233   error =
    234       grpc_server_request_call(f.server, &s, &call_details,
    235                                &request_metadata_recv, f.cq, f.cq, tag(201));
    236   GPR_ASSERT(GRPC_CALL_OK == error);
    237   CQ_EXPECT_COMPLETION(cqv, tag(201), true);
    238   cq_verify(cqv);
    239 
    240   // Initiate cancellation.
    241   GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c, nullptr));
    242 
    243   CQ_EXPECT_COMPLETION(cqv, tag(1), true);
    244   cq_verify(cqv);
    245 
    246   GPR_ASSERT(status == mode.expect_status);
    247   GPR_ASSERT(was_cancelled == 1);
    248 
    249   grpc_slice_unref(details);
    250   grpc_metadata_array_destroy(&initial_metadata_recv);
    251   grpc_metadata_array_destroy(&trailing_metadata_recv);
    252   grpc_metadata_array_destroy(&request_metadata_recv);
    253   grpc_call_details_destroy(&call_details);
    254   grpc_byte_buffer_destroy(request_payload);
    255   grpc_byte_buffer_destroy(response_payload);
    256   grpc_byte_buffer_destroy(request_payload_recv);
    257   grpc_byte_buffer_destroy(response_payload_recv);
    258 
    259   grpc_call_unref(c);
    260   grpc_call_unref(s);
    261 
    262   cq_verifier_destroy(cqv);
    263 
    264   end_test(&f);
    265   config.tear_down_data(&f);
    266 }
    267 
    268 void retry_cancellation(grpc_end2end_test_config config) {
    269   GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
    270   for (size_t i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); ++i) {
    271     test_retry_cancellation(config, cancellation_modes[i]);
    272   }
    273 }
    274 
    275 void retry_cancellation_pre_init(void) {}
    276