(*Transport).roundTrip
方法会调用t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
初始化TLSClientConfig
以及h2transport
,而这两者都和HTTP2.0有着紧密的联系。
TLSClientConfig: 初始化client支持的http协议, 并在tls握手时告知server。
h2transport: 如果本次请求是http2,那么h2transport会接管连接,请求和响应的处理逻辑。
下面看看源码:
func (t *Transport) onceSetNextProtoDefaults() { // ...此处省略代码... t2, err := http2configureTransport(t) if err != nil { log.Printf("Error enabling Transport HTTP/2 support: %v", err) return } t.h2transport = t2 // ...此处省略代码... } func http2configureTransport(t1 *Transport) (*http2Transport, error) { connPool := new(http2clientConnPool) t2 := &http2Transport{ ConnPool: http2noDialClientConnPool{connPool}, t1: t1, } connPool.t = t2 if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil { return nil, err } if t1.TLSClientConfig == nil { t1.TLSClientConfig = new(tls.Config) } if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") { t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...) } if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") { t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1") } upgradeFn := func(authority string, c *tls.Conn) RoundTripper { addr := http2authorityAddr("https", authority) if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil { go c.Close() return http2erringRoundTripper{err} } else if !used { // Turns out we don't need this c. // For example, two goroutines made requests to the same host // at the same time, both kicking off TCP dials. (since protocol // was unknown) go c.Close() } return t2 } if m := t1.TLSNextProto; len(m) == 0 { t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{ "h2": upgradeFn, } } else { m["h2"] = upgradeFn } return t2, nil }
笔者将上述的源码简单拆解为以下几个步骤:
新建一个
http2clientConnPool
并复制给t2,以后http2的请求会优先从该连接池中获取连接。初始化
TLSClientConfig
,并将支持的h2
和http1.1
协议添加到TLSClientConfig.NextProtos
中。定义一个
h2
的upgradeFn
存储到t1.TLSNextProto
里。
鉴于前一篇文章对新建连接前的步骤有了较为详细的介绍,所以这里直接看和server建立连接的部分源码,即(*Transport).dialConn
方法:
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { // ...此处省略代码... if cm.scheme() == "https" && t.hasCustomTLSDialer() { // ...此处省略代码... } else { conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // Proxy setup. // ...此处省略代码... if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil } } // ...此处省略代码... }
笔者对上述的源码描述如下:
调用
t.dial(ctx, "tcp", cm.addr())
创建TCP连接。如果是https的请求, 则对请求建立安全的tls传输通道。
检查tls的握手状态,如果和server协商的
NegotiatedProtocol
协议不为空,且client的t.TLSNextProto
有该协议,则返回alt不为空的持久连接(HTTP1.1不会进入if条件里)。
笔者对上述的第三点进行展开。经笔者在本地debug验证,当client和server都支持http2时,s.NegotiatedProtocol
的值为h2
且s.NegotiatedProtocolIsMutual
的值为true
。
在上面分析http2configureTransport
函数时,我们知道TLSNextProto
注册了一个key为h2
的函数,所以调用next
实际就是调用前面的upgradeFn
函数。
upgradeFn
会调用connPool.addConnIfNeeded
向http2的连接池添加一个tls传输通道,并最终返回前面已经创建好的t2
即http2Transport
。
func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) { p.mu.Lock() // ...此处省略代码... // 主要用于判断是否有必要像连接池添加新的连接 // 判断连接池中是否已有同host连接,如果有且该链接能够处理新的请求则直接返回 call, dup := p.addConnCalls[key] if !dup { // ...此处省略代码... call = &http2addConnCall{ p: p, done: make(chan struct{}), } p.addConnCalls[key] = call go call.run(t, key, c) } p.mu.Unlock() <-call.done if call.err != nil { return false, call.err } return !dup, nil } func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { cc, err := t.NewClientConn(tc) p := c.p p.mu.Lock() if err != nil { c.err = err } else { p.addConnLocked(key, cc) } delete(p.addConnCalls, key) p.mu.Unlock() close(c.done) }
分析上述的源码我们能够得到两点结论:
执行完
upgradeFn
之后,(*Transport).dialConn返回的持久化连接中alt字段已经不是nil了。t.NewClientConn(tc)
新建出来的连接会保存在http2的连接池即http2clientConnPool
中,下一小结将对NewClientConn展开分析。
最后我们回到(*Transport).roundTrip方法并分析其中的关键源码:
func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) // ...此处省略代码... for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // ...此处省略代码... pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil } // ...此处省略代码... } }
结合前面的分析,pconn.alt
在server和client都支持http2协议的情况下是不为nil的。所以,http2的请求会走pconn.alt.RoundTrip(req)
分支,也就是说http2的请求流程就被http2Transport
接管啦。
(*http2Transport).NewClientConn内部会调用t.newClientConn(c, t.disableKeepAlives())
。
因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤分析并分块儿贴出源码。
1、初始化一个http2ClientConn
:
cc := &http2ClientConn{ t: t, tconn: c, readerDone: make(chan struct{}), nextStreamID: 1, maxFrameSize: 16 << 10, // spec default initialWindowSize: 65535, // spec default maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. streams: make(map[uint32]*http2clientStream), singleUse: singleUse, wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), }
上面的源码新建了一个默认的http2ClientConn。
initialWindowSize:初始化窗口大小为65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。
maxConcurrentStreams:表示每个连接上允许最多有多少个数据流同时传输数据。
streams:当前连接上的数据流。
singleUse: 控制http2的连接是否允许多个数据流共享,其值由t.disableKeepAlives()
控制。
2、创建一个条件锁并且新建Writer&Reader。
cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(http2initialWindowSize)) cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr}) cc.br = bufio.NewReader(c)
新建Writer&Reader没什么好说的,需要注意的是cc.flow.add(int32(http2initialWindowSize))
。
cc.flow.add
将当前连接的可写流控制窗口大小设置为http2initialWindowSize
,即65535。
3、新建一个读写数据帧的Framer。
cc.fr = http2NewFramer(cc.bw, cc.br) cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
4、向server发送开场白,并发送一些初始化数据帧。
initialSettings := []http2Setting{ {ID: http2SettingEnablePush, Val: 0}, {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow}, } if max := t.maxHeaderListSize(); max != 0 { initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max}) } cc.bw.Write(http2clientPreface) cc.fr.WriteSettings(initialSettings...) cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow) cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize) cc.bw.Flush()
client向server发送的开场白内容如下:
const ( // client首先想server发送以PRI开头的一串字符串。 http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" ) var ( http2clientPreface = []byte(http2ClientPreface) )
发送完开场白后,client向server发送SETTINGS
数据帧。
http2SettingEnablePush: 告知server客户端是否开启push功能。
http2SettingInitialWindowSize:告知server客户端可接受的最大数据窗口是http2transportDefaultStreamFlow
(4M)。
发送完SETTINGS数据帧后,发送WINDOW_UPDATE数据帧, 因为第一个参数为0即streamID为0,则是告知server此连接可接受的最大数据窗口为http2transportDefaultConnFlow
(1G)。
发送完WINDOW_UPDATE数据帧后,将client的可读流控制窗口大小设置为http2transportDefaultConnFlow + http2initialWindowSize
。
5、开启读循环并返回
go cc.readLoop()(*http2Transport).RoundTrip
(*http2Transport).RoundTrip只是一个入口函数,它会调用(*http2Transport). RoundTripOpt方法。
(*http2Transport). RoundTripOpt有两个步骤比较关键:
t.connPool().GetClientConn(req, addr)
: 在http2的连接池里面获取一个可用连接,其中连接池的类型为http2noDialClientConnPool
,参考http2configureTransport
函数。
cc.roundTrip(req)
: 通过获取到的可用连接发送请求并返回响应。
根据实际的debug结果(http2noDialClientConnPool).GetClientConn最终会调用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)
。
通过(http2noDialClientConnPool).GetClientConn获取连接时传递给(*http2clientConnPool).getClientConn方法的第三个参数始终为false
,该参数为false时代表着即使无法正常获取可用连接,也不在这个环节重新发起拨号流程。
在(*http2clientConnPool).getClientConn中会遍历同地址的连接,并判断连接的状态从而获取一个可以处理请求的连接。
for _, cc := range p.conns[addr] { if st := cc.idleState(); st.canTakeNewRequest { if p.shouldTraceGetConn(st) { http2traceGetConn(req, addr) } p.mu.Unlock() return cc, nil } }
cc.idleState()
判断当前连接池中的连接能否处理新的请求:
1、当前连接是否能被多个请求共享,如果仅单个请求使用且已经有一个数据流,则当前连接不能处理新的请求。
if cc.singleUse && cc.nextStreamID > 1 { return }
2、以下几点均为true时,才代表当前连接能够处理新的请求:
连接状态正常,即未关闭并且不处于正在关闭的状态。
当前连接正在处理的数据流小于
maxConcurrentStreams
。下一个要处理的数据流 + 当前连接处于等待状态的请求*2 < math.MaxInt32。
当前连接没有长时间处于空闲状态(主要通过
cc.tooIdleLocked()
判断)。
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && !cc.tooIdleLocked()
当从链接池成功获取到一个可以处理请求的连接,就可以和server进行数据交互,即(*http2ClientConn).roundTrip
流程。
1、在真正开始处理请求前,还要进行header检查,http2对http1.1的某些header是不支持的,笔者就不对这个逻辑进行分析了,直接上源码:
func http2checkConnHeaders(req *Request) error { if v := req.Header.Get("Upgrade"); v != "" { return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"]) } if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) } if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) { return fmt.Errorf("http2: invalid Connection request header: %q", vv) } return nil } func http2commaSeparatedTrailers(req *Request) (string, error) { keys := make([]string, 0, len(req.Trailer)) for k := range req.Trailer { k = CanonicalHeaderKey(k) switch k { case "Transfer-Encoding", "Trailer", "Content-Length": return "", &http2badStringError{"invalid Trailer key", k} } keys = append(keys, k) } if len(keys) > 0 { sort.Strings(keys) return strings.Join(keys, ","), nil } return "", nil }
2、调用(*http2ClientConn).awaitOpenSlotForRequest
,一直等到当前连接处理的数据流小于maxConcurrentStreams
, 如果此函数返回错误,则本次请求失败。
2.1、double check当前连接可用。
if cc.closed || !cc.canTakeNewRequestLocked() { if waitingForConn != nil { close(waitingForConn) } return http2errClientConnUnusable }
2.2、如果当前连接处理的数据流小于maxConcurrentStreams
则直接返回nil。笔者相信大部分逻辑走到这儿就返回了。
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { if waitingForConn != nil { close(waitingForConn) } return nil }
2.3、如果当前连接处理的数据流确实已经达到上限,则开始进入等待流程。
if waitingForConn == nil { waitingForConn = make(chan struct{}) go func() { if err := http2awaitRequestCancel(req, waitingForConn); err != nil { cc.mu.Lock() waitingForConnErr = err cc.cond.Broadcast() cc.mu.Unlock() } }() } cc.pendingRequests++ cc.cond.Wait() cc.pendingRequests--
通过上面的逻辑知道,当前连接处理的数据流达到上限后有两种情况,一是等待请求被取消,二是等待其他请求结束。如果有其他数据流结束并唤醒当前等待的请求,则重复2.1、2.2和2.3的步骤。
3、调用cc.newStream()
在连接上创建一个数据流(创建数据流是线程安全的,因为源码中在调用awaitOpenSlotForRequest
之前先加锁,直到写入请求的header之后才释放锁)。
func (cc *http2ClientConn) newStream() *http2clientStream { cs := &http2clientStream{ cc: cc, ID: cc.nextStreamID, resc: make(chan http2resAndError, 1), peerReset: make(chan struct{}), done: make(chan struct{}), } cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) cs.inflow.add(http2transportDefaultStreamFlow) cs.inflow.setConnFlow(&cc.inflow) cc.nextStreamID += 2 cc.streams[cs.ID] = cs return cs }
笔者对上述代码简单描述如下:
新建一个
http2clientStream
,数据流ID为cc.nextStreamID
,新建数据流后,cc.nextStreamID +=2
。数据流通过
http2resAndError
管道接收请求的响应。初始化当前数据流的可写流控制窗口大小为
cc.initialWindowSize
,并保存连接的可写流控制指针。初始化当前数据流的可读流控制窗口大小为
http2transportDefaultStreamFlow
,并保存连接的可读流控制指针。最后将新建的数据流注册到当前连接中。
4、调用cc.t.getBodyWriterState(cs, body)
会返回一个http2bodyWriterState
结构体。通过该结构体可以知道请求body是否发送成功。
func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) { s.cs = cs if body == nil { return } resc := make(chan error, 1) s.resc = resc s.fn = func() { cs.cc.mu.Lock() cs.startedWrite = true cs.cc.mu.Unlock() resc <- cs.writeRequestBody(body, cs.req.Body) } s.delay = t.expectContinueTimeout() if s.delay == 0 || !httpguts.HeaderValuesContainsToken( cs.req.Header["Expect"], "100-continue") { return } // 此处省略代码,因为绝大部分请求都不会设置100-continue的标头 return }
s.fn
: 标记当前数据流开始写入数据,并且将请求body的发送结果写入s.resc
管道(本文暂不对writeRequestBody
展开分析,下篇文章会对其进行分析)。
5、因为是多个请求共享一个连接,那么向连接写入数据帧时需要加锁,比如加锁写入请求头。
cc.wmu.Lock() endStream := !hasBody && !hasTrailers werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) cc.wmu.Unlock()
6、如果有请求body,则开始写入请求body,没有请求body则设置响应header的超时时间(有请求body时,响应header的超时时间需要在请求body写完之后设置)。
if hasBody { bodyWriter.scheduleBodyWrite() } else { http2traceWroteRequest(cs.trace, nil) if d := cc.responseHeaderTimeout(); d != 0 { timer := time.NewTimer(d) defer timer.Stop() respHeaderTimer = timer.C } }
scheduleBodyWrite
的内容如下:
func (s http2bodyWriterState) scheduleBodyWrite() { if s.timer == nil { // We're not doing a delayed write (see // getBodyWriterState), so just start the writing // goroutine immediately. go s.fn() return } http2traceWait100Continue(s.cs.trace) if s.timer.Stop() { s.timer.Reset(s.delay) } }
因为笔者的请求header中没有携带100-continue
标头,所以在前面的getBodyWriterState
函数中初始化的s.timer为nil即调用scheduleBodyWrite
会立即开始发送请求body。
7、轮询管道获取响应结果。
在看轮询源码之前,先看一个简单的函数:
handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) { res := re.res if re.err != nil || res.StatusCode > 299 { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } if re.err != nil { cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), re.err } res.Request = req res.TLS = cc.tlsState return res, false, nil }
该函数主要就是判断读到的响应是否正常,并根据响应的结果构造(*http2ClientConn).roundTrip
的返回值。
了解了handleReadLoopResponse
之后,下面就看看轮询的逻辑:
for { select { case re := <-readLoopResCh: return handleReadLoopResponse(re) // 此处省略代码(包含请求取消,请求超时等管道的轮询) case err := <-bodyWriter.resc: // Prefer the read loop's response, if available. Issue 16102. select { case re := <-readLoopResCh: return handleReadLoopResponse(re) default: } if err != nil { cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), err } bodyWritten = true if d := cc.responseHeaderTimeout(); d != 0 { timer := time.NewTimer(d) defer timer.Stop() respHeaderTimer = timer.C } } }
笔者仅对上面的第二种情况即请求body发送完成进行描述:
能否读到响应,如果能够读取响应则直接返回。
判断请求body是否发送成功,如果发送失败,直接返回。
如果请求body发送成功,则设置响应header的超时时间。
本文主要描述了两个方面的内容:
确认client和server都支持http2协议,并构建一个http2的连接,同时开启该连接的读循环。
通过http2连接池获取一个http2连接,并发送请求和读取响应。