Source file src/pkg/net/fd.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 // +build darwin freebsd linux netbsd openbsd
6
7 package net
8
9 import (
10 "errors"
11 "io"
12 "os"
13 "sync"
14 "syscall"
15 "time"
16 )
17
18 // Network file descriptor.
19 type netFD struct {
20 // locking/lifetime of sysfd
21 sysmu sync.Mutex
22 sysref int
23
24 // must lock both sysmu and pollserver to write
25 // can lock either to read
26 closing bool
27
28 // immutable until Close
29 sysfd int
30 family int
31 sotype int
32 isConnected bool
33 sysfile *os.File
34 cr chan error
35 cw chan error
36 net string
37 laddr Addr
38 raddr Addr
39
40 // owned by client
41 rdeadline int64
42 rio sync.Mutex
43 wdeadline int64
44 wio sync.Mutex
45
46 // owned by fd wait server
47 ncr, ncw int
48 }
49
50 // A pollServer helps FDs determine when to retry a non-blocking
51 // read or write after they get EAGAIN. When an FD needs to wait,
52 // send the fd on s.cr (for a read) or s.cw (for a write) to pass the
53 // request to the poll server. Then receive on fd.cr/fd.cw.
54 // When the pollServer finds that i/o on FD should be possible
55 // again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
56 // This protocol is implemented as s.WaitRead() and s.WaitWrite().
57 //
58 // There is one subtlety: when sending on s.cr/s.cw, the
59 // poll server is probably in a system call, waiting for an fd
60 // to become ready. It's not looking at the request channels.
61 // To resolve this, the poll server waits not just on the FDs it has
62 // been given but also its own pipe. After sending on the
63 // buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
64 // byte to the pipe, causing the pollServer's poll system call to
65 // return. In response to the pipe being readable, the pollServer
66 // re-polls its request channels.
67 //
68 // Note that the ordering is "send request" and then "wake up server".
69 // If the operations were reversed, there would be a race: the poll
70 // server might wake up and look at the request channel, see that it
71 // was empty, and go back to sleep, all before the requester managed
72 // to send the request. Because the send must complete before the wakeup,
73 // the request channel must be buffered. A buffer of size 1 is sufficient
74 // for any request load. If many processes are trying to submit requests,
75 // one will succeed, the pollServer will read the request, and then the
76 // channel will be empty for the next process's request. A larger buffer
77 // might help batch requests.
78 //
79 // To avoid races in closing, all fd operations are locked and
80 // refcounted. when netFD.Close() is called, it calls syscall.Shutdown
81 // and sets a closing flag. Only when the last reference is removed
82 // will the fd be closed.
83
84 type pollServer struct {
85 cr, cw chan *netFD // buffered >= 1
86 pr, pw *os.File
87 poll *pollster // low-level OS hooks
88 sync.Mutex // controls pending and deadline
89 pending map[int]*netFD
90 deadline int64 // next deadline (nsec since 1970)
91 }
92
93 func (s *pollServer) AddFD(fd *netFD, mode int) error {
94 s.Lock()
95 intfd := fd.sysfd
96 if intfd < 0 || fd.closing {
97 // fd closed underfoot
98 s.Unlock()
99 return errClosing
100 }
101
102 var t int64
103 key := intfd << 1
104 if mode == 'r' {
105 fd.ncr++
106 t = fd.rdeadline
107 } else {
108 fd.ncw++
109 key++
110 t = fd.wdeadline
111 }
112 s.pending[key] = fd
113 doWakeup := false
114 if t > 0 && (s.deadline == 0 || t < s.deadline) {
115 s.deadline = t
116 doWakeup = true
117 }
118
119 wake, err := s.poll.AddFD(intfd, mode, false)
120 if err != nil {
121 panic("pollServer AddFD " + err.Error())
122 }
123 if wake {
124 doWakeup = true
125 }
126 s.Unlock()
127
128 if doWakeup {
129 s.Wakeup()
130 }
131 return nil
132 }
133
134 // Evict evicts fd from the pending list, unblocking
135 // any I/O running on fd. The caller must have locked
136 // pollserver.
137 func (s *pollServer) Evict(fd *netFD) {
138 if s.pending[fd.sysfd<<1] == fd {
139 s.WakeFD(fd, 'r', errClosing)
140 s.poll.DelFD(fd.sysfd, 'r')
141 delete(s.pending, fd.sysfd<<1)
142 }
143 if s.pending[fd.sysfd<<1|1] == fd {
144 s.WakeFD(fd, 'w', errClosing)
145 s.poll.DelFD(fd.sysfd, 'w')
146 delete(s.pending, fd.sysfd<<1|1)
147 }
148 }
149
150 var wakeupbuf [1]byte
151
152 func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
153
154 func (s *pollServer) LookupFD(fd int, mode int) *netFD {
155 key := fd << 1
156 if mode == 'w' {
157 key++
158 }
159 netfd, ok := s.pending[key]
160 if !ok {
161 return nil
162 }
163 delete(s.pending, key)
164 return netfd
165 }
166
167 func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
168 if mode == 'r' {
169 for fd.ncr > 0 {
170 fd.ncr--
171 fd.cr <- err
172 }
173 } else {
174 for fd.ncw > 0 {
175 fd.ncw--
176 fd.cw <- err
177 }
178 }
179 }
180
181 func (s *pollServer) Now() int64 {
182 return time.Now().UnixNano()
183 }
184
185 func (s *pollServer) CheckDeadlines() {
186 now := s.Now()
187 // TODO(rsc): This will need to be handled more efficiently,
188 // probably with a heap indexed by wakeup time.
189
190 var next_deadline int64
191 for key, fd := range s.pending {
192 var t int64
193 var mode int
194 if key&1 == 0 {
195 mode = 'r'
196 } else {
197 mode = 'w'
198 }
199 if mode == 'r' {
200 t = fd.rdeadline
201 } else {
202 t = fd.wdeadline
203 }
204 if t > 0 {
205 if t <= now {
206 delete(s.pending, key)
207 if mode == 'r' {
208 s.poll.DelFD(fd.sysfd, mode)
209 fd.rdeadline = -1
210 } else {
211 s.poll.DelFD(fd.sysfd, mode)
212 fd.wdeadline = -1
213 }
214 s.WakeFD(fd, mode, nil)
215 } else if next_deadline == 0 || t < next_deadline {
216 next_deadline = t
217 }
218 }
219 }
220 s.deadline = next_deadline
221 }
222
223 func (s *pollServer) Run() {
224 var scratch [100]byte
225 s.Lock()
226 defer s.Unlock()
227 for {
228 var t = s.deadline
229 if t > 0 {
230 t = t - s.Now()
231 if t <= 0 {
232 s.CheckDeadlines()
233 continue
234 }
235 }
236 fd, mode, err := s.poll.WaitFD(s, t)
237 if err != nil {
238 print("pollServer WaitFD: ", err.Error(), "\n")
239 return
240 }
241 if fd < 0 {
242 // Timeout happened.
243 s.CheckDeadlines()
244 continue
245 }
246 if fd == int(s.pr.Fd()) {
247 // Drain our wakeup pipe (we could loop here,
248 // but it's unlikely that there are more than
249 // len(scratch) wakeup calls).
250 s.pr.Read(scratch[0:])
251 s.CheckDeadlines()
252 } else {
253 netfd := s.LookupFD(fd, mode)
254 if netfd == nil {
255 // This can happen because the WaitFD runs without
256 // holding s's lock, so there might be a pending wakeup
257 // for an fd that has been evicted. No harm done.
258 continue
259 }
260 s.WakeFD(netfd, mode, nil)
261 }
262 }
263 }
264
265 func (s *pollServer) WaitRead(fd *netFD) error {
266 err := s.AddFD(fd, 'r')
267 if err == nil {
268 err = <-fd.cr
269 }
270 return err
271 }
272
273 func (s *pollServer) WaitWrite(fd *netFD) error {
274 err := s.AddFD(fd, 'w')
275 if err == nil {
276 err = <-fd.cw
277 }
278 return err
279 }
280
281 // Network FD methods.
282 // All the network FDs use a single pollServer.
283
284 var pollserver *pollServer
285 var onceStartServer sync.Once
286
287 func startServer() {
288 p, err := newPollServer()
289 if err != nil {
290 print("Start pollServer: ", err.Error(), "\n")
291 }
292 pollserver = p
293 }
294
295 func newFD(fd, family, sotype int, net string) (*netFD, error) {
296 onceStartServer.Do(startServer)
297 if err := syscall.SetNonblock(fd, true); err != nil {
298 return nil, err
299 }
300 netfd := &netFD{
301 sysfd: fd,
302 family: family,
303 sotype: sotype,
304 net: net,
305 }
306 netfd.cr = make(chan error, 1)
307 netfd.cw = make(chan error, 1)
308 return netfd, nil
309 }
310
311 func (fd *netFD) setAddr(laddr, raddr Addr) {
312 fd.laddr = laddr
313 fd.raddr = raddr
314 var ls, rs string
315 if laddr != nil {
316 ls = laddr.String()
317 }
318 if raddr != nil {
319 rs = raddr.String()
320 }
321 fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net+":"+ls+"->"+rs)
322 }
323
324 func (fd *netFD) connect(ra syscall.Sockaddr) error {
325 err := syscall.Connect(fd.sysfd, ra)
326 if err == syscall.EINPROGRESS {
327 if err = pollserver.WaitWrite(fd); err != nil {
328 return err
329 }
330 var e int
331 e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
332 if err != nil {
333 return os.NewSyscallError("getsockopt", err)
334 }
335 if e != 0 {
336 err = syscall.Errno(e)
337 }
338 }
339 return err
340 }
341
342 var errClosing = errors.New("use of closed network connection")
343
344 // Add a reference to this fd.
345 // If closing==true, pollserver must be locked; mark the fd as closing.
346 // Returns an error if the fd cannot be used.
347 func (fd *netFD) incref(closing bool) error {
348 if fd == nil {
349 return errClosing
350 }
351 fd.sysmu.Lock()
352 if fd.closing {
353 fd.sysmu.Unlock()
354 return errClosing
355 }
356 fd.sysref++
357 if closing {
358 fd.closing = true
359 }
360 fd.sysmu.Unlock()
361 return nil
362 }
363
364 // Remove a reference to this FD and close if we've been asked to do so (and
365 // there are no references left.
366 func (fd *netFD) decref() {
367 if fd == nil {
368 return
369 }
370 fd.sysmu.Lock()
371 fd.sysref--
372 if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
373 fd.sysfile.Close()
374 fd.sysfile = nil
375 fd.sysfd = -1
376 }
377 fd.sysmu.Unlock()
378 }
379
380 func (fd *netFD) Close() error {
381 pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict
382 defer pollserver.Unlock()
383 if err := fd.incref(true); err != nil {
384 return err
385 }
386 // Unblock any I/O. Once it all unblocks and returns,
387 // so that it cannot be referring to fd.sysfd anymore,
388 // the final decref will close fd.sysfd. This should happen
389 // fairly quickly, since all the I/O is non-blocking, and any
390 // attempts to block in the pollserver will return errClosing.
391 pollserver.Evict(fd)
392 fd.decref()
393 return nil
394 }
395
396 func (fd *netFD) shutdown(how int) error {
397 if err := fd.incref(false); err != nil {
398 return err
399 }
400 defer fd.decref()
401 err := syscall.Shutdown(fd.sysfd, how)
402 if err != nil {
403 return &OpError{"shutdown", fd.net, fd.laddr, err}
404 }
405 return nil
406 }
407
408 func (fd *netFD) CloseRead() error {
409 return fd.shutdown(syscall.SHUT_RD)
410 }
411
412 func (fd *netFD) CloseWrite() error {
413 return fd.shutdown(syscall.SHUT_WR)
414 }
415
416 func (fd *netFD) Read(p []byte) (n int, err error) {
417 fd.rio.Lock()
418 defer fd.rio.Unlock()
419 if err := fd.incref(false); err != nil {
420 return 0, err
421 }
422 defer fd.decref()
423 for {
424 n, err = syscall.Read(int(fd.sysfd), p)
425 if err == syscall.EAGAIN {
426 err = errTimeout
427 if fd.rdeadline >= 0 {
428 if err = pollserver.WaitRead(fd); err == nil {
429 continue
430 }
431 }
432 }
433 if err != nil {
434 n = 0
435 } else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM {
436 err = io.EOF
437 }
438 break
439 }
440 if err != nil && err != io.EOF {
441 err = &OpError{"read", fd.net, fd.raddr, err}
442 }
443 return
444 }
445
446 func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
447 fd.rio.Lock()
448 defer fd.rio.Unlock()
449 if err := fd.incref(false); err != nil {
450 return 0, nil, err
451 }
452 defer fd.decref()
453 for {
454 n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
455 if err == syscall.EAGAIN {
456 err = errTimeout
457 if fd.rdeadline >= 0 {
458 if err = pollserver.WaitRead(fd); err == nil {
459 continue
460 }
461 }
462 }
463 if err != nil {
464 n = 0
465 }
466 break
467 }
468 if err != nil && err != io.EOF {
469 err = &OpError{"read", fd.net, fd.laddr, err}
470 }
471 return
472 }
473
474 func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
475 fd.rio.Lock()
476 defer fd.rio.Unlock()
477 if err := fd.incref(false); err != nil {
478 return 0, 0, 0, nil, err
479 }
480 defer fd.decref()
481 for {
482 n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
483 if err == syscall.EAGAIN {
484 err = errTimeout
485 if fd.rdeadline >= 0 {
486 if err = pollserver.WaitRead(fd); err == nil {
487 continue
488 }
489 }
490 }
491 if err == nil && n == 0 {
492 err = io.EOF
493 }
494 break
495 }
496 if err != nil && err != io.EOF {
497 err = &OpError{"read", fd.net, fd.laddr, err}
498 return
499 }
500 return
501 }
502
503 func (fd *netFD) Write(p []byte) (int, error) {
504 fd.wio.Lock()
505 defer fd.wio.Unlock()
506 if err := fd.incref(false); err != nil {
507 return 0, err
508 }
509 defer fd.decref()
510 if fd.sysfile == nil {
511 return 0, syscall.EINVAL
512 }
513
514 var err error
515 nn := 0
516 for {
517 var n int
518 n, err = syscall.Write(int(fd.sysfd), p[nn:])
519 if n > 0 {
520 nn += n
521 }
522 if nn == len(p) {
523 break
524 }
525 if err == syscall.EAGAIN {
526 err = errTimeout
527 if fd.wdeadline >= 0 {
528 if err = pollserver.WaitWrite(fd); err == nil {
529 continue
530 }
531 }
532 }
533 if err != nil {
534 n = 0
535 break
536 }
537 if n == 0 {
538 err = io.ErrUnexpectedEOF
539 break
540 }
541 }
542 if err != nil {
543 err = &OpError{"write", fd.net, fd.raddr, err}
544 }
545 return nn, err
546 }
547
548 func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
549 fd.wio.Lock()
550 defer fd.wio.Unlock()
551 if err := fd.incref(false); err != nil {
552 return 0, err
553 }
554 defer fd.decref()
555 for {
556 err = syscall.Sendto(fd.sysfd, p, 0, sa)
557 if err == syscall.EAGAIN {
558 err = errTimeout
559 if fd.wdeadline >= 0 {
560 if err = pollserver.WaitWrite(fd); err == nil {
561 continue
562 }
563 }
564 }
565 break
566 }
567 if err == nil {
568 n = len(p)
569 } else {
570 err = &OpError{"write", fd.net, fd.raddr, err}
571 }
572 return
573 }
574
575 func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
576 fd.wio.Lock()
577 defer fd.wio.Unlock()
578 if err := fd.incref(false); err != nil {
579 return 0, 0, err
580 }
581 defer fd.decref()
582 for {
583 err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
584 if err == syscall.EAGAIN {
585 err = errTimeout
586 if fd.wdeadline >= 0 {
587 if err = pollserver.WaitWrite(fd); err == nil {
588 continue
589 }
590 }
591 }
592 break
593 }
594 if err == nil {
595 n = len(p)
596 oobn = len(oob)
597 } else {
598 err = &OpError{"write", fd.net, fd.raddr, err}
599 }
600 return
601 }
602
603 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
604 if err := fd.incref(false); err != nil {
605 return nil, err
606 }
607 defer fd.decref()
608
609 // See ../syscall/exec.go for description of ForkLock.
610 // It is okay to hold the lock across syscall.Accept
611 // because we have put fd.sysfd into non-blocking mode.
612 var s int
613 var rsa syscall.Sockaddr
614 for {
615 syscall.ForkLock.RLock()
616 s, rsa, err = syscall.Accept(fd.sysfd)
617 if err != nil {
618 syscall.ForkLock.RUnlock()
619 if err == syscall.EAGAIN {
620 err = errTimeout
621 if fd.rdeadline >= 0 {
622 if err = pollserver.WaitRead(fd); err == nil {
623 continue
624 }
625 }
626 } else if err == syscall.ECONNABORTED {
627 // This means that a socket on the listen queue was closed
628 // before we Accept()ed it; it's a silly error, so try again.
629 continue
630 }
631 return nil, &OpError{"accept", fd.net, fd.laddr, err}
632 }
633 break
634 }
635 syscall.CloseOnExec(s)
636 syscall.ForkLock.RUnlock()
637
638 if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
639 syscall.Close(s)
640 return nil, err
641 }
642 lsa, _ := syscall.Getsockname(netfd.sysfd)
643 netfd.setAddr(toAddr(lsa), toAddr(rsa))
644 return netfd, nil
645 }
646
647 func (fd *netFD) dup() (f *os.File, err error) {
648 ns, err := syscall.Dup(fd.sysfd)
649 if err != nil {
650 return nil, &OpError{"dup", fd.net, fd.laddr, err}
651 }
652
653 // We want blocking mode for the new fd, hence the double negative.
654 if err = syscall.SetNonblock(ns, false); err != nil {
655 return nil, &OpError{"setnonblock", fd.net, fd.laddr, err}
656 }
657
658 return os.NewFile(uintptr(ns), fd.sysfile.Name()), nil
659 }
660
661 func closesocket(s int) error {
662 return syscall.Close(s)
663 }