Source file src/pkg/io/pipe.go
1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 // Pipe adapter to connect code expecting an io.Reader 6 // with code expecting an io.Writer. 7 8 package io 9 10 import ( 11 "errors" 12 "sync" 13 ) 14 15 // ErrClosedPipe is the error used for read or write operations on a closed pipe. 16 var ErrClosedPipe = errors.New("io: read/write on closed pipe") 17 18 type pipeResult struct { 19 n int 20 err error 21 } 22 23 // A pipe is the shared pipe structure underlying PipeReader and PipeWriter. 24 type pipe struct { 25 rl sync.Mutex // gates readers one at a time 26 wl sync.Mutex // gates writers one at a time 27 l sync.Mutex // protects remaining fields 28 data []byte // data remaining in pending write 29 rwait sync.Cond // waiting reader 30 wwait sync.Cond // waiting writer 31 rerr error // if reader closed, error to give writes 32 werr error // if writer closed, error to give reads 33 } 34 35 func (p *pipe) read(b []byte) (n int, err error) { 36 // One reader at a time. 37 p.rl.Lock() 38 defer p.rl.Unlock() 39 40 p.l.Lock() 41 defer p.l.Unlock() 42 for { 43 if p.rerr != nil { 44 return 0, ErrClosedPipe 45 } 46 if p.data != nil { 47 break 48 } 49 if p.werr != nil { 50 return 0, p.werr 51 } 52 p.rwait.Wait() 53 } 54 n = copy(b, p.data) 55 p.data = p.data[n:] 56 if len(p.data) == 0 { 57 p.data = nil 58 p.wwait.Signal() 59 } 60 return 61 } 62 63 var zero [0]byte 64 65 func (p *pipe) write(b []byte) (n int, err error) { 66 // pipe uses nil to mean not available 67 if b == nil { 68 b = zero[:] 69 } 70 71 // One writer at a time. 72 p.wl.Lock() 73 defer p.wl.Unlock() 74 75 p.l.Lock() 76 defer p.l.Unlock() 77 p.data = b 78 p.rwait.Signal() 79 for { 80 if p.data == nil { 81 break 82 } 83 if p.rerr != nil { 84 err = p.rerr 85 break 86 } 87 if p.werr != nil { 88 err = ErrClosedPipe 89 } 90 p.wwait.Wait() 91 } 92 n = len(b) - len(p.data) 93 p.data = nil // in case of rerr or werr 94 return 95 } 96 97 func (p *pipe) rclose(err error) { 98 if err == nil { 99 err = ErrClosedPipe 100 } 101 p.l.Lock() 102 defer p.l.Unlock() 103 p.rerr = err 104 p.rwait.Signal() 105 p.wwait.Signal() 106 } 107 108 func (p *pipe) wclose(err error) { 109 if err == nil { 110 err = EOF 111 } 112 p.l.Lock() 113 defer p.l.Unlock() 114 p.werr = err 115 p.rwait.Signal() 116 p.wwait.Signal() 117 } 118 119 // A PipeReader is the read half of a pipe. 120 type PipeReader struct { 121 p *pipe 122 } 123 124 // Read implements the standard Read interface: 125 // it reads data from the pipe, blocking until a writer 126 // arrives or the write end is closed. 127 // If the write end is closed with an error, that error is 128 // returned as err; otherwise err is EOF. 129 func (r *PipeReader) Read(data []byte) (n int, err error) { 130 return r.p.read(data) 131 } 132 133 // Close closes the reader; subsequent writes to the 134 // write half of the pipe will return the error ErrClosedPipe. 135 func (r *PipeReader) Close() error { 136 return r.CloseWithError(nil) 137 } 138 139 // CloseWithError closes the reader; subsequent writes 140 // to the write half of the pipe will return the error err. 141 func (r *PipeReader) CloseWithError(err error) error { 142 r.p.rclose(err) 143 return nil 144 } 145 146 // A PipeWriter is the write half of a pipe. 147 type PipeWriter struct { 148 p *pipe 149 } 150 151 // Write implements the standard Write interface: 152 // it writes data to the pipe, blocking until readers 153 // have consumed all the data or the read end is closed. 154 // If the read end is closed with an error, that err is 155 // returned as err; otherwise err is ErrClosedPipe. 156 func (w *PipeWriter) Write(data []byte) (n int, err error) { 157 return w.p.write(data) 158 } 159 160 // Close closes the writer; subsequent reads from the 161 // read half of the pipe will return no bytes and EOF. 162 func (w *PipeWriter) Close() error { 163 return w.CloseWithError(nil) 164 } 165 166 // CloseWithError closes the writer; subsequent reads from the 167 // read half of the pipe will return no bytes and the error err. 168 func (w *PipeWriter) CloseWithError(err error) error { 169 w.p.wclose(err) 170 return nil 171 } 172 173 // Pipe creates a synchronous in-memory pipe. 174 // It can be used to connect code expecting an io.Reader 175 // with code expecting an io.Writer. 176 // Reads on one end are matched with writes on the other, 177 // copying data directly between the two; there is no internal buffering. 178 // It is safe to call Read and Write in parallel with each other or with 179 // Close. Close will complete once pending I/O is done. Parallel calls to 180 // Read, and parallel calls to Write, are also safe: 181 // the individual calls will be gated sequentially. 182 func Pipe() (*PipeReader, *PipeWriter) { 183 p := new(pipe) 184 p.rwait.L = &p.l 185 p.wwait.L = &p.l 186 r := &PipeReader{p} 187 w := &PipeWriter{p} 188 return r, w 189 }