Source file src/pkg/net/fd.go
1 // Copyright 2009 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 // +build darwin freebsd linux netbsd openbsd 6 7 package net 8 9 import ( 10 "errors" 11 "io" 12 "os" 13 "sync" 14 "syscall" 15 "time" 16 ) 17 18 // Network file descriptor. 19 type netFD struct { 20 // locking/lifetime of sysfd 21 sysmu sync.Mutex 22 sysref int 23 24 // must lock both sysmu and pollserver to write 25 // can lock either to read 26 closing bool 27 28 // immutable until Close 29 sysfd int 30 family int 31 sotype int 32 isConnected bool 33 sysfile *os.File 34 cr chan error 35 cw chan error 36 net string 37 laddr Addr 38 raddr Addr 39 40 // owned by client 41 rdeadline int64 42 rio sync.Mutex 43 wdeadline int64 44 wio sync.Mutex 45 46 // owned by fd wait server 47 ncr, ncw int 48 } 49 50 // A pollServer helps FDs determine when to retry a non-blocking 51 // read or write after they get EAGAIN. When an FD needs to wait, 52 // send the fd on s.cr (for a read) or s.cw (for a write) to pass the 53 // request to the poll server. Then receive on fd.cr/fd.cw. 54 // When the pollServer finds that i/o on FD should be possible 55 // again, it will send fd on fd.cr/fd.cw to wake any waiting processes. 56 // This protocol is implemented as s.WaitRead() and s.WaitWrite(). 57 // 58 // There is one subtlety: when sending on s.cr/s.cw, the 59 // poll server is probably in a system call, waiting for an fd 60 // to become ready. It's not looking at the request channels. 61 // To resolve this, the poll server waits not just on the FDs it has 62 // been given but also its own pipe. After sending on the 63 // buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a 64 // byte to the pipe, causing the pollServer's poll system call to 65 // return. In response to the pipe being readable, the pollServer 66 // re-polls its request channels. 67 // 68 // Note that the ordering is "send request" and then "wake up server". 69 // If the operations were reversed, there would be a race: the poll 70 // server might wake up and look at the request channel, see that it 71 // was empty, and go back to sleep, all before the requester managed 72 // to send the request. Because the send must complete before the wakeup, 73 // the request channel must be buffered. A buffer of size 1 is sufficient 74 // for any request load. If many processes are trying to submit requests, 75 // one will succeed, the pollServer will read the request, and then the 76 // channel will be empty for the next process's request. A larger buffer 77 // might help batch requests. 78 // 79 // To avoid races in closing, all fd operations are locked and 80 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown 81 // and sets a closing flag. Only when the last reference is removed 82 // will the fd be closed. 83 84 type pollServer struct { 85 cr, cw chan *netFD // buffered >= 1 86 pr, pw *os.File 87 poll *pollster // low-level OS hooks 88 sync.Mutex // controls pending and deadline 89 pending map[int]*netFD 90 deadline int64 // next deadline (nsec since 1970) 91 } 92 93 func (s *pollServer) AddFD(fd *netFD, mode int) error { 94 s.Lock() 95 intfd := fd.sysfd 96 if intfd < 0 || fd.closing { 97 // fd closed underfoot 98 s.Unlock() 99 return errClosing 100 } 101 102 var t int64 103 key := intfd << 1 104 if mode == 'r' { 105 fd.ncr++ 106 t = fd.rdeadline 107 } else { 108 fd.ncw++ 109 key++ 110 t = fd.wdeadline 111 } 112 s.pending[key] = fd 113 doWakeup := false 114 if t > 0 && (s.deadline == 0 || t < s.deadline) { 115 s.deadline = t 116 doWakeup = true 117 } 118 119 wake, err := s.poll.AddFD(intfd, mode, false) 120 if err != nil { 121 panic("pollServer AddFD " + err.Error()) 122 } 123 if wake { 124 doWakeup = true 125 } 126 s.Unlock() 127 128 if doWakeup { 129 s.Wakeup() 130 } 131 return nil 132 } 133 134 // Evict evicts fd from the pending list, unblocking 135 // any I/O running on fd. The caller must have locked 136 // pollserver. 137 func (s *pollServer) Evict(fd *netFD) { 138 if s.pending[fd.sysfd<<1] == fd { 139 s.WakeFD(fd, 'r', errClosing) 140 s.poll.DelFD(fd.sysfd, 'r') 141 delete(s.pending, fd.sysfd<<1) 142 } 143 if s.pending[fd.sysfd<<1|1] == fd { 144 s.WakeFD(fd, 'w', errClosing) 145 s.poll.DelFD(fd.sysfd, 'w') 146 delete(s.pending, fd.sysfd<<1|1) 147 } 148 } 149 150 var wakeupbuf [1]byte 151 152 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) } 153 154 func (s *pollServer) LookupFD(fd int, mode int) *netFD { 155 key := fd << 1 156 if mode == 'w' { 157 key++ 158 } 159 netfd, ok := s.pending[key] 160 if !ok { 161 return nil 162 } 163 delete(s.pending, key) 164 return netfd 165 } 166 167 func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { 168 if mode == 'r' { 169 for fd.ncr > 0 { 170 fd.ncr-- 171 fd.cr <- err 172 } 173 } else { 174 for fd.ncw > 0 { 175 fd.ncw-- 176 fd.cw <- err 177 } 178 } 179 } 180 181 func (s *pollServer) Now() int64 { 182 return time.Now().UnixNano() 183 } 184 185 func (s *pollServer) CheckDeadlines() { 186 now := s.Now() 187 // TODO(rsc): This will need to be handled more efficiently, 188 // probably with a heap indexed by wakeup time. 189 190 var next_deadline int64 191 for key, fd := range s.pending { 192 var t int64 193 var mode int 194 if key&1 == 0 { 195 mode = 'r' 196 } else { 197 mode = 'w' 198 } 199 if mode == 'r' { 200 t = fd.rdeadline 201 } else { 202 t = fd.wdeadline 203 } 204 if t > 0 { 205 if t <= now { 206 delete(s.pending, key) 207 if mode == 'r' { 208 s.poll.DelFD(fd.sysfd, mode) 209 fd.rdeadline = -1 210 } else { 211 s.poll.DelFD(fd.sysfd, mode) 212 fd.wdeadline = -1 213 } 214 s.WakeFD(fd, mode, nil) 215 } else if next_deadline == 0 || t < next_deadline { 216 next_deadline = t 217 } 218 } 219 } 220 s.deadline = next_deadline 221 } 222 223 func (s *pollServer) Run() { 224 var scratch [100]byte 225 s.Lock() 226 defer s.Unlock() 227 for { 228 var t = s.deadline 229 if t > 0 { 230 t = t - s.Now() 231 if t <= 0 { 232 s.CheckDeadlines() 233 continue 234 } 235 } 236 fd, mode, err := s.poll.WaitFD(s, t) 237 if err != nil { 238 print("pollServer WaitFD: ", err.Error(), "\n") 239 return 240 } 241 if fd < 0 { 242 // Timeout happened. 243 s.CheckDeadlines() 244 continue 245 } 246 if fd == int(s.pr.Fd()) { 247 // Drain our wakeup pipe (we could loop here, 248 // but it's unlikely that there are more than 249 // len(scratch) wakeup calls). 250 s.pr.Read(scratch[0:]) 251 s.CheckDeadlines() 252 } else { 253 netfd := s.LookupFD(fd, mode) 254 if netfd == nil { 255 // This can happen because the WaitFD runs without 256 // holding s's lock, so there might be a pending wakeup 257 // for an fd that has been evicted. No harm done. 258 continue 259 } 260 s.WakeFD(netfd, mode, nil) 261 } 262 } 263 } 264 265 func (s *pollServer) WaitRead(fd *netFD) error { 266 err := s.AddFD(fd, 'r') 267 if err == nil { 268 err = <-fd.cr 269 } 270 return err 271 } 272 273 func (s *pollServer) WaitWrite(fd *netFD) error { 274 err := s.AddFD(fd, 'w') 275 if err == nil { 276 err = <-fd.cw 277 } 278 return err 279 } 280 281 // Network FD methods. 282 // All the network FDs use a single pollServer. 283 284 var pollserver *pollServer 285 var onceStartServer sync.Once 286 287 func startServer() { 288 p, err := newPollServer() 289 if err != nil { 290 print("Start pollServer: ", err.Error(), "\n") 291 } 292 pollserver = p 293 } 294 295 func newFD(fd, family, sotype int, net string) (*netFD, error) { 296 onceStartServer.Do(startServer) 297 if err := syscall.SetNonblock(fd, true); err != nil { 298 return nil, err 299 } 300 netfd := &netFD{ 301 sysfd: fd, 302 family: family, 303 sotype: sotype, 304 net: net, 305 } 306 netfd.cr = make(chan error, 1) 307 netfd.cw = make(chan error, 1) 308 return netfd, nil 309 } 310 311 func (fd *netFD) setAddr(laddr, raddr Addr) { 312 fd.laddr = laddr 313 fd.raddr = raddr 314 var ls, rs string 315 if laddr != nil { 316 ls = laddr.String() 317 } 318 if raddr != nil { 319 rs = raddr.String() 320 } 321 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net+":"+ls+"->"+rs) 322 } 323 324 func (fd *netFD) connect(ra syscall.Sockaddr) error { 325 err := syscall.Connect(fd.sysfd, ra) 326 if err == syscall.EINPROGRESS { 327 if err = pollserver.WaitWrite(fd); err != nil { 328 return err 329 } 330 var e int 331 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) 332 if err != nil { 333 return os.NewSyscallError("getsockopt", err) 334 } 335 if e != 0 { 336 err = syscall.Errno(e) 337 } 338 } 339 return err 340 } 341 342 var errClosing = errors.New("use of closed network connection") 343 344 // Add a reference to this fd. 345 // If closing==true, pollserver must be locked; mark the fd as closing. 346 // Returns an error if the fd cannot be used. 347 func (fd *netFD) incref(closing bool) error { 348 if fd == nil { 349 return errClosing 350 } 351 fd.sysmu.Lock() 352 if fd.closing { 353 fd.sysmu.Unlock() 354 return errClosing 355 } 356 fd.sysref++ 357 if closing { 358 fd.closing = true 359 } 360 fd.sysmu.Unlock() 361 return nil 362 } 363 364 // Remove a reference to this FD and close if we've been asked to do so (and 365 // there are no references left. 366 func (fd *netFD) decref() { 367 if fd == nil { 368 return 369 } 370 fd.sysmu.Lock() 371 fd.sysref-- 372 if fd.closing && fd.sysref == 0 && fd.sysfile != nil { 373 fd.sysfile.Close() 374 fd.sysfile = nil 375 fd.sysfd = -1 376 } 377 fd.sysmu.Unlock() 378 } 379 380 func (fd *netFD) Close() error { 381 pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict 382 defer pollserver.Unlock() 383 if err := fd.incref(true); err != nil { 384 return err 385 } 386 // Unblock any I/O. Once it all unblocks and returns, 387 // so that it cannot be referring to fd.sysfd anymore, 388 // the final decref will close fd.sysfd. This should happen 389 // fairly quickly, since all the I/O is non-blocking, and any 390 // attempts to block in the pollserver will return errClosing. 391 pollserver.Evict(fd) 392 fd.decref() 393 return nil 394 } 395 396 func (fd *netFD) shutdown(how int) error { 397 if err := fd.incref(false); err != nil { 398 return err 399 } 400 defer fd.decref() 401 err := syscall.Shutdown(fd.sysfd, how) 402 if err != nil { 403 return &OpError{"shutdown", fd.net, fd.laddr, err} 404 } 405 return nil 406 } 407 408 func (fd *netFD) CloseRead() error { 409 return fd.shutdown(syscall.SHUT_RD) 410 } 411 412 func (fd *netFD) CloseWrite() error { 413 return fd.shutdown(syscall.SHUT_WR) 414 } 415 416 func (fd *netFD) Read(p []byte) (n int, err error) { 417 fd.rio.Lock() 418 defer fd.rio.Unlock() 419 if err := fd.incref(false); err != nil { 420 return 0, err 421 } 422 defer fd.decref() 423 for { 424 n, err = syscall.Read(int(fd.sysfd), p) 425 if err == syscall.EAGAIN { 426 err = errTimeout 427 if fd.rdeadline >= 0 { 428 if err = pollserver.WaitRead(fd); err == nil { 429 continue 430 } 431 } 432 } 433 if err != nil { 434 n = 0 435 } else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM { 436 err = io.EOF 437 } 438 break 439 } 440 if err != nil && err != io.EOF { 441 err = &OpError{"read", fd.net, fd.raddr, err} 442 } 443 return 444 } 445 446 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { 447 fd.rio.Lock() 448 defer fd.rio.Unlock() 449 if err := fd.incref(false); err != nil { 450 return 0, nil, err 451 } 452 defer fd.decref() 453 for { 454 n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0) 455 if err == syscall.EAGAIN { 456 err = errTimeout 457 if fd.rdeadline >= 0 { 458 if err = pollserver.WaitRead(fd); err == nil { 459 continue 460 } 461 } 462 } 463 if err != nil { 464 n = 0 465 } 466 break 467 } 468 if err != nil && err != io.EOF { 469 err = &OpError{"read", fd.net, fd.laddr, err} 470 } 471 return 472 } 473 474 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { 475 fd.rio.Lock() 476 defer fd.rio.Unlock() 477 if err := fd.incref(false); err != nil { 478 return 0, 0, 0, nil, err 479 } 480 defer fd.decref() 481 for { 482 n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0) 483 if err == syscall.EAGAIN { 484 err = errTimeout 485 if fd.rdeadline >= 0 { 486 if err = pollserver.WaitRead(fd); err == nil { 487 continue 488 } 489 } 490 } 491 if err == nil && n == 0 { 492 err = io.EOF 493 } 494 break 495 } 496 if err != nil && err != io.EOF { 497 err = &OpError{"read", fd.net, fd.laddr, err} 498 return 499 } 500 return 501 } 502 503 func (fd *netFD) Write(p []byte) (int, error) { 504 fd.wio.Lock() 505 defer fd.wio.Unlock() 506 if err := fd.incref(false); err != nil { 507 return 0, err 508 } 509 defer fd.decref() 510 if fd.sysfile == nil { 511 return 0, syscall.EINVAL 512 } 513 514 var err error 515 nn := 0 516 for { 517 var n int 518 n, err = syscall.Write(int(fd.sysfd), p[nn:]) 519 if n > 0 { 520 nn += n 521 } 522 if nn == len(p) { 523 break 524 } 525 if err == syscall.EAGAIN { 526 err = errTimeout 527 if fd.wdeadline >= 0 { 528 if err = pollserver.WaitWrite(fd); err == nil { 529 continue 530 } 531 } 532 } 533 if err != nil { 534 n = 0 535 break 536 } 537 if n == 0 { 538 err = io.ErrUnexpectedEOF 539 break 540 } 541 } 542 if err != nil { 543 err = &OpError{"write", fd.net, fd.raddr, err} 544 } 545 return nn, err 546 } 547 548 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { 549 fd.wio.Lock() 550 defer fd.wio.Unlock() 551 if err := fd.incref(false); err != nil { 552 return 0, err 553 } 554 defer fd.decref() 555 for { 556 err = syscall.Sendto(fd.sysfd, p, 0, sa) 557 if err == syscall.EAGAIN { 558 err = errTimeout 559 if fd.wdeadline >= 0 { 560 if err = pollserver.WaitWrite(fd); err == nil { 561 continue 562 } 563 } 564 } 565 break 566 } 567 if err == nil { 568 n = len(p) 569 } else { 570 err = &OpError{"write", fd.net, fd.raddr, err} 571 } 572 return 573 } 574 575 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { 576 fd.wio.Lock() 577 defer fd.wio.Unlock() 578 if err := fd.incref(false); err != nil { 579 return 0, 0, err 580 } 581 defer fd.decref() 582 for { 583 err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) 584 if err == syscall.EAGAIN { 585 err = errTimeout 586 if fd.wdeadline >= 0 { 587 if err = pollserver.WaitWrite(fd); err == nil { 588 continue 589 } 590 } 591 } 592 break 593 } 594 if err == nil { 595 n = len(p) 596 oobn = len(oob) 597 } else { 598 err = &OpError{"write", fd.net, fd.raddr, err} 599 } 600 return 601 } 602 603 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) { 604 if err := fd.incref(false); err != nil { 605 return nil, err 606 } 607 defer fd.decref() 608 609 // See ../syscall/exec.go for description of ForkLock. 610 // It is okay to hold the lock across syscall.Accept 611 // because we have put fd.sysfd into non-blocking mode. 612 var s int 613 var rsa syscall.Sockaddr 614 for { 615 syscall.ForkLock.RLock() 616 s, rsa, err = syscall.Accept(fd.sysfd) 617 if err != nil { 618 syscall.ForkLock.RUnlock() 619 if err == syscall.EAGAIN { 620 err = errTimeout 621 if fd.rdeadline >= 0 { 622 if err = pollserver.WaitRead(fd); err == nil { 623 continue 624 } 625 } 626 } else if err == syscall.ECONNABORTED { 627 // This means that a socket on the listen queue was closed 628 // before we Accept()ed it; it's a silly error, so try again. 629 continue 630 } 631 return nil, &OpError{"accept", fd.net, fd.laddr, err} 632 } 633 break 634 } 635 syscall.CloseOnExec(s) 636 syscall.ForkLock.RUnlock() 637 638 if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil { 639 syscall.Close(s) 640 return nil, err 641 } 642 lsa, _ := syscall.Getsockname(netfd.sysfd) 643 netfd.setAddr(toAddr(lsa), toAddr(rsa)) 644 return netfd, nil 645 } 646 647 func (fd *netFD) dup() (f *os.File, err error) { 648 ns, err := syscall.Dup(fd.sysfd) 649 if err != nil { 650 return nil, &OpError{"dup", fd.net, fd.laddr, err} 651 } 652 653 // We want blocking mode for the new fd, hence the double negative. 654 if err = syscall.SetNonblock(ns, false); err != nil { 655 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err} 656 } 657 658 return os.NewFile(uintptr(ns), fd.sysfile.Name()), nil 659 } 660 661 func closesocket(s int) error { 662 return syscall.Close(s) 663 }