Source file src/pkg/net/rpc/client.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package rpc
6
7 import (
8 "bufio"
9 "encoding/gob"
10 "errors"
11 "io"
12 "log"
13 "net"
14 "net/http"
15 "sync"
16 )
17
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
21
22 func (e ServerError) Error() string {
23 return string(e)
24 }
25
26 var ErrShutdown = errors.New("connection is shut down")
27
28 // Call represents an active RPC.
29 type Call struct {
30 ServiceMethod string // The name of the service and method to call.
31 Args interface{} // The argument to the function (*struct).
32 Reply interface{} // The reply from the function (*struct).
33 Error error // After completion, the error status.
34 Done chan *Call // Strobes when call is complete.
35 }
36
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client, and a Client may be used by
40 // multiple goroutines simultaneously.
41 type Client struct {
42 mutex sync.Mutex // protects pending, seq, request
43 sending sync.Mutex
44 request Request
45 seq uint64
46 codec ClientCodec
47 pending map[uint64]*Call
48 closing bool
49 shutdown bool
50 }
51
52 // A ClientCodec implements writing of RPC requests and
53 // reading of RPC responses for the client side of an RPC session.
54 // The client calls WriteRequest to write a request to the connection
55 // and calls ReadResponseHeader and ReadResponseBody in pairs
56 // to read responses. The client calls Close when finished with the
57 // connection. ReadResponseBody may be called with a nil
58 // argument to force the body of the response to be read and then
59 // discarded.
60 type ClientCodec interface {
61 WriteRequest(*Request, interface{}) error
62 ReadResponseHeader(*Response) error
63 ReadResponseBody(interface{}) error
64
65 Close() error
66 }
67
68 func (client *Client) send(call *Call) {
69 client.sending.Lock()
70 defer client.sending.Unlock()
71
72 // Register this call.
73 client.mutex.Lock()
74 if client.shutdown {
75 call.Error = ErrShutdown
76 client.mutex.Unlock()
77 call.done()
78 return
79 }
80 seq := client.seq
81 client.seq++
82 client.pending[seq] = call
83 client.mutex.Unlock()
84
85 // Encode and send the request.
86 client.request.Seq = seq
87 client.request.ServiceMethod = call.ServiceMethod
88 err := client.codec.WriteRequest(&client.request, call.Args)
89 if err != nil {
90 client.mutex.Lock()
91 delete(client.pending, seq)
92 client.mutex.Unlock()
93 call.Error = err
94 call.done()
95 }
96 }
97
98 func (client *Client) input() {
99 var err error
100 var response Response
101 for err == nil {
102 response = Response{}
103 err = client.codec.ReadResponseHeader(&response)
104 if err != nil {
105 if err == io.EOF && !client.closing {
106 err = io.ErrUnexpectedEOF
107 }
108 break
109 }
110 seq := response.Seq
111 client.mutex.Lock()
112 call := client.pending[seq]
113 delete(client.pending, seq)
114 client.mutex.Unlock()
115
116 if response.Error == "" {
117 err = client.codec.ReadResponseBody(call.Reply)
118 if err != nil {
119 call.Error = errors.New("reading body " + err.Error())
120 }
121 } else {
122 // We've got an error response. Give this to the request;
123 // any subsequent requests will get the ReadResponseBody
124 // error if there is one.
125 call.Error = ServerError(response.Error)
126 err = client.codec.ReadResponseBody(nil)
127 if err != nil {
128 err = errors.New("reading error body: " + err.Error())
129 }
130 }
131 call.done()
132 }
133 // Terminate pending calls.
134 client.sending.Lock()
135 client.mutex.Lock()
136 client.shutdown = true
137 closing := client.closing
138 for _, call := range client.pending {
139 call.Error = err
140 call.done()
141 }
142 client.mutex.Unlock()
143 client.sending.Unlock()
144 if err != io.EOF && !closing {
145 log.Println("rpc: client protocol error:", err)
146 }
147 }
148
149 func (call *Call) done() {
150 select {
151 case call.Done <- call:
152 // ok
153 default:
154 // We don't want to block here. It is the caller's responsibility to make
155 // sure the channel has enough buffer space. See comment in Go().
156 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
157 }
158 }
159
160 // NewClient returns a new Client to handle requests to the
161 // set of services at the other end of the connection.
162 // It adds a buffer to the write side of the connection so
163 // the header and payload are sent as a unit.
164 func NewClient(conn io.ReadWriteCloser) *Client {
165 encBuf := bufio.NewWriter(conn)
166 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
167 return NewClientWithCodec(client)
168 }
169
170 // NewClientWithCodec is like NewClient but uses the specified
171 // codec to encode requests and decode responses.
172 func NewClientWithCodec(codec ClientCodec) *Client {
173 client := &Client{
174 codec: codec,
175 pending: make(map[uint64]*Call),
176 }
177 go client.input()
178 return client
179 }
180
181 type gobClientCodec struct {
182 rwc io.ReadWriteCloser
183 dec *gob.Decoder
184 enc *gob.Encoder
185 encBuf *bufio.Writer
186 }
187
188 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
189 if err = c.enc.Encode(r); err != nil {
190 return
191 }
192 if err = c.enc.Encode(body); err != nil {
193 return
194 }
195 return c.encBuf.Flush()
196 }
197
198 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
199 return c.dec.Decode(r)
200 }
201
202 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
203 return c.dec.Decode(body)
204 }
205
206 func (c *gobClientCodec) Close() error {
207 return c.rwc.Close()
208 }
209
210 // DialHTTP connects to an HTTP RPC server at the specified network address
211 // listening on the default HTTP RPC path.
212 func DialHTTP(network, address string) (*Client, error) {
213 return DialHTTPPath(network, address, DefaultRPCPath)
214 }
215
216 // DialHTTPPath connects to an HTTP RPC server
217 // at the specified network address and path.
218 func DialHTTPPath(network, address, path string) (*Client, error) {
219 var err error
220 conn, err := net.Dial(network, address)
221 if err != nil {
222 return nil, err
223 }
224 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
225
226 // Require successful HTTP response
227 // before switching to RPC protocol.
228 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
229 if err == nil && resp.Status == connected {
230 return NewClient(conn), nil
231 }
232 if err == nil {
233 err = errors.New("unexpected HTTP response: " + resp.Status)
234 }
235 conn.Close()
236 return nil, &net.OpError{
237 Op: "dial-http",
238 Net: network + " " + address,
239 Addr: nil,
240 Err: err,
241 }
242 }
243
244 // Dial connects to an RPC server at the specified network address.
245 func Dial(network, address string) (*Client, error) {
246 conn, err := net.Dial(network, address)
247 if err != nil {
248 return nil, err
249 }
250 return NewClient(conn), nil
251 }
252
253 func (client *Client) Close() error {
254 client.mutex.Lock()
255 if client.shutdown || client.closing {
256 client.mutex.Unlock()
257 return ErrShutdown
258 }
259 client.closing = true
260 client.mutex.Unlock()
261 return client.codec.Close()
262 }
263
264 // Go invokes the function asynchronously. It returns the Call structure representing
265 // the invocation. The done channel will signal when the call is complete by returning
266 // the same Call object. If done is nil, Go will allocate a new channel.
267 // If non-nil, done must be buffered or Go will deliberately crash.
268 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
269 call := new(Call)
270 call.ServiceMethod = serviceMethod
271 call.Args = args
272 call.Reply = reply
273 if done == nil {
274 done = make(chan *Call, 10) // buffered.
275 } else {
276 // If caller passes done != nil, it must arrange that
277 // done has enough buffer for the number of simultaneous
278 // RPCs that will be using that channel. If the channel
279 // is totally unbuffered, it's best not to run at all.
280 if cap(done) == 0 {
281 log.Panic("rpc: done channel is unbuffered")
282 }
283 }
284 call.Done = done
285 client.send(call)
286 return call
287 }
288
289 // Call invokes the named function, waits for it to complete, and returns its error status.
290 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
291 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
292 return call.Error
293 }