Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
     20 #ifndef _GNU_SOURCE
     21 #define _GNU_SOURCE
     22 #endif
     23 
     24 #ifndef SO_RXQ_OVFL
     25 #define SO_RXQ_OVFL 40
     26 #endif
     27 
     28 #include <grpc/support/port_platform.h>
     29 
     30 #include "src/core/lib/iomgr/port.h"
     31 
     32 #ifdef GRPC_POSIX_SOCKET
     33 
     34 #include "src/core/lib/iomgr/udp_server.h"
     35 
     36 #include <errno.h>
     37 #include <fcntl.h>
     38 #include <limits.h>
     39 #include <netinet/in.h>
     40 #include <netinet/tcp.h>
     41 #include <string.h>
     42 #include <sys/socket.h>
     43 #include <sys/stat.h>
     44 #include <sys/types.h>
     45 #include <unistd.h>
     46 
     47 #include <grpc/grpc.h>
     48 #include <grpc/support/alloc.h>
     49 #include <grpc/support/log.h>
     50 #include <grpc/support/string_util.h>
     51 #include <grpc/support/sync.h>
     52 #include <grpc/support/time.h>
     53 #include "src/core/lib/channel/channel_args.h"
     54 #include "src/core/lib/gpr/string.h"
     55 #include "src/core/lib/gprpp/inlined_vector.h"
     56 #include "src/core/lib/gprpp/memory.h"
     57 #include "src/core/lib/iomgr/error.h"
     58 #include "src/core/lib/iomgr/ev_posix.h"
     59 #include "src/core/lib/iomgr/executor.h"
     60 #include "src/core/lib/iomgr/resolve_address.h"
     61 #include "src/core/lib/iomgr/sockaddr.h"
     62 #include "src/core/lib/iomgr/sockaddr_utils.h"
     63 #include "src/core/lib/iomgr/socket_factory_posix.h"
     64 #include "src/core/lib/iomgr/socket_utils_posix.h"
     65 #include "src/core/lib/iomgr/unix_sockets_posix.h"
     66 
     67 /* A listener which implements basic features of Listening on a port for
     68  * I/O events*/
     69 class GrpcUdpListener {
     70  public:
     71   GrpcUdpListener(grpc_udp_server* server, int fd,
     72                   const grpc_resolved_address* addr);
     73   ~GrpcUdpListener();
     74 
     75   /* Called when grpc server starts to listening on the grpc_fd. */
     76   void StartListening(grpc_pollset** pollsets, size_t pollset_count,
     77                       GrpcUdpHandlerFactory* handler_factory);
     78 
     79   /* Called when data is available to read from the socket.
     80    * Return true if there is more data to read from fd. */
     81   void OnRead(grpc_error* error, void* do_read_arg);
     82 
     83   /* Called when the socket is writeable. The given closure should be scheduled
     84    * when the socket becomes blocked next time. */
     85   void OnCanWrite(grpc_error* error, void* do_write_arg);
     86 
     87   /* Called when the grpc_fd is about to be orphaned (and the FD closed). */
     88   void OnFdAboutToOrphan();
     89 
     90   /* Called to orphan fd of this listener.*/
     91   void OrphanFd();
     92 
     93   /* Called when this listener is going to be destroyed. */
     94   void OnDestroy();
     95 
     96   int fd() const { return fd_; }
     97 
     98  protected:
     99   grpc_fd* emfd() const { return emfd_; }
    100 
    101   gpr_mu* mutex() { return &mutex_; }
    102 
    103  private:
    104   /* event manager callback when reads are ready */
    105   static void on_read(void* arg, grpc_error* error);
    106   static void on_write(void* arg, grpc_error* error);
    107 
    108   static void do_read(void* arg, grpc_error* error);
    109   static void do_write(void* arg, grpc_error* error);
    110   // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback
    111   // interface.
    112   static void fd_notify_on_write_wrapper(void* arg, grpc_error* error);
    113 
    114   static void shutdown_fd(void* args, grpc_error* error);
    115 
    116   int fd_;
    117   grpc_fd* emfd_;
    118   grpc_udp_server* server_;
    119   grpc_resolved_address addr_;
    120   grpc_closure read_closure_;
    121   grpc_closure write_closure_;
    122   // To be called when corresponding QuicGrpcServer closes all active
    123   // connections.
    124   grpc_closure orphan_fd_closure_;
    125   grpc_closure destroyed_closure_;
    126   // To be scheduled on another thread to actually read/write.
    127   grpc_closure do_read_closure_;
    128   grpc_closure do_write_closure_;
    129   grpc_closure notify_on_write_closure_;
    130   // True if orphan_cb is trigered.
    131   bool orphan_notified_;
    132   // True if grpc_fd_notify_on_write() is called after on_write() call.
    133   bool notify_on_write_armed_;
    134   // True if fd has been shutdown.
    135   bool already_shutdown_;
    136   // Object actually handles I/O events. Assigned in StartListening().
    137   GrpcUdpHandler* udp_handler_ = nullptr;
    138   // To be notified on destruction.
    139   GrpcUdpHandlerFactory* handler_factory_ = nullptr;
    140   // Required to access above fields.
    141   gpr_mu mutex_;
    142 };
    143 
    144 GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
    145                                  const grpc_resolved_address* addr)
    146     : fd_(fd),
    147       server_(server),
    148       orphan_notified_(false),
    149       already_shutdown_(false) {
    150   char* addr_str;
    151   char* name;
    152   grpc_sockaddr_to_string(&addr_str, addr, 1);
    153   gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
    154   gpr_free(addr_str);
    155   emfd_ = grpc_fd_create(fd, name, true);
    156   memcpy(&addr_, addr, sizeof(grpc_resolved_address));
    157   GPR_ASSERT(emfd_);
    158   gpr_free(name);
    159   gpr_mu_init(&mutex_);
    160 }
    161 
    162 GrpcUdpListener::~GrpcUdpListener() { gpr_mu_destroy(&mutex_); }
    163 
    164 /* the overall server */
    165 struct grpc_udp_server {
    166   gpr_mu mu;
    167 
    168   /* factory to use for creating and binding sockets, or NULL */
    169   grpc_socket_factory* socket_factory;
    170 
    171   /* active port count: how many ports are actually still listening */
    172   size_t active_ports;
    173   /* destroyed port count: how many ports are completely destroyed */
    174   size_t destroyed_ports;
    175 
    176   /* is this server shutting down? (boolean) */
    177   int shutdown;
    178 
    179   /* An array of listeners */
    180   grpc_core::InlinedVector<GrpcUdpListener, 16> listeners;
    181 
    182   /* factory for use to create udp listeners */
    183   GrpcUdpHandlerFactory* handler_factory;
    184 
    185   /* shutdown callback */
    186   grpc_closure* shutdown_complete;
    187 
    188   /* all pollsets interested in new connections */
    189   grpc_pollset** pollsets;
    190   /* number of pollsets in the pollsets array */
    191   size_t pollset_count;
    192   /* opaque object to pass to callbacks */
    193   void* user_data;
    194 
    195   /* latch has_so_reuseport during server creation */
    196   bool so_reuseport;
    197 };
    198 
    199 static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) {
    200   if (args) {
    201     const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY);
    202     if (arg) {
    203       GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
    204       return static_cast<grpc_socket_factory*>(arg->value.pointer.p);
    205     }
    206   }
    207   return nullptr;
    208 }
    209 
    210 grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
    211   grpc_udp_server* s = grpc_core::New<grpc_udp_server>();
    212   gpr_mu_init(&s->mu);
    213   s->socket_factory = get_socket_factory(args);
    214   if (s->socket_factory) {
    215     grpc_socket_factory_ref(s->socket_factory);
    216   }
    217   s->active_ports = 0;
    218   s->destroyed_ports = 0;
    219   s->shutdown = 0;
    220   s->so_reuseport = grpc_is_socket_reuse_port_supported();
    221   return s;
    222 }
    223 
    224 // static
    225 void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) {
    226   if (args == nullptr) {
    227     // No-op if shutdown args are null.
    228     return;
    229   }
    230   auto sp = static_cast<GrpcUdpListener*>(args);
    231   gpr_mu_lock(sp->mutex());
    232   gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_);
    233   grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error));
    234   sp->already_shutdown_ = true;
    235   if (!sp->notify_on_write_armed_) {
    236     // Re-arm write notification to notify listener with error. This is
    237     // necessary to decrement active_ports.
    238     sp->notify_on_write_armed_ = true;
    239     grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
    240   }
    241   gpr_mu_unlock(sp->mutex());
    242 }
    243 
    244 static void finish_shutdown(grpc_udp_server* s) {
    245   if (s->shutdown_complete != nullptr) {
    246     GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
    247   }
    248 
    249   gpr_mu_destroy(&s->mu);
    250 
    251   gpr_log(GPR_DEBUG, "Destroy all listeners.");
    252   for (size_t i = 0; i < s->listeners.size(); ++i) {
    253     s->listeners[i].OnDestroy();
    254   }
    255 
    256   if (s->socket_factory) {
    257     grpc_socket_factory_unref(s->socket_factory);
    258   }
    259 
    260   grpc_core::Delete(s);
    261 }
    262 
    263 static void destroyed_port(void* server, grpc_error* error) {
    264   grpc_udp_server* s = static_cast<grpc_udp_server*>(server);
    265   gpr_mu_lock(&s->mu);
    266   s->destroyed_ports++;
    267   if (s->destroyed_ports == s->listeners.size()) {
    268     gpr_mu_unlock(&s->mu);
    269     finish_shutdown(s);
    270   } else {
    271     gpr_mu_unlock(&s->mu);
    272   }
    273 }
    274 
    275 /* called when all listening endpoints have been shutdown, so no further
    276    events will be received on them - at this point it's safe to destroy
    277    things */
    278 static void deactivated_all_ports(grpc_udp_server* s) {
    279   /* delete ALL the things */
    280   gpr_mu_lock(&s->mu);
    281 
    282   GPR_ASSERT(s->shutdown);
    283 
    284   if (s->listeners.size() == 0) {
    285     gpr_mu_unlock(&s->mu);
    286     finish_shutdown(s);
    287     return;
    288   }
    289   for (size_t i = 0; i < s->listeners.size(); ++i) {
    290     s->listeners[i].OrphanFd();
    291   }
    292   gpr_mu_unlock(&s->mu);
    293 }
    294 
    295 void GrpcUdpListener::OrphanFd() {
    296   gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_);
    297   grpc_unlink_if_unix_domain_socket(&addr_);
    298 
    299   GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_,
    300                     grpc_schedule_on_exec_ctx);
    301   /* Because at this point, all listening sockets have been shutdown already, no
    302    * need to call OnFdAboutToOrphan() to notify the handler again. */
    303   grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown");
    304 }
    305 
    306 void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
    307   gpr_mu_lock(&s->mu);
    308 
    309   GPR_ASSERT(!s->shutdown);
    310   s->shutdown = 1;
    311 
    312   s->shutdown_complete = on_done;
    313 
    314   gpr_log(GPR_DEBUG, "start to destroy udp_server");
    315   /* shutdown all fd's */
    316   if (s->active_ports) {
    317     for (size_t i = 0; i < s->listeners.size(); ++i) {
    318       GrpcUdpListener* sp = &s->listeners[i];
    319       sp->OnFdAboutToOrphan();
    320     }
    321     gpr_mu_unlock(&s->mu);
    322   } else {
    323     gpr_mu_unlock(&s->mu);
    324     deactivated_all_ports(s);
    325   }
    326 }
    327 
    328 void GrpcUdpListener::OnFdAboutToOrphan() {
    329   gpr_mu_lock(&mutex_);
    330   grpc_unlink_if_unix_domain_socket(&addr_);
    331 
    332   GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_,
    333                     grpc_schedule_on_exec_ctx);
    334   if (!orphan_notified_ && udp_handler_ != nullptr) {
    335     /* Singals udp_handler that the FD is about to be closed and
    336      * should no longer be used. */
    337     GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this,
    338                       grpc_schedule_on_exec_ctx);
    339     gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_);
    340     udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data);
    341     orphan_notified_ = true;
    342   }
    343   gpr_mu_unlock(&mutex_);
    344 }
    345 
    346 static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,
    347                        const grpc_resolved_address* addr) {
    348   return (socket_factory != nullptr)
    349              ? grpc_socket_factory_bind(socket_factory, sockfd, addr)
    350              : bind(sockfd,
    351                     reinterpret_cast<grpc_sockaddr*>(
    352                         const_cast<char*>(addr->addr)),
    353                     addr->len);
    354 }
    355 
    356 /* Prepare a recently-created socket for listening. */
    357 static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
    358                           const grpc_resolved_address* addr, int rcv_buf_size,
    359                           int snd_buf_size, bool so_reuseport) {
    360   grpc_resolved_address sockname_temp;
    361   grpc_sockaddr* addr_ptr =
    362       reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr));
    363 
    364   if (fd < 0) {
    365     goto error;
    366   }
    367 
    368   if (grpc_set_socket_nonblocking(fd, 1) != GRPC_ERROR_NONE) {
    369     gpr_log(GPR_ERROR, "Unable to set nonblocking %d: %s", fd, strerror(errno));
    370     goto error;
    371   }
    372   if (grpc_set_socket_cloexec(fd, 1) != GRPC_ERROR_NONE) {
    373     gpr_log(GPR_ERROR, "Unable to set cloexec %d: %s", fd, strerror(errno));
    374     goto error;
    375   }
    376 
    377   if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) {
    378     gpr_log(GPR_ERROR, "Unable to set ip_pktinfo.");
    379     goto error;
    380   } else if (addr_ptr->sa_family == AF_INET6) {
    381     if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) {
    382       gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo.");
    383       goto error;
    384     }
    385   }
    386 
    387   if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) {
    388     gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes",
    389             snd_buf_size);
    390     goto error;
    391   }
    392 
    393   if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) {
    394     gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes",
    395             rcv_buf_size);
    396     goto error;
    397   }
    398 
    399   {
    400     int get_overflow = 1;
    401     if (0 != setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow,
    402                         sizeof(get_overflow))) {
    403       gpr_log(GPR_INFO, "Failed to set socket overflow support");
    404     }
    405   }
    406 
    407   if (so_reuseport && !grpc_is_unix_socket(addr) &&
    408       grpc_set_socket_reuse_port(fd, 1) != GRPC_ERROR_NONE) {
    409     gpr_log(GPR_ERROR, "Failed to set SO_REUSEPORT for fd %d", fd);
    410     goto error;
    411   }
    412 
    413   if (bind_socket(socket_factory, fd, addr) < 0) {
    414     char* addr_str;
    415     grpc_sockaddr_to_string(&addr_str, addr, 0);
    416     gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
    417     gpr_free(addr_str);
    418     goto error;
    419   }
    420 
    421   sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
    422 
    423   if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
    424                   &sockname_temp.len) < 0) {
    425     gpr_log(GPR_ERROR, "Unable to get the address socket %d is bound to: %s",
    426             fd, strerror(errno));
    427     goto error;
    428   }
    429 
    430   return grpc_sockaddr_get_port(&sockname_temp);
    431 
    432 error:
    433   if (fd >= 0) {
    434     close(fd);
    435   }
    436   return -1;
    437 }
    438 
    439 // static
    440 void GrpcUdpListener::do_read(void* arg, grpc_error* error) {
    441   GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
    442   GPR_ASSERT(error == GRPC_ERROR_NONE);
    443   /* TODO: the reason we hold server->mu here is merely to prevent fd
    444    * shutdown while we are reading. However, it blocks do_write(). Switch to
    445    * read lock if available. */
    446   gpr_mu_lock(sp->mutex());
    447   /* Tell the registered callback that data is available to read. */
    448   if (!sp->already_shutdown_ && sp->udp_handler_->Read()) {
    449     /* There maybe more packets to read. Schedule read_more_cb_ closure to run
    450      * after finishing this event loop. */
    451     GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE);
    452   } else {
    453     /* Finish reading all the packets, re-arm the notification event so we can
    454      * get another chance to read. Or fd already shutdown, re-arm to get a
    455      * notification with shutdown error. */
    456     grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_);
    457   }
    458   gpr_mu_unlock(sp->mutex());
    459 }
    460 
    461 // static
    462 void GrpcUdpListener::on_read(void* arg, grpc_error* error) {
    463   GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
    464   sp->OnRead(error, arg);
    465 }
    466 
    467 void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) {
    468   if (error != GRPC_ERROR_NONE) {
    469     gpr_mu_lock(&server_->mu);
    470     if (0 == --server_->active_ports && server_->shutdown) {
    471       gpr_mu_unlock(&server_->mu);
    472       deactivated_all_ports(server_);
    473     } else {
    474       gpr_mu_unlock(&server_->mu);
    475     }
    476     return;
    477   }
    478 
    479   /* Read once. If there is more data to read, off load the work to another
    480    * thread to finish. */
    481   if (udp_handler_->Read()) {
    482     /* There maybe more packets to read. Schedule read_more_cb_ closure to run
    483      * after finishing this event loop. */
    484     GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg,
    485                       grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
    486     GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE);
    487   } else {
    488     /* Finish reading all the packets, re-arm the notification event so we can
    489      * get another chance to read. Or fd already shutdown, re-arm to get a
    490      * notification with shutdown error. */
    491     grpc_fd_notify_on_read(emfd_, &read_closure_);
    492   }
    493 }
    494 
    495 // static
    496 // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
    497 void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
    498   GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
    499   gpr_mu_lock(sp->mutex());
    500   if (!sp->notify_on_write_armed_) {
    501     grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
    502     sp->notify_on_write_armed_ = true;
    503   }
    504   gpr_mu_unlock(sp->mutex());
    505 }
    506 
    507 // static
    508 void GrpcUdpListener::do_write(void* arg, grpc_error* error) {
    509   GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
    510   gpr_mu_lock(sp->mutex());
    511   if (sp->already_shutdown_) {
    512     // If fd has been shutdown, don't write any more and re-arm notification.
    513     grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
    514   } else {
    515     sp->notify_on_write_armed_ = false;
    516     /* Tell the registered callback that the socket is writeable. */
    517     GPR_ASSERT(error == GRPC_ERROR_NONE);
    518     GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper,
    519                       arg, grpc_schedule_on_exec_ctx);
    520     sp->udp_handler_->OnCanWrite(sp->server_->user_data,
    521                                  &sp->notify_on_write_closure_);
    522   }
    523   gpr_mu_unlock(sp->mutex());
    524 }
    525 
    526 // static
    527 void GrpcUdpListener::on_write(void* arg, grpc_error* error) {
    528   GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
    529   sp->OnCanWrite(error, arg);
    530 }
    531 
    532 void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) {
    533   if (error != GRPC_ERROR_NONE) {
    534     gpr_mu_lock(&server_->mu);
    535     if (0 == --server_->active_ports && server_->shutdown) {
    536       gpr_mu_unlock(&server_->mu);
    537       deactivated_all_ports(server_);
    538     } else {
    539       gpr_mu_unlock(&server_->mu);
    540     }
    541     return;
    542   }
    543 
    544   /* Schedule actual write in another thread. */
    545   GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg,
    546                     grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
    547 
    548   GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE);
    549 }
    550 
    551 static int add_socket_to_server(grpc_udp_server* s, int fd,
    552                                 const grpc_resolved_address* addr,
    553                                 int rcv_buf_size, int snd_buf_size) {
    554   gpr_log(GPR_DEBUG, "add socket %d to server", fd);
    555 
    556   int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size,
    557                             snd_buf_size, s->so_reuseport);
    558   if (port >= 0) {
    559     gpr_mu_lock(&s->mu);
    560     s->listeners.emplace_back(s, fd, addr);
    561     gpr_log(GPR_DEBUG,
    562             "add socket %d to server for port %d, %zu listener(s) in total", fd,
    563             port, s->listeners.size());
    564     gpr_mu_unlock(&s->mu);
    565   }
    566   return port;
    567 }
    568 
    569 int grpc_udp_server_add_port(grpc_udp_server* s,
    570                              const grpc_resolved_address* addr,
    571                              int rcv_buf_size, int snd_buf_size,
    572                              GrpcUdpHandlerFactory* handler_factory,
    573                              size_t num_listeners) {
    574   if (num_listeners > 1 && !s->so_reuseport) {
    575     gpr_log(GPR_ERROR,
    576             "Try to have multiple listeners on same port, but SO_REUSEPORT is "
    577             "not supported. Only create 1 listener.");
    578   }
    579   char* addr_str;
    580   grpc_sockaddr_to_string(&addr_str, addr, 1);
    581   gpr_log(GPR_DEBUG, "add address: %s to server", addr_str);
    582   gpr_free(addr_str);
    583 
    584   int allocated_port1 = -1;
    585   int allocated_port2 = -1;
    586   int fd;
    587   grpc_dualstack_mode dsmode;
    588   grpc_resolved_address addr6_v4mapped;
    589   grpc_resolved_address wild4;
    590   grpc_resolved_address wild6;
    591   grpc_resolved_address addr4_copy;
    592   grpc_resolved_address* allocated_addr = nullptr;
    593   grpc_resolved_address sockname_temp;
    594   int port = 0;
    595 
    596   /* Check if this is a wildcard port, and if so, try to keep the port the same
    597      as some previously created listener. */
    598   if (grpc_sockaddr_get_port(addr) == 0) {
    599     /* Loop through existing listeners to find the port in use. */
    600     for (size_t i = 0; i < s->listeners.size(); ++i) {
    601       sockname_temp.len =
    602           static_cast<socklen_t>(sizeof(struct sockaddr_storage));
    603       if (0 == getsockname(s->listeners[i].fd(),
    604                            reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
    605                            &sockname_temp.len)) {
    606         port = grpc_sockaddr_get_port(&sockname_temp);
    607         if (port > 0) {
    608           /* Found such a port, update |addr| to reflects this port. */
    609           allocated_addr = static_cast<grpc_resolved_address*>(
    610               gpr_malloc(sizeof(grpc_resolved_address)));
    611           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
    612           grpc_sockaddr_set_port(allocated_addr, port);
    613           addr = allocated_addr;
    614           break;
    615         }
    616       }
    617     }
    618   }
    619 
    620   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
    621     addr = &addr6_v4mapped;
    622   }
    623 
    624   s->handler_factory = handler_factory;
    625   for (size_t i = 0; i < num_listeners; ++i) {
    626     /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
    627     if (grpc_sockaddr_is_wildcard(addr, &port)) {
    628       grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
    629 
    630       /* Try listening on IPv6 first. */
    631       addr = &wild6;
    632       // TODO(rjshade): Test and propagate the returned grpc_error*:
    633       GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
    634           s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
    635       allocated_port1 =
    636           add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
    637       if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
    638         if (port == 0) {
    639           /* This is the first time to bind to |addr|. If its port is still
    640            * wildcard port, update |addr| with the ephermeral port returned by
    641            * kernel. Thus |addr| can have a specific port in following
    642            * iterations. */
    643           grpc_sockaddr_set_port(addr, allocated_port1);
    644           port = allocated_port1;
    645         } else if (allocated_port1 >= 0) {
    646           /* The following sucessfully created socket should have same port as
    647            * the first one. */
    648           GPR_ASSERT(port == allocated_port1);
    649         }
    650         /* A dualstack socket is created, no need to create corresponding IPV4
    651          * socket. */
    652         continue;
    653       }
    654 
    655       /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
    656       if (port == 0 && allocated_port1 > 0) {
    657         /* |port| hasn't been assigned to an emphemeral port yet, |wild4| must
    658          * have a wildcard port. Update it with the emphemeral port created
    659          * during binding.*/
    660         grpc_sockaddr_set_port(&wild4, allocated_port1);
    661         port = allocated_port1;
    662       }
    663       /* |wild4| should have been updated with an emphemeral port by now. Use
    664        * this IPV4 address to create a IPV4 socket. */
    665       addr = &wild4;
    666     }
    667 
    668     // TODO(rjshade): Test and propagate the returned grpc_error*:
    669     GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
    670         s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
    671     if (fd < 0) {
    672       gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
    673     }
    674     if (dsmode == GRPC_DSMODE_IPV4 &&
    675         grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
    676       addr = &addr4_copy;
    677     }
    678     allocated_port2 =
    679         add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
    680     if (port == 0) {
    681       /* Update |addr| with the ephermeral port returned by kernel. So |addr|
    682        * can have a specific port in following iterations. */
    683       grpc_sockaddr_set_port(addr, allocated_port2);
    684       port = allocated_port2;
    685     } else if (allocated_port2 >= 0) {
    686       GPR_ASSERT(port == allocated_port2);
    687     }
    688   }
    689 
    690   gpr_free(allocated_addr);
    691   return port;
    692 }
    693 
    694 int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
    695   if (port_index >= s->listeners.size()) {
    696     return -1;
    697   }
    698 
    699   return s->listeners[port_index].fd();
    700 }
    701 
    702 void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
    703                            size_t pollset_count, void* user_data) {
    704   gpr_log(GPR_DEBUG, "grpc_udp_server_start");
    705   gpr_mu_lock(&s->mu);
    706   GPR_ASSERT(s->active_ports == 0);
    707   s->pollsets = pollsets;
    708   s->user_data = user_data;
    709 
    710   for (size_t i = 0; i < s->listeners.size(); ++i) {
    711     s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory);
    712   }
    713 
    714   gpr_mu_unlock(&s->mu);
    715 }
    716 
    717 void GrpcUdpListener::StartListening(grpc_pollset** pollsets,
    718                                      size_t pollset_count,
    719                                      GrpcUdpHandlerFactory* handler_factory) {
    720   gpr_mu_lock(&mutex_);
    721   handler_factory_ = handler_factory;
    722   udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data);
    723   for (size_t i = 0; i < pollset_count; i++) {
    724     grpc_pollset_add_fd(pollsets[i], emfd_);
    725   }
    726   GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx);
    727   grpc_fd_notify_on_read(emfd_, &read_closure_);
    728 
    729   GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx);
    730   notify_on_write_armed_ = true;
    731   grpc_fd_notify_on_write(emfd_, &write_closure_);
    732 
    733   /* Registered for both read and write callbacks: increment active_ports
    734    * twice to account for this, and delay free-ing of memory until both
    735    * on_read and on_write have fired. */
    736   server_->active_ports += 2;
    737   gpr_mu_unlock(&mutex_);
    738 }
    739 
    740 void GrpcUdpListener::OnDestroy() {
    741   if (udp_handler_ != nullptr) {
    742     handler_factory_->DestroyUdpHandler(udp_handler_);
    743   }
    744 }
    745 
    746 #endif
    747