1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 // Pipe adapter to connect code expecting an io.Reader 6 // with code expecting an io.Writer. 7 8 package io 9 10 import ( 11 "errors" 12 "sync" 13 ) 14 15 // ErrClosedPipe is the error used for read or write operations on a closed pipe. 16 var ErrClosedPipe = errors.New("io: read/write on closed pipe") 17 18 type pipeResult struct { 19 n int 20 err error 21 } 22 23 // A pipe is the shared pipe structure underlying PipeReader and PipeWriter. 24 type pipe struct { 25 rl sync.Mutex // gates readers one at a time 26 wl sync.Mutex // gates writers one at a time 27 l sync.Mutex // protects remaining fields 28 data []byte // data remaining in pending write 29 rwait sync.Cond // waiting reader 30 wwait sync.Cond // waiting writer 31 rerr error // if reader closed, error to give writes 32 werr error // if writer closed, error to give reads 33 } 34 35 func (p *pipe) read(b []byte) (n int, err error) { 36 // One reader at a time. 37 p.rl.Lock() 38 defer p.rl.Unlock() 39 40 p.l.Lock() 41 defer p.l.Unlock() 42 for { 43 if p.rerr != nil { 44 return 0, ErrClosedPipe 45 } 46 if p.data != nil { 47 break 48 } 49 if p.werr != nil { 50 return 0, p.werr 51 } 52 p.rwait.Wait() 53 } 54 n = copy(b, p.data) 55 p.data = p.data[n:] 56 if len(p.data) == 0 { 57 p.data = nil 58 p.wwait.Signal() 59 } 60 return 61 } 62 63 var zero [0]byte 64 65 func (p *pipe) write(b []byte) (n int, err error) { 66 // pipe uses nil to mean not available 67 if b == nil { 68 b = zero[:] 69 } 70 71 // One writer at a time. 72 p.wl.Lock() 73 defer p.wl.Unlock() 74 75 p.l.Lock() 76 defer p.l.Unlock() 77 if p.werr != nil { 78 err = ErrClosedPipe 79 return 80 } 81 p.data = b 82 p.rwait.Signal() 83 for { 84 if p.data == nil { 85 break 86 } 87 if p.rerr != nil { 88 err = p.rerr 89 break 90 } 91 if p.werr != nil { 92 err = ErrClosedPipe 93 } 94 p.wwait.Wait() 95 } 96 n = len(b) - len(p.data) 97 p.data = nil // in case of rerr or werr 98 return 99 } 100 101 func (p *pipe) rclose(err error) { 102 if err == nil { 103 err = ErrClosedPipe 104 } 105 p.l.Lock() 106 defer p.l.Unlock() 107 p.rerr = err 108 p.rwait.Signal() 109 p.wwait.Signal() 110 } 111 112 func (p *pipe) wclose(err error) { 113 if err == nil { 114 err = EOF 115 } 116 p.l.Lock() 117 defer p.l.Unlock() 118 p.werr = err 119 p.rwait.Signal() 120 p.wwait.Signal() 121 } 122 123 // A PipeReader is the read half of a pipe. 124 type PipeReader struct { 125 p *pipe 126 } 127 128 // Read implements the standard Read interface: 129 // it reads data from the pipe, blocking until a writer 130 // arrives or the write end is closed. 131 // If the write end is closed with an error, that error is 132 // returned as err; otherwise err is EOF. 133 func (r *PipeReader) Read(data []byte) (n int, err error) { 134 return r.p.read(data) 135 } 136 137 // Close closes the reader; subsequent writes to the 138 // write half of the pipe will return the error ErrClosedPipe. 139 func (r *PipeReader) Close() error { 140 return r.CloseWithError(nil) 141 } 142 143 // CloseWithError closes the reader; subsequent writes 144 // to the write half of the pipe will return the error err. 145 func (r *PipeReader) CloseWithError(err error) error { 146 r.p.rclose(err) 147 return nil 148 } 149 150 // A PipeWriter is the write half of a pipe. 151 type PipeWriter struct { 152 p *pipe 153 } 154 155 // Write implements the standard Write interface: 156 // it writes data to the pipe, blocking until readers 157 // have consumed all the data or the read end is closed. 158 // If the read end is closed with an error, that err is 159 // returned as err; otherwise err is ErrClosedPipe. 160 func (w *PipeWriter) Write(data []byte) (n int, err error) { 161 return w.p.write(data) 162 } 163 164 // Close closes the writer; subsequent reads from the 165 // read half of the pipe will return no bytes and EOF. 166 func (w *PipeWriter) Close() error { 167 return w.CloseWithError(nil) 168 } 169 170 // CloseWithError closes the writer; subsequent reads from the 171 // read half of the pipe will return no bytes and the error err, 172 // or EOF if err is nil. 173 // 174 // CloseWithError always returns nil. 175 func (w *PipeWriter) CloseWithError(err error) error { 176 w.p.wclose(err) 177 return nil 178 } 179 180 // Pipe creates a synchronous in-memory pipe. 181 // It can be used to connect code expecting an io.Reader 182 // with code expecting an io.Writer. 183 // Reads on one end are matched with writes on the other, 184 // copying data directly between the two; there is no internal buffering. 185 // It is safe to call Read and Write in parallel with each other or with 186 // Close. Close will complete once pending I/O is done. Parallel calls to 187 // Read, and parallel calls to Write, are also safe: 188 // the individual calls will be gated sequentially. 189 func Pipe() (*PipeReader, *PipeWriter) { 190 p := new(pipe) 191 p.rwait.L = &p.l 192 p.wwait.L = &p.l 193 r := &PipeReader{p} 194 w := &PipeWriter{p} 195 return r, w 196 } 197