src/pkg/net/rpc/client.go - The Go Programming Language

Golang

Source file src/pkg/net/rpc/client.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package rpc
     6	
     7	import (
     8		"bufio"
     9		"encoding/gob"
    10		"errors"
    11		"io"
    12		"log"
    13		"net"
    14		"net/http"
    15		"sync"
    16	)
    17	
    18	// ServerError represents an error that has been returned from
    19	// the remote side of the RPC connection.
    20	type ServerError string
    21	
    22	func (e ServerError) Error() string {
    23		return string(e)
    24	}
    25	
    26	var ErrShutdown = errors.New("connection is shut down")
    27	
    28	// Call represents an active RPC.
    29	type Call struct {
    30		ServiceMethod string      // The name of the service and method to call.
    31		Args          interface{} // The argument to the function (*struct).
    32		Reply         interface{} // The reply from the function (*struct).
    33		Error         error       // After completion, the error status.
    34		Done          chan *Call  // Strobes when call is complete.
    35	}
    36	
    37	// Client represents an RPC Client.
    38	// There may be multiple outstanding Calls associated
    39	// with a single Client, and a Client may be used by
    40	// multiple goroutines simultaneously.
    41	type Client struct {
    42		mutex    sync.Mutex // protects pending, seq, request
    43		sending  sync.Mutex
    44		request  Request
    45		seq      uint64
    46		codec    ClientCodec
    47		pending  map[uint64]*Call
    48		closing  bool
    49		shutdown bool
    50	}
    51	
    52	// A ClientCodec implements writing of RPC requests and
    53	// reading of RPC responses for the client side of an RPC session.
    54	// The client calls WriteRequest to write a request to the connection
    55	// and calls ReadResponseHeader and ReadResponseBody in pairs
    56	// to read responses.  The client calls Close when finished with the
    57	// connection. ReadResponseBody may be called with a nil
    58	// argument to force the body of the response to be read and then
    59	// discarded.
    60	type ClientCodec interface {
    61		WriteRequest(*Request, interface{}) error
    62		ReadResponseHeader(*Response) error
    63		ReadResponseBody(interface{}) error
    64	
    65		Close() error
    66	}
    67	
    68	func (client *Client) send(call *Call) {
    69		client.sending.Lock()
    70		defer client.sending.Unlock()
    71	
    72		// Register this call.
    73		client.mutex.Lock()
    74		if client.shutdown {
    75			call.Error = ErrShutdown
    76			client.mutex.Unlock()
    77			call.done()
    78			return
    79		}
    80		seq := client.seq
    81		client.seq++
    82		client.pending[seq] = call
    83		client.mutex.Unlock()
    84	
    85		// Encode and send the request.
    86		client.request.Seq = seq
    87		client.request.ServiceMethod = call.ServiceMethod
    88		err := client.codec.WriteRequest(&client.request, call.Args)
    89		if err != nil {
    90			client.mutex.Lock()
    91			delete(client.pending, seq)
    92			client.mutex.Unlock()
    93			call.Error = err
    94			call.done()
    95		}
    96	}
    97	
    98	func (client *Client) input() {
    99		var err error
   100		var response Response
   101		for err == nil {
   102			response = Response{}
   103			err = client.codec.ReadResponseHeader(&response)
   104			if err != nil {
   105				if err == io.EOF && !client.closing {
   106					err = io.ErrUnexpectedEOF
   107				}
   108				break
   109			}
   110			seq := response.Seq
   111			client.mutex.Lock()
   112			call := client.pending[seq]
   113			delete(client.pending, seq)
   114			client.mutex.Unlock()
   115	
   116			if response.Error == "" {
   117				err = client.codec.ReadResponseBody(call.Reply)
   118				if err != nil {
   119					call.Error = errors.New("reading body " + err.Error())
   120				}
   121			} else {
   122				// We've got an error response. Give this to the request;
   123				// any subsequent requests will get the ReadResponseBody
   124				// error if there is one.
   125				call.Error = ServerError(response.Error)
   126				err = client.codec.ReadResponseBody(nil)
   127				if err != nil {
   128					err = errors.New("reading error body: " + err.Error())
   129				}
   130			}
   131			call.done()
   132		}
   133		// Terminate pending calls.
   134		client.sending.Lock()
   135		client.mutex.Lock()
   136		client.shutdown = true
   137		closing := client.closing
   138		for _, call := range client.pending {
   139			call.Error = err
   140			call.done()
   141		}
   142		client.mutex.Unlock()
   143		client.sending.Unlock()
   144		if err != io.EOF && !closing {
   145			log.Println("rpc: client protocol error:", err)
   146		}
   147	}
   148	
   149	func (call *Call) done() {
   150		select {
   151		case call.Done <- call:
   152			// ok
   153		default:
   154			// We don't want to block here.  It is the caller's responsibility to make
   155			// sure the channel has enough buffer space. See comment in Go().
   156			log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
   157		}
   158	}
   159	
   160	// NewClient returns a new Client to handle requests to the
   161	// set of services at the other end of the connection.
   162	// It adds a buffer to the write side of the connection so
   163	// the header and payload are sent as a unit.
   164	func NewClient(conn io.ReadWriteCloser) *Client {
   165		encBuf := bufio.NewWriter(conn)
   166		client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
   167		return NewClientWithCodec(client)
   168	}
   169	
   170	// NewClientWithCodec is like NewClient but uses the specified
   171	// codec to encode requests and decode responses.
   172	func NewClientWithCodec(codec ClientCodec) *Client {
   173		client := &Client{
   174			codec:   codec,
   175			pending: make(map[uint64]*Call),
   176		}
   177		go client.input()
   178		return client
   179	}
   180	
   181	type gobClientCodec struct {
   182		rwc    io.ReadWriteCloser
   183		dec    *gob.Decoder
   184		enc    *gob.Encoder
   185		encBuf *bufio.Writer
   186	}
   187	
   188	func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
   189		if err = c.enc.Encode(r); err != nil {
   190			return
   191		}
   192		if err = c.enc.Encode(body); err != nil {
   193			return
   194		}
   195		return c.encBuf.Flush()
   196	}
   197	
   198	func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
   199		return c.dec.Decode(r)
   200	}
   201	
   202	func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
   203		return c.dec.Decode(body)
   204	}
   205	
   206	func (c *gobClientCodec) Close() error {
   207		return c.rwc.Close()
   208	}
   209	
   210	// DialHTTP connects to an HTTP RPC server at the specified network address
   211	// listening on the default HTTP RPC path.
   212	func DialHTTP(network, address string) (*Client, error) {
   213		return DialHTTPPath(network, address, DefaultRPCPath)
   214	}
   215	
   216	// DialHTTPPath connects to an HTTP RPC server 
   217	// at the specified network address and path.
   218	func DialHTTPPath(network, address, path string) (*Client, error) {
   219		var err error
   220		conn, err := net.Dial(network, address)
   221		if err != nil {
   222			return nil, err
   223		}
   224		io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
   225	
   226		// Require successful HTTP response
   227		// before switching to RPC protocol.
   228		resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
   229		if err == nil && resp.Status == connected {
   230			return NewClient(conn), nil
   231		}
   232		if err == nil {
   233			err = errors.New("unexpected HTTP response: " + resp.Status)
   234		}
   235		conn.Close()
   236		return nil, &net.OpError{
   237			Op:   "dial-http",
   238			Net:  network + " " + address,
   239			Addr: nil,
   240			Err:  err,
   241		}
   242	}
   243	
   244	// Dial connects to an RPC server at the specified network address.
   245	func Dial(network, address string) (*Client, error) {
   246		conn, err := net.Dial(network, address)
   247		if err != nil {
   248			return nil, err
   249		}
   250		return NewClient(conn), nil
   251	}
   252	
   253	func (client *Client) Close() error {
   254		client.mutex.Lock()
   255		if client.shutdown || client.closing {
   256			client.mutex.Unlock()
   257			return ErrShutdown
   258		}
   259		client.closing = true
   260		client.mutex.Unlock()
   261		return client.codec.Close()
   262	}
   263	
   264	// Go invokes the function asynchronously.  It returns the Call structure representing
   265	// the invocation.  The done channel will signal when the call is complete by returning
   266	// the same Call object.  If done is nil, Go will allocate a new channel.
   267	// If non-nil, done must be buffered or Go will deliberately crash.
   268	func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
   269		call := new(Call)
   270		call.ServiceMethod = serviceMethod
   271		call.Args = args
   272		call.Reply = reply
   273		if done == nil {
   274			done = make(chan *Call, 10) // buffered.
   275		} else {
   276			// If caller passes done != nil, it must arrange that
   277			// done has enough buffer for the number of simultaneous
   278			// RPCs that will be using that channel.  If the channel
   279			// is totally unbuffered, it's best not to run at all.
   280			if cap(done) == 0 {
   281				log.Panic("rpc: done channel is unbuffered")
   282			}
   283		}
   284		call.Done = done
   285		client.send(call)
   286		return call
   287	}
   288	
   289	// Call invokes the named function, waits for it to complete, and returns its error status.
   290	func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
   291		call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
   292		return call.Error
   293	}