1 // Copyright 2010 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package net 6 7 import ( 8 "context" 9 "internal/race" 10 "os" 11 "runtime" 12 "sync" 13 "syscall" 14 "unsafe" 15 ) 16 17 var ( 18 initErr error 19 ioSync uint64 20 ) 21 22 // CancelIo Windows API cancels all outstanding IO for a particular 23 // socket on current thread. To overcome that limitation, we run 24 // special goroutine, locked to OS single thread, that both starts 25 // and cancels IO. It means, there are 2 unavoidable thread switches 26 // for every IO. 27 // Some newer versions of Windows has new CancelIoEx API, that does 28 // not have that limitation and can be used from any thread. This 29 // package uses CancelIoEx API, if present, otherwise it fallback 30 // to CancelIo. 31 32 var ( 33 canCancelIO bool // determines if CancelIoEx API is present 34 skipSyncNotif bool 35 hasLoadSetFileCompletionNotificationModes bool 36 ) 37 38 func sysInit() { 39 var d syscall.WSAData 40 e := syscall.WSAStartup(uint32(0x202), &d) 41 if e != nil { 42 initErr = os.NewSyscallError("wsastartup", e) 43 } 44 canCancelIO = syscall.LoadCancelIoEx() == nil 45 hasLoadSetFileCompletionNotificationModes = syscall.LoadSetFileCompletionNotificationModes() == nil 46 if hasLoadSetFileCompletionNotificationModes { 47 // It's not safe to use FILE_SKIP_COMPLETION_PORT_ON_SUCCESS if non IFS providers are installed: 48 // http://support.microsoft.com/kb/2568167 49 skipSyncNotif = true 50 protos := [2]int32{syscall.IPPROTO_TCP, 0} 51 var buf [32]syscall.WSAProtocolInfo 52 len := uint32(unsafe.Sizeof(buf)) 53 n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len) 54 if err != nil { 55 skipSyncNotif = false 56 } else { 57 for i := int32(0); i < n; i++ { 58 if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 { 59 skipSyncNotif = false 60 break 61 } 62 } 63 } 64 } 65 } 66 67 // canUseConnectEx reports whether we can use the ConnectEx Windows API call 68 // for the given network type. 69 func canUseConnectEx(net string) bool { 70 switch net { 71 case "tcp", "tcp4", "tcp6": 72 return true 73 } 74 // ConnectEx windows API does not support connectionless sockets. 75 return false 76 } 77 78 // operation contains superset of data necessary to perform all async IO. 79 type operation struct { 80 // Used by IOCP interface, it must be first field 81 // of the struct, as our code rely on it. 82 o syscall.Overlapped 83 84 // fields used by runtime.netpoll 85 runtimeCtx uintptr 86 mode int32 87 errno int32 88 qty uint32 89 90 // fields used only by net package 91 fd *netFD 92 errc chan error 93 buf syscall.WSABuf 94 sa syscall.Sockaddr 95 rsa *syscall.RawSockaddrAny 96 rsan int32 97 handle syscall.Handle 98 flags uint32 99 bufs []syscall.WSABuf 100 } 101 102 func (o *operation) InitBuf(buf []byte) { 103 o.buf.Len = uint32(len(buf)) 104 o.buf.Buf = nil 105 if len(buf) != 0 { 106 o.buf.Buf = &buf[0] 107 } 108 } 109 110 func (o *operation) InitBufs(buf *Buffers) { 111 if o.bufs == nil { 112 o.bufs = make([]syscall.WSABuf, 0, len(*buf)) 113 } else { 114 o.bufs = o.bufs[:0] 115 } 116 for _, b := range *buf { 117 var p *byte 118 if len(b) > 0 { 119 p = &b[0] 120 } 121 o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: p}) 122 } 123 } 124 125 // ClearBufs clears all pointers to Buffers parameter captured 126 // by InitBufs, so it can be released by garbage collector. 127 func (o *operation) ClearBufs() { 128 for i := range o.bufs { 129 o.bufs[i].Buf = nil 130 } 131 o.bufs = o.bufs[:0] 132 } 133 134 // ioSrv executes net IO requests. 135 type ioSrv struct { 136 req chan ioSrvReq 137 } 138 139 type ioSrvReq struct { 140 o *operation 141 submit func(o *operation) error // if nil, cancel the operation 142 } 143 144 // ProcessRemoteIO will execute submit IO requests on behalf 145 // of other goroutines, all on a single os thread, so it can 146 // cancel them later. Results of all operations will be sent 147 // back to their requesters via channel supplied in request. 148 // It is used only when the CancelIoEx API is unavailable. 149 func (s *ioSrv) ProcessRemoteIO() { 150 runtime.LockOSThread() 151 defer runtime.UnlockOSThread() 152 for r := range s.req { 153 if r.submit != nil { 154 r.o.errc <- r.submit(r.o) 155 } else { 156 r.o.errc <- syscall.CancelIo(r.o.fd.sysfd) 157 } 158 } 159 } 160 161 // ExecIO executes a single IO operation o. It submits and cancels 162 // IO in the current thread for systems where Windows CancelIoEx API 163 // is available. Alternatively, it passes the request onto 164 // runtime netpoll and waits for completion or cancels request. 165 func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) { 166 fd := o.fd 167 // Notify runtime netpoll about starting IO. 168 err := fd.pd.prepare(int(o.mode)) 169 if err != nil { 170 return 0, err 171 } 172 // Start IO. 173 if canCancelIO { 174 err = submit(o) 175 } else { 176 // Send request to a special dedicated thread, 177 // so it can stop the IO with CancelIO later. 178 s.req <- ioSrvReq{o, submit} 179 err = <-o.errc 180 } 181 switch err { 182 case nil: 183 // IO completed immediately 184 if o.fd.skipSyncNotif { 185 // No completion message will follow, so return immediately. 186 return int(o.qty), nil 187 } 188 // Need to get our completion message anyway. 189 case syscall.ERROR_IO_PENDING: 190 // IO started, and we have to wait for its completion. 191 err = nil 192 default: 193 return 0, err 194 } 195 // Wait for our request to complete. 196 err = fd.pd.wait(int(o.mode)) 197 if err == nil { 198 // All is good. Extract our IO results and return. 199 if o.errno != 0 { 200 err = syscall.Errno(o.errno) 201 return 0, err 202 } 203 return int(o.qty), nil 204 } 205 // IO is interrupted by "close" or "timeout" 206 netpollErr := err 207 switch netpollErr { 208 case errClosing, errTimeout: 209 // will deal with those. 210 default: 211 panic("net: unexpected runtime.netpoll error: " + netpollErr.Error()) 212 } 213 // Cancel our request. 214 if canCancelIO { 215 err := syscall.CancelIoEx(fd.sysfd, &o.o) 216 // Assuming ERROR_NOT_FOUND is returned, if IO is completed. 217 if err != nil && err != syscall.ERROR_NOT_FOUND { 218 // TODO(brainman): maybe do something else, but panic. 219 panic(err) 220 } 221 } else { 222 s.req <- ioSrvReq{o, nil} 223 <-o.errc 224 } 225 // Wait for cancelation to complete. 226 fd.pd.waitCanceled(int(o.mode)) 227 if o.errno != 0 { 228 err = syscall.Errno(o.errno) 229 if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled 230 err = netpollErr 231 } 232 return 0, err 233 } 234 // We issued a cancelation request. But, it seems, IO operation succeeded 235 // before the cancelation request run. We need to treat the IO operation as 236 // succeeded (the bytes are actually sent/recv from network). 237 return int(o.qty), nil 238 } 239 240 // Start helper goroutines. 241 var rsrv, wsrv *ioSrv 242 var onceStartServer sync.Once 243 244 func startServer() { 245 rsrv = new(ioSrv) 246 wsrv = new(ioSrv) 247 if !canCancelIO { 248 // Only CancelIo API is available. Lets start two special goroutines 249 // locked to an OS thread, that both starts and cancels IO. One will 250 // process read requests, while other will do writes. 251 rsrv.req = make(chan ioSrvReq) 252 go rsrv.ProcessRemoteIO() 253 wsrv.req = make(chan ioSrvReq) 254 go wsrv.ProcessRemoteIO() 255 } 256 } 257 258 // Network file descriptor. 259 type netFD struct { 260 // locking/lifetime of sysfd + serialize access to Read and Write methods 261 fdmu fdMutex 262 263 // immutable until Close 264 sysfd syscall.Handle 265 family int 266 sotype int 267 isStream bool 268 isConnected bool 269 skipSyncNotif bool 270 net string 271 laddr Addr 272 raddr Addr 273 274 rop operation // read operation 275 wop operation // write operation 276 277 // wait server 278 pd pollDesc 279 } 280 281 func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { 282 if initErr != nil { 283 return nil, initErr 284 } 285 onceStartServer.Do(startServer) 286 return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil 287 } 288 289 func (fd *netFD) init() error { 290 if err := fd.pd.init(fd); err != nil { 291 return err 292 } 293 if hasLoadSetFileCompletionNotificationModes { 294 // We do not use events, so we can skip them always. 295 flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE) 296 // It's not safe to skip completion notifications for UDP: 297 // http://blogs.technet.com/b/winserverperformance/archive/2008/06/26/designing-applications-for-high-performance-part-iii.aspx 298 if skipSyncNotif && fd.net == "tcp" { 299 flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS 300 } 301 err := syscall.SetFileCompletionNotificationModes(fd.sysfd, flags) 302 if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 { 303 fd.skipSyncNotif = true 304 } 305 } 306 // Disable SIO_UDP_CONNRESET behavior. 307 // http://support.microsoft.com/kb/263823 308 switch fd.net { 309 case "udp", "udp4", "udp6": 310 ret := uint32(0) 311 flag := uint32(0) 312 size := uint32(unsafe.Sizeof(flag)) 313 err := syscall.WSAIoctl(fd.sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) 314 if err != nil { 315 return os.NewSyscallError("wsaioctl", err) 316 } 317 } 318 fd.rop.mode = 'r' 319 fd.wop.mode = 'w' 320 fd.rop.fd = fd 321 fd.wop.fd = fd 322 fd.rop.runtimeCtx = fd.pd.runtimeCtx 323 fd.wop.runtimeCtx = fd.pd.runtimeCtx 324 if !canCancelIO { 325 fd.rop.errc = make(chan error) 326 fd.wop.errc = make(chan error) 327 } 328 return nil 329 } 330 331 func (fd *netFD) setAddr(laddr, raddr Addr) { 332 fd.laddr = laddr 333 fd.raddr = raddr 334 runtime.SetFinalizer(fd, (*netFD).Close) 335 } 336 337 func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { 338 // Do not need to call fd.writeLock here, 339 // because fd is not yet accessible to user, 340 // so no concurrent operations are possible. 341 if err := fd.init(); err != nil { 342 return err 343 } 344 if deadline, ok := ctx.Deadline(); ok && !deadline.IsZero() { 345 fd.setWriteDeadline(deadline) 346 defer fd.setWriteDeadline(noDeadline) 347 } 348 if !canUseConnectEx(fd.net) { 349 err := connectFunc(fd.sysfd, ra) 350 return os.NewSyscallError("connect", err) 351 } 352 // ConnectEx windows API requires an unconnected, previously bound socket. 353 if la == nil { 354 switch ra.(type) { 355 case *syscall.SockaddrInet4: 356 la = &syscall.SockaddrInet4{} 357 case *syscall.SockaddrInet6: 358 la = &syscall.SockaddrInet6{} 359 default: 360 panic("unexpected type in connect") 361 } 362 if err := syscall.Bind(fd.sysfd, la); err != nil { 363 return os.NewSyscallError("bind", err) 364 } 365 } 366 // Call ConnectEx API. 367 o := &fd.wop 368 o.sa = ra 369 370 // Wait for the goroutine converting context.Done into a write timeout 371 // to exist, otherwise our caller might cancel the context and 372 // cause fd.setWriteDeadline(aLongTimeAgo) to cancel a successful dial. 373 done := make(chan bool) // must be unbuffered 374 defer func() { done <- true }() 375 go func() { 376 select { 377 case <-ctx.Done(): 378 // Force the runtime's poller to immediately give 379 // up waiting for writability. 380 fd.setWriteDeadline(aLongTimeAgo) 381 <-done 382 case <-done: 383 } 384 }() 385 386 _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error { 387 return connectExFunc(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) 388 }) 389 if err != nil { 390 select { 391 case <-ctx.Done(): 392 return mapErr(ctx.Err()) 393 default: 394 if _, ok := err.(syscall.Errno); ok { 395 err = os.NewSyscallError("connectex", err) 396 } 397 return err 398 } 399 } 400 // Refresh socket properties. 401 return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))) 402 } 403 404 func (fd *netFD) destroy() { 405 if fd.sysfd == syscall.InvalidHandle { 406 return 407 } 408 // Poller may want to unregister fd in readiness notification mechanism, 409 // so this must be executed before closeFunc. 410 fd.pd.close() 411 closeFunc(fd.sysfd) 412 fd.sysfd = syscall.InvalidHandle 413 // no need for a finalizer anymore 414 runtime.SetFinalizer(fd, nil) 415 } 416 417 func (fd *netFD) Close() error { 418 if !fd.fdmu.increfAndClose() { 419 return errClosing 420 } 421 // unblock pending reader and writer 422 fd.pd.evict() 423 fd.decref() 424 return nil 425 } 426 427 func (fd *netFD) shutdown(how int) error { 428 if err := fd.incref(); err != nil { 429 return err 430 } 431 defer fd.decref() 432 return syscall.Shutdown(fd.sysfd, how) 433 } 434 435 func (fd *netFD) closeRead() error { 436 return fd.shutdown(syscall.SHUT_RD) 437 } 438 439 func (fd *netFD) closeWrite() error { 440 return fd.shutdown(syscall.SHUT_WR) 441 } 442 443 func (fd *netFD) Read(buf []byte) (int, error) { 444 if err := fd.readLock(); err != nil { 445 return 0, err 446 } 447 defer fd.readUnlock() 448 o := &fd.rop 449 o.InitBuf(buf) 450 n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error { 451 return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) 452 }) 453 if race.Enabled { 454 race.Acquire(unsafe.Pointer(&ioSync)) 455 } 456 if len(buf) != 0 { 457 err = fd.eofError(n, err) 458 } 459 if _, ok := err.(syscall.Errno); ok { 460 err = os.NewSyscallError("wsarecv", err) 461 } 462 return n, err 463 } 464 465 func (fd *netFD) readFrom(buf []byte) (int, syscall.Sockaddr, error) { 466 if len(buf) == 0 { 467 return 0, nil, nil 468 } 469 if err := fd.readLock(); err != nil { 470 return 0, nil, err 471 } 472 defer fd.readUnlock() 473 o := &fd.rop 474 o.InitBuf(buf) 475 n, err := rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { 476 if o.rsa == nil { 477 o.rsa = new(syscall.RawSockaddrAny) 478 } 479 o.rsan = int32(unsafe.Sizeof(*o.rsa)) 480 return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) 481 }) 482 err = fd.eofError(n, err) 483 if _, ok := err.(syscall.Errno); ok { 484 err = os.NewSyscallError("wsarecvfrom", err) 485 } 486 if err != nil { 487 return n, nil, err 488 } 489 sa, _ := o.rsa.Sockaddr() 490 return n, sa, nil 491 } 492 493 func (fd *netFD) Write(buf []byte) (int, error) { 494 if err := fd.writeLock(); err != nil { 495 return 0, err 496 } 497 defer fd.writeUnlock() 498 if race.Enabled { 499 race.ReleaseMerge(unsafe.Pointer(&ioSync)) 500 } 501 o := &fd.wop 502 o.InitBuf(buf) 503 n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { 504 return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) 505 }) 506 if _, ok := err.(syscall.Errno); ok { 507 err = os.NewSyscallError("wsasend", err) 508 } 509 return n, err 510 } 511 512 func (c *conn) writeBuffers(v *Buffers) (int64, error) { 513 if !c.ok() { 514 return 0, syscall.EINVAL 515 } 516 n, err := c.fd.writeBuffers(v) 517 if err != nil { 518 return n, &OpError{Op: "WSASend", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} 519 } 520 return n, nil 521 } 522 523 func (fd *netFD) writeBuffers(buf *Buffers) (int64, error) { 524 if len(*buf) == 0 { 525 return 0, nil 526 } 527 if err := fd.writeLock(); err != nil { 528 return 0, err 529 } 530 defer fd.writeUnlock() 531 if race.Enabled { 532 race.ReleaseMerge(unsafe.Pointer(&ioSync)) 533 } 534 o := &fd.wop 535 o.InitBufs(buf) 536 n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { 537 return syscall.WSASend(o.fd.sysfd, &o.bufs[0], uint32(len(*buf)), &o.qty, 0, &o.o, nil) 538 }) 539 o.ClearBufs() 540 if _, ok := err.(syscall.Errno); ok { 541 err = os.NewSyscallError("wsasend", err) 542 } 543 testHookDidWritev(n) 544 buf.consume(int64(n)) 545 return int64(n), err 546 } 547 548 func (fd *netFD) writeTo(buf []byte, sa syscall.Sockaddr) (int, error) { 549 if len(buf) == 0 { 550 return 0, nil 551 } 552 if err := fd.writeLock(); err != nil { 553 return 0, err 554 } 555 defer fd.writeUnlock() 556 o := &fd.wop 557 o.InitBuf(buf) 558 o.sa = sa 559 n, err := wsrv.ExecIO(o, "WSASendto", func(o *operation) error { 560 return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) 561 }) 562 if _, ok := err.(syscall.Errno); ok { 563 err = os.NewSyscallError("wsasendto", err) 564 } 565 return n, err 566 } 567 568 func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD, error) { 569 // Get new socket. 570 s, err := sysSocket(fd.family, fd.sotype, 0) 571 if err != nil { 572 return nil, err 573 } 574 575 // Associate our new socket with IOCP. 576 netfd, err := newFD(s, fd.family, fd.sotype, fd.net) 577 if err != nil { 578 closeFunc(s) 579 return nil, err 580 } 581 if err := netfd.init(); err != nil { 582 fd.Close() 583 return nil, err 584 } 585 586 // Submit accept request. 587 o.handle = s 588 o.rsan = int32(unsafe.Sizeof(rawsa[0])) 589 _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error { 590 return acceptFunc(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) 591 }) 592 if err != nil { 593 netfd.Close() 594 if _, ok := err.(syscall.Errno); ok { 595 err = os.NewSyscallError("acceptex", err) 596 } 597 return nil, err 598 } 599 600 // Inherit properties of the listening socket. 601 err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) 602 if err != nil { 603 netfd.Close() 604 return nil, os.NewSyscallError("setsockopt", err) 605 } 606 runtime.KeepAlive(fd) 607 return netfd, nil 608 } 609 610 func (fd *netFD) accept() (*netFD, error) { 611 if err := fd.readLock(); err != nil { 612 return nil, err 613 } 614 defer fd.readUnlock() 615 616 o := &fd.rop 617 var netfd *netFD 618 var err error 619 var rawsa [2]syscall.RawSockaddrAny 620 for { 621 netfd, err = fd.acceptOne(rawsa[:], o) 622 if err == nil { 623 break 624 } 625 // Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is 626 // returned here. These happen if connection reset is received 627 // before AcceptEx could complete. These errors relate to new 628 // connection, not to AcceptEx, so ignore broken connection and 629 // try AcceptEx again for more connections. 630 nerr, ok := err.(*os.SyscallError) 631 if !ok { 632 return nil, err 633 } 634 errno, ok := nerr.Err.(syscall.Errno) 635 if !ok { 636 return nil, err 637 } 638 switch errno { 639 case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET: 640 // ignore these and try again 641 default: 642 return nil, err 643 } 644 } 645 646 // Get local and peer addr out of AcceptEx buffer. 647 var lrsa, rrsa *syscall.RawSockaddrAny 648 var llen, rlen int32 649 syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])), 650 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen) 651 lsa, _ := lrsa.Sockaddr() 652 rsa, _ := rrsa.Sockaddr() 653 654 netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) 655 return netfd, nil 656 } 657 658 // Unimplemented functions. 659 660 func (fd *netFD) dup() (*os.File, error) { 661 // TODO: Implement this 662 return nil, syscall.EWINDOWS 663 } 664 665 func (fd *netFD) readMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { 666 return 0, 0, 0, nil, syscall.EWINDOWS 667 } 668 669 func (fd *netFD) writeMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { 670 return 0, 0, syscall.EWINDOWS 671 } 672