Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "src/core/lib/iomgr/port.h"
     20 
     21 // This test won't work except with posix sockets enabled
     22 #ifdef GRPC_POSIX_SOCKET
     23 
     24 #include "src/core/lib/iomgr/tcp_posix.h"
     25 
     26 #include <errno.h>
     27 #include <fcntl.h>
     28 #include <string.h>
     29 #include <sys/socket.h>
     30 #include <sys/types.h>
     31 #include <unistd.h>
     32 
     33 #include <grpc/grpc.h>
     34 #include <grpc/support/alloc.h>
     35 #include <grpc/support/log.h>
     36 #include <grpc/support/time.h>
     37 
     38 #include "src/core/lib/gpr/useful.h"
     39 #include "src/core/lib/iomgr/buffer_list.h"
     40 #include "src/core/lib/iomgr/ev_posix.h"
     41 #include "src/core/lib/iomgr/sockaddr_posix.h"
     42 #include "src/core/lib/slice/slice_internal.h"
     43 #include "test/core/iomgr/endpoint_tests.h"
     44 #include "test/core/util/test_config.h"
     45 
     46 static gpr_mu* g_mu;
     47 static grpc_pollset* g_pollset;
     48 
     49 /*
     50    General test notes:
     51 
     52    All tests which write data into a socket write i%256 into byte i, which is
     53    verified by readers.
     54 
     55    In general there are a few interesting things to vary which may lead to
     56    exercising different codepaths in an implementation:
     57    1. Total amount of data written to the socket
     58    2. Size of slice allocations
     59    3. Amount of data we read from or write to the socket at once
     60 
     61    The tests here tend to parameterize these where applicable.
     62 
     63  */
     64 
     65 static void create_sockets(int sv[2]) {
     66   int flags;
     67   GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
     68   flags = fcntl(sv[0], F_GETFL, 0);
     69   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
     70   flags = fcntl(sv[1], F_GETFL, 0);
     71   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
     72 }
     73 
     74 static void create_inet_sockets(int sv[2]) {
     75   /* Prepare listening socket */
     76   struct sockaddr_in addr;
     77   memset(&addr, 0, sizeof(struct sockaddr_in));
     78   addr.sin_family = AF_INET;
     79   int sock = socket(AF_INET, SOCK_STREAM, 0);
     80   GPR_ASSERT(sock);
     81   GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
     82   listen(sock, 1);
     83 
     84   /* Prepare client socket and connect to server */
     85   socklen_t len = sizeof(sockaddr_in);
     86   GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
     87 
     88   int client = socket(AF_INET, SOCK_STREAM, 0);
     89   GPR_ASSERT(client);
     90   int ret;
     91   do {
     92     ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
     93   } while (ret == -1 && errno == EINTR);
     94 
     95   /* Accept client connection */
     96   len = sizeof(socklen_t);
     97   int server;
     98   do {
     99     server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
    100   } while (server == -1 && errno == EINTR);
    101   GPR_ASSERT(server != -1);
    102 
    103   sv[0] = server;
    104   sv[1] = client;
    105   int flags = fcntl(sv[0], F_GETFL, 0);
    106   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
    107   flags = fcntl(sv[1], F_GETFL, 0);
    108   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
    109 }
    110 
    111 static ssize_t fill_socket(int fd) {
    112   ssize_t write_bytes;
    113   ssize_t total_bytes = 0;
    114   int i;
    115   unsigned char buf[256];
    116   for (i = 0; i < 256; ++i) {
    117     buf[i] = static_cast<uint8_t>(i);
    118   }
    119   do {
    120     write_bytes = write(fd, buf, 256);
    121     if (write_bytes > 0) {
    122       total_bytes += write_bytes;
    123     }
    124   } while (write_bytes >= 0 || errno == EINTR);
    125   GPR_ASSERT(errno == EAGAIN);
    126   return total_bytes;
    127 }
    128 
    129 static size_t fill_socket_partial(int fd, size_t bytes) {
    130   ssize_t write_bytes;
    131   size_t total_bytes = 0;
    132   unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(bytes));
    133   unsigned i;
    134   for (i = 0; i < bytes; ++i) {
    135     buf[i] = static_cast<uint8_t>(i % 256);
    136   }
    137 
    138   do {
    139     write_bytes = write(fd, buf, bytes - total_bytes);
    140     if (write_bytes > 0) {
    141       total_bytes += static_cast<size_t>(write_bytes);
    142     }
    143   } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
    144 
    145   gpr_free(buf);
    146 
    147   return total_bytes;
    148 }
    149 
    150 struct read_socket_state {
    151   grpc_endpoint* ep;
    152   size_t read_bytes;
    153   size_t target_read_bytes;
    154   grpc_slice_buffer incoming;
    155   grpc_closure read_cb;
    156 };
    157 
    158 static size_t count_slices(grpc_slice* slices, size_t nslices,
    159                            int* current_data) {
    160   size_t num_bytes = 0;
    161   unsigned i, j;
    162   unsigned char* buf;
    163   for (i = 0; i < nslices; ++i) {
    164     buf = GRPC_SLICE_START_PTR(slices[i]);
    165     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
    166       GPR_ASSERT(buf[j] == *current_data);
    167       *current_data = (*current_data + 1) % 256;
    168     }
    169     num_bytes += GRPC_SLICE_LENGTH(slices[i]);
    170   }
    171   return num_bytes;
    172 }
    173 
    174 static void read_cb(void* user_data, grpc_error* error) {
    175   struct read_socket_state* state =
    176       static_cast<struct read_socket_state*>(user_data);
    177   size_t read_bytes;
    178   int current_data;
    179 
    180   GPR_ASSERT(error == GRPC_ERROR_NONE);
    181 
    182   gpr_mu_lock(g_mu);
    183   current_data = state->read_bytes % 256;
    184   read_bytes = count_slices(state->incoming.slices, state->incoming.count,
    185                             &current_data);
    186   state->read_bytes += read_bytes;
    187   gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
    188           state->target_read_bytes);
    189   if (state->read_bytes >= state->target_read_bytes) {
    190     GPR_ASSERT(
    191         GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
    192     gpr_mu_unlock(g_mu);
    193   } else {
    194     grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
    195     gpr_mu_unlock(g_mu);
    196   }
    197 }
    198 
    199 /* Write to a socket, then read from it using the grpc_tcp API. */
    200 static void read_test(size_t num_bytes, size_t slice_size) {
    201   int sv[2];
    202   grpc_endpoint* ep;
    203   struct read_socket_state state;
    204   size_t written_bytes;
    205   grpc_millis deadline =
    206       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
    207   grpc_core::ExecCtx exec_ctx;
    208 
    209   gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
    210           num_bytes, slice_size);
    211 
    212   create_sockets(sv);
    213 
    214   grpc_arg a[1];
    215   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
    216   a[0].type = GRPC_ARG_INTEGER,
    217   a[0].value.integer = static_cast<int>(slice_size);
    218   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
    219   ep =
    220       grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
    221   grpc_endpoint_add_to_pollset(ep, g_pollset);
    222 
    223   written_bytes = fill_socket_partial(sv[0], num_bytes);
    224   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
    225 
    226   state.ep = ep;
    227   state.read_bytes = 0;
    228   state.target_read_bytes = written_bytes;
    229   grpc_slice_buffer_init(&state.incoming);
    230   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
    231 
    232   grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
    233 
    234   gpr_mu_lock(g_mu);
    235   while (state.read_bytes < state.target_read_bytes) {
    236     grpc_pollset_worker* worker = nullptr;
    237     GPR_ASSERT(GRPC_LOG_IF_ERROR(
    238         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
    239     gpr_mu_unlock(g_mu);
    240 
    241     gpr_mu_lock(g_mu);
    242   }
    243   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
    244   gpr_mu_unlock(g_mu);
    245 
    246   grpc_slice_buffer_destroy_internal(&state.incoming);
    247   grpc_endpoint_destroy(ep);
    248 }
    249 
    250 /* Write to a socket until it fills up, then read from it using the grpc_tcp
    251    API. */
    252 static void large_read_test(size_t slice_size) {
    253   int sv[2];
    254   grpc_endpoint* ep;
    255   struct read_socket_state state;
    256   ssize_t written_bytes;
    257   grpc_millis deadline =
    258       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
    259   grpc_core::ExecCtx exec_ctx;
    260 
    261   gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
    262 
    263   create_sockets(sv);
    264 
    265   grpc_arg a[1];
    266   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
    267   a[0].type = GRPC_ARG_INTEGER;
    268   a[0].value.integer = static_cast<int>(slice_size);
    269   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
    270   ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
    271                        "test");
    272   grpc_endpoint_add_to_pollset(ep, g_pollset);
    273 
    274   written_bytes = fill_socket(sv[0]);
    275   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
    276 
    277   state.ep = ep;
    278   state.read_bytes = 0;
    279   state.target_read_bytes = static_cast<size_t>(written_bytes);
    280   grpc_slice_buffer_init(&state.incoming);
    281   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
    282 
    283   grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
    284 
    285   gpr_mu_lock(g_mu);
    286   while (state.read_bytes < state.target_read_bytes) {
    287     grpc_pollset_worker* worker = nullptr;
    288     GPR_ASSERT(GRPC_LOG_IF_ERROR(
    289         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
    290     gpr_mu_unlock(g_mu);
    291 
    292     gpr_mu_lock(g_mu);
    293   }
    294   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
    295   gpr_mu_unlock(g_mu);
    296 
    297   grpc_slice_buffer_destroy_internal(&state.incoming);
    298   grpc_endpoint_destroy(ep);
    299 }
    300 
    301 struct write_socket_state {
    302   grpc_endpoint* ep;
    303   int write_done;
    304 };
    305 
    306 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
    307                                    size_t* num_blocks, uint8_t* current_data) {
    308   size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
    309   grpc_slice* slices =
    310       static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
    311   size_t num_bytes_left = num_bytes;
    312   unsigned i, j;
    313   unsigned char* buf;
    314   *num_blocks = nslices;
    315 
    316   for (i = 0; i < nslices; ++i) {
    317     slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
    318                                                               : slice_size);
    319     num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
    320     buf = GRPC_SLICE_START_PTR(slices[i]);
    321     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
    322       buf[j] = *current_data;
    323       (*current_data)++;
    324     }
    325   }
    326   GPR_ASSERT(num_bytes_left == 0);
    327   return slices;
    328 }
    329 
    330 static void write_done(void* user_data /* write_socket_state */,
    331                        grpc_error* error) {
    332   GPR_ASSERT(error == GRPC_ERROR_NONE);
    333   struct write_socket_state* state =
    334       static_cast<struct write_socket_state*>(user_data);
    335   gpr_mu_lock(g_mu);
    336   state->write_done = 1;
    337   GPR_ASSERT(
    338       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
    339   gpr_mu_unlock(g_mu);
    340 }
    341 
    342 void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
    343   unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(read_size));
    344   ssize_t bytes_read;
    345   size_t bytes_left = num_bytes;
    346   int flags;
    347   int current = 0;
    348   int i;
    349   grpc_core::ExecCtx exec_ctx;
    350 
    351   flags = fcntl(fd, F_GETFL, 0);
    352   GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
    353 
    354   for (;;) {
    355     grpc_pollset_worker* worker = nullptr;
    356     gpr_mu_lock(g_mu);
    357     GPR_ASSERT(GRPC_LOG_IF_ERROR(
    358         "pollset_work",
    359         grpc_pollset_work(g_pollset, &worker,
    360                           grpc_timespec_to_millis_round_up(
    361                               grpc_timeout_milliseconds_to_deadline(10)))));
    362     gpr_mu_unlock(g_mu);
    363 
    364     do {
    365       bytes_read =
    366           read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
    367     } while (bytes_read < 0 && errno == EINTR);
    368     GPR_ASSERT(bytes_read >= 0);
    369     for (i = 0; i < bytes_read; ++i) {
    370       GPR_ASSERT(buf[i] == current);
    371       current = (current + 1) % 256;
    372     }
    373     bytes_left -= static_cast<size_t>(bytes_read);
    374     if (bytes_left == 0) break;
    375   }
    376   flags = fcntl(fd, F_GETFL, 0);
    377   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
    378 
    379   gpr_free(buf);
    380 }
    381 
    382 /* Verifier for timestamps callback for write_test */
    383 void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
    384                          grpc_error* error) {
    385   GPR_ASSERT(error == GRPC_ERROR_NONE);
    386   GPR_ASSERT(arg != nullptr);
    387   GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
    388   GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
    389   GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
    390   gpr_atm* done_timestamps = (gpr_atm*)arg;
    391   gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
    392 }
    393 
    394 /* Write to a socket using the grpc_tcp API, then drain it directly.
    395    Note that if the write does not complete immediately we need to drain the
    396    socket in parallel with the read. If collect_timestamps is true, it will
    397    try to get timestamps for the write. */
    398 static void write_test(size_t num_bytes, size_t slice_size,
    399                        bool collect_timestamps) {
    400   int sv[2];
    401   grpc_endpoint* ep;
    402   struct write_socket_state state;
    403   size_t num_blocks;
    404   grpc_slice* slices;
    405   uint8_t current_data = 0;
    406   grpc_slice_buffer outgoing;
    407   grpc_closure write_done_closure;
    408   grpc_millis deadline =
    409       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
    410   grpc_core::ExecCtx exec_ctx;
    411 
    412   if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
    413     return;
    414   }
    415 
    416   gpr_log(GPR_INFO,
    417           "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
    418           num_bytes, slice_size);
    419 
    420   if (collect_timestamps) {
    421     create_inet_sockets(sv);
    422   } else {
    423     create_sockets(sv);
    424   }
    425 
    426   grpc_arg a[1];
    427   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
    428   a[0].type = GRPC_ARG_INTEGER,
    429   a[0].value.integer = static_cast<int>(slice_size);
    430   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
    431   ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
    432                        &args, "test");
    433   grpc_endpoint_add_to_pollset(ep, g_pollset);
    434 
    435   state.ep = ep;
    436   state.write_done = 0;
    437 
    438   slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
    439 
    440   grpc_slice_buffer_init(&outgoing);
    441   grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
    442   GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
    443                     grpc_schedule_on_exec_ctx);
    444 
    445   gpr_atm done_timestamps;
    446   gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
    447   grpc_endpoint_write(ep, &outgoing, &write_done_closure,
    448                       grpc_event_engine_can_track_errors() && collect_timestamps
    449                           ? (void*)&done_timestamps
    450                           : nullptr);
    451   drain_socket_blocking(sv[0], num_bytes, num_bytes);
    452   exec_ctx.Flush();
    453   gpr_mu_lock(g_mu);
    454   for (;;) {
    455     grpc_pollset_worker* worker = nullptr;
    456     if (state.write_done &&
    457         (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
    458          gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
    459       break;
    460     }
    461     GPR_ASSERT(GRPC_LOG_IF_ERROR(
    462         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
    463     gpr_mu_unlock(g_mu);
    464     exec_ctx.Flush();
    465     gpr_mu_lock(g_mu);
    466   }
    467   gpr_mu_unlock(g_mu);
    468 
    469   grpc_slice_buffer_destroy_internal(&outgoing);
    470   grpc_endpoint_destroy(ep);
    471   gpr_free(slices);
    472 }
    473 
    474 void on_fd_released(void* arg, grpc_error* errors) {
    475   int* done = static_cast<int*>(arg);
    476   *done = 1;
    477   GPR_ASSERT(
    478       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
    479 }
    480 
    481 /* Do a read_test, then release fd and try to read/write again. Verify that
    482    grpc_tcp_fd() is available before the fd is released. */
    483 static void release_fd_test(size_t num_bytes, size_t slice_size) {
    484   int sv[2];
    485   grpc_endpoint* ep;
    486   struct read_socket_state state;
    487   size_t written_bytes;
    488   int fd;
    489   grpc_millis deadline =
    490       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
    491   grpc_core::ExecCtx exec_ctx;
    492   grpc_closure fd_released_cb;
    493   int fd_released_done = 0;
    494   GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
    495                     grpc_schedule_on_exec_ctx);
    496 
    497   gpr_log(GPR_INFO,
    498           "Release fd read_test of size %" PRIuPTR ", slice size %" PRIuPTR,
    499           num_bytes, slice_size);
    500 
    501   create_sockets(sv);
    502 
    503   grpc_arg a[1];
    504   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
    505   a[0].type = GRPC_ARG_INTEGER;
    506   a[0].value.integer = static_cast<int>(slice_size);
    507   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
    508   ep =
    509       grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
    510   GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
    511   grpc_endpoint_add_to_pollset(ep, g_pollset);
    512 
    513   written_bytes = fill_socket_partial(sv[0], num_bytes);
    514   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
    515 
    516   state.ep = ep;
    517   state.read_bytes = 0;
    518   state.target_read_bytes = written_bytes;
    519   grpc_slice_buffer_init(&state.incoming);
    520   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
    521 
    522   grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
    523 
    524   gpr_mu_lock(g_mu);
    525   while (state.read_bytes < state.target_read_bytes) {
    526     grpc_pollset_worker* worker = nullptr;
    527     GPR_ASSERT(GRPC_LOG_IF_ERROR(
    528         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
    529     gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
    530             state.read_bytes, state.target_read_bytes);
    531     gpr_mu_unlock(g_mu);
    532     grpc_core::ExecCtx::Get()->Flush();
    533     gpr_mu_lock(g_mu);
    534   }
    535   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
    536   gpr_mu_unlock(g_mu);
    537 
    538   grpc_slice_buffer_destroy_internal(&state.incoming);
    539   grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
    540   grpc_core::ExecCtx::Get()->Flush();
    541   gpr_mu_lock(g_mu);
    542   while (!fd_released_done) {
    543     grpc_pollset_worker* worker = nullptr;
    544     GPR_ASSERT(GRPC_LOG_IF_ERROR(
    545         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
    546     gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
    547   }
    548   gpr_mu_unlock(g_mu);
    549   GPR_ASSERT(fd_released_done == 1);
    550   GPR_ASSERT(fd == sv[1]);
    551 
    552   written_bytes = fill_socket_partial(sv[0], num_bytes);
    553   drain_socket_blocking(fd, written_bytes, written_bytes);
    554   written_bytes = fill_socket_partial(fd, num_bytes);
    555   drain_socket_blocking(sv[0], written_bytes, written_bytes);
    556   close(fd);
    557 }
    558 
    559 void run_tests(void) {
    560   size_t i = 0;
    561 
    562   read_test(100, 8192);
    563   read_test(10000, 8192);
    564   read_test(10000, 137);
    565   read_test(10000, 1);
    566   large_read_test(8192);
    567   large_read_test(1);
    568 
    569   write_test(100, 8192, false);
    570   write_test(100, 1, false);
    571   write_test(100000, 8192, false);
    572   write_test(100000, 1, false);
    573   write_test(100000, 137, false);
    574 
    575   write_test(100, 8192, true);
    576   write_test(100, 1, true);
    577   write_test(100000, 8192, true);
    578   write_test(100000, 1, true);
    579   write_test(100, 137, true);
    580 
    581   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
    582     write_test(40320, i, false);
    583     write_test(40320, i, true);
    584   }
    585 
    586   release_fd_test(100, 8192);
    587 }
    588 
    589 static void clean_up(void) {}
    590 
    591 static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
    592     size_t slice_size) {
    593   int sv[2];
    594   grpc_endpoint_test_fixture f;
    595   grpc_core::ExecCtx exec_ctx;
    596 
    597   create_sockets(sv);
    598   grpc_resource_quota* resource_quota =
    599       grpc_resource_quota_create("tcp_posix_test_socketpair");
    600   grpc_arg a[1];
    601   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
    602   a[0].type = GRPC_ARG_INTEGER;
    603   a[0].value.integer = static_cast<int>(slice_size);
    604   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
    605   f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
    606                                 &args, "test");
    607   f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
    608                                 &args, "test");
    609   grpc_resource_quota_unref_internal(resource_quota);
    610   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
    611   grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
    612 
    613   return f;
    614 }
    615 
    616 static grpc_endpoint_test_config configs[] = {
    617     {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
    618 };
    619 
    620 static void destroy_pollset(void* p, grpc_error* error) {
    621   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
    622 }
    623 
    624 int main(int argc, char** argv) {
    625   grpc_closure destroyed;
    626   grpc_test_init(argc, argv);
    627   grpc_init();
    628   grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
    629   {
    630     grpc_core::ExecCtx exec_ctx;
    631     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
    632     grpc_pollset_init(g_pollset, &g_mu);
    633     grpc_endpoint_tests(configs[0], g_pollset, g_mu);
    634     run_tests();
    635     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
    636                       grpc_schedule_on_exec_ctx);
    637     grpc_pollset_shutdown(g_pollset, &destroyed);
    638 
    639     grpc_core::ExecCtx::Get()->Flush();
    640   }
    641   grpc_shutdown();
    642   gpr_free(g_pollset);
    643 
    644   return 0;
    645 }
    646 
    647 #else /* GRPC_POSIX_SOCKET */
    648 
    649 int main(int argc, char** argv) { return 1; }
    650 
    651 #endif /* GRPC_POSIX_SOCKET */
    652