Source file src/pkg/net/rpc/client.go
1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package rpc 6 7 import ( 8 "bufio" 9 "encoding/gob" 10 "errors" 11 "io" 12 "log" 13 "net" 14 "net/http" 15 "sync" 16 ) 17 18 // ServerError represents an error that has been returned from 19 // the remote side of the RPC connection. 20 type ServerError string 21 22 func (e ServerError) Error() string { 23 return string(e) 24 } 25 26 var ErrShutdown = errors.New("connection is shut down") 27 28 // Call represents an active RPC. 29 type Call struct { 30 ServiceMethod string // The name of the service and method to call. 31 Args interface{} // The argument to the function (*struct). 32 Reply interface{} // The reply from the function (*struct). 33 Error error // After completion, the error status. 34 Done chan *Call // Strobes when call is complete. 35 } 36 37 // Client represents an RPC Client. 38 // There may be multiple outstanding Calls associated 39 // with a single Client, and a Client may be used by 40 // multiple goroutines simultaneously. 41 type Client struct { 42 mutex sync.Mutex // protects pending, seq, request 43 sending sync.Mutex 44 request Request 45 seq uint64 46 codec ClientCodec 47 pending map[uint64]*Call 48 closing bool 49 shutdown bool 50 } 51 52 // A ClientCodec implements writing of RPC requests and 53 // reading of RPC responses for the client side of an RPC session. 54 // The client calls WriteRequest to write a request to the connection 55 // and calls ReadResponseHeader and ReadResponseBody in pairs 56 // to read responses. The client calls Close when finished with the 57 // connection. ReadResponseBody may be called with a nil 58 // argument to force the body of the response to be read and then 59 // discarded. 60 type ClientCodec interface { 61 WriteRequest(*Request, interface{}) error 62 ReadResponseHeader(*Response) error 63 ReadResponseBody(interface{}) error 64 65 Close() error 66 } 67 68 func (client *Client) send(call *Call) { 69 client.sending.Lock() 70 defer client.sending.Unlock() 71 72 // Register this call. 73 client.mutex.Lock() 74 if client.shutdown { 75 call.Error = ErrShutdown 76 client.mutex.Unlock() 77 call.done() 78 return 79 } 80 seq := client.seq 81 client.seq++ 82 client.pending[seq] = call 83 client.mutex.Unlock() 84 85 // Encode and send the request. 86 client.request.Seq = seq 87 client.request.ServiceMethod = call.ServiceMethod 88 err := client.codec.WriteRequest(&client.request, call.Args) 89 if err != nil { 90 client.mutex.Lock() 91 delete(client.pending, seq) 92 client.mutex.Unlock() 93 call.Error = err 94 call.done() 95 } 96 } 97 98 func (client *Client) input() { 99 var err error 100 var response Response 101 for err == nil { 102 response = Response{} 103 err = client.codec.ReadResponseHeader(&response) 104 if err != nil { 105 if err == io.EOF && !client.closing { 106 err = io.ErrUnexpectedEOF 107 } 108 break 109 } 110 seq := response.Seq 111 client.mutex.Lock() 112 call := client.pending[seq] 113 delete(client.pending, seq) 114 client.mutex.Unlock() 115 116 if response.Error == "" { 117 err = client.codec.ReadResponseBody(call.Reply) 118 if err != nil { 119 call.Error = errors.New("reading body " + err.Error()) 120 } 121 } else { 122 // We've got an error response. Give this to the request; 123 // any subsequent requests will get the ReadResponseBody 124 // error if there is one. 125 call.Error = ServerError(response.Error) 126 err = client.codec.ReadResponseBody(nil) 127 if err != nil { 128 err = errors.New("reading error body: " + err.Error()) 129 } 130 } 131 call.done() 132 } 133 // Terminate pending calls. 134 client.sending.Lock() 135 client.mutex.Lock() 136 client.shutdown = true 137 closing := client.closing 138 for _, call := range client.pending { 139 call.Error = err 140 call.done() 141 } 142 client.mutex.Unlock() 143 client.sending.Unlock() 144 if err != io.EOF && !closing { 145 log.Println("rpc: client protocol error:", err) 146 } 147 } 148 149 func (call *Call) done() { 150 select { 151 case call.Done <- call: 152 // ok 153 default: 154 // We don't want to block here. It is the caller's responsibility to make 155 // sure the channel has enough buffer space. See comment in Go(). 156 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") 157 } 158 } 159 160 // NewClient returns a new Client to handle requests to the 161 // set of services at the other end of the connection. 162 // It adds a buffer to the write side of the connection so 163 // the header and payload are sent as a unit. 164 func NewClient(conn io.ReadWriteCloser) *Client { 165 encBuf := bufio.NewWriter(conn) 166 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} 167 return NewClientWithCodec(client) 168 } 169 170 // NewClientWithCodec is like NewClient but uses the specified 171 // codec to encode requests and decode responses. 172 func NewClientWithCodec(codec ClientCodec) *Client { 173 client := &Client{ 174 codec: codec, 175 pending: make(map[uint64]*Call), 176 } 177 go client.input() 178 return client 179 } 180 181 type gobClientCodec struct { 182 rwc io.ReadWriteCloser 183 dec *gob.Decoder 184 enc *gob.Encoder 185 encBuf *bufio.Writer 186 } 187 188 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { 189 if err = c.enc.Encode(r); err != nil { 190 return 191 } 192 if err = c.enc.Encode(body); err != nil { 193 return 194 } 195 return c.encBuf.Flush() 196 } 197 198 func (c *gobClientCodec) ReadResponseHeader(r *Response) error { 199 return c.dec.Decode(r) 200 } 201 202 func (c *gobClientCodec) ReadResponseBody(body interface{}) error { 203 return c.dec.Decode(body) 204 } 205 206 func (c *gobClientCodec) Close() error { 207 return c.rwc.Close() 208 } 209 210 // DialHTTP connects to an HTTP RPC server at the specified network address 211 // listening on the default HTTP RPC path. 212 func DialHTTP(network, address string) (*Client, error) { 213 return DialHTTPPath(network, address, DefaultRPCPath) 214 } 215 216 // DialHTTPPath connects to an HTTP RPC server 217 // at the specified network address and path. 218 func DialHTTPPath(network, address, path string) (*Client, error) { 219 var err error 220 conn, err := net.Dial(network, address) 221 if err != nil { 222 return nil, err 223 } 224 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") 225 226 // Require successful HTTP response 227 // before switching to RPC protocol. 228 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) 229 if err == nil && resp.Status == connected { 230 return NewClient(conn), nil 231 } 232 if err == nil { 233 err = errors.New("unexpected HTTP response: " + resp.Status) 234 } 235 conn.Close() 236 return nil, &net.OpError{ 237 Op: "dial-http", 238 Net: network + " " + address, 239 Addr: nil, 240 Err: err, 241 } 242 } 243 244 // Dial connects to an RPC server at the specified network address. 245 func Dial(network, address string) (*Client, error) { 246 conn, err := net.Dial(network, address) 247 if err != nil { 248 return nil, err 249 } 250 return NewClient(conn), nil 251 } 252 253 func (client *Client) Close() error { 254 client.mutex.Lock() 255 if client.shutdown || client.closing { 256 client.mutex.Unlock() 257 return ErrShutdown 258 } 259 client.closing = true 260 client.mutex.Unlock() 261 return client.codec.Close() 262 } 263 264 // Go invokes the function asynchronously. It returns the Call structure representing 265 // the invocation. The done channel will signal when the call is complete by returning 266 // the same Call object. If done is nil, Go will allocate a new channel. 267 // If non-nil, done must be buffered or Go will deliberately crash. 268 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { 269 call := new(Call) 270 call.ServiceMethod = serviceMethod 271 call.Args = args 272 call.Reply = reply 273 if done == nil { 274 done = make(chan *Call, 10) // buffered. 275 } else { 276 // If caller passes done != nil, it must arrange that 277 // done has enough buffer for the number of simultaneous 278 // RPCs that will be using that channel. If the channel 279 // is totally unbuffered, it's best not to run at all. 280 if cap(done) == 0 { 281 log.Panic("rpc: done channel is unbuffered") 282 } 283 } 284 call.Done = done 285 client.send(call) 286 return call 287 } 288 289 // Call invokes the named function, waits for it to complete, and returns its error status. 290 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { 291 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done 292 return call.Error 293 }