Home | History | Annotate | Download | only in io
      1 // Copyright 2009 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 // Pipe adapter to connect code expecting an io.Reader
      6 // with code expecting an io.Writer.
      7 
      8 package io
      9 
     10 import (
     11 	"errors"
     12 	"sync"
     13 )
     14 
     15 // ErrClosedPipe is the error used for read or write operations on a closed pipe.
     16 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
     17 
     18 type pipeResult struct {
     19 	n   int
     20 	err error
     21 }
     22 
     23 // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
     24 type pipe struct {
     25 	rl    sync.Mutex // gates readers one at a time
     26 	wl    sync.Mutex // gates writers one at a time
     27 	l     sync.Mutex // protects remaining fields
     28 	data  []byte     // data remaining in pending write
     29 	rwait sync.Cond  // waiting reader
     30 	wwait sync.Cond  // waiting writer
     31 	rerr  error      // if reader closed, error to give writes
     32 	werr  error      // if writer closed, error to give reads
     33 }
     34 
     35 func (p *pipe) read(b []byte) (n int, err error) {
     36 	// One reader at a time.
     37 	p.rl.Lock()
     38 	defer p.rl.Unlock()
     39 
     40 	p.l.Lock()
     41 	defer p.l.Unlock()
     42 	for {
     43 		if p.rerr != nil {
     44 			return 0, ErrClosedPipe
     45 		}
     46 		if p.data != nil {
     47 			break
     48 		}
     49 		if p.werr != nil {
     50 			return 0, p.werr
     51 		}
     52 		p.rwait.Wait()
     53 	}
     54 	n = copy(b, p.data)
     55 	p.data = p.data[n:]
     56 	if len(p.data) == 0 {
     57 		p.data = nil
     58 		p.wwait.Signal()
     59 	}
     60 	return
     61 }
     62 
     63 var zero [0]byte
     64 
     65 func (p *pipe) write(b []byte) (n int, err error) {
     66 	// pipe uses nil to mean not available
     67 	if b == nil {
     68 		b = zero[:]
     69 	}
     70 
     71 	// One writer at a time.
     72 	p.wl.Lock()
     73 	defer p.wl.Unlock()
     74 
     75 	p.l.Lock()
     76 	defer p.l.Unlock()
     77 	if p.werr != nil {
     78 		err = ErrClosedPipe
     79 		return
     80 	}
     81 	p.data = b
     82 	p.rwait.Signal()
     83 	for {
     84 		if p.data == nil {
     85 			break
     86 		}
     87 		if p.rerr != nil {
     88 			err = p.rerr
     89 			break
     90 		}
     91 		if p.werr != nil {
     92 			err = ErrClosedPipe
     93 		}
     94 		p.wwait.Wait()
     95 	}
     96 	n = len(b) - len(p.data)
     97 	p.data = nil // in case of rerr or werr
     98 	return
     99 }
    100 
    101 func (p *pipe) rclose(err error) {
    102 	if err == nil {
    103 		err = ErrClosedPipe
    104 	}
    105 	p.l.Lock()
    106 	defer p.l.Unlock()
    107 	p.rerr = err
    108 	p.rwait.Signal()
    109 	p.wwait.Signal()
    110 }
    111 
    112 func (p *pipe) wclose(err error) {
    113 	if err == nil {
    114 		err = EOF
    115 	}
    116 	p.l.Lock()
    117 	defer p.l.Unlock()
    118 	p.werr = err
    119 	p.rwait.Signal()
    120 	p.wwait.Signal()
    121 }
    122 
    123 // A PipeReader is the read half of a pipe.
    124 type PipeReader struct {
    125 	p *pipe
    126 }
    127 
    128 // Read implements the standard Read interface:
    129 // it reads data from the pipe, blocking until a writer
    130 // arrives or the write end is closed.
    131 // If the write end is closed with an error, that error is
    132 // returned as err; otherwise err is EOF.
    133 func (r *PipeReader) Read(data []byte) (n int, err error) {
    134 	return r.p.read(data)
    135 }
    136 
    137 // Close closes the reader; subsequent writes to the
    138 // write half of the pipe will return the error ErrClosedPipe.
    139 func (r *PipeReader) Close() error {
    140 	return r.CloseWithError(nil)
    141 }
    142 
    143 // CloseWithError closes the reader; subsequent writes
    144 // to the write half of the pipe will return the error err.
    145 func (r *PipeReader) CloseWithError(err error) error {
    146 	r.p.rclose(err)
    147 	return nil
    148 }
    149 
    150 // A PipeWriter is the write half of a pipe.
    151 type PipeWriter struct {
    152 	p *pipe
    153 }
    154 
    155 // Write implements the standard Write interface:
    156 // it writes data to the pipe, blocking until readers
    157 // have consumed all the data or the read end is closed.
    158 // If the read end is closed with an error, that err is
    159 // returned as err; otherwise err is ErrClosedPipe.
    160 func (w *PipeWriter) Write(data []byte) (n int, err error) {
    161 	return w.p.write(data)
    162 }
    163 
    164 // Close closes the writer; subsequent reads from the
    165 // read half of the pipe will return no bytes and EOF.
    166 func (w *PipeWriter) Close() error {
    167 	return w.CloseWithError(nil)
    168 }
    169 
    170 // CloseWithError closes the writer; subsequent reads from the
    171 // read half of the pipe will return no bytes and the error err,
    172 // or EOF if err is nil.
    173 //
    174 // CloseWithError always returns nil.
    175 func (w *PipeWriter) CloseWithError(err error) error {
    176 	w.p.wclose(err)
    177 	return nil
    178 }
    179 
    180 // Pipe creates a synchronous in-memory pipe.
    181 // It can be used to connect code expecting an io.Reader
    182 // with code expecting an io.Writer.
    183 // Reads on one end are matched with writes on the other,
    184 // copying data directly between the two; there is no internal buffering.
    185 // It is safe to call Read and Write in parallel with each other or with
    186 // Close. Close will complete once pending I/O is done. Parallel calls to
    187 // Read, and parallel calls to Write, are also safe:
    188 // the individual calls will be gated sequentially.
    189 func Pipe() (*PipeReader, *PipeWriter) {
    190 	p := new(pipe)
    191 	p.rwait.L = &p.l
    192 	p.wwait.L = &p.l
    193 	r := &PipeReader{p}
    194 	w := &PipeWriter{p}
    195 	return r, w
    196 }
    197