Home | History | Annotate | Download | only in fixtures
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "test/core/end2end/fixtures/proxy.h"
     20 
     21 #include <string.h>
     22 
     23 #include <grpc/support/alloc.h>
     24 #include <grpc/support/log.h>
     25 #include <grpc/support/sync.h>
     26 
     27 #include "src/core/lib/gpr/host_port.h"
     28 #include "src/core/lib/gpr/useful.h"
     29 #include "src/core/lib/gprpp/thd.h"
     30 #include "test/core/util/port.h"
     31 
     32 struct grpc_end2end_proxy {
     33   grpc_end2end_proxy()
     34       : proxy_port(nullptr),
     35         server_port(nullptr),
     36         cq(nullptr),
     37         server(nullptr),
     38         client(nullptr),
     39         shutdown(false),
     40         new_call(nullptr) {
     41     memset(&new_call_details, 0, sizeof(new_call_details));
     42     memset(&new_call_metadata, 0, sizeof(new_call_metadata));
     43   }
     44   grpc_core::Thread thd;
     45   char* proxy_port;
     46   char* server_port;
     47   grpc_completion_queue* cq;
     48   grpc_server* server;
     49   grpc_channel* client;
     50 
     51   int shutdown;
     52 
     53   /* requested call */
     54   grpc_call* new_call;
     55   grpc_call_details new_call_details;
     56   grpc_metadata_array new_call_metadata;
     57 };
     58 
     59 typedef struct {
     60   void (*func)(void* arg, int success);
     61   void* arg;
     62 } closure;
     63 
     64 typedef struct {
     65   gpr_refcount refs;
     66   grpc_end2end_proxy* proxy;
     67 
     68   grpc_call* c2p;
     69   grpc_call* p2s;
     70 
     71   grpc_metadata_array c2p_initial_metadata;
     72   grpc_metadata_array p2s_initial_metadata;
     73 
     74   grpc_byte_buffer* c2p_msg;
     75   grpc_byte_buffer* p2s_msg;
     76 
     77   grpc_metadata_array p2s_trailing_metadata;
     78   grpc_status_code p2s_status;
     79   grpc_slice p2s_status_details;
     80 
     81   int c2p_server_cancelled;
     82 } proxy_call;
     83 
     84 static void thread_main(void* arg);
     85 static void request_call(grpc_end2end_proxy* proxy);
     86 
     87 grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def,
     88                                               grpc_channel_args* client_args,
     89                                               grpc_channel_args* server_args) {
     90   int proxy_port = grpc_pick_unused_port_or_die();
     91   int server_port = grpc_pick_unused_port_or_die();
     92 
     93   grpc_end2end_proxy* proxy = grpc_core::New<grpc_end2end_proxy>();
     94 
     95   gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port);
     96   gpr_join_host_port(&proxy->server_port, "localhost", server_port);
     97 
     98   gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port,
     99           proxy->server_port);
    100 
    101   proxy->cq = grpc_completion_queue_create_for_next(nullptr);
    102   proxy->server = def->create_server(proxy->proxy_port, server_args);
    103   proxy->client = def->create_client(proxy->server_port, client_args);
    104 
    105   grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
    106   grpc_server_start(proxy->server);
    107 
    108   grpc_call_details_init(&proxy->new_call_details);
    109   proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
    110   proxy->thd.Start();
    111 
    112   request_call(proxy);
    113 
    114   return proxy;
    115 }
    116 
    117 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
    118   closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
    119   cl->func = func;
    120   cl->arg = arg;
    121   return cl;
    122 }
    123 
    124 static void shutdown_complete(void* arg, int success) {
    125   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
    126   proxy->shutdown = 1;
    127   grpc_completion_queue_shutdown(proxy->cq);
    128 }
    129 
    130 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
    131   grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
    132                                   new_closure(shutdown_complete, proxy));
    133   proxy->thd.Join();
    134   gpr_free(proxy->proxy_port);
    135   gpr_free(proxy->server_port);
    136   grpc_server_destroy(proxy->server);
    137   grpc_channel_destroy(proxy->client);
    138   grpc_completion_queue_destroy(proxy->cq);
    139   grpc_call_details_destroy(&proxy->new_call_details);
    140   grpc_core::Delete(proxy);
    141 }
    142 
    143 static void unrefpc(proxy_call* pc, const char* reason) {
    144   if (gpr_unref(&pc->refs)) {
    145     grpc_call_unref(pc->c2p);
    146     grpc_call_unref(pc->p2s);
    147     grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
    148     grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
    149     grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
    150     grpc_slice_unref(pc->p2s_status_details);
    151     gpr_free(pc);
    152   }
    153 }
    154 
    155 static void refpc(proxy_call* pc, const char* reason) { gpr_ref(&pc->refs); }
    156 
    157 static void on_c2p_sent_initial_metadata(void* arg, int success) {
    158   proxy_call* pc = static_cast<proxy_call*>(arg);
    159   unrefpc(pc, "on_c2p_sent_initial_metadata");
    160 }
    161 
    162 static void on_p2s_recv_initial_metadata(void* arg, int success) {
    163   proxy_call* pc = static_cast<proxy_call*>(arg);
    164   grpc_op op;
    165   grpc_call_error err;
    166 
    167   memset(&op, 0, sizeof(op));
    168   if (!pc->proxy->shutdown) {
    169     op.op = GRPC_OP_SEND_INITIAL_METADATA;
    170     op.flags = 0;
    171     op.reserved = nullptr;
    172     op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
    173     op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
    174     refpc(pc, "on_c2p_sent_initial_metadata");
    175     err = grpc_call_start_batch(pc->c2p, &op, 1,
    176                                 new_closure(on_c2p_sent_initial_metadata, pc),
    177                                 nullptr);
    178     GPR_ASSERT(err == GRPC_CALL_OK);
    179   }
    180 
    181   unrefpc(pc, "on_p2s_recv_initial_metadata");
    182 }
    183 
    184 static void on_p2s_sent_initial_metadata(void* arg, int success) {
    185   proxy_call* pc = static_cast<proxy_call*>(arg);
    186   unrefpc(pc, "on_p2s_sent_initial_metadata");
    187 }
    188 
    189 static void on_c2p_recv_msg(void* arg, int success);
    190 
    191 static void on_p2s_sent_message(void* arg, int success) {
    192   proxy_call* pc = static_cast<proxy_call*>(arg);
    193   grpc_op op;
    194   grpc_call_error err;
    195 
    196   grpc_byte_buffer_destroy(pc->c2p_msg);
    197   if (!pc->proxy->shutdown && success) {
    198     op.op = GRPC_OP_RECV_MESSAGE;
    199     op.flags = 0;
    200     op.reserved = nullptr;
    201     op.data.recv_message.recv_message = &pc->c2p_msg;
    202     refpc(pc, "on_c2p_recv_msg");
    203     err = grpc_call_start_batch(pc->c2p, &op, 1,
    204                                 new_closure(on_c2p_recv_msg, pc), nullptr);
    205     GPR_ASSERT(err == GRPC_CALL_OK);
    206   }
    207 
    208   unrefpc(pc, "on_p2s_sent_message");
    209 }
    210 
    211 static void on_p2s_sent_close(void* arg, int success) {
    212   proxy_call* pc = static_cast<proxy_call*>(arg);
    213   unrefpc(pc, "on_p2s_sent_close");
    214 }
    215 
    216 static void on_c2p_recv_msg(void* arg, int success) {
    217   proxy_call* pc = static_cast<proxy_call*>(arg);
    218   grpc_op op;
    219   grpc_call_error err;
    220 
    221   if (!pc->proxy->shutdown && success) {
    222     if (pc->c2p_msg != nullptr) {
    223       op.op = GRPC_OP_SEND_MESSAGE;
    224       op.flags = 0;
    225       op.reserved = nullptr;
    226       op.data.send_message.send_message = pc->c2p_msg;
    227       refpc(pc, "on_p2s_sent_message");
    228       err = grpc_call_start_batch(
    229           pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
    230       GPR_ASSERT(err == GRPC_CALL_OK);
    231     } else {
    232       op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
    233       op.flags = 0;
    234       op.reserved = nullptr;
    235       refpc(pc, "on_p2s_sent_close");
    236       err = grpc_call_start_batch(pc->p2s, &op, 1,
    237                                   new_closure(on_p2s_sent_close, pc), nullptr);
    238       GPR_ASSERT(err == GRPC_CALL_OK);
    239     }
    240   } else {
    241     if (pc->c2p_msg != nullptr) {
    242       grpc_byte_buffer_destroy(pc->c2p_msg);
    243     }
    244   }
    245 
    246   unrefpc(pc, "on_c2p_recv_msg");
    247 }
    248 
    249 static void on_p2s_recv_msg(void* arg, int success);
    250 
    251 static void on_c2p_sent_message(void* arg, int success) {
    252   proxy_call* pc = static_cast<proxy_call*>(arg);
    253   grpc_op op;
    254   grpc_call_error err;
    255 
    256   grpc_byte_buffer_destroy(pc->p2s_msg);
    257   if (!pc->proxy->shutdown && success) {
    258     op.op = GRPC_OP_RECV_MESSAGE;
    259     op.flags = 0;
    260     op.reserved = nullptr;
    261     op.data.recv_message.recv_message = &pc->p2s_msg;
    262     refpc(pc, "on_p2s_recv_msg");
    263     err = grpc_call_start_batch(pc->p2s, &op, 1,
    264                                 new_closure(on_p2s_recv_msg, pc), nullptr);
    265     GPR_ASSERT(err == GRPC_CALL_OK);
    266   }
    267 
    268   unrefpc(pc, "on_c2p_sent_message");
    269 }
    270 
    271 static void on_p2s_recv_msg(void* arg, int success) {
    272   proxy_call* pc = static_cast<proxy_call*>(arg);
    273   grpc_op op;
    274   grpc_call_error err;
    275 
    276   if (!pc->proxy->shutdown && success && pc->p2s_msg) {
    277     op.op = GRPC_OP_SEND_MESSAGE;
    278     op.flags = 0;
    279     op.reserved = nullptr;
    280     op.data.send_message.send_message = pc->p2s_msg;
    281     refpc(pc, "on_c2p_sent_message");
    282     err = grpc_call_start_batch(pc->c2p, &op, 1,
    283                                 new_closure(on_c2p_sent_message, pc), nullptr);
    284     GPR_ASSERT(err == GRPC_CALL_OK);
    285   } else {
    286     grpc_byte_buffer_destroy(pc->p2s_msg);
    287   }
    288   unrefpc(pc, "on_p2s_recv_msg");
    289 }
    290 
    291 static void on_c2p_sent_status(void* arg, int success) {
    292   proxy_call* pc = static_cast<proxy_call*>(arg);
    293   unrefpc(pc, "on_c2p_sent_status");
    294 }
    295 
    296 static void on_p2s_status(void* arg, int success) {
    297   proxy_call* pc = static_cast<proxy_call*>(arg);
    298   grpc_op op;
    299   grpc_call_error err;
    300 
    301   if (!pc->proxy->shutdown) {
    302     GPR_ASSERT(success);
    303     op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
    304     op.flags = 0;
    305     op.reserved = nullptr;
    306     op.data.send_status_from_server.trailing_metadata_count =
    307         pc->p2s_trailing_metadata.count;
    308     op.data.send_status_from_server.trailing_metadata =
    309         pc->p2s_trailing_metadata.metadata;
    310     op.data.send_status_from_server.status = pc->p2s_status;
    311     op.data.send_status_from_server.status_details = &pc->p2s_status_details;
    312     refpc(pc, "on_c2p_sent_status");
    313     err = grpc_call_start_batch(pc->c2p, &op, 1,
    314                                 new_closure(on_c2p_sent_status, pc), nullptr);
    315     GPR_ASSERT(err == GRPC_CALL_OK);
    316   }
    317 
    318   unrefpc(pc, "on_p2s_status");
    319 }
    320 
    321 static void on_c2p_closed(void* arg, int success) {
    322   proxy_call* pc = static_cast<proxy_call*>(arg);
    323   unrefpc(pc, "on_c2p_closed");
    324 }
    325 
    326 static void on_new_call(void* arg, int success) {
    327   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
    328   grpc_call_error err;
    329 
    330   if (success) {
    331     grpc_op op;
    332     memset(&op, 0, sizeof(op));
    333     proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
    334     memset(pc, 0, sizeof(*pc));
    335     pc->proxy = proxy;
    336     GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
    337              proxy->new_call_metadata);
    338     pc->c2p = proxy->new_call;
    339     pc->p2s = grpc_channel_create_call(
    340         proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
    341         proxy->new_call_details.method, &proxy->new_call_details.host,
    342         proxy->new_call_details.deadline, nullptr);
    343     gpr_ref_init(&pc->refs, 1);
    344 
    345     op.reserved = nullptr;
    346 
    347     op.op = GRPC_OP_RECV_INITIAL_METADATA;
    348     op.flags = 0;
    349     op.data.recv_initial_metadata.recv_initial_metadata =
    350         &pc->p2s_initial_metadata;
    351     refpc(pc, "on_p2s_recv_initial_metadata");
    352     err = grpc_call_start_batch(pc->p2s, &op, 1,
    353                                 new_closure(on_p2s_recv_initial_metadata, pc),
    354                                 nullptr);
    355     GPR_ASSERT(err == GRPC_CALL_OK);
    356 
    357     op.op = GRPC_OP_SEND_INITIAL_METADATA;
    358     op.flags = proxy->new_call_details.flags;
    359     op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
    360     op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
    361     refpc(pc, "on_p2s_sent_initial_metadata");
    362     err = grpc_call_start_batch(pc->p2s, &op, 1,
    363                                 new_closure(on_p2s_sent_initial_metadata, pc),
    364                                 nullptr);
    365     GPR_ASSERT(err == GRPC_CALL_OK);
    366 
    367     op.op = GRPC_OP_RECV_MESSAGE;
    368     op.flags = 0;
    369     op.data.recv_message.recv_message = &pc->c2p_msg;
    370     refpc(pc, "on_c2p_recv_msg");
    371     err = grpc_call_start_batch(pc->c2p, &op, 1,
    372                                 new_closure(on_c2p_recv_msg, pc), nullptr);
    373     GPR_ASSERT(err == GRPC_CALL_OK);
    374 
    375     op.op = GRPC_OP_RECV_MESSAGE;
    376     op.flags = 0;
    377     op.data.recv_message.recv_message = &pc->p2s_msg;
    378     refpc(pc, "on_p2s_recv_msg");
    379     err = grpc_call_start_batch(pc->p2s, &op, 1,
    380                                 new_closure(on_p2s_recv_msg, pc), nullptr);
    381     GPR_ASSERT(err == GRPC_CALL_OK);
    382 
    383     op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
    384     op.flags = 0;
    385     op.data.recv_status_on_client.trailing_metadata =
    386         &pc->p2s_trailing_metadata;
    387     op.data.recv_status_on_client.status = &pc->p2s_status;
    388     op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
    389     refpc(pc, "on_p2s_status");
    390     err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
    391                                 nullptr);
    392     GPR_ASSERT(err == GRPC_CALL_OK);
    393 
    394     op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
    395     op.flags = 0;
    396     op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
    397     refpc(pc, "on_c2p_closed");
    398     err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
    399                                 nullptr);
    400     GPR_ASSERT(err == GRPC_CALL_OK);
    401 
    402     request_call(proxy);
    403 
    404     grpc_call_details_destroy(&proxy->new_call_details);
    405     grpc_call_details_init(&proxy->new_call_details);
    406 
    407     unrefpc(pc, "init");
    408   } else {
    409     GPR_ASSERT(proxy->new_call == nullptr);
    410   }
    411 }
    412 
    413 static void request_call(grpc_end2end_proxy* proxy) {
    414   proxy->new_call = nullptr;
    415   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
    416                                  proxy->server, &proxy->new_call,
    417                                  &proxy->new_call_details,
    418                                  &proxy->new_call_metadata, proxy->cq,
    419                                  proxy->cq, new_closure(on_new_call, proxy)));
    420 }
    421 
    422 static void thread_main(void* arg) {
    423   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
    424   closure* cl;
    425   for (;;) {
    426     grpc_event ev = grpc_completion_queue_next(
    427         proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
    428     switch (ev.type) {
    429       case GRPC_QUEUE_TIMEOUT:
    430         gpr_log(GPR_ERROR, "Should never reach here");
    431         abort();
    432       case GRPC_QUEUE_SHUTDOWN:
    433         return;
    434       case GRPC_OP_COMPLETE:
    435         cl = static_cast<closure*>(ev.tag);
    436         cl->func(cl->arg, ev.success);
    437         gpr_free(cl);
    438         break;
    439     }
    440   }
    441 }
    442 
    443 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
    444   return proxy->proxy_port;
    445 }
    446 
    447 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
    448   return proxy->server_port;
    449 }
    450