src/pkg/net/http/transport.go - The Go Programming Language

Golang

Source file src/pkg/net/http/transport.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// HTTP client implementation. See RFC 2616.
     6	// 
     7	// This is the low-level Transport implementation of RoundTripper.
     8	// The high-level interface is in client.go.
     9	
    10	package http
    11	
    12	import (
    13		"bufio"
    14		"compress/gzip"
    15		"crypto/tls"
    16		"encoding/base64"
    17		"errors"
    18		"fmt"
    19		"io"
    20		"io/ioutil"
    21		"log"
    22		"net"
    23		"net/url"
    24		"os"
    25		"strings"
    26		"sync"
    27	)
    28	
    29	// DefaultTransport is the default implementation of Transport and is
    30	// used by DefaultClient.  It establishes a new network connection for
    31	// each call to Do and uses HTTP proxies as directed by the
    32	// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy)
    33	// environment variables.
    34	var DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment}
    35	
    36	// DefaultMaxIdleConnsPerHost is the default value of Transport's
    37	// MaxIdleConnsPerHost.
    38	const DefaultMaxIdleConnsPerHost = 2
    39	
    40	// Transport is an implementation of RoundTripper that supports http,
    41	// https, and http proxies (for either http or https with CONNECT).
    42	// Transport can also cache connections for future re-use.
    43	type Transport struct {
    44		lk       sync.Mutex
    45		idleConn map[string][]*persistConn
    46		altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper
    47	
    48		// TODO: tunable on global max cached connections
    49		// TODO: tunable on timeout on cached connections
    50		// TODO: optional pipelining
    51	
    52		// Proxy specifies a function to return a proxy for a given
    53		// Request. If the function returns a non-nil error, the
    54		// request is aborted with the provided error.
    55		// If Proxy is nil or returns a nil *URL, no proxy is used.
    56		Proxy func(*Request) (*url.URL, error)
    57	
    58		// Dial specifies the dial function for creating TCP
    59		// connections.
    60		// If Dial is nil, net.Dial is used.
    61		Dial func(net, addr string) (c net.Conn, err error)
    62	
    63		// TLSClientConfig specifies the TLS configuration to use with
    64		// tls.Client. If nil, the default configuration is used.
    65		TLSClientConfig *tls.Config
    66	
    67		DisableKeepAlives  bool
    68		DisableCompression bool
    69	
    70		// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
    71		// (keep-alive) to keep to keep per-host.  If zero,
    72		// DefaultMaxIdleConnsPerHost is used.
    73		MaxIdleConnsPerHost int
    74	}
    75	
    76	// ProxyFromEnvironment returns the URL of the proxy to use for a
    77	// given request, as indicated by the environment variables
    78	// $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy).
    79	// An error is returned if the proxy environment is invalid.
    80	// A nil URL and nil error are returned if no proxy is defined in the
    81	// environment, or a proxy should not be used for the given request.
    82	func ProxyFromEnvironment(req *Request) (*url.URL, error) {
    83		proxy := getenvEitherCase("HTTP_PROXY")
    84		if proxy == "" {
    85			return nil, nil
    86		}
    87		if !useProxy(canonicalAddr(req.URL)) {
    88			return nil, nil
    89		}
    90		proxyURL, err := url.Parse(proxy)
    91		if err != nil || proxyURL.Scheme == "" {
    92			if u, err := url.Parse("http://" + proxy); err == nil {
    93				proxyURL = u
    94				err = nil
    95			}
    96		}
    97		if err != nil {
    98			return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
    99		}
   100		return proxyURL, nil
   101	}
   102	
   103	// ProxyURL returns a proxy function (for use in a Transport)
   104	// that always returns the same URL.
   105	func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
   106		return func(*Request) (*url.URL, error) {
   107			return fixedURL, nil
   108		}
   109	}
   110	
   111	// transportRequest is a wrapper around a *Request that adds
   112	// optional extra headers to write.
   113	type transportRequest struct {
   114		*Request        // original request, not to be mutated
   115		extra    Header // extra headers to write, or nil
   116	}
   117	
   118	func (tr *transportRequest) extraHeaders() Header {
   119		if tr.extra == nil {
   120			tr.extra = make(Header)
   121		}
   122		return tr.extra
   123	}
   124	
   125	// RoundTrip implements the RoundTripper interface.
   126	func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
   127		if req.URL == nil {
   128			return nil, errors.New("http: nil Request.URL")
   129		}
   130		if req.Header == nil {
   131			return nil, errors.New("http: nil Request.Header")
   132		}
   133		if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
   134			t.lk.Lock()
   135			var rt RoundTripper
   136			if t.altProto != nil {
   137				rt = t.altProto[req.URL.Scheme]
   138			}
   139			t.lk.Unlock()
   140			if rt == nil {
   141				return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
   142			}
   143			return rt.RoundTrip(req)
   144		}
   145		treq := &transportRequest{Request: req}
   146		cm, err := t.connectMethodForRequest(treq)
   147		if err != nil {
   148			return nil, err
   149		}
   150	
   151		// Get the cached or newly-created connection to either the
   152		// host (for http or https), the http proxy, or the http proxy
   153		// pre-CONNECTed to https server.  In any case, we'll be ready
   154		// to send it requests.
   155		pconn, err := t.getConn(cm)
   156		if err != nil {
   157			return nil, err
   158		}
   159	
   160		return pconn.roundTrip(treq)
   161	}
   162	
   163	// RegisterProtocol registers a new protocol with scheme.
   164	// The Transport will pass requests using the given scheme to rt.
   165	// It is rt's responsibility to simulate HTTP request semantics.
   166	//
   167	// RegisterProtocol can be used by other packages to provide
   168	// implementations of protocol schemes like "ftp" or "file".
   169	func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
   170		if scheme == "http" || scheme == "https" {
   171			panic("protocol " + scheme + " already registered")
   172		}
   173		t.lk.Lock()
   174		defer t.lk.Unlock()
   175		if t.altProto == nil {
   176			t.altProto = make(map[string]RoundTripper)
   177		}
   178		if _, exists := t.altProto[scheme]; exists {
   179			panic("protocol " + scheme + " already registered")
   180		}
   181		t.altProto[scheme] = rt
   182	}
   183	
   184	// CloseIdleConnections closes any connections which were previously
   185	// connected from previous requests but are now sitting idle in
   186	// a "keep-alive" state. It does not interrupt any connections currently
   187	// in use.
   188	func (t *Transport) CloseIdleConnections() {
   189		t.lk.Lock()
   190		defer t.lk.Unlock()
   191		if t.idleConn == nil {
   192			return
   193		}
   194		for _, conns := range t.idleConn {
   195			for _, pconn := range conns {
   196				pconn.close()
   197			}
   198		}
   199		t.idleConn = make(map[string][]*persistConn)
   200	}
   201	
   202	//
   203	// Private implementation past this point.
   204	//
   205	
   206	func getenvEitherCase(k string) string {
   207		if v := os.Getenv(strings.ToUpper(k)); v != "" {
   208			return v
   209		}
   210		return os.Getenv(strings.ToLower(k))
   211	}
   212	
   213	func (t *Transport) connectMethodForRequest(treq *transportRequest) (*connectMethod, error) {
   214		cm := &connectMethod{
   215			targetScheme: treq.URL.Scheme,
   216			targetAddr:   canonicalAddr(treq.URL),
   217		}
   218		if t.Proxy != nil {
   219			var err error
   220			cm.proxyURL, err = t.Proxy(treq.Request)
   221			if err != nil {
   222				return nil, err
   223			}
   224		}
   225		return cm, nil
   226	}
   227	
   228	// proxyAuth returns the Proxy-Authorization header to set
   229	// on requests, if applicable.
   230	func (cm *connectMethod) proxyAuth() string {
   231		if cm.proxyURL == nil {
   232			return ""
   233		}
   234		if u := cm.proxyURL.User; u != nil {
   235			return "Basic " + base64.URLEncoding.EncodeToString([]byte(u.String()))
   236		}
   237		return ""
   238	}
   239	
   240	// putIdleConn adds pconn to the list of idle persistent connections awaiting
   241	// a new request.
   242	// If pconn is no longer needed or not in a good state, putIdleConn
   243	// returns false.
   244	func (t *Transport) putIdleConn(pconn *persistConn) bool {
   245		t.lk.Lock()
   246		defer t.lk.Unlock()
   247		if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
   248			pconn.close()
   249			return false
   250		}
   251		if pconn.isBroken() {
   252			return false
   253		}
   254		key := pconn.cacheKey
   255		max := t.MaxIdleConnsPerHost
   256		if max == 0 {
   257			max = DefaultMaxIdleConnsPerHost
   258		}
   259		if len(t.idleConn[key]) >= max {
   260			pconn.close()
   261			return false
   262		}
   263		t.idleConn[key] = append(t.idleConn[key], pconn)
   264		return true
   265	}
   266	
   267	func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
   268		t.lk.Lock()
   269		defer t.lk.Unlock()
   270		if t.idleConn == nil {
   271			t.idleConn = make(map[string][]*persistConn)
   272		}
   273		key := cm.String()
   274		for {
   275			pconns, ok := t.idleConn[key]
   276			if !ok {
   277				return nil
   278			}
   279			if len(pconns) == 1 {
   280				pconn = pconns[0]
   281				delete(t.idleConn, key)
   282			} else {
   283				// 2 or more cached connections; pop last
   284				// TODO: queue?
   285				pconn = pconns[len(pconns)-1]
   286				t.idleConn[key] = pconns[0 : len(pconns)-1]
   287			}
   288			if !pconn.isBroken() {
   289				return
   290			}
   291		}
   292		return
   293	}
   294	
   295	func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
   296		if t.Dial != nil {
   297			return t.Dial(network, addr)
   298		}
   299		return net.Dial(network, addr)
   300	}
   301	
   302	// getConn dials and creates a new persistConn to the target as
   303	// specified in the connectMethod.  This includes doing a proxy CONNECT
   304	// and/or setting up TLS.  If this doesn't return an error, the persistConn
   305	// is ready to write requests to.
   306	func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
   307		if pc := t.getIdleConn(cm); pc != nil {
   308			return pc, nil
   309		}
   310	
   311		conn, err := t.dial("tcp", cm.addr())
   312		if err != nil {
   313			if cm.proxyURL != nil {
   314				err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err)
   315			}
   316			return nil, err
   317		}
   318	
   319		pa := cm.proxyAuth()
   320	
   321		pconn := &persistConn{
   322			t:        t,
   323			cacheKey: cm.String(),
   324			conn:     conn,
   325			reqch:    make(chan requestAndChan, 50),
   326		}
   327	
   328		switch {
   329		case cm.proxyURL == nil:
   330			// Do nothing.
   331		case cm.targetScheme == "http":
   332			pconn.isProxy = true
   333			if pa != "" {
   334				pconn.mutateHeaderFunc = func(h Header) {
   335					h.Set("Proxy-Authorization", pa)
   336				}
   337			}
   338		case cm.targetScheme == "https":
   339			connectReq := &Request{
   340				Method: "CONNECT",
   341				URL:    &url.URL{Opaque: cm.targetAddr},
   342				Host:   cm.targetAddr,
   343				Header: make(Header),
   344			}
   345			if pa != "" {
   346				connectReq.Header.Set("Proxy-Authorization", pa)
   347			}
   348			connectReq.Write(conn)
   349	
   350			// Read response.
   351			// Okay to use and discard buffered reader here, because
   352			// TLS server will not speak until spoken to.
   353			br := bufio.NewReader(conn)
   354			resp, err := ReadResponse(br, connectReq)
   355			if err != nil {
   356				conn.Close()
   357				return nil, err
   358			}
   359			if resp.StatusCode != 200 {
   360				f := strings.SplitN(resp.Status, " ", 2)
   361				conn.Close()
   362				return nil, errors.New(f[1])
   363			}
   364		}
   365	
   366		if cm.targetScheme == "https" {
   367			// Initiate TLS and check remote host name against certificate.
   368			conn = tls.Client(conn, t.TLSClientConfig)
   369			if err = conn.(*tls.Conn).Handshake(); err != nil {
   370				return nil, err
   371			}
   372			if t.TLSClientConfig == nil || !t.TLSClientConfig.InsecureSkipVerify {
   373				if err = conn.(*tls.Conn).VerifyHostname(cm.tlsHost()); err != nil {
   374					return nil, err
   375				}
   376			}
   377			pconn.conn = conn
   378		}
   379	
   380		pconn.br = bufio.NewReader(pconn.conn)
   381		pconn.bw = bufio.NewWriter(pconn.conn)
   382		go pconn.readLoop()
   383		return pconn, nil
   384	}
   385	
   386	// useProxy returns true if requests to addr should use a proxy,
   387	// according to the NO_PROXY or no_proxy environment variable.
   388	// addr is always a canonicalAddr with a host and port.
   389	func useProxy(addr string) bool {
   390		if len(addr) == 0 {
   391			return true
   392		}
   393		host, _, err := net.SplitHostPort(addr)
   394		if err != nil {
   395			return false
   396		}
   397		if host == "localhost" {
   398			return false
   399		}
   400		if ip := net.ParseIP(host); ip != nil {
   401			if ip.IsLoopback() {
   402				return false
   403			}
   404		}
   405	
   406		no_proxy := getenvEitherCase("NO_PROXY")
   407		if no_proxy == "*" {
   408			return false
   409		}
   410	
   411		addr = strings.ToLower(strings.TrimSpace(addr))
   412		if hasPort(addr) {
   413			addr = addr[:strings.LastIndex(addr, ":")]
   414		}
   415	
   416		for _, p := range strings.Split(no_proxy, ",") {
   417			p = strings.ToLower(strings.TrimSpace(p))
   418			if len(p) == 0 {
   419				continue
   420			}
   421			if hasPort(p) {
   422				p = p[:strings.LastIndex(p, ":")]
   423			}
   424			if addr == p || (p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:])) {
   425				return false
   426			}
   427		}
   428		return true
   429	}
   430	
   431	// connectMethod is the map key (in its String form) for keeping persistent
   432	// TCP connections alive for subsequent HTTP requests.
   433	//
   434	// A connect method may be of the following types:
   435	//
   436	// Cache key form                Description
   437	// -----------------             -------------------------
   438	// ||http|foo.com                http directly to server, no proxy
   439	// ||https|foo.com               https directly to server, no proxy
   440	// http://proxy.com|https|foo.com  http to proxy, then CONNECT to foo.com
   441	// http://proxy.com|http           http to proxy, http to anywhere after that
   442	//
   443	// Note: no support to https to the proxy yet.
   444	//
   445	type connectMethod struct {
   446		proxyURL     *url.URL // nil for no proxy, else full proxy URL
   447		targetScheme string   // "http" or "https"
   448		targetAddr   string   // Not used if proxy + http targetScheme (4th example in table)
   449	}
   450	
   451	func (ck *connectMethod) String() string {
   452		proxyStr := ""
   453		if ck.proxyURL != nil {
   454			proxyStr = ck.proxyURL.String()
   455		}
   456		return strings.Join([]string{proxyStr, ck.targetScheme, ck.targetAddr}, "|")
   457	}
   458	
   459	// addr returns the first hop "host:port" to which we need to TCP connect.
   460	func (cm *connectMethod) addr() string {
   461		if cm.proxyURL != nil {
   462			return canonicalAddr(cm.proxyURL)
   463		}
   464		return cm.targetAddr
   465	}
   466	
   467	// tlsHost returns the host name to match against the peer's
   468	// TLS certificate.
   469	func (cm *connectMethod) tlsHost() string {
   470		h := cm.targetAddr
   471		if hasPort(h) {
   472			h = h[:strings.LastIndex(h, ":")]
   473		}
   474		return h
   475	}
   476	
   477	// persistConn wraps a connection, usually a persistent one
   478	// (but may be used for non-keep-alive requests as well)
   479	type persistConn struct {
   480		t        *Transport
   481		cacheKey string // its connectMethod.String()
   482		conn     net.Conn
   483		br       *bufio.Reader       // from conn
   484		bw       *bufio.Writer       // to conn
   485		reqch    chan requestAndChan // written by roundTrip(); read by readLoop()
   486		isProxy  bool
   487	
   488		// mutateHeaderFunc is an optional func to modify extra
   489		// headers on each outbound request before it's written. (the
   490		// original Request given to RoundTrip is not modified)
   491		mutateHeaderFunc func(Header)
   492	
   493		lk                   sync.Mutex // guards numExpectedResponses and broken
   494		numExpectedResponses int
   495		broken               bool // an error has happened on this connection; marked broken so it's not reused.
   496	}
   497	
   498	func (pc *persistConn) isBroken() bool {
   499		pc.lk.Lock()
   500		defer pc.lk.Unlock()
   501		return pc.broken
   502	}
   503	
   504	var remoteSideClosedFunc func(error) bool // or nil to use default
   505	
   506	func remoteSideClosed(err error) bool {
   507		if err == io.EOF {
   508			return true
   509		}
   510		if remoteSideClosedFunc != nil {
   511			return remoteSideClosedFunc(err)
   512		}
   513		return false
   514	}
   515	
   516	func (pc *persistConn) readLoop() {
   517		alive := true
   518		var lastbody io.ReadCloser // last response body, if any, read on this connection
   519	
   520		for alive {
   521			pb, err := pc.br.Peek(1)
   522	
   523			pc.lk.Lock()
   524			if pc.numExpectedResponses == 0 {
   525				pc.closeLocked()
   526				pc.lk.Unlock()
   527				if len(pb) > 0 {
   528					log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
   529						string(pb), err)
   530				}
   531				return
   532			}
   533			pc.lk.Unlock()
   534	
   535			rc := <-pc.reqch
   536	
   537			// Advance past the previous response's body, if the
   538			// caller hasn't done so.
   539			if lastbody != nil {
   540				lastbody.Close() // assumed idempotent
   541				lastbody = nil
   542			}
   543			resp, err := ReadResponse(pc.br, rc.req)
   544	
   545			if err != nil {
   546				pc.close()
   547			} else {
   548				hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
   549				if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
   550					resp.Header.Del("Content-Encoding")
   551					resp.Header.Del("Content-Length")
   552					resp.ContentLength = -1
   553					gzReader, zerr := gzip.NewReader(resp.Body)
   554					if zerr != nil {
   555						pc.close()
   556						err = zerr
   557					} else {
   558						resp.Body = &readFirstCloseBoth{&discardOnCloseReadCloser{gzReader}, resp.Body}
   559					}
   560				}
   561				resp.Body = &bodyEOFSignal{body: resp.Body}
   562			}
   563	
   564			if err != nil || resp.Close || rc.req.Close {
   565				alive = false
   566			}
   567	
   568			hasBody := resp != nil && resp.ContentLength != 0
   569			var waitForBodyRead chan bool
   570			if alive {
   571				if hasBody {
   572					lastbody = resp.Body
   573					waitForBodyRead = make(chan bool)
   574					resp.Body.(*bodyEOFSignal).fn = func() {
   575						if !pc.t.putIdleConn(pc) {
   576							alive = false
   577						}
   578						waitForBodyRead <- true
   579					}
   580				} else {
   581					// When there's no response body, we immediately
   582					// reuse the TCP connection (putIdleConn), but
   583					// we need to prevent ClientConn.Read from
   584					// closing the Response.Body on the next
   585					// loop, otherwise it might close the body
   586					// before the client code has had a chance to
   587					// read it (even though it'll just be 0, EOF).
   588					lastbody = nil
   589	
   590					if !pc.t.putIdleConn(pc) {
   591						alive = false
   592					}
   593				}
   594			}
   595	
   596			rc.ch <- responseAndError{resp, err}
   597	
   598			// Wait for the just-returned response body to be fully consumed
   599			// before we race and peek on the underlying bufio reader.
   600			if waitForBodyRead != nil {
   601				<-waitForBodyRead
   602			}
   603		}
   604	}
   605	
   606	type responseAndError struct {
   607		res *Response
   608		err error
   609	}
   610	
   611	type requestAndChan struct {
   612		req *Request
   613		ch  chan responseAndError
   614	
   615		// did the Transport (as opposed to the client code) add an
   616		// Accept-Encoding gzip header? only if it we set it do
   617		// we transparently decode the gzip.
   618		addedGzip bool
   619	}
   620	
   621	func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
   622		if pc.mutateHeaderFunc != nil {
   623			pc.mutateHeaderFunc(req.extraHeaders())
   624		}
   625	
   626		// Ask for a compressed version if the caller didn't set their
   627		// own value for Accept-Encoding. We only attempted to
   628		// uncompress the gzip stream if we were the layer that
   629		// requested it.
   630		requestedGzip := false
   631		if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" {
   632			// Request gzip only, not deflate. Deflate is ambiguous and 
   633			// not as universally supported anyway.
   634			// See: http://www.gzip.org/zlib/zlib_faq.html#faq38
   635			requestedGzip = true
   636			req.extraHeaders().Set("Accept-Encoding", "gzip")
   637		}
   638	
   639		pc.lk.Lock()
   640		pc.numExpectedResponses++
   641		pc.lk.Unlock()
   642	
   643		err = req.Request.write(pc.bw, pc.isProxy, req.extra)
   644		if err != nil {
   645			pc.close()
   646			return
   647		}
   648		pc.bw.Flush()
   649	
   650		ch := make(chan responseAndError, 1)
   651		pc.reqch <- requestAndChan{req.Request, ch, requestedGzip}
   652		re := <-ch
   653		pc.lk.Lock()
   654		pc.numExpectedResponses--
   655		pc.lk.Unlock()
   656	
   657		return re.res, re.err
   658	}
   659	
   660	func (pc *persistConn) close() {
   661		pc.lk.Lock()
   662		defer pc.lk.Unlock()
   663		pc.closeLocked()
   664	}
   665	
   666	func (pc *persistConn) closeLocked() {
   667		pc.broken = true
   668		pc.conn.Close()
   669		pc.mutateHeaderFunc = nil
   670	}
   671	
   672	var portMap = map[string]string{
   673		"http":  "80",
   674		"https": "443",
   675	}
   676	
   677	// canonicalAddr returns url.Host but always with a ":port" suffix
   678	func canonicalAddr(url *url.URL) string {
   679		addr := url.Host
   680		if !hasPort(addr) {
   681			return addr + ":" + portMap[url.Scheme]
   682		}
   683		return addr
   684	}
   685	
   686	func responseIsKeepAlive(res *Response) bool {
   687		// TODO: implement.  for now just always shutting down the connection.
   688		return false
   689	}
   690	
   691	// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
   692	// once, right before the final Read() or Close() call returns, but after
   693	// EOF has been seen.
   694	type bodyEOFSignal struct {
   695		body     io.ReadCloser
   696		fn       func()
   697		isClosed bool
   698	}
   699	
   700	func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
   701		n, err = es.body.Read(p)
   702		if es.isClosed && n > 0 {
   703			panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725")
   704		}
   705		if err == io.EOF && es.fn != nil {
   706			es.fn()
   707			es.fn = nil
   708		}
   709		return
   710	}
   711	
   712	func (es *bodyEOFSignal) Close() (err error) {
   713		if es.isClosed {
   714			return nil
   715		}
   716		es.isClosed = true
   717		err = es.body.Close()
   718		if err == nil && es.fn != nil {
   719			es.fn()
   720			es.fn = nil
   721		}
   722		return
   723	}
   724	
   725	type readFirstCloseBoth struct {
   726		io.ReadCloser
   727		io.Closer
   728	}
   729	
   730	func (r *readFirstCloseBoth) Close() error {
   731		if err := r.ReadCloser.Close(); err != nil {
   732			r.Closer.Close()
   733			return err
   734		}
   735		if err := r.Closer.Close(); err != nil {
   736			return err
   737		}
   738		return nil
   739	}
   740	
   741	// discardOnCloseReadCloser consumes all its input on Close.
   742	type discardOnCloseReadCloser struct {
   743		io.ReadCloser
   744	}
   745	
   746	func (d *discardOnCloseReadCloser) Close() error {
   747		io.Copy(ioutil.Discard, d.ReadCloser) // ignore errors; likely invalid or already closed
   748		return d.ReadCloser.Close()
   749	}