Home | History | Annotate | Download | only in fixtures
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "test/core/end2end/fixtures/http_proxy_fixture.h"
     20 
     21 #include "src/core/lib/iomgr/sockaddr.h"
     22 
     23 #include <string.h>
     24 
     25 #include <grpc/grpc.h>
     26 #include <grpc/slice_buffer.h>
     27 #include <grpc/support/alloc.h>
     28 #include <grpc/support/atm.h>
     29 #include <grpc/support/log.h>
     30 #include <grpc/support/string_util.h>
     31 #include <grpc/support/sync.h>
     32 
     33 #include "src/core/lib/channel/channel_args.h"
     34 #include "src/core/lib/gpr/host_port.h"
     35 #include "src/core/lib/gpr/string.h"
     36 #include "src/core/lib/gprpp/thd.h"
     37 #include "src/core/lib/http/parser.h"
     38 #include "src/core/lib/iomgr/closure.h"
     39 #include "src/core/lib/iomgr/combiner.h"
     40 #include "src/core/lib/iomgr/endpoint.h"
     41 #include "src/core/lib/iomgr/error.h"
     42 #include "src/core/lib/iomgr/exec_ctx.h"
     43 #include "src/core/lib/iomgr/pollset.h"
     44 #include "src/core/lib/iomgr/pollset_set.h"
     45 #include "src/core/lib/iomgr/resolve_address.h"
     46 #include "src/core/lib/iomgr/sockaddr_utils.h"
     47 #include "src/core/lib/iomgr/tcp_client.h"
     48 #include "src/core/lib/iomgr/tcp_server.h"
     49 #include "src/core/lib/iomgr/timer.h"
     50 #include "src/core/lib/slice/b64.h"
     51 #include "src/core/lib/slice/slice_internal.h"
     52 #include "test/core/util/port.h"
     53 
     54 struct grpc_end2end_http_proxy {
     55   grpc_end2end_http_proxy()
     56       : proxy_name(nullptr),
     57         server(nullptr),
     58         channel_args(nullptr),
     59         mu(nullptr),
     60         pollset(nullptr),
     61         combiner(nullptr) {
     62     gpr_ref_init(&users, 1);
     63     combiner = grpc_combiner_create();
     64   }
     65   char* proxy_name;
     66   grpc_core::Thread thd;
     67   grpc_tcp_server* server;
     68   grpc_channel_args* channel_args;
     69   gpr_mu* mu;
     70   grpc_pollset* pollset;
     71   gpr_refcount users;
     72 
     73   grpc_combiner* combiner;
     74 };
     75 
     76 //
     77 // Connection handling
     78 //
     79 
     80 // proxy_connection structure is only accessed in the closures which are all
     81 // scheduled under the same combiner lock. So there is is no need for a mutex to
     82 // protect this structure.
     83 typedef struct proxy_connection {
     84   grpc_end2end_http_proxy* proxy;
     85 
     86   grpc_endpoint* client_endpoint;
     87   grpc_endpoint* server_endpoint;
     88 
     89   gpr_refcount refcount;
     90 
     91   grpc_pollset_set* pollset_set;
     92 
     93   // NOTE: All the closures execute under proxy->combiner lock. Which means
     94   // there will not be any data-races between the closures
     95   grpc_closure on_read_request_done;
     96   grpc_closure on_server_connect_done;
     97   grpc_closure on_write_response_done;
     98   grpc_closure on_client_read_done;
     99   grpc_closure on_client_write_done;
    100   grpc_closure on_server_read_done;
    101   grpc_closure on_server_write_done;
    102 
    103   bool client_read_failed : 1;
    104   bool client_write_failed : 1;
    105   bool client_shutdown : 1;
    106   bool server_read_failed : 1;
    107   bool server_write_failed : 1;
    108   bool server_shutdown : 1;
    109 
    110   grpc_slice_buffer client_read_buffer;
    111   grpc_slice_buffer client_deferred_write_buffer;
    112   bool client_is_writing;
    113   grpc_slice_buffer client_write_buffer;
    114   grpc_slice_buffer server_read_buffer;
    115   grpc_slice_buffer server_deferred_write_buffer;
    116   bool server_is_writing;
    117   grpc_slice_buffer server_write_buffer;
    118 
    119   grpc_http_parser http_parser;
    120   grpc_http_request http_request;
    121 } proxy_connection;
    122 
    123 static void proxy_connection_ref(proxy_connection* conn, const char* reason) {
    124   gpr_ref(&conn->refcount);
    125 }
    126 
    127 // Helper function to destroy the proxy connection.
    128 static void proxy_connection_unref(proxy_connection* conn, const char* reason) {
    129   if (gpr_unref(&conn->refcount)) {
    130     gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint,
    131             conn->server_endpoint);
    132     grpc_endpoint_destroy(conn->client_endpoint);
    133     if (conn->server_endpoint != nullptr) {
    134       grpc_endpoint_destroy(conn->server_endpoint);
    135     }
    136     grpc_pollset_set_destroy(conn->pollset_set);
    137     grpc_slice_buffer_destroy_internal(&conn->client_read_buffer);
    138     grpc_slice_buffer_destroy_internal(&conn->client_deferred_write_buffer);
    139     grpc_slice_buffer_destroy_internal(&conn->client_write_buffer);
    140     grpc_slice_buffer_destroy_internal(&conn->server_read_buffer);
    141     grpc_slice_buffer_destroy_internal(&conn->server_deferred_write_buffer);
    142     grpc_slice_buffer_destroy_internal(&conn->server_write_buffer);
    143     grpc_http_parser_destroy(&conn->http_parser);
    144     grpc_http_request_destroy(&conn->http_request);
    145     gpr_unref(&conn->proxy->users);
    146     gpr_free(conn);
    147   }
    148 }
    149 
    150 enum failure_type {
    151   SETUP_FAILED,  // To be used before we start proxying.
    152   CLIENT_READ_FAILED,
    153   CLIENT_WRITE_FAILED,
    154   SERVER_READ_FAILED,
    155   SERVER_WRITE_FAILED,
    156 };
    157 
    158 // Helper function to shut down the proxy connection.
    159 static void proxy_connection_failed(proxy_connection* conn,
    160                                     failure_type failure, const char* prefix,
    161                                     grpc_error* error) {
    162   gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error));
    163   // Decide whether we should shut down the client and server.
    164   bool shutdown_client = false;
    165   bool shutdown_server = false;
    166   if (failure == SETUP_FAILED) {
    167     shutdown_client = true;
    168     shutdown_server = true;
    169   } else {
    170     if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) ||
    171         (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) ||
    172         (failure == SERVER_READ_FAILED && !conn->client_is_writing)) {
    173       shutdown_client = true;
    174     }
    175     if ((failure == SERVER_READ_FAILED && conn->server_write_failed) ||
    176         (failure == SERVER_WRITE_FAILED && conn->server_read_failed) ||
    177         (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) {
    178       shutdown_server = true;
    179     }
    180   }
    181   // If we decided to shut down either one and have not yet done so, do so.
    182   if (shutdown_client && !conn->client_shutdown) {
    183     grpc_endpoint_shutdown(conn->client_endpoint, GRPC_ERROR_REF(error));
    184     conn->client_shutdown = true;
    185   }
    186   if (shutdown_server && !conn->server_shutdown &&
    187       (conn->server_endpoint != nullptr)) {
    188     grpc_endpoint_shutdown(conn->server_endpoint, GRPC_ERROR_REF(error));
    189     conn->server_shutdown = true;
    190   }
    191   // Unref the connection.
    192   proxy_connection_unref(conn, "conn_failed");
    193   GRPC_ERROR_UNREF(error);
    194 }
    195 
    196 // Callback for writing proxy data to the client.
    197 static void on_client_write_done(void* arg, grpc_error* error) {
    198   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    199   conn->client_is_writing = false;
    200   if (error != GRPC_ERROR_NONE) {
    201     proxy_connection_failed(conn, CLIENT_WRITE_FAILED,
    202                             "HTTP proxy client write", GRPC_ERROR_REF(error));
    203     return;
    204   }
    205   // Clear write buffer (the data we just wrote).
    206   grpc_slice_buffer_reset_and_unref(&conn->client_write_buffer);
    207   // If more data was read from the server since we started this write,
    208   // write that data now.
    209   if (conn->client_deferred_write_buffer.length > 0) {
    210     grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer,
    211                                 &conn->client_write_buffer);
    212     conn->client_is_writing = true;
    213     grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
    214                         &conn->on_client_write_done, nullptr);
    215   } else {
    216     // No more writes.  Unref the connection.
    217     proxy_connection_unref(conn, "write_done");
    218   }
    219 }
    220 
    221 // Callback for writing proxy data to the backend server.
    222 static void on_server_write_done(void* arg, grpc_error* error) {
    223   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    224   conn->server_is_writing = false;
    225   if (error != GRPC_ERROR_NONE) {
    226     proxy_connection_failed(conn, SERVER_WRITE_FAILED,
    227                             "HTTP proxy server write", GRPC_ERROR_REF(error));
    228     return;
    229   }
    230   // Clear write buffer (the data we just wrote).
    231   grpc_slice_buffer_reset_and_unref(&conn->server_write_buffer);
    232   // If more data was read from the client since we started this write,
    233   // write that data now.
    234   if (conn->server_deferred_write_buffer.length > 0) {
    235     grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer,
    236                                 &conn->server_write_buffer);
    237     conn->server_is_writing = true;
    238     grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
    239                         &conn->on_server_write_done, nullptr);
    240   } else {
    241     // No more writes.  Unref the connection.
    242     proxy_connection_unref(conn, "server_write");
    243   }
    244 }
    245 
    246 // Callback for reading data from the client, which will be proxied to
    247 // the backend server.
    248 static void on_client_read_done(void* arg, grpc_error* error) {
    249   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    250   if (error != GRPC_ERROR_NONE) {
    251     proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read",
    252                             GRPC_ERROR_REF(error));
    253     return;
    254   }
    255   // If there is already a pending write (i.e., server_write_buffer is
    256   // not empty), then move the read data into server_deferred_write_buffer,
    257   // and the next write will be requested in on_server_write_done(), when
    258   // the current write is finished.
    259   //
    260   // Otherwise, move the read data into the write buffer and write it.
    261   if (conn->server_is_writing) {
    262     grpc_slice_buffer_move_into(&conn->client_read_buffer,
    263                                 &conn->server_deferred_write_buffer);
    264   } else {
    265     grpc_slice_buffer_move_into(&conn->client_read_buffer,
    266                                 &conn->server_write_buffer);
    267     proxy_connection_ref(conn, "client_read");
    268     conn->server_is_writing = true;
    269     grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
    270                         &conn->on_server_write_done, nullptr);
    271   }
    272   // Read more data.
    273   grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
    274                      &conn->on_client_read_done);
    275 }
    276 
    277 // Callback for reading data from the backend server, which will be
    278 // proxied to the client.
    279 static void on_server_read_done(void* arg, grpc_error* error) {
    280   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    281   if (error != GRPC_ERROR_NONE) {
    282     proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read",
    283                             GRPC_ERROR_REF(error));
    284     return;
    285   }
    286   // If there is already a pending write (i.e., client_write_buffer is
    287   // not empty), then move the read data into client_deferred_write_buffer,
    288   // and the next write will be requested in on_client_write_done(), when
    289   // the current write is finished.
    290   //
    291   // Otherwise, move the read data into the write buffer and write it.
    292   if (conn->client_is_writing) {
    293     grpc_slice_buffer_move_into(&conn->server_read_buffer,
    294                                 &conn->client_deferred_write_buffer);
    295   } else {
    296     grpc_slice_buffer_move_into(&conn->server_read_buffer,
    297                                 &conn->client_write_buffer);
    298     proxy_connection_ref(conn, "server_read");
    299     conn->client_is_writing = true;
    300     grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
    301                         &conn->on_client_write_done, nullptr);
    302   }
    303   // Read more data.
    304   grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
    305                      &conn->on_server_read_done);
    306 }
    307 
    308 // Callback to write the HTTP response for the CONNECT request.
    309 static void on_write_response_done(void* arg, grpc_error* error) {
    310   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    311   conn->client_is_writing = false;
    312   if (error != GRPC_ERROR_NONE) {
    313     proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy write response",
    314                             GRPC_ERROR_REF(error));
    315     return;
    316   }
    317   // Clear write buffer.
    318   grpc_slice_buffer_reset_and_unref(&conn->client_write_buffer);
    319   // Start reading from both client and server.  One of the read
    320   // requests inherits our ref to conn, but we need to take a new ref
    321   // for the other one.
    322   proxy_connection_ref(conn, "client_read");
    323   proxy_connection_ref(conn, "server_read");
    324   proxy_connection_unref(conn, "write_response");
    325   grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
    326                      &conn->on_client_read_done);
    327   grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
    328                      &conn->on_server_read_done);
    329 }
    330 
    331 // Callback to connect to the backend server specified by the HTTP
    332 // CONNECT request.
    333 static void on_server_connect_done(void* arg, grpc_error* error) {
    334   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    335   if (error != GRPC_ERROR_NONE) {
    336     // TODO(roth): Technically, in this case, we should handle the error
    337     // by returning an HTTP response to the client indicating that the
    338     // connection failed.  However, for the purposes of this test code,
    339     // it's fine to pretend this is a client-side error, which will
    340     // cause the client connection to be dropped.
    341     proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy server connect",
    342                             GRPC_ERROR_REF(error));
    343     return;
    344   }
    345   // We've established a connection, so send back a 200 response code to
    346   // the client.
    347   // The write callback inherits our reference to conn.
    348   grpc_slice slice =
    349       grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n");
    350   grpc_slice_buffer_add(&conn->client_write_buffer, slice);
    351   conn->client_is_writing = true;
    352   grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
    353                       &conn->on_write_response_done, nullptr);
    354 }
    355 
    356 /**
    357  * Parses the proxy auth header value to check if it matches :-
    358  * Basic <base64_encoded_expected_cred>
    359  * Returns true if it matches, false otherwise
    360  */
    361 static bool proxy_auth_header_matches(char* proxy_auth_header_val,
    362                                       char* expected_cred) {
    363   GPR_ASSERT(proxy_auth_header_val != nullptr);
    364   GPR_ASSERT(expected_cred != nullptr);
    365   if (strncmp(proxy_auth_header_val, "Basic ", 6) != 0) {
    366     return false;
    367   }
    368   proxy_auth_header_val += 6;
    369   grpc_slice decoded_slice = grpc_base64_decode(proxy_auth_header_val, 0);
    370   const bool header_matches =
    371       grpc_slice_str_cmp(decoded_slice, expected_cred) == 0;
    372   grpc_slice_unref_internal(decoded_slice);
    373   return header_matches;
    374 }
    375 
    376 // Callback to read the HTTP CONNECT request.
    377 // TODO(roth): Technically, for any of the failure modes handled by this
    378 // function, we should handle the error by returning an HTTP response to
    379 // the client indicating that the request failed.  However, for the purposes
    380 // of this test code, it's fine to pretend this is a client-side error,
    381 // which will cause the client connection to be dropped.
    382 static void on_read_request_done(void* arg, grpc_error* error) {
    383   proxy_connection* conn = static_cast<proxy_connection*>(arg);
    384   gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
    385           grpc_error_string(error));
    386   if (error != GRPC_ERROR_NONE) {
    387     proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
    388                             GRPC_ERROR_REF(error));
    389     return;
    390   }
    391   // Read request and feed it to the parser.
    392   for (size_t i = 0; i < conn->client_read_buffer.count; ++i) {
    393     if (GRPC_SLICE_LENGTH(conn->client_read_buffer.slices[i]) > 0) {
    394       error = grpc_http_parser_parse(
    395           &conn->http_parser, conn->client_read_buffer.slices[i], nullptr);
    396       if (error != GRPC_ERROR_NONE) {
    397         proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy request parse",
    398                                 GRPC_ERROR_REF(error));
    399         GRPC_ERROR_UNREF(error);
    400         return;
    401       }
    402     }
    403   }
    404   grpc_slice_buffer_reset_and_unref(&conn->client_read_buffer);
    405   // If we're not done reading the request, read more data.
    406   if (conn->http_parser.state != GRPC_HTTP_BODY) {
    407     grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
    408                        &conn->on_read_request_done);
    409     return;
    410   }
    411   // Make sure we got a CONNECT request.
    412   if (strcmp(conn->http_request.method, "CONNECT") != 0) {
    413     char* msg;
    414     gpr_asprintf(&msg, "HTTP proxy got request method %s",
    415                  conn->http_request.method);
    416     error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
    417     gpr_free(msg);
    418     proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
    419                             GRPC_ERROR_REF(error));
    420     GRPC_ERROR_UNREF(error);
    421     return;
    422   }
    423   // If proxy auth is being used, check if the header is present and as expected
    424   const grpc_arg* proxy_auth_arg = grpc_channel_args_find(
    425       conn->proxy->channel_args, GRPC_ARG_HTTP_PROXY_AUTH_CREDS);
    426   char* proxy_auth_str = grpc_channel_arg_get_string(proxy_auth_arg);
    427   if (proxy_auth_str != nullptr) {
    428     bool client_authenticated = false;
    429     for (size_t i = 0; i < conn->http_request.hdr_count; i++) {
    430       if (strcmp(conn->http_request.hdrs[i].key, "Proxy-Authorization") == 0) {
    431         client_authenticated = proxy_auth_header_matches(
    432             conn->http_request.hdrs[i].value, proxy_auth_str);
    433         break;
    434       }
    435     }
    436     if (!client_authenticated) {
    437       const char* msg = "HTTP Connect could not verify authentication";
    438       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
    439       proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy read request",
    440                               GRPC_ERROR_REF(error));
    441       GRPC_ERROR_UNREF(error);
    442       return;
    443     }
    444   }
    445   // Resolve address.
    446   grpc_resolved_addresses* resolved_addresses = nullptr;
    447   error = grpc_blocking_resolve_address(conn->http_request.path, "80",
    448                                         &resolved_addresses);
    449   if (error != GRPC_ERROR_NONE) {
    450     proxy_connection_failed(conn, SETUP_FAILED, "HTTP proxy DNS lookup",
    451                             GRPC_ERROR_REF(error));
    452     GRPC_ERROR_UNREF(error);
    453     return;
    454   }
    455   GPR_ASSERT(resolved_addresses->naddrs >= 1);
    456   // Connect to requested address.
    457   // The connection callback inherits our reference to conn.
    458   const grpc_millis deadline =
    459       grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC;
    460   grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint,
    461                           conn->pollset_set, nullptr,
    462                           &resolved_addresses->addrs[0], deadline);
    463   grpc_resolved_addresses_destroy(resolved_addresses);
    464 }
    465 
    466 static void on_accept(void* arg, grpc_endpoint* endpoint,
    467                       grpc_pollset* accepting_pollset,
    468                       grpc_tcp_server_acceptor* acceptor) {
    469   gpr_free(acceptor);
    470   grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
    471   // Instantiate proxy_connection.
    472   proxy_connection* conn =
    473       static_cast<proxy_connection*>(gpr_zalloc(sizeof(*conn)));
    474   gpr_ref(&proxy->users);
    475   conn->client_endpoint = endpoint;
    476   conn->proxy = proxy;
    477   gpr_ref_init(&conn->refcount, 1);
    478   conn->pollset_set = grpc_pollset_set_create();
    479   grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset);
    480   grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set);
    481   GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
    482                     grpc_combiner_scheduler(conn->proxy->combiner));
    483   GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn,
    484                     grpc_combiner_scheduler(conn->proxy->combiner));
    485   GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn,
    486                     grpc_combiner_scheduler(conn->proxy->combiner));
    487   GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
    488                     grpc_combiner_scheduler(conn->proxy->combiner));
    489   GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
    490                     grpc_combiner_scheduler(conn->proxy->combiner));
    491   GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
    492                     grpc_combiner_scheduler(conn->proxy->combiner));
    493   GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
    494                     grpc_combiner_scheduler(conn->proxy->combiner));
    495   grpc_slice_buffer_init(&conn->client_read_buffer);
    496   grpc_slice_buffer_init(&conn->client_deferred_write_buffer);
    497   conn->client_is_writing = false;
    498   grpc_slice_buffer_init(&conn->client_write_buffer);
    499   grpc_slice_buffer_init(&conn->server_read_buffer);
    500   grpc_slice_buffer_init(&conn->server_deferred_write_buffer);
    501   conn->server_is_writing = false;
    502   grpc_slice_buffer_init(&conn->server_write_buffer);
    503   grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST,
    504                         &conn->http_request);
    505   grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
    506                      &conn->on_read_request_done);
    507 }
    508 
    509 //
    510 // Proxy class
    511 //
    512 
    513 static void thread_main(void* arg) {
    514   grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
    515   grpc_core::ExecCtx exec_ctx;
    516   do {
    517     gpr_ref(&proxy->users);
    518     grpc_pollset_worker* worker = nullptr;
    519     gpr_mu_lock(proxy->mu);
    520     GRPC_LOG_IF_ERROR(
    521         "grpc_pollset_work",
    522         grpc_pollset_work(proxy->pollset, &worker,
    523                           grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC));
    524     gpr_mu_unlock(proxy->mu);
    525     grpc_core::ExecCtx::Get()->Flush();
    526   } while (!gpr_unref(&proxy->users));
    527 }
    528 
    529 grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
    530     grpc_channel_args* args) {
    531   grpc_core::ExecCtx exec_ctx;
    532   grpc_end2end_http_proxy* proxy = grpc_core::New<grpc_end2end_http_proxy>();
    533   // Construct proxy address.
    534   const int proxy_port = grpc_pick_unused_port_or_die();
    535   gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port);
    536   gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name);
    537   // Create TCP server.
    538   proxy->channel_args = grpc_channel_args_copy(args);
    539   grpc_error* error =
    540       grpc_tcp_server_create(nullptr, proxy->channel_args, &proxy->server);
    541   GPR_ASSERT(error == GRPC_ERROR_NONE);
    542   // Bind to port.
    543   grpc_resolved_address resolved_addr;
    544   grpc_sockaddr_in* addr =
    545       reinterpret_cast<grpc_sockaddr_in*>(resolved_addr.addr);
    546   memset(&resolved_addr, 0, sizeof(resolved_addr));
    547   addr->sin_family = GRPC_AF_INET;
    548   grpc_sockaddr_set_port(&resolved_addr, proxy_port);
    549   int port;
    550   error = grpc_tcp_server_add_port(proxy->server, &resolved_addr, &port);
    551   GPR_ASSERT(error == GRPC_ERROR_NONE);
    552   GPR_ASSERT(port == proxy_port);
    553   // Start server.
    554   proxy->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
    555   grpc_pollset_init(proxy->pollset, &proxy->mu);
    556   grpc_tcp_server_start(proxy->server, &proxy->pollset, 1, on_accept, proxy);
    557 
    558   // Start proxy thread.
    559   proxy->thd = grpc_core::Thread("grpc_http_proxy", thread_main, proxy);
    560   proxy->thd.Start();
    561   return proxy;
    562 }
    563 
    564 static void destroy_pollset(void* arg, grpc_error* error) {
    565   grpc_pollset* pollset = static_cast<grpc_pollset*>(arg);
    566   grpc_pollset_destroy(pollset);
    567   gpr_free(pollset);
    568 }
    569 
    570 void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
    571   gpr_unref(&proxy->users);  // Signal proxy thread to shutdown.
    572   grpc_core::ExecCtx exec_ctx;
    573   proxy->thd.Join();
    574   grpc_tcp_server_shutdown_listeners(proxy->server);
    575   grpc_tcp_server_unref(proxy->server);
    576   gpr_free(proxy->proxy_name);
    577   grpc_channel_args_destroy(proxy->channel_args);
    578   grpc_pollset_shutdown(proxy->pollset,
    579                         GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset,
    580                                             grpc_schedule_on_exec_ctx));
    581   GRPC_COMBINER_UNREF(proxy->combiner, "test");
    582   grpc_core::Delete(proxy);
    583 }
    584 
    585 const char* grpc_end2end_http_proxy_get_proxy_name(
    586     grpc_end2end_http_proxy* proxy) {
    587   return proxy->proxy_name;
    588 }
    589