Home | History | Annotate | Download | only in io
      1 // Copyright 2009 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Pipe adapter to connect code expecting an io.Reader
      6 // with code expecting an io.Writer.
      7 
      8 package io
      9 
     10 import (
     11 	"errors"
     12 	"sync"
     13 )
     14 
     15 // ErrClosedPipe is the error used for read or write operations on a closed pipe.
     16 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
     17 
     18 // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
     19 type pipe struct {
     20 	rl    sync.Mutex // gates readers one at a time
     21 	wl    sync.Mutex // gates writers one at a time
     22 	l     sync.Mutex // protects remaining fields
     23 	data  []byte     // data remaining in pending write
     24 	rwait sync.Cond  // waiting reader
     25 	wwait sync.Cond  // waiting writer
     26 	rerr  error      // if reader closed, error to give writes
     27 	werr  error      // if writer closed, error to give reads
     28 }
     29 
     30 func (p *pipe) read(b []byte) (n int, err error) {
     31 	// One reader at a time.
     32 	p.rl.Lock()
     33 	defer p.rl.Unlock()
     34 
     35 	p.l.Lock()
     36 	defer p.l.Unlock()
     37 	for {
     38 		if p.rerr != nil {
     39 			return 0, ErrClosedPipe
     40 		}
     41 		if p.data != nil {
     42 			break
     43 		}
     44 		if p.werr != nil {
     45 			return 0, p.werr
     46 		}
     47 		p.rwait.Wait()
     48 	}
     49 	n = copy(b, p.data)
     50 	p.data = p.data[n:]
     51 	if len(p.data) == 0 {
     52 		p.data = nil
     53 		p.wwait.Signal()
     54 	}
     55 	return
     56 }
     57 
     58 var zero [0]byte
     59 
     60 func (p *pipe) write(b []byte) (n int, err error) {
     61 	// pipe uses nil to mean not available
     62 	if b == nil {
     63 		b = zero[:]
     64 	}
     65 
     66 	// One writer at a time.
     67 	p.wl.Lock()
     68 	defer p.wl.Unlock()
     69 
     70 	p.l.Lock()
     71 	defer p.l.Unlock()
     72 	if p.werr != nil {
     73 		err = ErrClosedPipe
     74 		return
     75 	}
     76 	p.data = b
     77 	p.rwait.Signal()
     78 	for {
     79 		if p.data == nil {
     80 			break
     81 		}
     82 		if p.rerr != nil {
     83 			err = p.rerr
     84 			break
     85 		}
     86 		if p.werr != nil {
     87 			err = ErrClosedPipe
     88 			break
     89 		}
     90 		p.wwait.Wait()
     91 	}
     92 	n = len(b) - len(p.data)
     93 	p.data = nil // in case of rerr or werr
     94 	return
     95 }
     96 
     97 func (p *pipe) rclose(err error) {
     98 	if err == nil {
     99 		err = ErrClosedPipe
    100 	}
    101 	p.l.Lock()
    102 	defer p.l.Unlock()
    103 	p.rerr = err
    104 	p.rwait.Signal()
    105 	p.wwait.Signal()
    106 }
    107 
    108 func (p *pipe) wclose(err error) {
    109 	if err == nil {
    110 		err = EOF
    111 	}
    112 	p.l.Lock()
    113 	defer p.l.Unlock()
    114 	p.werr = err
    115 	p.rwait.Signal()
    116 	p.wwait.Signal()
    117 }
    118 
    119 // A PipeReader is the read half of a pipe.
    120 type PipeReader struct {
    121 	p *pipe
    122 }
    123 
    124 // Read implements the standard Read interface:
    125 // it reads data from the pipe, blocking until a writer
    126 // arrives or the write end is closed.
    127 // If the write end is closed with an error, that error is
    128 // returned as err; otherwise err is EOF.
    129 func (r *PipeReader) Read(data []byte) (n int, err error) {
    130 	return r.p.read(data)
    131 }
    132 
    133 // Close closes the reader; subsequent writes to the
    134 // write half of the pipe will return the error ErrClosedPipe.
    135 func (r *PipeReader) Close() error {
    136 	return r.CloseWithError(nil)
    137 }
    138 
    139 // CloseWithError closes the reader; subsequent writes
    140 // to the write half of the pipe will return the error err.
    141 func (r *PipeReader) CloseWithError(err error) error {
    142 	r.p.rclose(err)
    143 	return nil
    144 }
    145 
    146 // A PipeWriter is the write half of a pipe.
    147 type PipeWriter struct {
    148 	p *pipe
    149 }
    150 
    151 // Write implements the standard Write interface:
    152 // it writes data to the pipe, blocking until one or more readers
    153 // have consumed all the data or the read end is closed.
    154 // If the read end is closed with an error, that err is
    155 // returned as err; otherwise err is ErrClosedPipe.
    156 func (w *PipeWriter) Write(data []byte) (n int, err error) {
    157 	return w.p.write(data)
    158 }
    159 
    160 // Close closes the writer; subsequent reads from the
    161 // read half of the pipe will return no bytes and EOF.
    162 func (w *PipeWriter) Close() error {
    163 	return w.CloseWithError(nil)
    164 }
    165 
    166 // CloseWithError closes the writer; subsequent reads from the
    167 // read half of the pipe will return no bytes and the error err,
    168 // or EOF if err is nil.
    169 //
    170 // CloseWithError always returns nil.
    171 func (w *PipeWriter) CloseWithError(err error) error {
    172 	w.p.wclose(err)
    173 	return nil
    174 }
    175 
    176 // Pipe creates a synchronous in-memory pipe.
    177 // It can be used to connect code expecting an io.Reader
    178 // with code expecting an io.Writer.
    179 //
    180 // Reads and Writes on the pipe are matched one to one
    181 // except when multiple Reads are needed to consume a single Write.
    182 // That is, each Write to the PipeWriter blocks until it has satisfied
    183 // one or more Reads from the PipeReader that fully consume
    184 // the written data.
    185 // The data is copied directly from the Write to the corresponding
    186 // Read (or Reads); there is no internal buffering.
    187 //
    188 // It is safe to call Read and Write in parallel with each other or with Close.
    189 // Parallel calls to Read and parallel calls to Write are also safe:
    190 // the individual calls will be gated sequentially.
    191 func Pipe() (*PipeReader, *PipeWriter) {
    192 	p := new(pipe)
    193 	p.rwait.L = &p.l
    194 	p.wwait.L = &p.l
    195 	r := &PipeReader{p}
    196 	w := &PipeWriter{p}
    197 	return r, w
    198 }
    199