Source file src/pkg/net/http/transport.go
1 // Copyright 2011 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 // HTTP client implementation. See RFC 2616.
6 //
7 // This is the low-level Transport implementation of RoundTripper.
8 // The high-level interface is in client.go.
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "crypto/tls"
16 "encoding/base64"
17 "errors"
18 "fmt"
19 "io"
20 "io/ioutil"
21 "log"
22 "net"
23 "net/url"
24 "os"
25 "strings"
26 "sync"
27 )
28
29 // DefaultTransport is the default implementation of Transport and is
30 // used by DefaultClient. It establishes a new network connection for
31 // each call to Do and uses HTTP proxies as directed by the
32 // $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy)
33 // environment variables.
34 var DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment}
35
36 // DefaultMaxIdleConnsPerHost is the default value of Transport's
37 // MaxIdleConnsPerHost.
38 const DefaultMaxIdleConnsPerHost = 2
39
40 // Transport is an implementation of RoundTripper that supports http,
41 // https, and http proxies (for either http or https with CONNECT).
42 // Transport can also cache connections for future re-use.
43 type Transport struct {
44 lk sync.Mutex
45 idleConn map[string][]*persistConn
46 altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper
47
48 // TODO: tunable on global max cached connections
49 // TODO: tunable on timeout on cached connections
50 // TODO: optional pipelining
51
52 // Proxy specifies a function to return a proxy for a given
53 // Request. If the function returns a non-nil error, the
54 // request is aborted with the provided error.
55 // If Proxy is nil or returns a nil *URL, no proxy is used.
56 Proxy func(*Request) (*url.URL, error)
57
58 // Dial specifies the dial function for creating TCP
59 // connections.
60 // If Dial is nil, net.Dial is used.
61 Dial func(net, addr string) (c net.Conn, err error)
62
63 // TLSClientConfig specifies the TLS configuration to use with
64 // tls.Client. If nil, the default configuration is used.
65 TLSClientConfig *tls.Config
66
67 DisableKeepAlives bool
68 DisableCompression bool
69
70 // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
71 // (keep-alive) to keep to keep per-host. If zero,
72 // DefaultMaxIdleConnsPerHost is used.
73 MaxIdleConnsPerHost int
74 }
75
76 // ProxyFromEnvironment returns the URL of the proxy to use for a
77 // given request, as indicated by the environment variables
78 // $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy).
79 // An error is returned if the proxy environment is invalid.
80 // A nil URL and nil error are returned if no proxy is defined in the
81 // environment, or a proxy should not be used for the given request.
82 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
83 proxy := getenvEitherCase("HTTP_PROXY")
84 if proxy == "" {
85 return nil, nil
86 }
87 if !useProxy(canonicalAddr(req.URL)) {
88 return nil, nil
89 }
90 proxyURL, err := url.Parse(proxy)
91 if err != nil || proxyURL.Scheme == "" {
92 if u, err := url.Parse("http://" + proxy); err == nil {
93 proxyURL = u
94 err = nil
95 }
96 }
97 if err != nil {
98 return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
99 }
100 return proxyURL, nil
101 }
102
103 // ProxyURL returns a proxy function (for use in a Transport)
104 // that always returns the same URL.
105 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
106 return func(*Request) (*url.URL, error) {
107 return fixedURL, nil
108 }
109 }
110
111 // transportRequest is a wrapper around a *Request that adds
112 // optional extra headers to write.
113 type transportRequest struct {
114 *Request // original request, not to be mutated
115 extra Header // extra headers to write, or nil
116 }
117
118 func (tr *transportRequest) extraHeaders() Header {
119 if tr.extra == nil {
120 tr.extra = make(Header)
121 }
122 return tr.extra
123 }
124
125 // RoundTrip implements the RoundTripper interface.
126 func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) {
127 if req.URL == nil {
128 return nil, errors.New("http: nil Request.URL")
129 }
130 if req.Header == nil {
131 return nil, errors.New("http: nil Request.Header")
132 }
133 if req.URL.Scheme != "http" && req.URL.Scheme != "https" {
134 t.lk.Lock()
135 var rt RoundTripper
136 if t.altProto != nil {
137 rt = t.altProto[req.URL.Scheme]
138 }
139 t.lk.Unlock()
140 if rt == nil {
141 return nil, &badStringError{"unsupported protocol scheme", req.URL.Scheme}
142 }
143 return rt.RoundTrip(req)
144 }
145 treq := &transportRequest{Request: req}
146 cm, err := t.connectMethodForRequest(treq)
147 if err != nil {
148 return nil, err
149 }
150
151 // Get the cached or newly-created connection to either the
152 // host (for http or https), the http proxy, or the http proxy
153 // pre-CONNECTed to https server. In any case, we'll be ready
154 // to send it requests.
155 pconn, err := t.getConn(cm)
156 if err != nil {
157 return nil, err
158 }
159
160 return pconn.roundTrip(treq)
161 }
162
163 // RegisterProtocol registers a new protocol with scheme.
164 // The Transport will pass requests using the given scheme to rt.
165 // It is rt's responsibility to simulate HTTP request semantics.
166 //
167 // RegisterProtocol can be used by other packages to provide
168 // implementations of protocol schemes like "ftp" or "file".
169 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
170 if scheme == "http" || scheme == "https" {
171 panic("protocol " + scheme + " already registered")
172 }
173 t.lk.Lock()
174 defer t.lk.Unlock()
175 if t.altProto == nil {
176 t.altProto = make(map[string]RoundTripper)
177 }
178 if _, exists := t.altProto[scheme]; exists {
179 panic("protocol " + scheme + " already registered")
180 }
181 t.altProto[scheme] = rt
182 }
183
184 // CloseIdleConnections closes any connections which were previously
185 // connected from previous requests but are now sitting idle in
186 // a "keep-alive" state. It does not interrupt any connections currently
187 // in use.
188 func (t *Transport) CloseIdleConnections() {
189 t.lk.Lock()
190 defer t.lk.Unlock()
191 if t.idleConn == nil {
192 return
193 }
194 for _, conns := range t.idleConn {
195 for _, pconn := range conns {
196 pconn.close()
197 }
198 }
199 t.idleConn = make(map[string][]*persistConn)
200 }
201
202 //
203 // Private implementation past this point.
204 //
205
206 func getenvEitherCase(k string) string {
207 if v := os.Getenv(strings.ToUpper(k)); v != "" {
208 return v
209 }
210 return os.Getenv(strings.ToLower(k))
211 }
212
213 func (t *Transport) connectMethodForRequest(treq *transportRequest) (*connectMethod, error) {
214 cm := &connectMethod{
215 targetScheme: treq.URL.Scheme,
216 targetAddr: canonicalAddr(treq.URL),
217 }
218 if t.Proxy != nil {
219 var err error
220 cm.proxyURL, err = t.Proxy(treq.Request)
221 if err != nil {
222 return nil, err
223 }
224 }
225 return cm, nil
226 }
227
228 // proxyAuth returns the Proxy-Authorization header to set
229 // on requests, if applicable.
230 func (cm *connectMethod) proxyAuth() string {
231 if cm.proxyURL == nil {
232 return ""
233 }
234 if u := cm.proxyURL.User; u != nil {
235 return "Basic " + base64.URLEncoding.EncodeToString([]byte(u.String()))
236 }
237 return ""
238 }
239
240 // putIdleConn adds pconn to the list of idle persistent connections awaiting
241 // a new request.
242 // If pconn is no longer needed or not in a good state, putIdleConn
243 // returns false.
244 func (t *Transport) putIdleConn(pconn *persistConn) bool {
245 t.lk.Lock()
246 defer t.lk.Unlock()
247 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
248 pconn.close()
249 return false
250 }
251 if pconn.isBroken() {
252 return false
253 }
254 key := pconn.cacheKey
255 max := t.MaxIdleConnsPerHost
256 if max == 0 {
257 max = DefaultMaxIdleConnsPerHost
258 }
259 if len(t.idleConn[key]) >= max {
260 pconn.close()
261 return false
262 }
263 t.idleConn[key] = append(t.idleConn[key], pconn)
264 return true
265 }
266
267 func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
268 t.lk.Lock()
269 defer t.lk.Unlock()
270 if t.idleConn == nil {
271 t.idleConn = make(map[string][]*persistConn)
272 }
273 key := cm.String()
274 for {
275 pconns, ok := t.idleConn[key]
276 if !ok {
277 return nil
278 }
279 if len(pconns) == 1 {
280 pconn = pconns[0]
281 delete(t.idleConn, key)
282 } else {
283 // 2 or more cached connections; pop last
284 // TODO: queue?
285 pconn = pconns[len(pconns)-1]
286 t.idleConn[key] = pconns[0 : len(pconns)-1]
287 }
288 if !pconn.isBroken() {
289 return
290 }
291 }
292 return
293 }
294
295 func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
296 if t.Dial != nil {
297 return t.Dial(network, addr)
298 }
299 return net.Dial(network, addr)
300 }
301
302 // getConn dials and creates a new persistConn to the target as
303 // specified in the connectMethod. This includes doing a proxy CONNECT
304 // and/or setting up TLS. If this doesn't return an error, the persistConn
305 // is ready to write requests to.
306 func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
307 if pc := t.getIdleConn(cm); pc != nil {
308 return pc, nil
309 }
310
311 conn, err := t.dial("tcp", cm.addr())
312 if err != nil {
313 if cm.proxyURL != nil {
314 err = fmt.Errorf("http: error connecting to proxy %s: %v", cm.proxyURL, err)
315 }
316 return nil, err
317 }
318
319 pa := cm.proxyAuth()
320
321 pconn := &persistConn{
322 t: t,
323 cacheKey: cm.String(),
324 conn: conn,
325 reqch: make(chan requestAndChan, 50),
326 }
327
328 switch {
329 case cm.proxyURL == nil:
330 // Do nothing.
331 case cm.targetScheme == "http":
332 pconn.isProxy = true
333 if pa != "" {
334 pconn.mutateHeaderFunc = func(h Header) {
335 h.Set("Proxy-Authorization", pa)
336 }
337 }
338 case cm.targetScheme == "https":
339 connectReq := &Request{
340 Method: "CONNECT",
341 URL: &url.URL{Opaque: cm.targetAddr},
342 Host: cm.targetAddr,
343 Header: make(Header),
344 }
345 if pa != "" {
346 connectReq.Header.Set("Proxy-Authorization", pa)
347 }
348 connectReq.Write(conn)
349
350 // Read response.
351 // Okay to use and discard buffered reader here, because
352 // TLS server will not speak until spoken to.
353 br := bufio.NewReader(conn)
354 resp, err := ReadResponse(br, connectReq)
355 if err != nil {
356 conn.Close()
357 return nil, err
358 }
359 if resp.StatusCode != 200 {
360 f := strings.SplitN(resp.Status, " ", 2)
361 conn.Close()
362 return nil, errors.New(f[1])
363 }
364 }
365
366 if cm.targetScheme == "https" {
367 // Initiate TLS and check remote host name against certificate.
368 conn = tls.Client(conn, t.TLSClientConfig)
369 if err = conn.(*tls.Conn).Handshake(); err != nil {
370 return nil, err
371 }
372 if t.TLSClientConfig == nil || !t.TLSClientConfig.InsecureSkipVerify {
373 if err = conn.(*tls.Conn).VerifyHostname(cm.tlsHost()); err != nil {
374 return nil, err
375 }
376 }
377 pconn.conn = conn
378 }
379
380 pconn.br = bufio.NewReader(pconn.conn)
381 pconn.bw = bufio.NewWriter(pconn.conn)
382 go pconn.readLoop()
383 return pconn, nil
384 }
385
386 // useProxy returns true if requests to addr should use a proxy,
387 // according to the NO_PROXY or no_proxy environment variable.
388 // addr is always a canonicalAddr with a host and port.
389 func useProxy(addr string) bool {
390 if len(addr) == 0 {
391 return true
392 }
393 host, _, err := net.SplitHostPort(addr)
394 if err != nil {
395 return false
396 }
397 if host == "localhost" {
398 return false
399 }
400 if ip := net.ParseIP(host); ip != nil {
401 if ip.IsLoopback() {
402 return false
403 }
404 }
405
406 no_proxy := getenvEitherCase("NO_PROXY")
407 if no_proxy == "*" {
408 return false
409 }
410
411 addr = strings.ToLower(strings.TrimSpace(addr))
412 if hasPort(addr) {
413 addr = addr[:strings.LastIndex(addr, ":")]
414 }
415
416 for _, p := range strings.Split(no_proxy, ",") {
417 p = strings.ToLower(strings.TrimSpace(p))
418 if len(p) == 0 {
419 continue
420 }
421 if hasPort(p) {
422 p = p[:strings.LastIndex(p, ":")]
423 }
424 if addr == p || (p[0] == '.' && (strings.HasSuffix(addr, p) || addr == p[1:])) {
425 return false
426 }
427 }
428 return true
429 }
430
431 // connectMethod is the map key (in its String form) for keeping persistent
432 // TCP connections alive for subsequent HTTP requests.
433 //
434 // A connect method may be of the following types:
435 //
436 // Cache key form Description
437 // ----------------- -------------------------
438 // ||http|foo.com http directly to server, no proxy
439 // ||https|foo.com https directly to server, no proxy
440 // http://proxy.com|https|foo.com http to proxy, then CONNECT to foo.com
441 // http://proxy.com|http http to proxy, http to anywhere after that
442 //
443 // Note: no support to https to the proxy yet.
444 //
445 type connectMethod struct {
446 proxyURL *url.URL // nil for no proxy, else full proxy URL
447 targetScheme string // "http" or "https"
448 targetAddr string // Not used if proxy + http targetScheme (4th example in table)
449 }
450
451 func (ck *connectMethod) String() string {
452 proxyStr := ""
453 if ck.proxyURL != nil {
454 proxyStr = ck.proxyURL.String()
455 }
456 return strings.Join([]string{proxyStr, ck.targetScheme, ck.targetAddr}, "|")
457 }
458
459 // addr returns the first hop "host:port" to which we need to TCP connect.
460 func (cm *connectMethod) addr() string {
461 if cm.proxyURL != nil {
462 return canonicalAddr(cm.proxyURL)
463 }
464 return cm.targetAddr
465 }
466
467 // tlsHost returns the host name to match against the peer's
468 // TLS certificate.
469 func (cm *connectMethod) tlsHost() string {
470 h := cm.targetAddr
471 if hasPort(h) {
472 h = h[:strings.LastIndex(h, ":")]
473 }
474 return h
475 }
476
477 // persistConn wraps a connection, usually a persistent one
478 // (but may be used for non-keep-alive requests as well)
479 type persistConn struct {
480 t *Transport
481 cacheKey string // its connectMethod.String()
482 conn net.Conn
483 br *bufio.Reader // from conn
484 bw *bufio.Writer // to conn
485 reqch chan requestAndChan // written by roundTrip(); read by readLoop()
486 isProxy bool
487
488 // mutateHeaderFunc is an optional func to modify extra
489 // headers on each outbound request before it's written. (the
490 // original Request given to RoundTrip is not modified)
491 mutateHeaderFunc func(Header)
492
493 lk sync.Mutex // guards numExpectedResponses and broken
494 numExpectedResponses int
495 broken bool // an error has happened on this connection; marked broken so it's not reused.
496 }
497
498 func (pc *persistConn) isBroken() bool {
499 pc.lk.Lock()
500 defer pc.lk.Unlock()
501 return pc.broken
502 }
503
504 var remoteSideClosedFunc func(error) bool // or nil to use default
505
506 func remoteSideClosed(err error) bool {
507 if err == io.EOF {
508 return true
509 }
510 if remoteSideClosedFunc != nil {
511 return remoteSideClosedFunc(err)
512 }
513 return false
514 }
515
516 func (pc *persistConn) readLoop() {
517 alive := true
518 var lastbody io.ReadCloser // last response body, if any, read on this connection
519
520 for alive {
521 pb, err := pc.br.Peek(1)
522
523 pc.lk.Lock()
524 if pc.numExpectedResponses == 0 {
525 pc.closeLocked()
526 pc.lk.Unlock()
527 if len(pb) > 0 {
528 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v",
529 string(pb), err)
530 }
531 return
532 }
533 pc.lk.Unlock()
534
535 rc := <-pc.reqch
536
537 // Advance past the previous response's body, if the
538 // caller hasn't done so.
539 if lastbody != nil {
540 lastbody.Close() // assumed idempotent
541 lastbody = nil
542 }
543 resp, err := ReadResponse(pc.br, rc.req)
544
545 if err != nil {
546 pc.close()
547 } else {
548 hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
549 if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" {
550 resp.Header.Del("Content-Encoding")
551 resp.Header.Del("Content-Length")
552 resp.ContentLength = -1
553 gzReader, zerr := gzip.NewReader(resp.Body)
554 if zerr != nil {
555 pc.close()
556 err = zerr
557 } else {
558 resp.Body = &readFirstCloseBoth{&discardOnCloseReadCloser{gzReader}, resp.Body}
559 }
560 }
561 resp.Body = &bodyEOFSignal{body: resp.Body}
562 }
563
564 if err != nil || resp.Close || rc.req.Close {
565 alive = false
566 }
567
568 hasBody := resp != nil && resp.ContentLength != 0
569 var waitForBodyRead chan bool
570 if alive {
571 if hasBody {
572 lastbody = resp.Body
573 waitForBodyRead = make(chan bool)
574 resp.Body.(*bodyEOFSignal).fn = func() {
575 if !pc.t.putIdleConn(pc) {
576 alive = false
577 }
578 waitForBodyRead <- true
579 }
580 } else {
581 // When there's no response body, we immediately
582 // reuse the TCP connection (putIdleConn), but
583 // we need to prevent ClientConn.Read from
584 // closing the Response.Body on the next
585 // loop, otherwise it might close the body
586 // before the client code has had a chance to
587 // read it (even though it'll just be 0, EOF).
588 lastbody = nil
589
590 if !pc.t.putIdleConn(pc) {
591 alive = false
592 }
593 }
594 }
595
596 rc.ch <- responseAndError{resp, err}
597
598 // Wait for the just-returned response body to be fully consumed
599 // before we race and peek on the underlying bufio reader.
600 if waitForBodyRead != nil {
601 <-waitForBodyRead
602 }
603 }
604 }
605
606 type responseAndError struct {
607 res *Response
608 err error
609 }
610
611 type requestAndChan struct {
612 req *Request
613 ch chan responseAndError
614
615 // did the Transport (as opposed to the client code) add an
616 // Accept-Encoding gzip header? only if it we set it do
617 // we transparently decode the gzip.
618 addedGzip bool
619 }
620
621 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
622 if pc.mutateHeaderFunc != nil {
623 pc.mutateHeaderFunc(req.extraHeaders())
624 }
625
626 // Ask for a compressed version if the caller didn't set their
627 // own value for Accept-Encoding. We only attempted to
628 // uncompress the gzip stream if we were the layer that
629 // requested it.
630 requestedGzip := false
631 if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" {
632 // Request gzip only, not deflate. Deflate is ambiguous and
633 // not as universally supported anyway.
634 // See: http://www.gzip.org/zlib/zlib_faq.html#faq38
635 requestedGzip = true
636 req.extraHeaders().Set("Accept-Encoding", "gzip")
637 }
638
639 pc.lk.Lock()
640 pc.numExpectedResponses++
641 pc.lk.Unlock()
642
643 err = req.Request.write(pc.bw, pc.isProxy, req.extra)
644 if err != nil {
645 pc.close()
646 return
647 }
648 pc.bw.Flush()
649
650 ch := make(chan responseAndError, 1)
651 pc.reqch <- requestAndChan{req.Request, ch, requestedGzip}
652 re := <-ch
653 pc.lk.Lock()
654 pc.numExpectedResponses--
655 pc.lk.Unlock()
656
657 return re.res, re.err
658 }
659
660 func (pc *persistConn) close() {
661 pc.lk.Lock()
662 defer pc.lk.Unlock()
663 pc.closeLocked()
664 }
665
666 func (pc *persistConn) closeLocked() {
667 pc.broken = true
668 pc.conn.Close()
669 pc.mutateHeaderFunc = nil
670 }
671
672 var portMap = map[string]string{
673 "http": "80",
674 "https": "443",
675 }
676
677 // canonicalAddr returns url.Host but always with a ":port" suffix
678 func canonicalAddr(url *url.URL) string {
679 addr := url.Host
680 if !hasPort(addr) {
681 return addr + ":" + portMap[url.Scheme]
682 }
683 return addr
684 }
685
686 func responseIsKeepAlive(res *Response) bool {
687 // TODO: implement. for now just always shutting down the connection.
688 return false
689 }
690
691 // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
692 // once, right before the final Read() or Close() call returns, but after
693 // EOF has been seen.
694 type bodyEOFSignal struct {
695 body io.ReadCloser
696 fn func()
697 isClosed bool
698 }
699
700 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
701 n, err = es.body.Read(p)
702 if es.isClosed && n > 0 {
703 panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725")
704 }
705 if err == io.EOF && es.fn != nil {
706 es.fn()
707 es.fn = nil
708 }
709 return
710 }
711
712 func (es *bodyEOFSignal) Close() (err error) {
713 if es.isClosed {
714 return nil
715 }
716 es.isClosed = true
717 err = es.body.Close()
718 if err == nil && es.fn != nil {
719 es.fn()
720 es.fn = nil
721 }
722 return
723 }
724
725 type readFirstCloseBoth struct {
726 io.ReadCloser
727 io.Closer
728 }
729
730 func (r *readFirstCloseBoth) Close() error {
731 if err := r.ReadCloser.Close(); err != nil {
732 r.Closer.Close()
733 return err
734 }
735 if err := r.Closer.Close(); err != nil {
736 return err
737 }
738 return nil
739 }
740
741 // discardOnCloseReadCloser consumes all its input on Close.
742 type discardOnCloseReadCloser struct {
743 io.ReadCloser
744 }
745
746 func (d *discardOnCloseReadCloser) Close() error {
747 io.Copy(ioutil.Discard, d.ReadCloser) // ignore errors; likely invalid or already closed
748 return d.ReadCloser.Close()
749 }