src/pkg/net/fd.go - The Go Programming Language

Golang

Source file src/pkg/net/fd.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// +build darwin freebsd linux netbsd openbsd
     6	
     7	package net
     8	
     9	import (
    10		"errors"
    11		"io"
    12		"os"
    13		"sync"
    14		"syscall"
    15		"time"
    16	)
    17	
    18	// Network file descriptor.
    19	type netFD struct {
    20		// locking/lifetime of sysfd
    21		sysmu  sync.Mutex
    22		sysref int
    23	
    24		// must lock both sysmu and pollserver to write
    25		// can lock either to read
    26		closing bool
    27	
    28		// immutable until Close
    29		sysfd       int
    30		family      int
    31		sotype      int
    32		isConnected bool
    33		sysfile     *os.File
    34		cr          chan error
    35		cw          chan error
    36		net         string
    37		laddr       Addr
    38		raddr       Addr
    39	
    40		// owned by client
    41		rdeadline int64
    42		rio       sync.Mutex
    43		wdeadline int64
    44		wio       sync.Mutex
    45	
    46		// owned by fd wait server
    47		ncr, ncw int
    48	}
    49	
    50	// A pollServer helps FDs determine when to retry a non-blocking
    51	// read or write after they get EAGAIN.  When an FD needs to wait,
    52	// send the fd on s.cr (for a read) or s.cw (for a write) to pass the
    53	// request to the poll server.  Then receive on fd.cr/fd.cw.
    54	// When the pollServer finds that i/o on FD should be possible
    55	// again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
    56	// This protocol is implemented as s.WaitRead() and s.WaitWrite().
    57	//
    58	// There is one subtlety: when sending on s.cr/s.cw, the
    59	// poll server is probably in a system call, waiting for an fd
    60	// to become ready.  It's not looking at the request channels.
    61	// To resolve this, the poll server waits not just on the FDs it has
    62	// been given but also its own pipe.  After sending on the
    63	// buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
    64	// byte to the pipe, causing the pollServer's poll system call to
    65	// return.  In response to the pipe being readable, the pollServer
    66	// re-polls its request channels.
    67	//
    68	// Note that the ordering is "send request" and then "wake up server".
    69	// If the operations were reversed, there would be a race: the poll
    70	// server might wake up and look at the request channel, see that it
    71	// was empty, and go back to sleep, all before the requester managed
    72	// to send the request.  Because the send must complete before the wakeup,
    73	// the request channel must be buffered.  A buffer of size 1 is sufficient
    74	// for any request load.  If many processes are trying to submit requests,
    75	// one will succeed, the pollServer will read the request, and then the
    76	// channel will be empty for the next process's request.  A larger buffer
    77	// might help batch requests.
    78	//
    79	// To avoid races in closing, all fd operations are locked and
    80	// refcounted. when netFD.Close() is called, it calls syscall.Shutdown
    81	// and sets a closing flag. Only when the last reference is removed
    82	// will the fd be closed.
    83	
    84	type pollServer struct {
    85		cr, cw     chan *netFD // buffered >= 1
    86		pr, pw     *os.File
    87		poll       *pollster // low-level OS hooks
    88		sync.Mutex           // controls pending and deadline
    89		pending    map[int]*netFD
    90		deadline   int64 // next deadline (nsec since 1970)
    91	}
    92	
    93	func (s *pollServer) AddFD(fd *netFD, mode int) error {
    94		s.Lock()
    95		intfd := fd.sysfd
    96		if intfd < 0 || fd.closing {
    97			// fd closed underfoot
    98			s.Unlock()
    99			return errClosing
   100		}
   101	
   102		var t int64
   103		key := intfd << 1
   104		if mode == 'r' {
   105			fd.ncr++
   106			t = fd.rdeadline
   107		} else {
   108			fd.ncw++
   109			key++
   110			t = fd.wdeadline
   111		}
   112		s.pending[key] = fd
   113		doWakeup := false
   114		if t > 0 && (s.deadline == 0 || t < s.deadline) {
   115			s.deadline = t
   116			doWakeup = true
   117		}
   118	
   119		wake, err := s.poll.AddFD(intfd, mode, false)
   120		if err != nil {
   121			panic("pollServer AddFD " + err.Error())
   122		}
   123		if wake {
   124			doWakeup = true
   125		}
   126		s.Unlock()
   127	
   128		if doWakeup {
   129			s.Wakeup()
   130		}
   131		return nil
   132	}
   133	
   134	// Evict evicts fd from the pending list, unblocking
   135	// any I/O running on fd.  The caller must have locked
   136	// pollserver.
   137	func (s *pollServer) Evict(fd *netFD) {
   138		if s.pending[fd.sysfd<<1] == fd {
   139			s.WakeFD(fd, 'r', errClosing)
   140			s.poll.DelFD(fd.sysfd, 'r')
   141			delete(s.pending, fd.sysfd<<1)
   142		}
   143		if s.pending[fd.sysfd<<1|1] == fd {
   144			s.WakeFD(fd, 'w', errClosing)
   145			s.poll.DelFD(fd.sysfd, 'w')
   146			delete(s.pending, fd.sysfd<<1|1)
   147		}
   148	}
   149	
   150	var wakeupbuf [1]byte
   151	
   152	func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
   153	
   154	func (s *pollServer) LookupFD(fd int, mode int) *netFD {
   155		key := fd << 1
   156		if mode == 'w' {
   157			key++
   158		}
   159		netfd, ok := s.pending[key]
   160		if !ok {
   161			return nil
   162		}
   163		delete(s.pending, key)
   164		return netfd
   165	}
   166	
   167	func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
   168		if mode == 'r' {
   169			for fd.ncr > 0 {
   170				fd.ncr--
   171				fd.cr <- err
   172			}
   173		} else {
   174			for fd.ncw > 0 {
   175				fd.ncw--
   176				fd.cw <- err
   177			}
   178		}
   179	}
   180	
   181	func (s *pollServer) Now() int64 {
   182		return time.Now().UnixNano()
   183	}
   184	
   185	func (s *pollServer) CheckDeadlines() {
   186		now := s.Now()
   187		// TODO(rsc): This will need to be handled more efficiently,
   188		// probably with a heap indexed by wakeup time.
   189	
   190		var next_deadline int64
   191		for key, fd := range s.pending {
   192			var t int64
   193			var mode int
   194			if key&1 == 0 {
   195				mode = 'r'
   196			} else {
   197				mode = 'w'
   198			}
   199			if mode == 'r' {
   200				t = fd.rdeadline
   201			} else {
   202				t = fd.wdeadline
   203			}
   204			if t > 0 {
   205				if t <= now {
   206					delete(s.pending, key)
   207					if mode == 'r' {
   208						s.poll.DelFD(fd.sysfd, mode)
   209						fd.rdeadline = -1
   210					} else {
   211						s.poll.DelFD(fd.sysfd, mode)
   212						fd.wdeadline = -1
   213					}
   214					s.WakeFD(fd, mode, nil)
   215				} else if next_deadline == 0 || t < next_deadline {
   216					next_deadline = t
   217				}
   218			}
   219		}
   220		s.deadline = next_deadline
   221	}
   222	
   223	func (s *pollServer) Run() {
   224		var scratch [100]byte
   225		s.Lock()
   226		defer s.Unlock()
   227		for {
   228			var t = s.deadline
   229			if t > 0 {
   230				t = t - s.Now()
   231				if t <= 0 {
   232					s.CheckDeadlines()
   233					continue
   234				}
   235			}
   236			fd, mode, err := s.poll.WaitFD(s, t)
   237			if err != nil {
   238				print("pollServer WaitFD: ", err.Error(), "\n")
   239				return
   240			}
   241			if fd < 0 {
   242				// Timeout happened.
   243				s.CheckDeadlines()
   244				continue
   245			}
   246			if fd == int(s.pr.Fd()) {
   247				// Drain our wakeup pipe (we could loop here,
   248				// but it's unlikely that there are more than
   249				// len(scratch) wakeup calls).
   250				s.pr.Read(scratch[0:])
   251				s.CheckDeadlines()
   252			} else {
   253				netfd := s.LookupFD(fd, mode)
   254				if netfd == nil {
   255					// This can happen because the WaitFD runs without
   256					// holding s's lock, so there might be a pending wakeup
   257					// for an fd that has been evicted.  No harm done.
   258					continue
   259				}
   260				s.WakeFD(netfd, mode, nil)
   261			}
   262		}
   263	}
   264	
   265	func (s *pollServer) WaitRead(fd *netFD) error {
   266		err := s.AddFD(fd, 'r')
   267		if err == nil {
   268			err = <-fd.cr
   269		}
   270		return err
   271	}
   272	
   273	func (s *pollServer) WaitWrite(fd *netFD) error {
   274		err := s.AddFD(fd, 'w')
   275		if err == nil {
   276			err = <-fd.cw
   277		}
   278		return err
   279	}
   280	
   281	// Network FD methods.
   282	// All the network FDs use a single pollServer.
   283	
   284	var pollserver *pollServer
   285	var onceStartServer sync.Once
   286	
   287	func startServer() {
   288		p, err := newPollServer()
   289		if err != nil {
   290			print("Start pollServer: ", err.Error(), "\n")
   291		}
   292		pollserver = p
   293	}
   294	
   295	func newFD(fd, family, sotype int, net string) (*netFD, error) {
   296		onceStartServer.Do(startServer)
   297		if err := syscall.SetNonblock(fd, true); err != nil {
   298			return nil, err
   299		}
   300		netfd := &netFD{
   301			sysfd:  fd,
   302			family: family,
   303			sotype: sotype,
   304			net:    net,
   305		}
   306		netfd.cr = make(chan error, 1)
   307		netfd.cw = make(chan error, 1)
   308		return netfd, nil
   309	}
   310	
   311	func (fd *netFD) setAddr(laddr, raddr Addr) {
   312		fd.laddr = laddr
   313		fd.raddr = raddr
   314		var ls, rs string
   315		if laddr != nil {
   316			ls = laddr.String()
   317		}
   318		if raddr != nil {
   319			rs = raddr.String()
   320		}
   321		fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net+":"+ls+"->"+rs)
   322	}
   323	
   324	func (fd *netFD) connect(ra syscall.Sockaddr) error {
   325		err := syscall.Connect(fd.sysfd, ra)
   326		if err == syscall.EINPROGRESS {
   327			if err = pollserver.WaitWrite(fd); err != nil {
   328				return err
   329			}
   330			var e int
   331			e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
   332			if err != nil {
   333				return os.NewSyscallError("getsockopt", err)
   334			}
   335			if e != 0 {
   336				err = syscall.Errno(e)
   337			}
   338		}
   339		return err
   340	}
   341	
   342	var errClosing = errors.New("use of closed network connection")
   343	
   344	// Add a reference to this fd.
   345	// If closing==true, pollserver must be locked; mark the fd as closing.
   346	// Returns an error if the fd cannot be used.
   347	func (fd *netFD) incref(closing bool) error {
   348		if fd == nil {
   349			return errClosing
   350		}
   351		fd.sysmu.Lock()
   352		if fd.closing {
   353			fd.sysmu.Unlock()
   354			return errClosing
   355		}
   356		fd.sysref++
   357		if closing {
   358			fd.closing = true
   359		}
   360		fd.sysmu.Unlock()
   361		return nil
   362	}
   363	
   364	// Remove a reference to this FD and close if we've been asked to do so (and
   365	// there are no references left.
   366	func (fd *netFD) decref() {
   367		if fd == nil {
   368			return
   369		}
   370		fd.sysmu.Lock()
   371		fd.sysref--
   372		if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
   373			fd.sysfile.Close()
   374			fd.sysfile = nil
   375			fd.sysfd = -1
   376		}
   377		fd.sysmu.Unlock()
   378	}
   379	
   380	func (fd *netFD) Close() error {
   381		pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict
   382		defer pollserver.Unlock()
   383		if err := fd.incref(true); err != nil {
   384			return err
   385		}
   386		// Unblock any I/O.  Once it all unblocks and returns,
   387		// so that it cannot be referring to fd.sysfd anymore,
   388		// the final decref will close fd.sysfd.  This should happen
   389		// fairly quickly, since all the I/O is non-blocking, and any
   390		// attempts to block in the pollserver will return errClosing.
   391		pollserver.Evict(fd)
   392		fd.decref()
   393		return nil
   394	}
   395	
   396	func (fd *netFD) shutdown(how int) error {
   397		if err := fd.incref(false); err != nil {
   398			return err
   399		}
   400		defer fd.decref()
   401		err := syscall.Shutdown(fd.sysfd, how)
   402		if err != nil {
   403			return &OpError{"shutdown", fd.net, fd.laddr, err}
   404		}
   405		return nil
   406	}
   407	
   408	func (fd *netFD) CloseRead() error {
   409		return fd.shutdown(syscall.SHUT_RD)
   410	}
   411	
   412	func (fd *netFD) CloseWrite() error {
   413		return fd.shutdown(syscall.SHUT_WR)
   414	}
   415	
   416	func (fd *netFD) Read(p []byte) (n int, err error) {
   417		fd.rio.Lock()
   418		defer fd.rio.Unlock()
   419		if err := fd.incref(false); err != nil {
   420			return 0, err
   421		}
   422		defer fd.decref()
   423		for {
   424			n, err = syscall.Read(int(fd.sysfd), p)
   425			if err == syscall.EAGAIN {
   426				err = errTimeout
   427				if fd.rdeadline >= 0 {
   428					if err = pollserver.WaitRead(fd); err == nil {
   429						continue
   430					}
   431				}
   432			}
   433			if err != nil {
   434				n = 0
   435			} else if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM {
   436				err = io.EOF
   437			}
   438			break
   439		}
   440		if err != nil && err != io.EOF {
   441			err = &OpError{"read", fd.net, fd.raddr, err}
   442		}
   443		return
   444	}
   445	
   446	func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
   447		fd.rio.Lock()
   448		defer fd.rio.Unlock()
   449		if err := fd.incref(false); err != nil {
   450			return 0, nil, err
   451		}
   452		defer fd.decref()
   453		for {
   454			n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
   455			if err == syscall.EAGAIN {
   456				err = errTimeout
   457				if fd.rdeadline >= 0 {
   458					if err = pollserver.WaitRead(fd); err == nil {
   459						continue
   460					}
   461				}
   462			}
   463			if err != nil {
   464				n = 0
   465			}
   466			break
   467		}
   468		if err != nil && err != io.EOF {
   469			err = &OpError{"read", fd.net, fd.laddr, err}
   470		}
   471		return
   472	}
   473	
   474	func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
   475		fd.rio.Lock()
   476		defer fd.rio.Unlock()
   477		if err := fd.incref(false); err != nil {
   478			return 0, 0, 0, nil, err
   479		}
   480		defer fd.decref()
   481		for {
   482			n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
   483			if err == syscall.EAGAIN {
   484				err = errTimeout
   485				if fd.rdeadline >= 0 {
   486					if err = pollserver.WaitRead(fd); err == nil {
   487						continue
   488					}
   489				}
   490			}
   491			if err == nil && n == 0 {
   492				err = io.EOF
   493			}
   494			break
   495		}
   496		if err != nil && err != io.EOF {
   497			err = &OpError{"read", fd.net, fd.laddr, err}
   498			return
   499		}
   500		return
   501	}
   502	
   503	func (fd *netFD) Write(p []byte) (int, error) {
   504		fd.wio.Lock()
   505		defer fd.wio.Unlock()
   506		if err := fd.incref(false); err != nil {
   507			return 0, err
   508		}
   509		defer fd.decref()
   510		if fd.sysfile == nil {
   511			return 0, syscall.EINVAL
   512		}
   513	
   514		var err error
   515		nn := 0
   516		for {
   517			var n int
   518			n, err = syscall.Write(int(fd.sysfd), p[nn:])
   519			if n > 0 {
   520				nn += n
   521			}
   522			if nn == len(p) {
   523				break
   524			}
   525			if err == syscall.EAGAIN {
   526				err = errTimeout
   527				if fd.wdeadline >= 0 {
   528					if err = pollserver.WaitWrite(fd); err == nil {
   529						continue
   530					}
   531				}
   532			}
   533			if err != nil {
   534				n = 0
   535				break
   536			}
   537			if n == 0 {
   538				err = io.ErrUnexpectedEOF
   539				break
   540			}
   541		}
   542		if err != nil {
   543			err = &OpError{"write", fd.net, fd.raddr, err}
   544		}
   545		return nn, err
   546	}
   547	
   548	func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
   549		fd.wio.Lock()
   550		defer fd.wio.Unlock()
   551		if err := fd.incref(false); err != nil {
   552			return 0, err
   553		}
   554		defer fd.decref()
   555		for {
   556			err = syscall.Sendto(fd.sysfd, p, 0, sa)
   557			if err == syscall.EAGAIN {
   558				err = errTimeout
   559				if fd.wdeadline >= 0 {
   560					if err = pollserver.WaitWrite(fd); err == nil {
   561						continue
   562					}
   563				}
   564			}
   565			break
   566		}
   567		if err == nil {
   568			n = len(p)
   569		} else {
   570			err = &OpError{"write", fd.net, fd.raddr, err}
   571		}
   572		return
   573	}
   574	
   575	func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
   576		fd.wio.Lock()
   577		defer fd.wio.Unlock()
   578		if err := fd.incref(false); err != nil {
   579			return 0, 0, err
   580		}
   581		defer fd.decref()
   582		for {
   583			err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
   584			if err == syscall.EAGAIN {
   585				err = errTimeout
   586				if fd.wdeadline >= 0 {
   587					if err = pollserver.WaitWrite(fd); err == nil {
   588						continue
   589					}
   590				}
   591			}
   592			break
   593		}
   594		if err == nil {
   595			n = len(p)
   596			oobn = len(oob)
   597		} else {
   598			err = &OpError{"write", fd.net, fd.raddr, err}
   599		}
   600		return
   601	}
   602	
   603	func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
   604		if err := fd.incref(false); err != nil {
   605			return nil, err
   606		}
   607		defer fd.decref()
   608	
   609		// See ../syscall/exec.go for description of ForkLock.
   610		// It is okay to hold the lock across syscall.Accept
   611		// because we have put fd.sysfd into non-blocking mode.
   612		var s int
   613		var rsa syscall.Sockaddr
   614		for {
   615			syscall.ForkLock.RLock()
   616			s, rsa, err = syscall.Accept(fd.sysfd)
   617			if err != nil {
   618				syscall.ForkLock.RUnlock()
   619				if err == syscall.EAGAIN {
   620					err = errTimeout
   621					if fd.rdeadline >= 0 {
   622						if err = pollserver.WaitRead(fd); err == nil {
   623							continue
   624						}
   625					}
   626				} else if err == syscall.ECONNABORTED {
   627					// This means that a socket on the listen queue was closed
   628					// before we Accept()ed it; it's a silly error, so try again.
   629					continue
   630				}
   631				return nil, &OpError{"accept", fd.net, fd.laddr, err}
   632			}
   633			break
   634		}
   635		syscall.CloseOnExec(s)
   636		syscall.ForkLock.RUnlock()
   637	
   638		if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
   639			syscall.Close(s)
   640			return nil, err
   641		}
   642		lsa, _ := syscall.Getsockname(netfd.sysfd)
   643		netfd.setAddr(toAddr(lsa), toAddr(rsa))
   644		return netfd, nil
   645	}
   646	
   647	func (fd *netFD) dup() (f *os.File, err error) {
   648		ns, err := syscall.Dup(fd.sysfd)
   649		if err != nil {
   650			return nil, &OpError{"dup", fd.net, fd.laddr, err}
   651		}
   652	
   653		// We want blocking mode for the new fd, hence the double negative.
   654		if err = syscall.SetNonblock(ns, false); err != nil {
   655			return nil, &OpError{"setnonblock", fd.net, fd.laddr, err}
   656		}
   657	
   658		return os.NewFile(uintptr(ns), fd.sysfile.Name()), nil
   659	}
   660	
   661	func closesocket(s int) error {
   662		return syscall.Close(s)
   663	}