Home | History | Annotate | Download | only in net
      1 // Copyright 2010 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package net
      6 
      7 import (
      8 	"io"
      9 	"sync"
     10 	"time"
     11 )
     12 
     13 // pipeDeadline is an abstraction for handling timeouts.
     14 type pipeDeadline struct {
     15 	mu     sync.Mutex // Guards timer and cancel
     16 	timer  *time.Timer
     17 	cancel chan struct{} // Must be non-nil
     18 }
     19 
     20 func makePipeDeadline() pipeDeadline {
     21 	return pipeDeadline{cancel: make(chan struct{})}
     22 }
     23 
     24 // set sets the point in time when the deadline will time out.
     25 // A timeout event is signaled by closing the channel returned by waiter.
     26 // Once a timeout has occurred, the deadline can be refreshed by specifying a
     27 // t value in the future.
     28 //
     29 // A zero value for t prevents timeout.
     30 func (d *pipeDeadline) set(t time.Time) {
     31 	d.mu.Lock()
     32 	defer d.mu.Unlock()
     33 
     34 	if d.timer != nil && !d.timer.Stop() {
     35 		<-d.cancel // Wait for the timer callback to finish and close cancel
     36 	}
     37 	d.timer = nil
     38 
     39 	// Time is zero, then there is no deadline.
     40 	closed := isClosedChan(d.cancel)
     41 	if t.IsZero() {
     42 		if closed {
     43 			d.cancel = make(chan struct{})
     44 		}
     45 		return
     46 	}
     47 
     48 	// Time in the future, setup a timer to cancel in the future.
     49 	if dur := time.Until(t); dur > 0 {
     50 		if closed {
     51 			d.cancel = make(chan struct{})
     52 		}
     53 		d.timer = time.AfterFunc(dur, func() {
     54 			close(d.cancel)
     55 		})
     56 		return
     57 	}
     58 
     59 	// Time in the past, so close immediately.
     60 	if !closed {
     61 		close(d.cancel)
     62 	}
     63 }
     64 
     65 // wait returns a channel that is closed when the deadline is exceeded.
     66 func (d *pipeDeadline) wait() chan struct{} {
     67 	d.mu.Lock()
     68 	defer d.mu.Unlock()
     69 	return d.cancel
     70 }
     71 
     72 func isClosedChan(c <-chan struct{}) bool {
     73 	select {
     74 	case <-c:
     75 		return true
     76 	default:
     77 		return false
     78 	}
     79 }
     80 
     81 type timeoutError struct{}
     82 
     83 func (timeoutError) Error() string   { return "deadline exceeded" }
     84 func (timeoutError) Timeout() bool   { return true }
     85 func (timeoutError) Temporary() bool { return true }
     86 
     87 type pipeAddr struct{}
     88 
     89 func (pipeAddr) Network() string { return "pipe" }
     90 func (pipeAddr) String() string  { return "pipe" }
     91 
     92 type pipe struct {
     93 	wrMu sync.Mutex // Serialize Write operations
     94 
     95 	// Used by local Read to interact with remote Write.
     96 	// Successful receive on rdRx is always followed by send on rdTx.
     97 	rdRx <-chan []byte
     98 	rdTx chan<- int
     99 
    100 	// Used by local Write to interact with remote Read.
    101 	// Successful send on wrTx is always followed by receive on wrRx.
    102 	wrTx chan<- []byte
    103 	wrRx <-chan int
    104 
    105 	once       sync.Once // Protects closing localDone
    106 	localDone  chan struct{}
    107 	remoteDone <-chan struct{}
    108 
    109 	readDeadline  pipeDeadline
    110 	writeDeadline pipeDeadline
    111 }
    112 
    113 // Pipe creates a synchronous, in-memory, full duplex
    114 // network connection; both ends implement the Conn interface.
    115 // Reads on one end are matched with writes on the other,
    116 // copying data directly between the two; there is no internal
    117 // buffering.
    118 func Pipe() (Conn, Conn) {
    119 	cb1 := make(chan []byte)
    120 	cb2 := make(chan []byte)
    121 	cn1 := make(chan int)
    122 	cn2 := make(chan int)
    123 	done1 := make(chan struct{})
    124 	done2 := make(chan struct{})
    125 
    126 	p1 := &pipe{
    127 		rdRx: cb1, rdTx: cn1,
    128 		wrTx: cb2, wrRx: cn2,
    129 		localDone: done1, remoteDone: done2,
    130 		readDeadline:  makePipeDeadline(),
    131 		writeDeadline: makePipeDeadline(),
    132 	}
    133 	p2 := &pipe{
    134 		rdRx: cb2, rdTx: cn2,
    135 		wrTx: cb1, wrRx: cn1,
    136 		localDone: done2, remoteDone: done1,
    137 		readDeadline:  makePipeDeadline(),
    138 		writeDeadline: makePipeDeadline(),
    139 	}
    140 	return p1, p2
    141 }
    142 
    143 func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
    144 func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
    145 
    146 func (p *pipe) Read(b []byte) (int, error) {
    147 	n, err := p.read(b)
    148 	if err != nil && err != io.EOF && err != io.ErrClosedPipe {
    149 		err = &OpError{Op: "read", Net: "pipe", Err: err}
    150 	}
    151 	return n, err
    152 }
    153 
    154 func (p *pipe) read(b []byte) (n int, err error) {
    155 	switch {
    156 	case isClosedChan(p.localDone):
    157 		return 0, io.ErrClosedPipe
    158 	case isClosedChan(p.remoteDone):
    159 		return 0, io.EOF
    160 	case isClosedChan(p.readDeadline.wait()):
    161 		return 0, timeoutError{}
    162 	}
    163 
    164 	select {
    165 	case bw := <-p.rdRx:
    166 		nr := copy(b, bw)
    167 		p.rdTx <- nr
    168 		return nr, nil
    169 	case <-p.localDone:
    170 		return 0, io.ErrClosedPipe
    171 	case <-p.remoteDone:
    172 		return 0, io.EOF
    173 	case <-p.readDeadline.wait():
    174 		return 0, timeoutError{}
    175 	}
    176 }
    177 
    178 func (p *pipe) Write(b []byte) (int, error) {
    179 	n, err := p.write(b)
    180 	if err != nil && err != io.ErrClosedPipe {
    181 		err = &OpError{Op: "write", Net: "pipe", Err: err}
    182 	}
    183 	return n, err
    184 }
    185 
    186 func (p *pipe) write(b []byte) (n int, err error) {
    187 	switch {
    188 	case isClosedChan(p.localDone):
    189 		return 0, io.ErrClosedPipe
    190 	case isClosedChan(p.remoteDone):
    191 		return 0, io.ErrClosedPipe
    192 	case isClosedChan(p.writeDeadline.wait()):
    193 		return 0, timeoutError{}
    194 	}
    195 
    196 	p.wrMu.Lock() // Ensure entirety of b is written together
    197 	defer p.wrMu.Unlock()
    198 	for once := true; once || len(b) > 0; once = false {
    199 		select {
    200 		case p.wrTx <- b:
    201 			nw := <-p.wrRx
    202 			b = b[nw:]
    203 			n += nw
    204 		case <-p.localDone:
    205 			return n, io.ErrClosedPipe
    206 		case <-p.remoteDone:
    207 			return n, io.ErrClosedPipe
    208 		case <-p.writeDeadline.wait():
    209 			return n, timeoutError{}
    210 		}
    211 	}
    212 	return n, nil
    213 }
    214 
    215 func (p *pipe) SetDeadline(t time.Time) error {
    216 	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
    217 		return io.ErrClosedPipe
    218 	}
    219 	p.readDeadline.set(t)
    220 	p.writeDeadline.set(t)
    221 	return nil
    222 }
    223 
    224 func (p *pipe) SetReadDeadline(t time.Time) error {
    225 	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
    226 		return io.ErrClosedPipe
    227 	}
    228 	p.readDeadline.set(t)
    229 	return nil
    230 }
    231 
    232 func (p *pipe) SetWriteDeadline(t time.Time) error {
    233 	if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
    234 		return io.ErrClosedPipe
    235 	}
    236 	p.writeDeadline.set(t)
    237 	return nil
    238 }
    239 
    240 func (p *pipe) Close() error {
    241 	p.once.Do(func() { close(p.localDone) })
    242 	return nil
    243 }
    244