Source file src/pkg/net/textproto/pipeline.go
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package textproto
6
7 import (
8 "sync"
9 )
10
11 // A Pipeline manages a pipelined in-order request/response sequence.
12 //
13 // To use a Pipeline p to manage multiple clients on a connection,
14 // each client should run:
15 //
16 // id := p.Next() // take a number
17 //
18 // p.StartRequest(id) // wait for turn to send request
19 // «send request»
20 // p.EndRequest(id) // notify Pipeline that request is sent
21 //
22 // p.StartResponse(id) // wait for turn to read response
23 // «read response»
24 // p.EndResponse(id) // notify Pipeline that response is read
25 //
26 // A pipelined server can use the same calls to ensure that
27 // responses computed in parallel are written in the correct order.
28 type Pipeline struct {
29 mu sync.Mutex
30 id uint
31 request sequencer
32 response sequencer
33 }
34
35 // Next returns the next id for a request/response pair.
36 func (p *Pipeline) Next() uint {
37 p.mu.Lock()
38 id := p.id
39 p.id++
40 p.mu.Unlock()
41 return id
42 }
43
44 // StartRequest blocks until it is time to send (or, if this is a server, receive)
45 // the request with the given id.
46 func (p *Pipeline) StartRequest(id uint) {
47 p.request.Start(id)
48 }
49
50 // EndRequest notifies p that the request with the given id has been sent
51 // (or, if this is a server, received).
52 func (p *Pipeline) EndRequest(id uint) {
53 p.request.End(id)
54 }
55
56 // StartResponse blocks until it is time to receive (or, if this is a server, send)
57 // the request with the given id.
58 func (p *Pipeline) StartResponse(id uint) {
59 p.response.Start(id)
60 }
61
62 // EndResponse notifies p that the response with the given id has been received
63 // (or, if this is a server, sent).
64 func (p *Pipeline) EndResponse(id uint) {
65 p.response.End(id)
66 }
67
68 // A sequencer schedules a sequence of numbered events that must
69 // happen in order, one after the other. The event numbering must start
70 // at 0 and increment without skipping. The event number wraps around
71 // safely as long as there are not 2^32 simultaneous events pending.
72 type sequencer struct {
73 mu sync.Mutex
74 id uint
75 wait map[uint]chan uint
76 }
77
78 // Start waits until it is time for the event numbered id to begin.
79 // That is, except for the first event, it waits until End(id-1) has
80 // been called.
81 func (s *sequencer) Start(id uint) {
82 s.mu.Lock()
83 if s.id == id {
84 s.mu.Unlock()
85 return
86 }
87 c := make(chan uint)
88 if s.wait == nil {
89 s.wait = make(map[uint]chan uint)
90 }
91 s.wait[id] = c
92 s.mu.Unlock()
93 <-c
94 }
95
96 // End notifies the sequencer that the event numbered id has completed,
97 // allowing it to schedule the event numbered id+1. It is a run-time error
98 // to call End with an id that is not the number of the active event.
99 func (s *sequencer) End(id uint) {
100 s.mu.Lock()
101 if s.id != id {
102 panic("out of sync")
103 }
104 id++
105 s.id = id
106 if s.wait == nil {
107 s.wait = make(map[uint]chan uint)
108 }
109 c, ok := s.wait[id]
110 if ok {
111 delete(s.wait, id)
112 }
113 s.mu.Unlock()
114 if ok {
115 c <- 1
116 }
117 }