Home | History | Annotate | Download | only in io
      1 // Copyright 2009 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Pipe adapter to connect code expecting an io.Reader
      6 // with code expecting an io.Writer.
      7 
      8 package io
      9 
     10 import (
     11 	"errors"
     12 	"sync"
     13 	"sync/atomic"
     14 )
     15 
     16 // atomicError is a type-safe atomic value for errors.
     17 // We use a struct{ error } to ensure consistent use of a concrete type.
     18 type atomicError struct{ v atomic.Value }
     19 
     20 func (a *atomicError) Store(err error) {
     21 	a.v.Store(struct{ error }{err})
     22 }
     23 func (a *atomicError) Load() error {
     24 	err, _ := a.v.Load().(struct{ error })
     25 	return err.error
     26 }
     27 
     28 // ErrClosedPipe is the error used for read or write operations on a closed pipe.
     29 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
     30 
     31 // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
     32 type pipe struct {
     33 	wrMu sync.Mutex // Serializes Write operations
     34 	wrCh chan []byte
     35 	rdCh chan int
     36 
     37 	once sync.Once // Protects closing done
     38 	done chan struct{}
     39 	rerr atomicError
     40 	werr atomicError
     41 }
     42 
     43 func (p *pipe) Read(b []byte) (n int, err error) {
     44 	select {
     45 	case <-p.done:
     46 		return 0, p.readCloseError()
     47 	default:
     48 	}
     49 
     50 	select {
     51 	case bw := <-p.wrCh:
     52 		nr := copy(b, bw)
     53 		p.rdCh <- nr
     54 		return nr, nil
     55 	case <-p.done:
     56 		return 0, p.readCloseError()
     57 	}
     58 }
     59 
     60 func (p *pipe) readCloseError() error {
     61 	rerr := p.rerr.Load()
     62 	if werr := p.werr.Load(); rerr == nil && werr != nil {
     63 		return werr
     64 	}
     65 	return ErrClosedPipe
     66 }
     67 
     68 func (p *pipe) CloseRead(err error) error {
     69 	if err == nil {
     70 		err = ErrClosedPipe
     71 	}
     72 	p.rerr.Store(err)
     73 	p.once.Do(func() { close(p.done) })
     74 	return nil
     75 }
     76 
     77 func (p *pipe) Write(b []byte) (n int, err error) {
     78 	select {
     79 	case <-p.done:
     80 		return 0, p.writeCloseError()
     81 	default:
     82 		p.wrMu.Lock()
     83 		defer p.wrMu.Unlock()
     84 	}
     85 
     86 	for once := true; once || len(b) > 0; once = false {
     87 		select {
     88 		case p.wrCh <- b:
     89 			nw := <-p.rdCh
     90 			b = b[nw:]
     91 			n += nw
     92 		case <-p.done:
     93 			return n, p.writeCloseError()
     94 		}
     95 	}
     96 	return n, nil
     97 }
     98 
     99 func (p *pipe) writeCloseError() error {
    100 	werr := p.werr.Load()
    101 	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
    102 		return rerr
    103 	}
    104 	return ErrClosedPipe
    105 }
    106 
    107 func (p *pipe) CloseWrite(err error) error {
    108 	if err == nil {
    109 		err = EOF
    110 	}
    111 	p.werr.Store(err)
    112 	p.once.Do(func() { close(p.done) })
    113 	return nil
    114 }
    115 
    116 // A PipeReader is the read half of a pipe.
    117 type PipeReader struct {
    118 	p *pipe
    119 }
    120 
    121 // Read implements the standard Read interface:
    122 // it reads data from the pipe, blocking until a writer
    123 // arrives or the write end is closed.
    124 // If the write end is closed with an error, that error is
    125 // returned as err; otherwise err is EOF.
    126 func (r *PipeReader) Read(data []byte) (n int, err error) {
    127 	return r.p.Read(data)
    128 }
    129 
    130 // Close closes the reader; subsequent writes to the
    131 // write half of the pipe will return the error ErrClosedPipe.
    132 func (r *PipeReader) Close() error {
    133 	return r.CloseWithError(nil)
    134 }
    135 
    136 // CloseWithError closes the reader; subsequent writes
    137 // to the write half of the pipe will return the error err.
    138 func (r *PipeReader) CloseWithError(err error) error {
    139 	return r.p.CloseRead(err)
    140 }
    141 
    142 // A PipeWriter is the write half of a pipe.
    143 type PipeWriter struct {
    144 	p *pipe
    145 }
    146 
    147 // Write implements the standard Write interface:
    148 // it writes data to the pipe, blocking until one or more readers
    149 // have consumed all the data or the read end is closed.
    150 // If the read end is closed with an error, that err is
    151 // returned as err; otherwise err is ErrClosedPipe.
    152 func (w *PipeWriter) Write(data []byte) (n int, err error) {
    153 	return w.p.Write(data)
    154 }
    155 
    156 // Close closes the writer; subsequent reads from the
    157 // read half of the pipe will return no bytes and EOF.
    158 func (w *PipeWriter) Close() error {
    159 	return w.CloseWithError(nil)
    160 }
    161 
    162 // CloseWithError closes the writer; subsequent reads from the
    163 // read half of the pipe will return no bytes and the error err,
    164 // or EOF if err is nil.
    165 //
    166 // CloseWithError always returns nil.
    167 func (w *PipeWriter) CloseWithError(err error) error {
    168 	return w.p.CloseWrite(err)
    169 }
    170 
    171 // Pipe creates a synchronous in-memory pipe.
    172 // It can be used to connect code expecting an io.Reader
    173 // with code expecting an io.Writer.
    174 //
    175 // Reads and Writes on the pipe are matched one to one
    176 // except when multiple Reads are needed to consume a single Write.
    177 // That is, each Write to the PipeWriter blocks until it has satisfied
    178 // one or more Reads from the PipeReader that fully consume
    179 // the written data.
    180 // The data is copied directly from the Write to the corresponding
    181 // Read (or Reads); there is no internal buffering.
    182 //
    183 // It is safe to call Read and Write in parallel with each other or with Close.
    184 // Parallel calls to Read and parallel calls to Write are also safe:
    185 // the individual calls will be gated sequentially.
    186 func Pipe() (*PipeReader, *PipeWriter) {
    187 	p := &pipe{
    188 		wrCh: make(chan []byte),
    189 		rdCh: make(chan int),
    190 		done: make(chan struct{}),
    191 	}
    192 	return &PipeReader{p}, &PipeWriter{p}
    193 }
    194