src/pkg/io/pipe.go - The Go Programming Language

Golang

Source file src/pkg/io/pipe.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Pipe adapter to connect code expecting an io.Reader
     6	// with code expecting an io.Writer.
     7	
     8	package io
     9	
    10	import (
    11		"errors"
    12		"sync"
    13	)
    14	
    15	// ErrClosedPipe is the error used for read or write operations on a closed pipe.
    16	var ErrClosedPipe = errors.New("io: read/write on closed pipe")
    17	
    18	type pipeResult struct {
    19		n   int
    20		err error
    21	}
    22	
    23	// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    24	type pipe struct {
    25		rl    sync.Mutex // gates readers one at a time
    26		wl    sync.Mutex // gates writers one at a time
    27		l     sync.Mutex // protects remaining fields
    28		data  []byte     // data remaining in pending write
    29		rwait sync.Cond  // waiting reader
    30		wwait sync.Cond  // waiting writer
    31		rerr  error      // if reader closed, error to give writes
    32		werr  error      // if writer closed, error to give reads
    33	}
    34	
    35	func (p *pipe) read(b []byte) (n int, err error) {
    36		// One reader at a time.
    37		p.rl.Lock()
    38		defer p.rl.Unlock()
    39	
    40		p.l.Lock()
    41		defer p.l.Unlock()
    42		for {
    43			if p.rerr != nil {
    44				return 0, ErrClosedPipe
    45			}
    46			if p.data != nil {
    47				break
    48			}
    49			if p.werr != nil {
    50				return 0, p.werr
    51			}
    52			p.rwait.Wait()
    53		}
    54		n = copy(b, p.data)
    55		p.data = p.data[n:]
    56		if len(p.data) == 0 {
    57			p.data = nil
    58			p.wwait.Signal()
    59		}
    60		return
    61	}
    62	
    63	var zero [0]byte
    64	
    65	func (p *pipe) write(b []byte) (n int, err error) {
    66		// pipe uses nil to mean not available
    67		if b == nil {
    68			b = zero[:]
    69		}
    70	
    71		// One writer at a time.
    72		p.wl.Lock()
    73		defer p.wl.Unlock()
    74	
    75		p.l.Lock()
    76		defer p.l.Unlock()
    77		p.data = b
    78		p.rwait.Signal()
    79		for {
    80			if p.data == nil {
    81				break
    82			}
    83			if p.rerr != nil {
    84				err = p.rerr
    85				break
    86			}
    87			if p.werr != nil {
    88				err = ErrClosedPipe
    89			}
    90			p.wwait.Wait()
    91		}
    92		n = len(b) - len(p.data)
    93		p.data = nil // in case of rerr or werr
    94		return
    95	}
    96	
    97	func (p *pipe) rclose(err error) {
    98		if err == nil {
    99			err = ErrClosedPipe
   100		}
   101		p.l.Lock()
   102		defer p.l.Unlock()
   103		p.rerr = err
   104		p.rwait.Signal()
   105		p.wwait.Signal()
   106	}
   107	
   108	func (p *pipe) wclose(err error) {
   109		if err == nil {
   110			err = EOF
   111		}
   112		p.l.Lock()
   113		defer p.l.Unlock()
   114		p.werr = err
   115		p.rwait.Signal()
   116		p.wwait.Signal()
   117	}
   118	
   119	// A PipeReader is the read half of a pipe.
   120	type PipeReader struct {
   121		p *pipe
   122	}
   123	
   124	// Read implements the standard Read interface:
   125	// it reads data from the pipe, blocking until a writer
   126	// arrives or the write end is closed.
   127	// If the write end is closed with an error, that error is
   128	// returned as err; otherwise err is EOF.
   129	func (r *PipeReader) Read(data []byte) (n int, err error) {
   130		return r.p.read(data)
   131	}
   132	
   133	// Close closes the reader; subsequent writes to the
   134	// write half of the pipe will return the error ErrClosedPipe.
   135	func (r *PipeReader) Close() error {
   136		return r.CloseWithError(nil)
   137	}
   138	
   139	// CloseWithError closes the reader; subsequent writes
   140	// to the write half of the pipe will return the error err.
   141	func (r *PipeReader) CloseWithError(err error) error {
   142		r.p.rclose(err)
   143		return nil
   144	}
   145	
   146	// A PipeWriter is the write half of a pipe.
   147	type PipeWriter struct {
   148		p *pipe
   149	}
   150	
   151	// Write implements the standard Write interface:
   152	// it writes data to the pipe, blocking until readers
   153	// have consumed all the data or the read end is closed.
   154	// If the read end is closed with an error, that err is
   155	// returned as err; otherwise err is ErrClosedPipe.
   156	func (w *PipeWriter) Write(data []byte) (n int, err error) {
   157		return w.p.write(data)
   158	}
   159	
   160	// Close closes the writer; subsequent reads from the
   161	// read half of the pipe will return no bytes and EOF.
   162	func (w *PipeWriter) Close() error {
   163		return w.CloseWithError(nil)
   164	}
   165	
   166	// CloseWithError closes the writer; subsequent reads from the
   167	// read half of the pipe will return no bytes and the error err.
   168	func (w *PipeWriter) CloseWithError(err error) error {
   169		w.p.wclose(err)
   170		return nil
   171	}
   172	
   173	// Pipe creates a synchronous in-memory pipe.
   174	// It can be used to connect code expecting an io.Reader
   175	// with code expecting an io.Writer.
   176	// Reads on one end are matched with writes on the other,
   177	// copying data directly between the two; there is no internal buffering.
   178	// It is safe to call Read and Write in parallel with each other or with
   179	// Close. Close will complete once pending I/O is done. Parallel calls to
   180	// Read, and parallel calls to Write, are also safe:
   181	// the individual calls will be gated sequentially.
   182	func Pipe() (*PipeReader, *PipeWriter) {
   183		p := new(pipe)
   184		p.rwait.L = &p.l
   185		p.wwait.L = &p.l
   186		r := &PipeReader{p}
   187		w := &PipeWriter{p}
   188		return r, w
   189	}