1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ 20 #ifndef _GNU_SOURCE 21 #define _GNU_SOURCE 22 #endif 23 24 #ifndef SO_RXQ_OVFL 25 #define SO_RXQ_OVFL 40 26 #endif 27 28 #include <grpc/support/port_platform.h> 29 30 #include "src/core/lib/iomgr/port.h" 31 32 #ifdef GRPC_POSIX_SOCKET 33 34 #include "src/core/lib/iomgr/udp_server.h" 35 36 #include <errno.h> 37 #include <fcntl.h> 38 #include <limits.h> 39 #include <netinet/in.h> 40 #include <netinet/tcp.h> 41 #include <string.h> 42 #include <sys/socket.h> 43 #include <sys/stat.h> 44 #include <sys/types.h> 45 #include <unistd.h> 46 47 #include <grpc/grpc.h> 48 #include <grpc/support/alloc.h> 49 #include <grpc/support/log.h> 50 #include <grpc/support/string_util.h> 51 #include <grpc/support/sync.h> 52 #include <grpc/support/time.h> 53 #include "src/core/lib/channel/channel_args.h" 54 #include "src/core/lib/gpr/string.h" 55 #include "src/core/lib/gprpp/inlined_vector.h" 56 #include "src/core/lib/gprpp/memory.h" 57 #include "src/core/lib/iomgr/error.h" 58 #include "src/core/lib/iomgr/ev_posix.h" 59 #include "src/core/lib/iomgr/executor.h" 60 #include "src/core/lib/iomgr/resolve_address.h" 61 #include "src/core/lib/iomgr/sockaddr.h" 62 #include "src/core/lib/iomgr/sockaddr_utils.h" 63 #include "src/core/lib/iomgr/socket_factory_posix.h" 64 #include "src/core/lib/iomgr/socket_utils_posix.h" 65 #include "src/core/lib/iomgr/unix_sockets_posix.h" 66 67 /* A listener which implements basic features of Listening on a port for 68 * I/O events*/ 69 class GrpcUdpListener { 70 public: 71 GrpcUdpListener(grpc_udp_server* server, int fd, 72 const grpc_resolved_address* addr); 73 ~GrpcUdpListener(); 74 75 /* Called when grpc server starts to listening on the grpc_fd. */ 76 void StartListening(grpc_pollset** pollsets, size_t pollset_count, 77 GrpcUdpHandlerFactory* handler_factory); 78 79 /* Called when data is available to read from the socket. 80 * Return true if there is more data to read from fd. */ 81 void OnRead(grpc_error* error, void* do_read_arg); 82 83 /* Called when the socket is writeable. The given closure should be scheduled 84 * when the socket becomes blocked next time. */ 85 void OnCanWrite(grpc_error* error, void* do_write_arg); 86 87 /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ 88 void OnFdAboutToOrphan(); 89 90 /* Called to orphan fd of this listener.*/ 91 void OrphanFd(); 92 93 /* Called when this listener is going to be destroyed. */ 94 void OnDestroy(); 95 96 int fd() const { return fd_; } 97 98 protected: 99 grpc_fd* emfd() const { return emfd_; } 100 101 gpr_mu* mutex() { return &mutex_; } 102 103 private: 104 /* event manager callback when reads are ready */ 105 static void on_read(void* arg, grpc_error* error); 106 static void on_write(void* arg, grpc_error* error); 107 108 static void do_read(void* arg, grpc_error* error); 109 static void do_write(void* arg, grpc_error* error); 110 // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback 111 // interface. 112 static void fd_notify_on_write_wrapper(void* arg, grpc_error* error); 113 114 static void shutdown_fd(void* args, grpc_error* error); 115 116 int fd_; 117 grpc_fd* emfd_; 118 grpc_udp_server* server_; 119 grpc_resolved_address addr_; 120 grpc_closure read_closure_; 121 grpc_closure write_closure_; 122 // To be called when corresponding QuicGrpcServer closes all active 123 // connections. 124 grpc_closure orphan_fd_closure_; 125 grpc_closure destroyed_closure_; 126 // To be scheduled on another thread to actually read/write. 127 grpc_closure do_read_closure_; 128 grpc_closure do_write_closure_; 129 grpc_closure notify_on_write_closure_; 130 // True if orphan_cb is trigered. 131 bool orphan_notified_; 132 // True if grpc_fd_notify_on_write() is called after on_write() call. 133 bool notify_on_write_armed_; 134 // True if fd has been shutdown. 135 bool already_shutdown_; 136 // Object actually handles I/O events. Assigned in StartListening(). 137 GrpcUdpHandler* udp_handler_ = nullptr; 138 // To be notified on destruction. 139 GrpcUdpHandlerFactory* handler_factory_ = nullptr; 140 // Required to access above fields. 141 gpr_mu mutex_; 142 }; 143 144 GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, 145 const grpc_resolved_address* addr) 146 : fd_(fd), 147 server_(server), 148 orphan_notified_(false), 149 already_shutdown_(false) { 150 char* addr_str; 151 char* name; 152 grpc_sockaddr_to_string(&addr_str, addr, 1); 153 gpr_asprintf(&name, "udp-server-listener:%s", addr_str); 154 gpr_free(addr_str); 155 emfd_ = grpc_fd_create(fd, name, true); 156 memcpy(&addr_, addr, sizeof(grpc_resolved_address)); 157 GPR_ASSERT(emfd_); 158 gpr_free(name); 159 gpr_mu_init(&mutex_); 160 } 161 162 GrpcUdpListener::~GrpcUdpListener() { gpr_mu_destroy(&mutex_); } 163 164 /* the overall server */ 165 struct grpc_udp_server { 166 gpr_mu mu; 167 168 /* factory to use for creating and binding sockets, or NULL */ 169 grpc_socket_factory* socket_factory; 170 171 /* active port count: how many ports are actually still listening */ 172 size_t active_ports; 173 /* destroyed port count: how many ports are completely destroyed */ 174 size_t destroyed_ports; 175 176 /* is this server shutting down? (boolean) */ 177 int shutdown; 178 179 /* An array of listeners */ 180 grpc_core::InlinedVector<GrpcUdpListener, 16> listeners; 181 182 /* factory for use to create udp listeners */ 183 GrpcUdpHandlerFactory* handler_factory; 184 185 /* shutdown callback */ 186 grpc_closure* shutdown_complete; 187 188 /* all pollsets interested in new connections */ 189 grpc_pollset** pollsets; 190 /* number of pollsets in the pollsets array */ 191 size_t pollset_count; 192 /* opaque object to pass to callbacks */ 193 void* user_data; 194 195 /* latch has_so_reuseport during server creation */ 196 bool so_reuseport; 197 }; 198 199 static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { 200 if (args) { 201 const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); 202 if (arg) { 203 GPR_ASSERT(arg->type == GRPC_ARG_POINTER); 204 return static_cast<grpc_socket_factory*>(arg->value.pointer.p); 205 } 206 } 207 return nullptr; 208 } 209 210 grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { 211 grpc_udp_server* s = grpc_core::New<grpc_udp_server>(); 212 gpr_mu_init(&s->mu); 213 s->socket_factory = get_socket_factory(args); 214 if (s->socket_factory) { 215 grpc_socket_factory_ref(s->socket_factory); 216 } 217 s->active_ports = 0; 218 s->destroyed_ports = 0; 219 s->shutdown = 0; 220 s->so_reuseport = grpc_is_socket_reuse_port_supported(); 221 return s; 222 } 223 224 // static 225 void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) { 226 if (args == nullptr) { 227 // No-op if shutdown args are null. 228 return; 229 } 230 auto sp = static_cast<GrpcUdpListener*>(args); 231 gpr_mu_lock(sp->mutex()); 232 gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_); 233 grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error)); 234 sp->already_shutdown_ = true; 235 if (!sp->notify_on_write_armed_) { 236 // Re-arm write notification to notify listener with error. This is 237 // necessary to decrement active_ports. 238 sp->notify_on_write_armed_ = true; 239 grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); 240 } 241 gpr_mu_unlock(sp->mutex()); 242 } 243 244 static void finish_shutdown(grpc_udp_server* s) { 245 if (s->shutdown_complete != nullptr) { 246 GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); 247 } 248 249 gpr_mu_destroy(&s->mu); 250 251 gpr_log(GPR_DEBUG, "Destroy all listeners."); 252 for (size_t i = 0; i < s->listeners.size(); ++i) { 253 s->listeners[i].OnDestroy(); 254 } 255 256 if (s->socket_factory) { 257 grpc_socket_factory_unref(s->socket_factory); 258 } 259 260 grpc_core::Delete(s); 261 } 262 263 static void destroyed_port(void* server, grpc_error* error) { 264 grpc_udp_server* s = static_cast<grpc_udp_server*>(server); 265 gpr_mu_lock(&s->mu); 266 s->destroyed_ports++; 267 if (s->destroyed_ports == s->listeners.size()) { 268 gpr_mu_unlock(&s->mu); 269 finish_shutdown(s); 270 } else { 271 gpr_mu_unlock(&s->mu); 272 } 273 } 274 275 /* called when all listening endpoints have been shutdown, so no further 276 events will be received on them - at this point it's safe to destroy 277 things */ 278 static void deactivated_all_ports(grpc_udp_server* s) { 279 /* delete ALL the things */ 280 gpr_mu_lock(&s->mu); 281 282 GPR_ASSERT(s->shutdown); 283 284 if (s->listeners.size() == 0) { 285 gpr_mu_unlock(&s->mu); 286 finish_shutdown(s); 287 return; 288 } 289 for (size_t i = 0; i < s->listeners.size(); ++i) { 290 s->listeners[i].OrphanFd(); 291 } 292 gpr_mu_unlock(&s->mu); 293 } 294 295 void GrpcUdpListener::OrphanFd() { 296 gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_); 297 grpc_unlink_if_unix_domain_socket(&addr_); 298 299 GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, 300 grpc_schedule_on_exec_ctx); 301 /* Because at this point, all listening sockets have been shutdown already, no 302 * need to call OnFdAboutToOrphan() to notify the handler again. */ 303 grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown"); 304 } 305 306 void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { 307 gpr_mu_lock(&s->mu); 308 309 GPR_ASSERT(!s->shutdown); 310 s->shutdown = 1; 311 312 s->shutdown_complete = on_done; 313 314 gpr_log(GPR_DEBUG, "start to destroy udp_server"); 315 /* shutdown all fd's */ 316 if (s->active_ports) { 317 for (size_t i = 0; i < s->listeners.size(); ++i) { 318 GrpcUdpListener* sp = &s->listeners[i]; 319 sp->OnFdAboutToOrphan(); 320 } 321 gpr_mu_unlock(&s->mu); 322 } else { 323 gpr_mu_unlock(&s->mu); 324 deactivated_all_ports(s); 325 } 326 } 327 328 void GrpcUdpListener::OnFdAboutToOrphan() { 329 gpr_mu_lock(&mutex_); 330 grpc_unlink_if_unix_domain_socket(&addr_); 331 332 GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, 333 grpc_schedule_on_exec_ctx); 334 if (!orphan_notified_ && udp_handler_ != nullptr) { 335 /* Singals udp_handler that the FD is about to be closed and 336 * should no longer be used. */ 337 GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this, 338 grpc_schedule_on_exec_ctx); 339 gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_); 340 udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data); 341 orphan_notified_ = true; 342 } 343 gpr_mu_unlock(&mutex_); 344 } 345 346 static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, 347 const grpc_resolved_address* addr) { 348 return (socket_factory != nullptr) 349 ? grpc_socket_factory_bind(socket_factory, sockfd, addr) 350 : bind(sockfd, 351 reinterpret_cast<grpc_sockaddr*>( 352 const_cast<char*>(addr->addr)), 353 addr->len); 354 } 355 356 /* Prepare a recently-created socket for listening. */ 357 static int prepare_socket(grpc_socket_factory* socket_factory, int fd, 358 const grpc_resolved_address* addr, int rcv_buf_size, 359 int snd_buf_size, bool so_reuseport) { 360 grpc_resolved_address sockname_temp; 361 grpc_sockaddr* addr_ptr = 362 reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)); 363 364 if (fd < 0) { 365 goto error; 366 } 367 368 if (grpc_set_socket_nonblocking(fd, 1) != GRPC_ERROR_NONE) { 369 gpr_log(GPR_ERROR, "Unable to set nonblocking %d: %s", fd, strerror(errno)); 370 goto error; 371 } 372 if (grpc_set_socket_cloexec(fd, 1) != GRPC_ERROR_NONE) { 373 gpr_log(GPR_ERROR, "Unable to set cloexec %d: %s", fd, strerror(errno)); 374 goto error; 375 } 376 377 if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) { 378 gpr_log(GPR_ERROR, "Unable to set ip_pktinfo."); 379 goto error; 380 } else if (addr_ptr->sa_family == AF_INET6) { 381 if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) { 382 gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo."); 383 goto error; 384 } 385 } 386 387 if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { 388 gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", 389 snd_buf_size); 390 goto error; 391 } 392 393 if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) { 394 gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", 395 rcv_buf_size); 396 goto error; 397 } 398 399 { 400 int get_overflow = 1; 401 if (0 != setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, 402 sizeof(get_overflow))) { 403 gpr_log(GPR_INFO, "Failed to set socket overflow support"); 404 } 405 } 406 407 if (so_reuseport && !grpc_is_unix_socket(addr) && 408 grpc_set_socket_reuse_port(fd, 1) != GRPC_ERROR_NONE) { 409 gpr_log(GPR_ERROR, "Failed to set SO_REUSEPORT for fd %d", fd); 410 goto error; 411 } 412 413 if (bind_socket(socket_factory, fd, addr) < 0) { 414 char* addr_str; 415 grpc_sockaddr_to_string(&addr_str, addr, 0); 416 gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); 417 gpr_free(addr_str); 418 goto error; 419 } 420 421 sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage)); 422 423 if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr), 424 &sockname_temp.len) < 0) { 425 gpr_log(GPR_ERROR, "Unable to get the address socket %d is bound to: %s", 426 fd, strerror(errno)); 427 goto error; 428 } 429 430 return grpc_sockaddr_get_port(&sockname_temp); 431 432 error: 433 if (fd >= 0) { 434 close(fd); 435 } 436 return -1; 437 } 438 439 // static 440 void GrpcUdpListener::do_read(void* arg, grpc_error* error) { 441 GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); 442 GPR_ASSERT(error == GRPC_ERROR_NONE); 443 /* TODO: the reason we hold server->mu here is merely to prevent fd 444 * shutdown while we are reading. However, it blocks do_write(). Switch to 445 * read lock if available. */ 446 gpr_mu_lock(sp->mutex()); 447 /* Tell the registered callback that data is available to read. */ 448 if (!sp->already_shutdown_ && sp->udp_handler_->Read()) { 449 /* There maybe more packets to read. Schedule read_more_cb_ closure to run 450 * after finishing this event loop. */ 451 GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE); 452 } else { 453 /* Finish reading all the packets, re-arm the notification event so we can 454 * get another chance to read. Or fd already shutdown, re-arm to get a 455 * notification with shutdown error. */ 456 grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_); 457 } 458 gpr_mu_unlock(sp->mutex()); 459 } 460 461 // static 462 void GrpcUdpListener::on_read(void* arg, grpc_error* error) { 463 GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); 464 sp->OnRead(error, arg); 465 } 466 467 void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { 468 if (error != GRPC_ERROR_NONE) { 469 gpr_mu_lock(&server_->mu); 470 if (0 == --server_->active_ports && server_->shutdown) { 471 gpr_mu_unlock(&server_->mu); 472 deactivated_all_ports(server_); 473 } else { 474 gpr_mu_unlock(&server_->mu); 475 } 476 return; 477 } 478 479 /* Read once. If there is more data to read, off load the work to another 480 * thread to finish. */ 481 if (udp_handler_->Read()) { 482 /* There maybe more packets to read. Schedule read_more_cb_ closure to run 483 * after finishing this event loop. */ 484 GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, 485 grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); 486 GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); 487 } else { 488 /* Finish reading all the packets, re-arm the notification event so we can 489 * get another chance to read. Or fd already shutdown, re-arm to get a 490 * notification with shutdown error. */ 491 grpc_fd_notify_on_read(emfd_, &read_closure_); 492 } 493 } 494 495 // static 496 // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. 497 void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) { 498 GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); 499 gpr_mu_lock(sp->mutex()); 500 if (!sp->notify_on_write_armed_) { 501 grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); 502 sp->notify_on_write_armed_ = true; 503 } 504 gpr_mu_unlock(sp->mutex()); 505 } 506 507 // static 508 void GrpcUdpListener::do_write(void* arg, grpc_error* error) { 509 GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); 510 gpr_mu_lock(sp->mutex()); 511 if (sp->already_shutdown_) { 512 // If fd has been shutdown, don't write any more and re-arm notification. 513 grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); 514 } else { 515 sp->notify_on_write_armed_ = false; 516 /* Tell the registered callback that the socket is writeable. */ 517 GPR_ASSERT(error == GRPC_ERROR_NONE); 518 GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper, 519 arg, grpc_schedule_on_exec_ctx); 520 sp->udp_handler_->OnCanWrite(sp->server_->user_data, 521 &sp->notify_on_write_closure_); 522 } 523 gpr_mu_unlock(sp->mutex()); 524 } 525 526 // static 527 void GrpcUdpListener::on_write(void* arg, grpc_error* error) { 528 GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); 529 sp->OnCanWrite(error, arg); 530 } 531 532 void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { 533 if (error != GRPC_ERROR_NONE) { 534 gpr_mu_lock(&server_->mu); 535 if (0 == --server_->active_ports && server_->shutdown) { 536 gpr_mu_unlock(&server_->mu); 537 deactivated_all_ports(server_); 538 } else { 539 gpr_mu_unlock(&server_->mu); 540 } 541 return; 542 } 543 544 /* Schedule actual write in another thread. */ 545 GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, 546 grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); 547 548 GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); 549 } 550 551 static int add_socket_to_server(grpc_udp_server* s, int fd, 552 const grpc_resolved_address* addr, 553 int rcv_buf_size, int snd_buf_size) { 554 gpr_log(GPR_DEBUG, "add socket %d to server", fd); 555 556 int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, 557 snd_buf_size, s->so_reuseport); 558 if (port >= 0) { 559 gpr_mu_lock(&s->mu); 560 s->listeners.emplace_back(s, fd, addr); 561 gpr_log(GPR_DEBUG, 562 "add socket %d to server for port %d, %zu listener(s) in total", fd, 563 port, s->listeners.size()); 564 gpr_mu_unlock(&s->mu); 565 } 566 return port; 567 } 568 569 int grpc_udp_server_add_port(grpc_udp_server* s, 570 const grpc_resolved_address* addr, 571 int rcv_buf_size, int snd_buf_size, 572 GrpcUdpHandlerFactory* handler_factory, 573 size_t num_listeners) { 574 if (num_listeners > 1 && !s->so_reuseport) { 575 gpr_log(GPR_ERROR, 576 "Try to have multiple listeners on same port, but SO_REUSEPORT is " 577 "not supported. Only create 1 listener."); 578 } 579 char* addr_str; 580 grpc_sockaddr_to_string(&addr_str, addr, 1); 581 gpr_log(GPR_DEBUG, "add address: %s to server", addr_str); 582 gpr_free(addr_str); 583 584 int allocated_port1 = -1; 585 int allocated_port2 = -1; 586 int fd; 587 grpc_dualstack_mode dsmode; 588 grpc_resolved_address addr6_v4mapped; 589 grpc_resolved_address wild4; 590 grpc_resolved_address wild6; 591 grpc_resolved_address addr4_copy; 592 grpc_resolved_address* allocated_addr = nullptr; 593 grpc_resolved_address sockname_temp; 594 int port = 0; 595 596 /* Check if this is a wildcard port, and if so, try to keep the port the same 597 as some previously created listener. */ 598 if (grpc_sockaddr_get_port(addr) == 0) { 599 /* Loop through existing listeners to find the port in use. */ 600 for (size_t i = 0; i < s->listeners.size(); ++i) { 601 sockname_temp.len = 602 static_cast<socklen_t>(sizeof(struct sockaddr_storage)); 603 if (0 == getsockname(s->listeners[i].fd(), 604 reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr), 605 &sockname_temp.len)) { 606 port = grpc_sockaddr_get_port(&sockname_temp); 607 if (port > 0) { 608 /* Found such a port, update |addr| to reflects this port. */ 609 allocated_addr = static_cast<grpc_resolved_address*>( 610 gpr_malloc(sizeof(grpc_resolved_address))); 611 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); 612 grpc_sockaddr_set_port(allocated_addr, port); 613 addr = allocated_addr; 614 break; 615 } 616 } 617 } 618 } 619 620 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { 621 addr = &addr6_v4mapped; 622 } 623 624 s->handler_factory = handler_factory; 625 for (size_t i = 0; i < num_listeners; ++i) { 626 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ 627 if (grpc_sockaddr_is_wildcard(addr, &port)) { 628 grpc_sockaddr_make_wildcards(port, &wild4, &wild6); 629 630 /* Try listening on IPv6 first. */ 631 addr = &wild6; 632 // TODO(rjshade): Test and propagate the returned grpc_error*: 633 GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( 634 s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); 635 allocated_port1 = 636 add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); 637 if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { 638 if (port == 0) { 639 /* This is the first time to bind to |addr|. If its port is still 640 * wildcard port, update |addr| with the ephermeral port returned by 641 * kernel. Thus |addr| can have a specific port in following 642 * iterations. */ 643 grpc_sockaddr_set_port(addr, allocated_port1); 644 port = allocated_port1; 645 } else if (allocated_port1 >= 0) { 646 /* The following sucessfully created socket should have same port as 647 * the first one. */ 648 GPR_ASSERT(port == allocated_port1); 649 } 650 /* A dualstack socket is created, no need to create corresponding IPV4 651 * socket. */ 652 continue; 653 } 654 655 /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ 656 if (port == 0 && allocated_port1 > 0) { 657 /* |port| hasn't been assigned to an emphemeral port yet, |wild4| must 658 * have a wildcard port. Update it with the emphemeral port created 659 * during binding.*/ 660 grpc_sockaddr_set_port(&wild4, allocated_port1); 661 port = allocated_port1; 662 } 663 /* |wild4| should have been updated with an emphemeral port by now. Use 664 * this IPV4 address to create a IPV4 socket. */ 665 addr = &wild4; 666 } 667 668 // TODO(rjshade): Test and propagate the returned grpc_error*: 669 GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( 670 s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); 671 if (fd < 0) { 672 gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); 673 } 674 if (dsmode == GRPC_DSMODE_IPV4 && 675 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { 676 addr = &addr4_copy; 677 } 678 allocated_port2 = 679 add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); 680 if (port == 0) { 681 /* Update |addr| with the ephermeral port returned by kernel. So |addr| 682 * can have a specific port in following iterations. */ 683 grpc_sockaddr_set_port(addr, allocated_port2); 684 port = allocated_port2; 685 } else if (allocated_port2 >= 0) { 686 GPR_ASSERT(port == allocated_port2); 687 } 688 } 689 690 gpr_free(allocated_addr); 691 return port; 692 } 693 694 int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { 695 if (port_index >= s->listeners.size()) { 696 return -1; 697 } 698 699 return s->listeners[port_index].fd(); 700 } 701 702 void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, 703 size_t pollset_count, void* user_data) { 704 gpr_log(GPR_DEBUG, "grpc_udp_server_start"); 705 gpr_mu_lock(&s->mu); 706 GPR_ASSERT(s->active_ports == 0); 707 s->pollsets = pollsets; 708 s->user_data = user_data; 709 710 for (size_t i = 0; i < s->listeners.size(); ++i) { 711 s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory); 712 } 713 714 gpr_mu_unlock(&s->mu); 715 } 716 717 void GrpcUdpListener::StartListening(grpc_pollset** pollsets, 718 size_t pollset_count, 719 GrpcUdpHandlerFactory* handler_factory) { 720 gpr_mu_lock(&mutex_); 721 handler_factory_ = handler_factory; 722 udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data); 723 for (size_t i = 0; i < pollset_count; i++) { 724 grpc_pollset_add_fd(pollsets[i], emfd_); 725 } 726 GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx); 727 grpc_fd_notify_on_read(emfd_, &read_closure_); 728 729 GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx); 730 notify_on_write_armed_ = true; 731 grpc_fd_notify_on_write(emfd_, &write_closure_); 732 733 /* Registered for both read and write callbacks: increment active_ports 734 * twice to account for this, and delay free-ing of memory until both 735 * on_read and on_write have fired. */ 736 server_->active_ports += 2; 737 gpr_mu_unlock(&mutex_); 738 } 739 740 void GrpcUdpListener::OnDestroy() { 741 if (udp_handler_ != nullptr) { 742 handler_factory_->DestroyUdpHandler(udp_handler_); 743 } 744 } 745 746 #endif 747