src/pkg/net/textproto/pipeline.go - The Go Programming Language

Golang

Source file src/pkg/net/textproto/pipeline.go

     1	// Copyright 2010 The Go Authors.  All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package textproto
     6	
     7	import (
     8		"sync"
     9	)
    10	
    11	// A Pipeline manages a pipelined in-order request/response sequence.
    12	//
    13	// To use a Pipeline p to manage multiple clients on a connection,
    14	// each client should run:
    15	//
    16	//	id := p.Next()	// take a number
    17	//
    18	//	p.StartRequest(id)	// wait for turn to send request
    19	//	«send request»
    20	//	p.EndRequest(id)	// notify Pipeline that request is sent
    21	//
    22	//	p.StartResponse(id)	// wait for turn to read response
    23	//	«read response»
    24	//	p.EndResponse(id)	// notify Pipeline that response is read
    25	//
    26	// A pipelined server can use the same calls to ensure that
    27	// responses computed in parallel are written in the correct order.
    28	type Pipeline struct {
    29		mu       sync.Mutex
    30		id       uint
    31		request  sequencer
    32		response sequencer
    33	}
    34	
    35	// Next returns the next id for a request/response pair.
    36	func (p *Pipeline) Next() uint {
    37		p.mu.Lock()
    38		id := p.id
    39		p.id++
    40		p.mu.Unlock()
    41		return id
    42	}
    43	
    44	// StartRequest blocks until it is time to send (or, if this is a server, receive)
    45	// the request with the given id.
    46	func (p *Pipeline) StartRequest(id uint) {
    47		p.request.Start(id)
    48	}
    49	
    50	// EndRequest notifies p that the request with the given id has been sent
    51	// (or, if this is a server, received).
    52	func (p *Pipeline) EndRequest(id uint) {
    53		p.request.End(id)
    54	}
    55	
    56	// StartResponse blocks until it is time to receive (or, if this is a server, send)
    57	// the request with the given id.
    58	func (p *Pipeline) StartResponse(id uint) {
    59		p.response.Start(id)
    60	}
    61	
    62	// EndResponse notifies p that the response with the given id has been received
    63	// (or, if this is a server, sent).
    64	func (p *Pipeline) EndResponse(id uint) {
    65		p.response.End(id)
    66	}
    67	
    68	// A sequencer schedules a sequence of numbered events that must
    69	// happen in order, one after the other.  The event numbering must start
    70	// at 0 and increment without skipping.  The event number wraps around
    71	// safely as long as there are not 2^32 simultaneous events pending.
    72	type sequencer struct {
    73		mu   sync.Mutex
    74		id   uint
    75		wait map[uint]chan uint
    76	}
    77	
    78	// Start waits until it is time for the event numbered id to begin.
    79	// That is, except for the first event, it waits until End(id-1) has
    80	// been called.
    81	func (s *sequencer) Start(id uint) {
    82		s.mu.Lock()
    83		if s.id == id {
    84			s.mu.Unlock()
    85			return
    86		}
    87		c := make(chan uint)
    88		if s.wait == nil {
    89			s.wait = make(map[uint]chan uint)
    90		}
    91		s.wait[id] = c
    92		s.mu.Unlock()
    93		<-c
    94	}
    95	
    96	// End notifies the sequencer that the event numbered id has completed,
    97	// allowing it to schedule the event numbered id+1.  It is a run-time error
    98	// to call End with an id that is not the number of the active event.
    99	func (s *sequencer) End(id uint) {
   100		s.mu.Lock()
   101		if s.id != id {
   102			panic("out of sync")
   103		}
   104		id++
   105		s.id = id
   106		if s.wait == nil {
   107			s.wait = make(map[uint]chan uint)
   108		}
   109		c, ok := s.wait[id]
   110		if ok {
   111			delete(s.wait, id)
   112		}
   113		s.mu.Unlock()
   114		if ok {
   115			c <- 1
   116		}
   117	}