src/pkg/net/rpc/server.go - The Go Programming Language

Golang

Source file src/pkg/net/rpc/server.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	/*
     6		Package rpc provides access to the exported methods of an object across a
     7		network or other I/O connection.  A server registers an object, making it visible
     8		as a service with the name of the type of the object.  After registration, exported
     9		methods of the object will be accessible remotely.  A server may register multiple
    10		objects (services) of different types but it is an error to register multiple
    11		objects of the same type.
    12	
    13		Only methods that satisfy these criteria will be made available for remote access;
    14		other methods will be ignored:
    15	
    16			- the method is exported.
    17			- the method has two arguments, both exported (or builtin) types.
    18			- the method's second argument is a pointer.
    19			- the method has return type error.
    20	
    21		In effect, the method must look schematically like
    22	
    23			func (t *T) MethodName(argType T1, replyType *T2) error
    24	
    25		where T, T1 and T2 can be marshaled by encoding/gob.
    26		These requirements apply even if a different codec is used.
    27		(In future, these requirements may soften for custom codecs.)
    28	
    29		The method's first argument represents the arguments provided by the caller; the
    30		second argument represents the result parameters to be returned to the caller.
    31		The method's return value, if non-nil, is passed back as a string that the client
    32		sees as if created by errors.New.
    33	
    34		The server may handle requests on a single connection by calling ServeConn.  More
    35		typically it will create a network listener and call Accept or, for an HTTP
    36		listener, HandleHTTP and http.Serve.
    37	
    38		A client wishing to use the service establishes a connection and then invokes
    39		NewClient on the connection.  The convenience function Dial (DialHTTP) performs
    40		both steps for a raw network connection (an HTTP connection).  The resulting
    41		Client object has two methods, Call and Go, that specify the service and method to
    42		call, a pointer containing the arguments, and a pointer to receive the result
    43		parameters.
    44	
    45		The Call method waits for the remote call to complete while the Go method
    46		launches the call asynchronously and signals completion using the Call
    47		structure's Done channel.
    48	
    49		Unless an explicit codec is set up, package encoding/gob is used to
    50		transport the data.
    51	
    52		Here is a simple example.  A server wishes to export an object of type Arith:
    53	
    54			package server
    55	
    56			type Args struct {
    57				A, B int
    58			}
    59	
    60			type Quotient struct {
    61				Quo, Rem int
    62			}
    63	
    64			type Arith int
    65	
    66			func (t *Arith) Multiply(args *Args, reply *int) error {
    67				*reply = args.A * args.B
    68				return nil
    69			}
    70	
    71			func (t *Arith) Divide(args *Args, quo *Quotient) error {
    72				if args.B == 0 {
    73					return errors.New("divide by zero")
    74				}
    75				quo.Quo = args.A / args.B
    76				quo.Rem = args.A % args.B
    77				return nil
    78			}
    79	
    80		The server calls (for HTTP service):
    81	
    82			arith := new(Arith)
    83			rpc.Register(arith)
    84			rpc.HandleHTTP()
    85			l, e := net.Listen("tcp", ":1234")
    86			if e != nil {
    87				log.Fatal("listen error:", e)
    88			}
    89			go http.Serve(l, nil)
    90	
    91		At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
    92		"Arith.Divide".  To invoke one, a client first dials the server:
    93	
    94			client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
    95			if err != nil {
    96				log.Fatal("dialing:", err)
    97			}
    98	
    99		Then it can make a remote call:
   100	
   101			// Synchronous call
   102			args := &server.Args{7,8}
   103			var reply int
   104			err = client.Call("Arith.Multiply", args, &reply)
   105			if err != nil {
   106				log.Fatal("arith error:", err)
   107			}
   108			fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
   109	
   110		or
   111	
   112			// Asynchronous call
   113			quotient := new(Quotient)
   114			divCall := client.Go("Arith.Divide", args, &quotient, nil)
   115			replyCall := <-divCall.Done	// will be equal to divCall
   116			// check errors, print, etc.
   117	
   118		A server implementation will often provide a simple, type-safe wrapper for the
   119		client.
   120	*/
   121	package rpc
   122	
   123	import (
   124		"bufio"
   125		"encoding/gob"
   126		"errors"
   127		"io"
   128		"log"
   129		"net"
   130		"net/http"
   131		"reflect"
   132		"strings"
   133		"sync"
   134		"unicode"
   135		"unicode/utf8"
   136	)
   137	
   138	const (
   139		// Defaults used by HandleHTTP
   140		DefaultRPCPath   = "/_goRPC_"
   141		DefaultDebugPath = "/debug/rpc"
   142	)
   143	
   144	// Precompute the reflect type for error.  Can't use error directly
   145	// because Typeof takes an empty interface value.  This is annoying.
   146	var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
   147	
   148	type methodType struct {
   149		sync.Mutex // protects counters
   150		method     reflect.Method
   151		ArgType    reflect.Type
   152		ReplyType  reflect.Type
   153		numCalls   uint
   154	}
   155	
   156	type service struct {
   157		name   string                 // name of service
   158		rcvr   reflect.Value          // receiver of methods for the service
   159		typ    reflect.Type           // type of the receiver
   160		method map[string]*methodType // registered methods
   161	}
   162	
   163	// Request is a header written before every RPC call.  It is used internally
   164	// but documented here as an aid to debugging, such as when analyzing
   165	// network traffic.
   166	type Request struct {
   167		ServiceMethod string   // format: "Service.Method"
   168		Seq           uint64   // sequence number chosen by client
   169		next          *Request // for free list in Server
   170	}
   171	
   172	// Response is a header written before every RPC return.  It is used internally
   173	// but documented here as an aid to debugging, such as when analyzing
   174	// network traffic.
   175	type Response struct {
   176		ServiceMethod string    // echoes that of the Request
   177		Seq           uint64    // echoes that of the request
   178		Error         string    // error, if any.
   179		next          *Response // for free list in Server
   180	}
   181	
   182	// Server represents an RPC Server.
   183	type Server struct {
   184		mu         sync.Mutex // protects the serviceMap
   185		serviceMap map[string]*service
   186		reqLock    sync.Mutex // protects freeReq
   187		freeReq    *Request
   188		respLock   sync.Mutex // protects freeResp
   189		freeResp   *Response
   190	}
   191	
   192	// NewServer returns a new Server.
   193	func NewServer() *Server {
   194		return &Server{serviceMap: make(map[string]*service)}
   195	}
   196	
   197	// DefaultServer is the default instance of *Server.
   198	var DefaultServer = NewServer()
   199	
   200	// Is this an exported - upper case - name?
   201	func isExported(name string) bool {
   202		rune, _ := utf8.DecodeRuneInString(name)
   203		return unicode.IsUpper(rune)
   204	}
   205	
   206	// Is this type exported or a builtin?
   207	func isExportedOrBuiltinType(t reflect.Type) bool {
   208		for t.Kind() == reflect.Ptr {
   209			t = t.Elem()
   210		}
   211		// PkgPath will be non-empty even for an exported type,
   212		// so we need to check the type name as well.
   213		return isExported(t.Name()) || t.PkgPath() == ""
   214	}
   215	
   216	// Register publishes in the server the set of methods of the
   217	// receiver value that satisfy the following conditions:
   218	//	- exported method
   219	//	- two arguments, both pointers to exported structs
   220	//	- one return value, of type error
   221	// It returns an error if the receiver is not an exported type or has no
   222	// suitable methods.
   223	// The client accesses each method using a string of the form "Type.Method",
   224	// where Type is the receiver's concrete type.
   225	func (server *Server) Register(rcvr interface{}) error {
   226		return server.register(rcvr, "", false)
   227	}
   228	
   229	// RegisterName is like Register but uses the provided name for the type 
   230	// instead of the receiver's concrete type.
   231	func (server *Server) RegisterName(name string, rcvr interface{}) error {
   232		return server.register(rcvr, name, true)
   233	}
   234	
   235	func (server *Server) register(rcvr interface{}, name string, useName bool) error {
   236		server.mu.Lock()
   237		defer server.mu.Unlock()
   238		if server.serviceMap == nil {
   239			server.serviceMap = make(map[string]*service)
   240		}
   241		s := new(service)
   242		s.typ = reflect.TypeOf(rcvr)
   243		s.rcvr = reflect.ValueOf(rcvr)
   244		sname := reflect.Indirect(s.rcvr).Type().Name()
   245		if useName {
   246			sname = name
   247		}
   248		if sname == "" {
   249			log.Fatal("rpc: no service name for type", s.typ.String())
   250		}
   251		if !isExported(sname) && !useName {
   252			s := "rpc Register: type " + sname + " is not exported"
   253			log.Print(s)
   254			return errors.New(s)
   255		}
   256		if _, present := server.serviceMap[sname]; present {
   257			return errors.New("rpc: service already defined: " + sname)
   258		}
   259		s.name = sname
   260		s.method = make(map[string]*methodType)
   261	
   262		// Install the methods
   263		for m := 0; m < s.typ.NumMethod(); m++ {
   264			method := s.typ.Method(m)
   265			mtype := method.Type
   266			mname := method.Name
   267			// Method must be exported.
   268			if method.PkgPath != "" {
   269				continue
   270			}
   271			// Method needs three ins: receiver, *args, *reply.
   272			if mtype.NumIn() != 3 {
   273				log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
   274				continue
   275			}
   276			// First arg need not be a pointer.
   277			argType := mtype.In(1)
   278			if !isExportedOrBuiltinType(argType) {
   279				log.Println(mname, "argument type not exported:", argType)
   280				continue
   281			}
   282			// Second arg must be a pointer.
   283			replyType := mtype.In(2)
   284			if replyType.Kind() != reflect.Ptr {
   285				log.Println("method", mname, "reply type not a pointer:", replyType)
   286				continue
   287			}
   288			// Reply type must be exported.
   289			if !isExportedOrBuiltinType(replyType) {
   290				log.Println("method", mname, "reply type not exported:", replyType)
   291				continue
   292			}
   293			// Method needs one out.
   294			if mtype.NumOut() != 1 {
   295				log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
   296				continue
   297			}
   298			// The return type of the method must be error.
   299			if returnType := mtype.Out(0); returnType != typeOfError {
   300				log.Println("method", mname, "returns", returnType.String(), "not error")
   301				continue
   302			}
   303			s.method[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
   304		}
   305	
   306		if len(s.method) == 0 {
   307			s := "rpc Register: type " + sname + " has no exported methods of suitable type"
   308			log.Print(s)
   309			return errors.New(s)
   310		}
   311		server.serviceMap[s.name] = s
   312		return nil
   313	}
   314	
   315	// A value sent as a placeholder for the server's response value when the server
   316	// receives an invalid request. It is never decoded by the client since the Response
   317	// contains an error when it is used.
   318	var invalidRequest = struct{}{}
   319	
   320	func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
   321		resp := server.getResponse()
   322		// Encode the response header
   323		resp.ServiceMethod = req.ServiceMethod
   324		if errmsg != "" {
   325			resp.Error = errmsg
   326			reply = invalidRequest
   327		}
   328		resp.Seq = req.Seq
   329		sending.Lock()
   330		err := codec.WriteResponse(resp, reply)
   331		if err != nil {
   332			log.Println("rpc: writing response:", err)
   333		}
   334		sending.Unlock()
   335		server.freeResponse(resp)
   336	}
   337	
   338	func (m *methodType) NumCalls() (n uint) {
   339		m.Lock()
   340		n = m.numCalls
   341		m.Unlock()
   342		return n
   343	}
   344	
   345	func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
   346		mtype.Lock()
   347		mtype.numCalls++
   348		mtype.Unlock()
   349		function := mtype.method.Func
   350		// Invoke the method, providing a new value for the reply.
   351		returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
   352		// The return value for the method is an error.
   353		errInter := returnValues[0].Interface()
   354		errmsg := ""
   355		if errInter != nil {
   356			errmsg = errInter.(error).Error()
   357		}
   358		server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
   359		server.freeRequest(req)
   360	}
   361	
   362	type gobServerCodec struct {
   363		rwc    io.ReadWriteCloser
   364		dec    *gob.Decoder
   365		enc    *gob.Encoder
   366		encBuf *bufio.Writer
   367	}
   368	
   369	func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
   370		return c.dec.Decode(r)
   371	}
   372	
   373	func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
   374		return c.dec.Decode(body)
   375	}
   376	
   377	func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
   378		if err = c.enc.Encode(r); err != nil {
   379			return
   380		}
   381		if err = c.enc.Encode(body); err != nil {
   382			return
   383		}
   384		return c.encBuf.Flush()
   385	}
   386	
   387	func (c *gobServerCodec) Close() error {
   388		return c.rwc.Close()
   389	}
   390	
   391	// ServeConn runs the server on a single connection.
   392	// ServeConn blocks, serving the connection until the client hangs up.
   393	// The caller typically invokes ServeConn in a go statement.
   394	// ServeConn uses the gob wire format (see package gob) on the
   395	// connection.  To use an alternate codec, use ServeCodec.
   396	func (server *Server) ServeConn(conn io.ReadWriteCloser) {
   397		buf := bufio.NewWriter(conn)
   398		srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf}
   399		server.ServeCodec(srv)
   400	}
   401	
   402	// ServeCodec is like ServeConn but uses the specified codec to
   403	// decode requests and encode responses.
   404	func (server *Server) ServeCodec(codec ServerCodec) {
   405		sending := new(sync.Mutex)
   406		for {
   407			service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
   408			if err != nil {
   409				if err != io.EOF {
   410					log.Println("rpc:", err)
   411				}
   412				if !keepReading {
   413					break
   414				}
   415				// send a response if we actually managed to read a header.
   416				if req != nil {
   417					server.sendResponse(sending, req, invalidRequest, codec, err.Error())
   418					server.freeRequest(req)
   419				}
   420				continue
   421			}
   422			go service.call(server, sending, mtype, req, argv, replyv, codec)
   423		}
   424		codec.Close()
   425	}
   426	
   427	// ServeRequest is like ServeCodec but synchronously serves a single request.
   428	// It does not close the codec upon completion.
   429	func (server *Server) ServeRequest(codec ServerCodec) error {
   430		sending := new(sync.Mutex)
   431		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
   432		if err != nil {
   433			if !keepReading {
   434				return err
   435			}
   436			// send a response if we actually managed to read a header.
   437			if req != nil {
   438				server.sendResponse(sending, req, invalidRequest, codec, err.Error())
   439				server.freeRequest(req)
   440			}
   441			return err
   442		}
   443		service.call(server, sending, mtype, req, argv, replyv, codec)
   444		return nil
   445	}
   446	
   447	func (server *Server) getRequest() *Request {
   448		server.reqLock.Lock()
   449		req := server.freeReq
   450		if req == nil {
   451			req = new(Request)
   452		} else {
   453			server.freeReq = req.next
   454			*req = Request{}
   455		}
   456		server.reqLock.Unlock()
   457		return req
   458	}
   459	
   460	func (server *Server) freeRequest(req *Request) {
   461		server.reqLock.Lock()
   462		req.next = server.freeReq
   463		server.freeReq = req
   464		server.reqLock.Unlock()
   465	}
   466	
   467	func (server *Server) getResponse() *Response {
   468		server.respLock.Lock()
   469		resp := server.freeResp
   470		if resp == nil {
   471			resp = new(Response)
   472		} else {
   473			server.freeResp = resp.next
   474			*resp = Response{}
   475		}
   476		server.respLock.Unlock()
   477		return resp
   478	}
   479	
   480	func (server *Server) freeResponse(resp *Response) {
   481		server.respLock.Lock()
   482		resp.next = server.freeResp
   483		server.freeResp = resp
   484		server.respLock.Unlock()
   485	}
   486	
   487	func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
   488		service, mtype, req, keepReading, err = server.readRequestHeader(codec)
   489		if err != nil {
   490			if !keepReading {
   491				return
   492			}
   493			// discard body
   494			codec.ReadRequestBody(nil)
   495			return
   496		}
   497	
   498		// Decode the argument value.
   499		argIsValue := false // if true, need to indirect before calling.
   500		if mtype.ArgType.Kind() == reflect.Ptr {
   501			argv = reflect.New(mtype.ArgType.Elem())
   502		} else {
   503			argv = reflect.New(mtype.ArgType)
   504			argIsValue = true
   505		}
   506		// argv guaranteed to be a pointer now.
   507		if err = codec.ReadRequestBody(argv.Interface()); err != nil {
   508			return
   509		}
   510		if argIsValue {
   511			argv = argv.Elem()
   512		}
   513	
   514		replyv = reflect.New(mtype.ReplyType.Elem())
   515		return
   516	}
   517	
   518	func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
   519		// Grab the request header.
   520		req = server.getRequest()
   521		err = codec.ReadRequestHeader(req)
   522		if err != nil {
   523			req = nil
   524			if err == io.EOF || err == io.ErrUnexpectedEOF {
   525				return
   526			}
   527			err = errors.New("rpc: server cannot decode request: " + err.Error())
   528			return
   529		}
   530	
   531		// We read the header successfully.  If we see an error now,
   532		// we can still recover and move on to the next request.
   533		keepReading = true
   534	
   535		serviceMethod := strings.Split(req.ServiceMethod, ".")
   536		if len(serviceMethod) != 2 {
   537			err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
   538			return
   539		}
   540		// Look up the request.
   541		server.mu.Lock()
   542		service = server.serviceMap[serviceMethod[0]]
   543		server.mu.Unlock()
   544		if service == nil {
   545			err = errors.New("rpc: can't find service " + req.ServiceMethod)
   546			return
   547		}
   548		mtype = service.method[serviceMethod[1]]
   549		if mtype == nil {
   550			err = errors.New("rpc: can't find method " + req.ServiceMethod)
   551		}
   552		return
   553	}
   554	
   555	// Accept accepts connections on the listener and serves requests
   556	// for each incoming connection.  Accept blocks; the caller typically
   557	// invokes it in a go statement.
   558	func (server *Server) Accept(lis net.Listener) {
   559		for {
   560			conn, err := lis.Accept()
   561			if err != nil {
   562				log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit?
   563			}
   564			go server.ServeConn(conn)
   565		}
   566	}
   567	
   568	// Register publishes the receiver's methods in the DefaultServer.
   569	func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
   570	
   571	// RegisterName is like Register but uses the provided name for the type 
   572	// instead of the receiver's concrete type.
   573	func RegisterName(name string, rcvr interface{}) error {
   574		return DefaultServer.RegisterName(name, rcvr)
   575	}
   576	
   577	// A ServerCodec implements reading of RPC requests and writing of
   578	// RPC responses for the server side of an RPC session.
   579	// The server calls ReadRequestHeader and ReadRequestBody in pairs
   580	// to read requests from the connection, and it calls WriteResponse to
   581	// write a response back.  The server calls Close when finished with the
   582	// connection. ReadRequestBody may be called with a nil
   583	// argument to force the body of the request to be read and discarded.
   584	type ServerCodec interface {
   585		ReadRequestHeader(*Request) error
   586		ReadRequestBody(interface{}) error
   587		WriteResponse(*Response, interface{}) error
   588	
   589		Close() error
   590	}
   591	
   592	// ServeConn runs the DefaultServer on a single connection.
   593	// ServeConn blocks, serving the connection until the client hangs up.
   594	// The caller typically invokes ServeConn in a go statement.
   595	// ServeConn uses the gob wire format (see package gob) on the
   596	// connection.  To use an alternate codec, use ServeCodec.
   597	func ServeConn(conn io.ReadWriteCloser) {
   598		DefaultServer.ServeConn(conn)
   599	}
   600	
   601	// ServeCodec is like ServeConn but uses the specified codec to
   602	// decode requests and encode responses.
   603	func ServeCodec(codec ServerCodec) {
   604		DefaultServer.ServeCodec(codec)
   605	}
   606	
   607	// ServeRequest is like ServeCodec but synchronously serves a single request.
   608	// It does not close the codec upon completion.
   609	func ServeRequest(codec ServerCodec) error {
   610		return DefaultServer.ServeRequest(codec)
   611	}
   612	
   613	// Accept accepts connections on the listener and serves requests
   614	// to DefaultServer for each incoming connection.  
   615	// Accept blocks; the caller typically invokes it in a go statement.
   616	func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
   617	
   618	// Can connect to RPC service using HTTP CONNECT to rpcPath.
   619	var connected = "200 Connected to Go RPC"
   620	
   621	// ServeHTTP implements an http.Handler that answers RPC requests.
   622	func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   623		if req.Method != "CONNECT" {
   624			w.Header().Set("Content-Type", "text/plain; charset=utf-8")
   625			w.WriteHeader(http.StatusMethodNotAllowed)
   626			io.WriteString(w, "405 must CONNECT\n")
   627			return
   628		}
   629		conn, _, err := w.(http.Hijacker).Hijack()
   630		if err != nil {
   631			log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
   632			return
   633		}
   634		io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
   635		server.ServeConn(conn)
   636	}
   637	
   638	// HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
   639	// and a debugging handler on debugPath.
   640	// It is still necessary to invoke http.Serve(), typically in a go statement.
   641	func (server *Server) HandleHTTP(rpcPath, debugPath string) {
   642		http.Handle(rpcPath, server)
   643		http.Handle(debugPath, debugHTTP{server})
   644	}
   645	
   646	// HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
   647	// on DefaultRPCPath and a debugging handler on DefaultDebugPath.
   648	// It is still necessary to invoke http.Serve(), typically in a go statement.
   649	func HandleHTTP() {
   650		DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
   651	}