Source file src/pkg/net/rpc/server.go
1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 /* 6 Package rpc provides access to the exported methods of an object across a 7 network or other I/O connection. A server registers an object, making it visible 8 as a service with the name of the type of the object. After registration, exported 9 methods of the object will be accessible remotely. A server may register multiple 10 objects (services) of different types but it is an error to register multiple 11 objects of the same type. 12 13 Only methods that satisfy these criteria will be made available for remote access; 14 other methods will be ignored: 15 16 - the method is exported. 17 - the method has two arguments, both exported (or builtin) types. 18 - the method's second argument is a pointer. 19 - the method has return type error. 20 21 In effect, the method must look schematically like 22 23 func (t *T) MethodName(argType T1, replyType *T2) error 24 25 where T, T1 and T2 can be marshaled by encoding/gob. 26 These requirements apply even if a different codec is used. 27 (In future, these requirements may soften for custom codecs.) 28 29 The method's first argument represents the arguments provided by the caller; the 30 second argument represents the result parameters to be returned to the caller. 31 The method's return value, if non-nil, is passed back as a string that the client 32 sees as if created by errors.New. 33 34 The server may handle requests on a single connection by calling ServeConn. More 35 typically it will create a network listener and call Accept or, for an HTTP 36 listener, HandleHTTP and http.Serve. 37 38 A client wishing to use the service establishes a connection and then invokes 39 NewClient on the connection. The convenience function Dial (DialHTTP) performs 40 both steps for a raw network connection (an HTTP connection). The resulting 41 Client object has two methods, Call and Go, that specify the service and method to 42 call, a pointer containing the arguments, and a pointer to receive the result 43 parameters. 44 45 The Call method waits for the remote call to complete while the Go method 46 launches the call asynchronously and signals completion using the Call 47 structure's Done channel. 48 49 Unless an explicit codec is set up, package encoding/gob is used to 50 transport the data. 51 52 Here is a simple example. A server wishes to export an object of type Arith: 53 54 package server 55 56 type Args struct { 57 A, B int 58 } 59 60 type Quotient struct { 61 Quo, Rem int 62 } 63 64 type Arith int 65 66 func (t *Arith) Multiply(args *Args, reply *int) error { 67 *reply = args.A * args.B 68 return nil 69 } 70 71 func (t *Arith) Divide(args *Args, quo *Quotient) error { 72 if args.B == 0 { 73 return errors.New("divide by zero") 74 } 75 quo.Quo = args.A / args.B 76 quo.Rem = args.A % args.B 77 return nil 78 } 79 80 The server calls (for HTTP service): 81 82 arith := new(Arith) 83 rpc.Register(arith) 84 rpc.HandleHTTP() 85 l, e := net.Listen("tcp", ":1234") 86 if e != nil { 87 log.Fatal("listen error:", e) 88 } 89 go http.Serve(l, nil) 90 91 At this point, clients can see a service "Arith" with methods "Arith.Multiply" and 92 "Arith.Divide". To invoke one, a client first dials the server: 93 94 client, err := rpc.DialHTTP("tcp", serverAddress + ":1234") 95 if err != nil { 96 log.Fatal("dialing:", err) 97 } 98 99 Then it can make a remote call: 100 101 // Synchronous call 102 args := &server.Args{7,8} 103 var reply int 104 err = client.Call("Arith.Multiply", args, &reply) 105 if err != nil { 106 log.Fatal("arith error:", err) 107 } 108 fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply) 109 110 or 111 112 // Asynchronous call 113 quotient := new(Quotient) 114 divCall := client.Go("Arith.Divide", args, "ient, nil) 115 replyCall := <-divCall.Done // will be equal to divCall 116 // check errors, print, etc. 117 118 A server implementation will often provide a simple, type-safe wrapper for the 119 client. 120 */ 121 package rpc 122 123 import ( 124 "bufio" 125 "encoding/gob" 126 "errors" 127 "io" 128 "log" 129 "net" 130 "net/http" 131 "reflect" 132 "strings" 133 "sync" 134 "unicode" 135 "unicode/utf8" 136 ) 137 138 const ( 139 // Defaults used by HandleHTTP 140 DefaultRPCPath = "/_goRPC_" 141 DefaultDebugPath = "/debug/rpc" 142 ) 143 144 // Precompute the reflect type for error. Can't use error directly 145 // because Typeof takes an empty interface value. This is annoying. 146 var typeOfError = reflect.TypeOf((*error)(nil)).Elem() 147 148 type methodType struct { 149 sync.Mutex // protects counters 150 method reflect.Method 151 ArgType reflect.Type 152 ReplyType reflect.Type 153 numCalls uint 154 } 155 156 type service struct { 157 name string // name of service 158 rcvr reflect.Value // receiver of methods for the service 159 typ reflect.Type // type of the receiver 160 method map[string]*methodType // registered methods 161 } 162 163 // Request is a header written before every RPC call. It is used internally 164 // but documented here as an aid to debugging, such as when analyzing 165 // network traffic. 166 type Request struct { 167 ServiceMethod string // format: "Service.Method" 168 Seq uint64 // sequence number chosen by client 169 next *Request // for free list in Server 170 } 171 172 // Response is a header written before every RPC return. It is used internally 173 // but documented here as an aid to debugging, such as when analyzing 174 // network traffic. 175 type Response struct { 176 ServiceMethod string // echoes that of the Request 177 Seq uint64 // echoes that of the request 178 Error string // error, if any. 179 next *Response // for free list in Server 180 } 181 182 // Server represents an RPC Server. 183 type Server struct { 184 mu sync.Mutex // protects the serviceMap 185 serviceMap map[string]*service 186 reqLock sync.Mutex // protects freeReq 187 freeReq *Request 188 respLock sync.Mutex // protects freeResp 189 freeResp *Response 190 } 191 192 // NewServer returns a new Server. 193 func NewServer() *Server { 194 return &Server{serviceMap: make(map[string]*service)} 195 } 196 197 // DefaultServer is the default instance of *Server. 198 var DefaultServer = NewServer() 199 200 // Is this an exported - upper case - name? 201 func isExported(name string) bool { 202 rune, _ := utf8.DecodeRuneInString(name) 203 return unicode.IsUpper(rune) 204 } 205 206 // Is this type exported or a builtin? 207 func isExportedOrBuiltinType(t reflect.Type) bool { 208 for t.Kind() == reflect.Ptr { 209 t = t.Elem() 210 } 211 // PkgPath will be non-empty even for an exported type, 212 // so we need to check the type name as well. 213 return isExported(t.Name()) || t.PkgPath() == "" 214 } 215 216 // Register publishes in the server the set of methods of the 217 // receiver value that satisfy the following conditions: 218 // - exported method 219 // - two arguments, both pointers to exported structs 220 // - one return value, of type error 221 // It returns an error if the receiver is not an exported type or has no 222 // suitable methods. 223 // The client accesses each method using a string of the form "Type.Method", 224 // where Type is the receiver's concrete type. 225 func (server *Server) Register(rcvr interface{}) error { 226 return server.register(rcvr, "", false) 227 } 228 229 // RegisterName is like Register but uses the provided name for the type 230 // instead of the receiver's concrete type. 231 func (server *Server) RegisterName(name string, rcvr interface{}) error { 232 return server.register(rcvr, name, true) 233 } 234 235 func (server *Server) register(rcvr interface{}, name string, useName bool) error { 236 server.mu.Lock() 237 defer server.mu.Unlock() 238 if server.serviceMap == nil { 239 server.serviceMap = make(map[string]*service) 240 } 241 s := new(service) 242 s.typ = reflect.TypeOf(rcvr) 243 s.rcvr = reflect.ValueOf(rcvr) 244 sname := reflect.Indirect(s.rcvr).Type().Name() 245 if useName { 246 sname = name 247 } 248 if sname == "" { 249 log.Fatal("rpc: no service name for type", s.typ.String()) 250 } 251 if !isExported(sname) && !useName { 252 s := "rpc Register: type " + sname + " is not exported" 253 log.Print(s) 254 return errors.New(s) 255 } 256 if _, present := server.serviceMap[sname]; present { 257 return errors.New("rpc: service already defined: " + sname) 258 } 259 s.name = sname 260 s.method = make(map[string]*methodType) 261 262 // Install the methods 263 for m := 0; m < s.typ.NumMethod(); m++ { 264 method := s.typ.Method(m) 265 mtype := method.Type 266 mname := method.Name 267 // Method must be exported. 268 if method.PkgPath != "" { 269 continue 270 } 271 // Method needs three ins: receiver, *args, *reply. 272 if mtype.NumIn() != 3 { 273 log.Println("method", mname, "has wrong number of ins:", mtype.NumIn()) 274 continue 275 } 276 // First arg need not be a pointer. 277 argType := mtype.In(1) 278 if !isExportedOrBuiltinType(argType) { 279 log.Println(mname, "argument type not exported:", argType) 280 continue 281 } 282 // Second arg must be a pointer. 283 replyType := mtype.In(2) 284 if replyType.Kind() != reflect.Ptr { 285 log.Println("method", mname, "reply type not a pointer:", replyType) 286 continue 287 } 288 // Reply type must be exported. 289 if !isExportedOrBuiltinType(replyType) { 290 log.Println("method", mname, "reply type not exported:", replyType) 291 continue 292 } 293 // Method needs one out. 294 if mtype.NumOut() != 1 { 295 log.Println("method", mname, "has wrong number of outs:", mtype.NumOut()) 296 continue 297 } 298 // The return type of the method must be error. 299 if returnType := mtype.Out(0); returnType != typeOfError { 300 log.Println("method", mname, "returns", returnType.String(), "not error") 301 continue 302 } 303 s.method[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType} 304 } 305 306 if len(s.method) == 0 { 307 s := "rpc Register: type " + sname + " has no exported methods of suitable type" 308 log.Print(s) 309 return errors.New(s) 310 } 311 server.serviceMap[s.name] = s 312 return nil 313 } 314 315 // A value sent as a placeholder for the server's response value when the server 316 // receives an invalid request. It is never decoded by the client since the Response 317 // contains an error when it is used. 318 var invalidRequest = struct{}{} 319 320 func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) { 321 resp := server.getResponse() 322 // Encode the response header 323 resp.ServiceMethod = req.ServiceMethod 324 if errmsg != "" { 325 resp.Error = errmsg 326 reply = invalidRequest 327 } 328 resp.Seq = req.Seq 329 sending.Lock() 330 err := codec.WriteResponse(resp, reply) 331 if err != nil { 332 log.Println("rpc: writing response:", err) 333 } 334 sending.Unlock() 335 server.freeResponse(resp) 336 } 337 338 func (m *methodType) NumCalls() (n uint) { 339 m.Lock() 340 n = m.numCalls 341 m.Unlock() 342 return n 343 } 344 345 func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { 346 mtype.Lock() 347 mtype.numCalls++ 348 mtype.Unlock() 349 function := mtype.method.Func 350 // Invoke the method, providing a new value for the reply. 351 returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) 352 // The return value for the method is an error. 353 errInter := returnValues[0].Interface() 354 errmsg := "" 355 if errInter != nil { 356 errmsg = errInter.(error).Error() 357 } 358 server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) 359 server.freeRequest(req) 360 } 361 362 type gobServerCodec struct { 363 rwc io.ReadWriteCloser 364 dec *gob.Decoder 365 enc *gob.Encoder 366 encBuf *bufio.Writer 367 } 368 369 func (c *gobServerCodec) ReadRequestHeader(r *Request) error { 370 return c.dec.Decode(r) 371 } 372 373 func (c *gobServerCodec) ReadRequestBody(body interface{}) error { 374 return c.dec.Decode(body) 375 } 376 377 func (c *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) { 378 if err = c.enc.Encode(r); err != nil { 379 return 380 } 381 if err = c.enc.Encode(body); err != nil { 382 return 383 } 384 return c.encBuf.Flush() 385 } 386 387 func (c *gobServerCodec) Close() error { 388 return c.rwc.Close() 389 } 390 391 // ServeConn runs the server on a single connection. 392 // ServeConn blocks, serving the connection until the client hangs up. 393 // The caller typically invokes ServeConn in a go statement. 394 // ServeConn uses the gob wire format (see package gob) on the 395 // connection. To use an alternate codec, use ServeCodec. 396 func (server *Server) ServeConn(conn io.ReadWriteCloser) { 397 buf := bufio.NewWriter(conn) 398 srv := &gobServerCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(buf), buf} 399 server.ServeCodec(srv) 400 } 401 402 // ServeCodec is like ServeConn but uses the specified codec to 403 // decode requests and encode responses. 404 func (server *Server) ServeCodec(codec ServerCodec) { 405 sending := new(sync.Mutex) 406 for { 407 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) 408 if err != nil { 409 if err != io.EOF { 410 log.Println("rpc:", err) 411 } 412 if !keepReading { 413 break 414 } 415 // send a response if we actually managed to read a header. 416 if req != nil { 417 server.sendResponse(sending, req, invalidRequest, codec, err.Error()) 418 server.freeRequest(req) 419 } 420 continue 421 } 422 go service.call(server, sending, mtype, req, argv, replyv, codec) 423 } 424 codec.Close() 425 } 426 427 // ServeRequest is like ServeCodec but synchronously serves a single request. 428 // It does not close the codec upon completion. 429 func (server *Server) ServeRequest(codec ServerCodec) error { 430 sending := new(sync.Mutex) 431 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) 432 if err != nil { 433 if !keepReading { 434 return err 435 } 436 // send a response if we actually managed to read a header. 437 if req != nil { 438 server.sendResponse(sending, req, invalidRequest, codec, err.Error()) 439 server.freeRequest(req) 440 } 441 return err 442 } 443 service.call(server, sending, mtype, req, argv, replyv, codec) 444 return nil 445 } 446 447 func (server *Server) getRequest() *Request { 448 server.reqLock.Lock() 449 req := server.freeReq 450 if req == nil { 451 req = new(Request) 452 } else { 453 server.freeReq = req.next 454 *req = Request{} 455 } 456 server.reqLock.Unlock() 457 return req 458 } 459 460 func (server *Server) freeRequest(req *Request) { 461 server.reqLock.Lock() 462 req.next = server.freeReq 463 server.freeReq = req 464 server.reqLock.Unlock() 465 } 466 467 func (server *Server) getResponse() *Response { 468 server.respLock.Lock() 469 resp := server.freeResp 470 if resp == nil { 471 resp = new(Response) 472 } else { 473 server.freeResp = resp.next 474 *resp = Response{} 475 } 476 server.respLock.Unlock() 477 return resp 478 } 479 480 func (server *Server) freeResponse(resp *Response) { 481 server.respLock.Lock() 482 resp.next = server.freeResp 483 server.freeResp = resp 484 server.respLock.Unlock() 485 } 486 487 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) { 488 service, mtype, req, keepReading, err = server.readRequestHeader(codec) 489 if err != nil { 490 if !keepReading { 491 return 492 } 493 // discard body 494 codec.ReadRequestBody(nil) 495 return 496 } 497 498 // Decode the argument value. 499 argIsValue := false // if true, need to indirect before calling. 500 if mtype.ArgType.Kind() == reflect.Ptr { 501 argv = reflect.New(mtype.ArgType.Elem()) 502 } else { 503 argv = reflect.New(mtype.ArgType) 504 argIsValue = true 505 } 506 // argv guaranteed to be a pointer now. 507 if err = codec.ReadRequestBody(argv.Interface()); err != nil { 508 return 509 } 510 if argIsValue { 511 argv = argv.Elem() 512 } 513 514 replyv = reflect.New(mtype.ReplyType.Elem()) 515 return 516 } 517 518 func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) { 519 // Grab the request header. 520 req = server.getRequest() 521 err = codec.ReadRequestHeader(req) 522 if err != nil { 523 req = nil 524 if err == io.EOF || err == io.ErrUnexpectedEOF { 525 return 526 } 527 err = errors.New("rpc: server cannot decode request: " + err.Error()) 528 return 529 } 530 531 // We read the header successfully. If we see an error now, 532 // we can still recover and move on to the next request. 533 keepReading = true 534 535 serviceMethod := strings.Split(req.ServiceMethod, ".") 536 if len(serviceMethod) != 2 { 537 err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod) 538 return 539 } 540 // Look up the request. 541 server.mu.Lock() 542 service = server.serviceMap[serviceMethod[0]] 543 server.mu.Unlock() 544 if service == nil { 545 err = errors.New("rpc: can't find service " + req.ServiceMethod) 546 return 547 } 548 mtype = service.method[serviceMethod[1]] 549 if mtype == nil { 550 err = errors.New("rpc: can't find method " + req.ServiceMethod) 551 } 552 return 553 } 554 555 // Accept accepts connections on the listener and serves requests 556 // for each incoming connection. Accept blocks; the caller typically 557 // invokes it in a go statement. 558 func (server *Server) Accept(lis net.Listener) { 559 for { 560 conn, err := lis.Accept() 561 if err != nil { 562 log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit? 563 } 564 go server.ServeConn(conn) 565 } 566 } 567 568 // Register publishes the receiver's methods in the DefaultServer. 569 func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } 570 571 // RegisterName is like Register but uses the provided name for the type 572 // instead of the receiver's concrete type. 573 func RegisterName(name string, rcvr interface{}) error { 574 return DefaultServer.RegisterName(name, rcvr) 575 } 576 577 // A ServerCodec implements reading of RPC requests and writing of 578 // RPC responses for the server side of an RPC session. 579 // The server calls ReadRequestHeader and ReadRequestBody in pairs 580 // to read requests from the connection, and it calls WriteResponse to 581 // write a response back. The server calls Close when finished with the 582 // connection. ReadRequestBody may be called with a nil 583 // argument to force the body of the request to be read and discarded. 584 type ServerCodec interface { 585 ReadRequestHeader(*Request) error 586 ReadRequestBody(interface{}) error 587 WriteResponse(*Response, interface{}) error 588 589 Close() error 590 } 591 592 // ServeConn runs the DefaultServer on a single connection. 593 // ServeConn blocks, serving the connection until the client hangs up. 594 // The caller typically invokes ServeConn in a go statement. 595 // ServeConn uses the gob wire format (see package gob) on the 596 // connection. To use an alternate codec, use ServeCodec. 597 func ServeConn(conn io.ReadWriteCloser) { 598 DefaultServer.ServeConn(conn) 599 } 600 601 // ServeCodec is like ServeConn but uses the specified codec to 602 // decode requests and encode responses. 603 func ServeCodec(codec ServerCodec) { 604 DefaultServer.ServeCodec(codec) 605 } 606 607 // ServeRequest is like ServeCodec but synchronously serves a single request. 608 // It does not close the codec upon completion. 609 func ServeRequest(codec ServerCodec) error { 610 return DefaultServer.ServeRequest(codec) 611 } 612 613 // Accept accepts connections on the listener and serves requests 614 // to DefaultServer for each incoming connection. 615 // Accept blocks; the caller typically invokes it in a go statement. 616 func Accept(lis net.Listener) { DefaultServer.Accept(lis) } 617 618 // Can connect to RPC service using HTTP CONNECT to rpcPath. 619 var connected = "200 Connected to Go RPC" 620 621 // ServeHTTP implements an http.Handler that answers RPC requests. 622 func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { 623 if req.Method != "CONNECT" { 624 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 625 w.WriteHeader(http.StatusMethodNotAllowed) 626 io.WriteString(w, "405 must CONNECT\n") 627 return 628 } 629 conn, _, err := w.(http.Hijacker).Hijack() 630 if err != nil { 631 log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error()) 632 return 633 } 634 io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n") 635 server.ServeConn(conn) 636 } 637 638 // HandleHTTP registers an HTTP handler for RPC messages on rpcPath, 639 // and a debugging handler on debugPath. 640 // It is still necessary to invoke http.Serve(), typically in a go statement. 641 func (server *Server) HandleHTTP(rpcPath, debugPath string) { 642 http.Handle(rpcPath, server) 643 http.Handle(debugPath, debugHTTP{server}) 644 } 645 646 // HandleHTTP registers an HTTP handler for RPC messages to DefaultServer 647 // on DefaultRPCPath and a debugging handler on DefaultDebugPath. 648 // It is still necessary to invoke http.Serve(), typically in a go statement. 649 func HandleHTTP() { 650 DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) 651 }