Source file src/pkg/net/rpc/server.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 /*
6 Package rpc provides access to the exported methods of an object across a
7 network or other I/O connection. A server registers an object, making it visible
8 as a service with the name of the type of the object. After registration, exported
9 methods of the object will be accessible remotely. A server may register multiple
10 objects (services) of different types but it is an error to register multiple
11 objects of the same type.
12
13 Only methods that satisfy these criteria will be made available for remote access;
14 other methods will be ignored:
15
16 - the method is exported.
17 - the method has two arguments, both exported (or builtin) types.
18 - the method's second argument is a pointer.
19 - the method has return type error.
20
21 In effect, the method must look schematically like
22
23 func (t *T) MethodName(argType T1, replyType *T2) error
24
25 where T, T1 and T2 can be marshaled by encoding/gob.
26 These requirements apply even if a different codec is used.
27 (In future, these requirements may soften for custom codecs.)
28
29 The method's first argument represents the arguments provided by the caller; the
30 second argument represents the result parameters to be returned to the caller.
31 The method's return value, if non-nil, is passed back as a string that the client
32 sees as if created by errors.New.
33
34 The server may handle requests on a single connection by calling ServeConn. More
35 typically it will create a network listener and call Accept or, for an HTTP
36 listener, HandleHTTP and http.Serve.
37
38 A client wishing to use the service establishes a connection and then invokes
39 NewClient on the connection. The convenience function Dial (DialHTTP) performs
40 both steps for a raw network connection (an HTTP connection). The resulting
41 Client object has two methods, Call and Go, that specify the service and method to
42 call, a pointer containing the arguments, and a pointer to receive the result
43 parameters.
44
45 The Call method waits for the remote call to complete while the Go method
46 launches the call asynchronously and signals completion using the Call
47 structure's Done channel.
48
49 Unless an explicit codec is set up, package encoding/gob is used to
50 transport the data.
51
52 Here is a simple example. A server wishes to export an object of type Arith:
53
54 package server
55
56 type Args struct {
57 A, B int
58 }
59
60 type Quotient struct {
61 Quo, Rem int
62 }
63
64 type Arith int
65
66 func (t *Arith) Multiply(args *Args, reply *int) error {
67 *reply = args.A * args.B
68 return nil
69 }
70
71 func (t *Arith) Divide(args *Args, quo *Quotient) error {
72 if args.B == 0 {
73 return errors.New("divide by zero")
74 }
75 quo.Quo = args.A / args.B
76 quo.Rem = args.A % args.B
77 return nil
78 }
79
80 The server calls (for HTTP service):
81
82 arith := new(Arith)
83 rpc.Register(arith)
84 rpc.HandleHTTP()
85 l, e := net.Listen("tcp", ":1234")
86 if e != nil {
87 log.Fatal("listen error:", e)
88 }
89 go http.Serve(l, nil)
90
91 At this point, clients can see a service "Arith" with methods "Arith.Multiply" and
92 "Arith.Divide". To invoke one, a client first dials the server:
93
94 client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
95 if err != nil {
96 log.Fatal("dialing:", err)
97 }
98
99 Then it can make a remote call:
100
101 // Synchronous call
102 args := &server.Args{7,8}
103 var reply int
104 err = client.Call("Arith.Multiply", args, &reply)
105 if err != nil {
106 log.Fatal("arith error:", err)
107 }
108 fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
109
110 or
111
112 // Asynchronous call
113 quotient := new(Quotient)
114 divCall := client.Go("Arith.Divide", args, "ient, nil)
115 replyCall := <-divCall.Done // will be equal to divCall
116 // check errors, print, etc.
117
118 A server implementation will often provide a simple, type-safe wrapper for the
119 client.
120 */
121 package rpc
122
123 import (
124 "bufio"
125 "encoding/gob"
126 "errors"
127 "io"
128 "log"
129 "net"
130 "net/http"
131 "reflect"
132 "strings"
133 "sync"
134 "unicode"
135 "unicode/utf8"
136 )
137
138 const (
139 // Defaults used by HandleHTTP
140 DefaultRPCPath = "/_goRPC_"
141 DefaultDebugPath = "/debug/rpc"
142 )
143
144 // Precompute the reflect type for error. Can't use error directly
145 // because Typeof takes an empty interface value. This is annoying.
146 var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
147
148 type methodType struct {
149 sync.Mutex // protects counters
150 method reflect.Method
151 ArgType reflect.Type
152 ReplyType reflect.Type
153 numCalls uint
154 }
155
156 type service struct {
157 name string // name of service
158 rcvr reflect.Value // receiver of methods for the service
159 typ reflect.Type // type of the receiver
160 method map[string]*methodType // registered methods
161 }
162
163 // Request is a header written before every RPC call. It is used internally
164 // but documented here as an aid to debugging, such as when analyzing
165 // network traffic.
166 type Request struct {
167 ServiceMethod string // format: "Service.Method"
168 Seq uint64 // sequence number chosen by client
169 next *Request // for free list in Server
170 }
171
172 // Response is a header written before every RPC return. It is used internally
173 // but documented here as an aid to debugging, such as when analyzing
174 // network traffic.
175 type Response struct {
176 ServiceMethod string // echoes that of the Request
177 Seq uint64 // echoes that of the request
178 Error string // error, if any.
179 next *Response // for free list in Server
180 }
181
182 // Server represents an RPC Server.
183 type Server struct {
184 mu sync.Mutex // protects the serviceMap
185 serviceMap map[string]*service
186 reqLock sync.Mutex // protects freeReq
187 freeReq *Request
188 respLock sync.Mutex // protects freeResp
189 freeResp *Response
190 }
191
192 // NewServer returns a new Server.
193 func NewServer() *Server {
194 return &Server{serviceMap: make(map[string]*service)}
195 }
196
197 // DefaultServer is the default instance of *Server.
198 var DefaultServer = NewServer()
199
200 // Is this an exported - upper case - name?
201 func isExported(name string) bool {
202 rune, _ := utf8.DecodeRuneInString(name)
203 return unicode.IsUpper(rune)
204 }
205
206 // Is this type exported or a builtin?
207 func isExportedOrBuiltinType(t reflect.Type) bool {
208 for t.Kind() == reflect.Ptr {
209 t = t.Elem()
210 }
211 // PkgPath will be non-empty even for an exported type,
212 // so we need to check the type name as well.
213 return isExported(t.Name()) || t.PkgPath() == ""
214 }
215
216 // Register publishes in the server the set of methods of the
217 // receiver value that satisfy the following conditions:
218 // - exported method
219 // - two arguments, both pointers to exported structs
220 // - one return value, of type error
221 // It returns an error if the receiver is not an exported type or has no
222 // suitable methods.
223 // The client accesses each method using a string of the form "Type.Method",
224 // where Type is the receiver's concrete type.
225 func (server *Server) Register(rcvr interface{}) error {
226 return server.register(rcvr, "", false)
227 }
228
229 // RegisterName is like Register but uses the provided name for the type
230 // instead of the receiver's concrete type.
231 func (server *Server) RegisterName(name string, rcvr interface{}) error {
232 return server.register(rcvr, name, true)
233 }
234
235 func (server *Server) register(rcvr interface{}, name string, useName bool) error {
236 server.mu.Lock()
237 defer server.mu.Unlock()
238 if server.serviceMap == nil {
239 server.serviceMap = make(map[string]*service)
240 }
241 s := new(service)
242 s.typ = reflect.TypeOf(rcvr)
243 s.rcvr = reflect.ValueOf(rcvr)
244 sname := reflect.Indirect(s.rcvr).Type().Name()
245 if useName {
246 sname = name
247 }
248 if sname == "" {
249 log.Fatal("rpc: no service name for type", s.typ.String())
250 }
251 if !isExported(sname) && !useName {
252 s := "rpc Register: type " + sname + " is not exported"
253 log.Print(s)
254 return errors.New(s)
255 }
256 if _, present := server.serviceMap[sname]; present {
257 return errors.New("rpc: service already defined: " + sname)
258 }
259 s.name = sname
260 s.method = make(map[string]*methodType)
261
262 // Install the methods
263 for m := 0; m < s.typ.NumMethod(); m++ {
264 method := s.typ.Method(m)
265 mtype := method.Type
266 mname := method.Name
267 // Method must be exported.
268 if method.PkgPath != "" {
269 continue
270 }
271 // Method needs three ins: receiver, *args, *reply.
272 if mtype.NumIn() != 3 {
273 log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
274 continue
275 }
276 // First arg need not be a pointer.
277 argType := mtype.In(1)
278 if !isExportedOrBuiltinType(argType) {
279 log.Println(mname, "argument type not exported:", argType)
280 continue
281 }
282 // Second arg must be a pointer.
283 replyType := mtype.In(2)
284 if replyType.Kind() != reflect.Ptr {
285 log.Println("method", mname, "reply type not a pointer:", replyType)
286 continue
287 }
288 // Reply type must be exported.
289 if !isExportedOrBuiltinType(replyType) {
290 log.Println("method", mname, "reply type not exported:", replyType)
291 continue
292 }
293 // Method needs one out.
294 if mtype.NumOut() != 1 {
295 log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
296 continue
297 }
298 // The return type of the method must be error.
299 if returnType := mtype.Out(0); returnType != typeOfError {
300 log.Println("method", mname, "returns", returnType.String(), "not error")
301 continue
302 }
303 s.method[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
304 }
305
306 if len(s.method) == 0 {
307 s := "rpc Register: type " + sname + " has no exported methods of suitable type"
308 log.Print(s)
309 return errors.New(s)
310 }
311 server.serviceMap[s.name] = s
312 return nil
313 }
314
315 // A value sent as a placeholder for the server's response value when the server
316 // receives an invalid request. It is never decoded by the client since the Response
317 // contains an error when it is used.
318 var invalidRequest = struct{}{}
319
320 func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
321 resp := server.getResponse()
322 // Encode the response header
323 resp.ServiceMethod = req.ServiceMethod
324 if errmsg != "" {
325 resp.Error = errmsg
326 reply = invalidRequest
327 }
328 resp.Seq = req.Seq
329 sending.Lock()
330 err := codec.WriteResponse(resp, reply)
331 if err != nil {
332 log.Println("rpc: writing response:", err)
333 }
334 sending.Unlock()
335 server.freeResponse(resp)
336 }
337
338 func (m *methodType) NumCalls() (n uint) {
339 m.Lock()
340 n = m.numCalls
341 m.Unlock()
342 return n
343 }
344
345 func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
346 mtype.Lock()
347 mtype.numCalls++
348 mtype.Unlock()
349 function := mtype.method.Func
350 // Invoke the method, providing a new value for the reply.
351 returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
352 // The return value for the method is an error.
353 errInter := returnValues[0].Interface()
354 errmsg := ""
355 if errInter != nil {
356 errmsg = errInter.(error).Error()
357 }
358 server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
359 server.freeRequest(req)
360 }
361
362 type gobServerCodec struct {
363 rwc io.ReadWriteCloser
364 dec *gob.Decoder
365 enc *gob.Encoder
366 encBuf *bufio.Writer
367 }
368
369 func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
370 return c.dec.Decode(r)
371 }
372
373 func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
374 return c.dec.Decode(body)
375 }
376
377 func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) {
378 if err = c.enc.Encode(r); err != nil {
379 return
380 }
381 if err = c.enc.Encode(body); err != nil {
382 return
383 }
384 return c.encBuf.Flush()
385 }
386
387 func (c *gobServerCodec) Close() error {
388 return c.rwc.Close()
389 }
390
391 // ServeConn runs the server on a single connection.
392 // ServeConn blocks, serving the connection until the client hangs up.
393 // The caller typically invokes ServeConn in a go statement.
394 // ServeConn uses the gob wire format (see package gob) on the
395 // connection. To use an alternate codec, use ServeCodec.
396 func (server *Server) ServeConn(conn io.ReadWriteCloser) {
397 buf := bufio.NewWriter(conn)
398 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf}
399 server.ServeCodec(srv)
400 }
401
402 // ServeCodec is like ServeConn but uses the specified codec to
403 // decode requests and encode responses.
404 func (server *Server) ServeCodec(codec ServerCodec) {
405 sending := new(sync.Mutex)
406 for {
407 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
408 if err != nil {
409 if err != io.EOF {
410 log.Println("rpc:", err)
411 }
412 if !keepReading {
413 break
414 }
415 // send a response if we actually managed to read a header.
416 if req != nil {
417 server.sendResponse(sending, req, invalidRequest, codec, err.Error())
418 server.freeRequest(req)
419 }
420 continue
421 }
422 go service.call(server, sending, mtype, req, argv, replyv, codec)
423 }
424 codec.Close()
425 }
426
427 // ServeRequest is like ServeCodec but synchronously serves a single request.
428 // It does not close the codec upon completion.
429 func (server *Server) ServeRequest(codec ServerCodec) error {
430 sending := new(sync.Mutex)
431 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
432 if err != nil {
433 if !keepReading {
434 return err
435 }
436 // send a response if we actually managed to read a header.
437 if req != nil {
438 server.sendResponse(sending, req, invalidRequest, codec, err.Error())
439 server.freeRequest(req)
440 }
441 return err
442 }
443 service.call(server, sending, mtype, req, argv, replyv, codec)
444 return nil
445 }
446
447 func (server *Server) getRequest() *Request {
448 server.reqLock.Lock()
449 req := server.freeReq
450 if req == nil {
451 req = new(Request)
452 } else {
453 server.freeReq = req.next
454 *req = Request{}
455 }
456 server.reqLock.Unlock()
457 return req
458 }
459
460 func (server *Server) freeRequest(req *Request) {
461 server.reqLock.Lock()
462 req.next = server.freeReq
463 server.freeReq = req
464 server.reqLock.Unlock()
465 }
466
467 func (server *Server) getResponse() *Response {
468 server.respLock.Lock()
469 resp := server.freeResp
470 if resp == nil {
471 resp = new(Response)
472 } else {
473 server.freeResp = resp.next
474 *resp = Response{}
475 }
476 server.respLock.Unlock()
477 return resp
478 }
479
480 func (server *Server) freeResponse(resp *Response) {
481 server.respLock.Lock()
482 resp.next = server.freeResp
483 server.freeResp = resp
484 server.respLock.Unlock()
485 }
486
487 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
488 service, mtype, req, keepReading, err = server.readRequestHeader(codec)
489 if err != nil {
490 if !keepReading {
491 return
492 }
493 // discard body
494 codec.ReadRequestBody(nil)
495 return
496 }
497
498 // Decode the argument value.
499 argIsValue := false // if true, need to indirect before calling.
500 if mtype.ArgType.Kind() == reflect.Ptr {
501 argv = reflect.New(mtype.ArgType.Elem())
502 } else {
503 argv = reflect.New(mtype.ArgType)
504 argIsValue = true
505 }
506 // argv guaranteed to be a pointer now.
507 if err = codec.ReadRequestBody(argv.Interface()); err != nil {
508 return
509 }
510 if argIsValue {
511 argv = argv.Elem()
512 }
513
514 replyv = reflect.New(mtype.ReplyType.Elem())
515 return
516 }
517
518 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
519 // Grab the request header.
520 req = server.getRequest()
521 err = codec.ReadRequestHeader(req)
522 if err != nil {
523 req = nil
524 if err == io.EOF || err == io.ErrUnexpectedEOF {
525 return
526 }
527 err = errors.New("rpc: server cannot decode request: " + err.Error())
528 return
529 }
530
531 // We read the header successfully. If we see an error now,
532 // we can still recover and move on to the next request.
533 keepReading = true
534
535 serviceMethod := strings.Split(req.ServiceMethod, ".")
536 if len(serviceMethod) != 2 {
537 err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
538 return
539 }
540 // Look up the request.
541 server.mu.Lock()
542 service = server.serviceMap[serviceMethod[0]]
543 server.mu.Unlock()
544 if service == nil {
545 err = errors.New("rpc: can't find service " + req.ServiceMethod)
546 return
547 }
548 mtype = service.method[serviceMethod[1]]
549 if mtype == nil {
550 err = errors.New("rpc: can't find method " + req.ServiceMethod)
551 }
552 return
553 }
554
555 // Accept accepts connections on the listener and serves requests
556 // for each incoming connection. Accept blocks; the caller typically
557 // invokes it in a go statement.
558 func (server *Server) Accept(lis net.Listener) {
559 for {
560 conn, err := lis.Accept()
561 if err != nil {
562 log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit?
563 }
564 go server.ServeConn(conn)
565 }
566 }
567
568 // Register publishes the receiver's methods in the DefaultServer.
569 func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
570
571 // RegisterName is like Register but uses the provided name for the type
572 // instead of the receiver's concrete type.
573 func RegisterName(name string, rcvr interface{}) error {
574 return DefaultServer.RegisterName(name, rcvr)
575 }
576
577 // A ServerCodec implements reading of RPC requests and writing of
578 // RPC responses for the server side of an RPC session.
579 // The server calls ReadRequestHeader and ReadRequestBody in pairs
580 // to read requests from the connection, and it calls WriteResponse to
581 // write a response back. The server calls Close when finished with the
582 // connection. ReadRequestBody may be called with a nil
583 // argument to force the body of the request to be read and discarded.
584 type ServerCodec interface {
585 ReadRequestHeader(*Request) error
586 ReadRequestBody(interface{}) error
587 WriteResponse(*Response, interface{}) error
588
589 Close() error
590 }
591
592 // ServeConn runs the DefaultServer on a single connection.
593 // ServeConn blocks, serving the connection until the client hangs up.
594 // The caller typically invokes ServeConn in a go statement.
595 // ServeConn uses the gob wire format (see package gob) on the
596 // connection. To use an alternate codec, use ServeCodec.
597 func ServeConn(conn io.ReadWriteCloser) {
598 DefaultServer.ServeConn(conn)
599 }
600
601 // ServeCodec is like ServeConn but uses the specified codec to
602 // decode requests and encode responses.
603 func ServeCodec(codec ServerCodec) {
604 DefaultServer.ServeCodec(codec)
605 }
606
607 // ServeRequest is like ServeCodec but synchronously serves a single request.
608 // It does not close the codec upon completion.
609 func ServeRequest(codec ServerCodec) error {
610 return DefaultServer.ServeRequest(codec)
611 }
612
613 // Accept accepts connections on the listener and serves requests
614 // to DefaultServer for each incoming connection.
615 // Accept blocks; the caller typically invokes it in a go statement.
616 func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
617
618 // Can connect to RPC service using HTTP CONNECT to rpcPath.
619 var connected = "200 Connected to Go RPC"
620
621 // ServeHTTP implements an http.Handler that answers RPC requests.
622 func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
623 if req.Method != "CONNECT" {
624 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
625 w.WriteHeader(http.StatusMethodNotAllowed)
626 io.WriteString(w, "405 must CONNECT\n")
627 return
628 }
629 conn, _, err := w.(http.Hijacker).Hijack()
630 if err != nil {
631 log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
632 return
633 }
634 io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
635 server.ServeConn(conn)
636 }
637
638 // HandleHTTP registers an HTTP handler for RPC messages on rpcPath,
639 // and a debugging handler on debugPath.
640 // It is still necessary to invoke http.Serve(), typically in a go statement.
641 func (server *Server) HandleHTTP(rpcPath, debugPath string) {
642 http.Handle(rpcPath, server)
643 http.Handle(debugPath, debugHTTP{server})
644 }
645
646 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer
647 // on DefaultRPCPath and a debugging handler on DefaultDebugPath.
648 // It is still necessary to invoke http.Serve(), typically in a go statement.
649 func HandleHTTP() {
650 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
651 }