《主流网络请求框架 OkHttp 全方位详析》掘金地点
1、OkHttp先容
OkHttp 是一个开源的 HTTP 客户端库,由 Square 公司开辟并维护。它提供了轻便、高效的 API,用于处理惩罚网络请求和响应。OkHttp 基于 Java 编写,同时也提供了对 Kotlin 的精良支持。
OkHttp 提供了以下主要特性:
- 轻便易用的 API:OkHttp 提供了轻便而强大的 API,使得进行 HTTP 请求变得非常容易。通过构建 Request 对象和使用 Call 对象来发起同步或异步的网络请求。
- 支持 HTTP/2 和 SPDY:OkHttp 支持现代的 HTTP 协议,包罗 HTTP/2 和 SPDY,从而提供更快速和更有效率的网络通信。
- 连接池和缓存:OkHttp 内置了连接池和响应缓存,可以有效地管理和复用网络连接,并提供可设置的缓存机制,减少重复的网络请求。
- 拦截器:OkHttp 提供了拦截器的机制,允许开辟者在发送请求和接收响应的过程中进行自定义处理惩罚,例如添加公共参数、日记记录等。
- 支持 GZIP 压缩:OkHttp 支持接受息争压 GZIP 压缩的响应数据,减小网络传输的数据量,提升网络性能。
- 适配 Android 平台:OkHttp 在 Android 开辟中得到广泛应用,它提供了专门针对 Android 平台的优化,包罗性能、安全和稳固性方面的思量。
2、OkHttp基本使用与请求流程
基本使用
- val client = OkHttpClient.Builder()
- .callTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS)
- .connectTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS)
- .readTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS)
- .writeTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS)
- .retryOnConnectionFailure(true)
- .followRedirects(true)
- .followSslRedirects(true)
- .cache(null) // 设置缓存
- .authenticator(null) // 设置身份验证器
- .certificatePinner(null) // 设置证书锁定器
- .connectionPool(null) // 设置连接池
- .connectionSpecs(listOf()) // 设置连接规范
- .cookieJar(null) // 设置 Cookie 管理器
- .dispatcher(null) // 设置分发器
- .dns(null) // 设置 DNS 解析器
- .eventListenerFactory(null) // 设置事件监听器工厂
- .proxy(null) // 设置代理
- .protocols(listOf()) // 设置支持的协议
- .proxyAuthenticator(null) // 设置代理身份验证器
- .proxySelector(null) // 设置代理选择器
- .socketFactory(null) // 设置 Socket 工厂
- .sslSocketFactory(null) // 设置 SSL Socket 工厂
- .hostnameVerifier(null) // 设置主机名验证器
- .proxy(proxy) // 设置代理
- .build()
- val request = Request.Builder()
- .url(url)
- .header("xxx", "xxx")
- .addHeader("xxx", "xxx")
- .post(RequestBody.create(null, "XXX")) // 使用 POST 方法并传入请求体,不写默认为 GET 方法
- .cacheControl(okhttp3.CacheControl.FORCE_NETWORK) // 设置缓存控制
- .tag("custom-tag") // 设置标记
- .build()
- val call = client.newCall(request)
- // 构造 Call 对象之后就可以同步或异步请求,并处理结果了
- // 1、同步
- client.newCall(call).execute().use { response ->
- if (response.isSuccessful){
- Log.v("同步请求响应:${response.body?.string()}")
- }else{
- Log.e("同步请求失败")
- }
- }
- // 2、异步
- client.newCall(call).enqueue(object : Callback {
- override fun onFailure(call: Call, e: IOException) {
- Log.e("异步请求失败: ${e.message}")
- }
- override fun onResponse(call: Call, response: Response) {
- Log.v("异步请求响应:${response.body?.string()}")
- }
- })
复制代码 请求流程
3、分发器Dispatcher
3.1、异步请求分发流程
首先我们要知道异步请求有两个队列,ready 队列和 running 队列,前者用来记录等候执行的请求,后者用来记录正在执行的请求:
- // ready 队列
- private val readyAsyncCalls = ArrayDeque<AsyncCall>()
- // running 队列
- private val runningAsyncCalls = ArrayDeque<AsyncCall>()
复制代码 我们来看看 enqueue() 方法:
- internal fun enqueue(call: AsyncCall) {
- synchronized(this) {
- readyAsyncCalls.add(call)
- // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
- // the same host.
- if (!call.call.forWebSocket) {
- val existingCall = findExistingCallWithHost(call.host)
- if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
- }
- }
- promoteAndExecute()
- }
复制代码 此中 synchronized() 代码段中,先将请求加入 ready 队列,随后判定当前请求是否为 WebSocket,如果不是就调用 findExistingCallWithHost() 方法,在 running 和 ready 队列中查找与当前请求 Host 相同的请求,如果找到了,就让相同 Host 请求中的 callsPerHost 变量共享同一个对象,这个对象是 AtomicInteger 类型,用于对请求做一些限制,在下文有解释,可以先往下看。
synchronized() 代码段之外,执行的 promoteAndExecute() 方法是分发器分发异步请求的关键,先来看看源码:
- private fun promoteAndExecute(): Boolean {
- this.assertThreadDoesntHoldLock()
- val executableCalls = mutableListOf<AsyncCall>()
- val isRunning: Boolean
- synchronized(this) {
- val i = readyAsyncCalls.iterator()
- while (i.hasNext()) {
- val asyncCall = i.next()
- if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
- if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
- i.remove()
- asyncCall.callsPerHost.incrementAndGet()
- executableCalls.add(asyncCall)
- runningAsyncCalls.add(asyncCall)
- }
- isRunning = runningCallsCount() > 0
- }
- for (i in 0 until executableCalls.size) {
- val asyncCall = executableCalls[i]
- asyncCall.executeOn(executorService)
- }
- return isRunning
- }
复制代码 synchronized() 代码段中主要完成了检查 ready 队列,并将符合条件可执行的使命添加到 running 队列和 executableCalls 队列中,这个 executableCalls 生存的才是这次要执行的请求,同步代码块之外,就遍历 executableCalls 执行请求。
主要过程可以用下图表现:
此中 ThreadPool 阶段较为重要,是请求效率提升的关键,我们先往返首一下 Java 线程池的机制:
当一个使命通过 execute(Runnable) 方法添加到线程池时,有两种情况:
- 一、线程数量小于 corePoolSize,则新建线程(核心)来处理惩罚被添加的使命。
- 二、线程数量大于或等于 corePoolSize,则新使命被添加到等候队列,若添加失败:
- 线程数量小于 maximumPoolSize,新建线程执行新使命。
- 线程数量等于 maximumPoolSize,使用 RejectedExecutionHandler 拒绝计谋。
相识了 Java 线程池机制后,我们来看下方 OkHttp 源码,下方的 ExecutorService 对象就是 OkHttp 中的线程池,此中为了提升请求效率,则不能让新使命被添加到等候队列,而是要新建线程执行新使命,因此将 corePoolSize 设置为0,并传入无界队列 SynchronousQueue 使得每次添加新使命到队列都失败,则会新建线程去执行新使命。
- @get:JvmName("executorService") val executorService: ExecutorService
- get() {
- if (executorServiceOrNull == null) {
- executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
- SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
- }
- return executorServiceOrNull!!
- }
复制代码 3.2、异步分发限制
限制 1:正在运行的异步请求数不超过最大值,默认是64
- // 上面代码中:
- if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
复制代码 限制 2:正在运行的异步请求中相同 Host 的数量不超过最大值,默认是5,防止单个客户端与服务端连接太多造成巨大压力
- // 上面代码中:
- if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
复制代码 此中 callsPerHost 是 AtomicInteger 类型数据,记录的是全部请求中有多少个 Host 相同。
3.3、分发器处理惩罚同步请求
相识了分发器处理惩罚异步请求的过程之后,同步请求就很简朴了,同步请求只有一个 running 队列:
- // running 队列
- private val runningSyncCalls = ArrayDeque<RealCall>()
复制代码 Dispatcher 中的 execute() 方法仅将 Call 对象添加到 running 队列中:
- @Synchronized internal fun executed(call: RealCall) {
- runningSyncCalls.add(call)
- }
- override fun execute(): Response {
- check(executed.compareAndSet(false, true)) { "Already Executed" }
- timeout.enter()
- callStart()
- try {
- client.dispatcher.executed(this) // 这里调用的就是上方的 executed 方法
- return getResponseWithInterceptorChain()
- } finally {
- client.dispatcher.finished(this)
- }
- }
复制代码 此中值得注意的是上述代码块 finally 中的 client.dispatcher.finished(this),在新版本的 OkHttp 中,异步和同步的 finished() 方法都会触发 promoteAndExecute() 方法,遍历检查异步请求中的等候队列:
- internal fun finished(call: RealCall) {
- finished(runningSyncCalls, call)
- }
- private fun <T> finished(calls: Deque<T>, call: T) {
- val idleCallback: Runnable?
- synchronized(this) {
- if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
- idleCallback = this.idleCallback
- }
- val isRunning = promoteAndExecute() // 异步分发遍历检查
- if (!isRunning && idleCallback != null) {
- idleCallback.run()
- }
- }
复制代码 4、OkHttp拦截器责任链设计模式与功能概述
设计特点:请求从前去后,响应从后往前,根据 interceptors 中的拦截器顺序执行拦截,有点类似于递归的输出情势 。
功能概述:
- 重试重定向拦截器:在交给下一个拦截器之前,负责判定用户是否取消了请求;在获得响应之后,会根据响应码判定是否需要重定向,如果满足条件那么就会重启执行全部拦截器。
- 桥接拦截器:在交给下一个拦截器之前,负责将 HTTP 协议必备的请求头加入此中(如 Host)并添加一些默认的行为(如 GZIP 压缩);在获得响应之后,调用生存 cookie 接口并分析 GZIP 数据。
- 缓存拦截器:在交给下一个拦截器之前,读取并判定是否使用缓存;在获得响应之后判定是否缓存。
- 连接拦截器:在交给下一个拦截器之前,负责找到或新建一个连接,并获得对应的 socket 流;在获得响应之后不进行额外的处理惩罚。
- 请求服务器拦截器:与服务器进行通信,向服务器发送请求,分析读取响应数据。
5、OkHttp五大拦截器
5.1、重试重定向拦截器 (RetryAndFollowUpInterceptor)
这个拦截器主要是在请求出错之后重试或根据响应码重定向,先来看看 intercept() 方法中重试部分的源码:
- override fun intercept(chain: Interceptor.Chain): Response {
- ......
- ......
- try {
- if (call.isCanceled()) {
- throw IOException("Canceled")
- }
- try {
- response = realChain.proceed(request)
- newExchangeFinder = true
- } catch (e: RouteException) {
- // The attempt to connect via a route failed. The request will not have been sent.
- if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
- throw e.firstConnectException.withSuppressed(recoveredFailures)
- } else {
- recoveredFailures += e.firstConnectException
- }
- newExchangeFinder = false
- continue
- } catch (e: IOException) {
- // An attempt to communicate with a server failed. The request may have been sent.
- if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
- throw e.withSuppressed(recoveredFailures)
- } else {
- recoveredFailures += e
- }
- newExchangeFinder = false
- continue
- }
- ......
- ......
- }
复制代码 此中有非常之后就运行到 catch 代码块内进行重试,并根据 recover() 方法判定是否满足重试限制,如果满足,则将重试次数加一,并 continue 进入到下一次重试。
重试限制
上面提到了执行重试要根据 recover() 方法判定是否满足重试限制,那么执行重试有什么限制呢?让我们来看看 recover() 方法的源码:
- private fun recover(
- e: IOException,
- call: RealCall,
- userRequest: Request,
- requestSendStarted: Boolean
- ): Boolean {
- // OkHttpClient 配置不重试
- // The application layer has forbidden retries.
- if (!client.retryOnConnectionFailure) return false
- // 1、如果是 IO 异常(非 http2 中断异常)表示请求可能发出
- // 2、如果请求体配置为只能被使用一次(默认为 false,可重复使用)
- // We can't send the request body again.
- if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
- // 异常不重试:协议异常、IO 中断异常(除 Socket 读写超时之外),ssl 认证异常
- // This exception is fatal.
- if (!isRecoverable(e, requestSendStarted)) return false
- // 判断是否有更多的路线,没有则不重试,即时前面的条件都满足
- // No more routes to attempt.
- if (!call.retryAfterFailure()) return false
- // For failure recovery, use the same route selector with a new connection.
- return true
- }
复制代码 以上限制条件按顺序可表现为下图:
重定向规则
执行完上述重试代码块之后,会判定是否需要重定向,那么在 RetryAndFollowUpInterceptor 中,什么情况才会触发重定向呢?我们来看看重试重定向拦截器 intercept() 方法中重定向部分的源码:
- override fun intercept(chain: Interceptor.Chain): Response {
- ......
- ......
- val exchange = call.interceptorScopedExchange
- val followUp = followUpRequest(response, exchange)
- if (followUp == null) {
- if (exchange != null && exchange.isDuplex) {
- call.timeoutEarlyExit()
- }
- closeActiveExchange = false
- return response
- }
- val followUpBody = followUp.body
- if (followUpBody != null && followUpBody.isOneShot()) {
- closeActiveExchange = false
- return response
- }
- response.body?.closeQuietly()
- if (++followUpCount > MAX_FOLLOW_UPS) {
- throw ProtocolException("Too many follow-up requests: $followUpCount")
- }
- request = followUp // 这里用 RetryAndFollowUpInterceptor 内部成员 request 接收 followUp 之后,又会运行到外部循环,进行重定向
- priorResponse = response
-
- ......
- ......
- }
复制代码 此中,最关键的一句是val followUp = followUpRequest(response, exchange),它的返回值决定了 followUp 是否为 null,如果为 null,则不会重定向。
那么,让我们来看看 followUpRequest() 方法的源码:
- private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
- val route = exchange?.connection?.route()
- val responseCode = userResponse.code
- val method = userResponse.request.method
- when (responseCode) {
- HTTP_PROXY_AUTH -> {
- ......
- }
- HTTP_UNAUTHORIZED -> ......
- HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
- ......
- }
- HTTP_CLIENT_TIMEOUT -> {
- ......
- }
- HTTP_UNAVAILABLE -> {
- ......
- }
- HTTP_MISDIRECTED_REQUEST -> {
- ......
- }
- else -> return null
- }
- }
复制代码 此方法中主要是根据响应码来判定是否满足重定向条件,由上到下依次如下表格所示:
响应码分析执行重定向要满足的条件HTTP_PROXY_AUTH(407)代理需要授权,如付费代理,需要验证身份通过 proxyAuthenticator 获得到了 Request。
例如添加 Proxy-Authorization 请求头HTTP_UNAUTHORIZED(401)服务器需要授权,如某些接口需要登录才华使用(不安全,基本上没用了)通过 authenticator 获得到了 Request。
例如添加 Authorization 请求头HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER(3XX)重定向响应OkHttpClient 设置了允许重定向(OkHttpClient.Builder().followRedirects(true),默认为 true)HTTP_CLIENT_TIMEOUT(408)请求超时1、OkHttpClient 设置了重试(默认允许重试)
2、本次请求的结果不是响应408的重试结果,即不能一连两次响应408,否则第二次不再重试
3、服务器未响应 Retry-After 大概响应 Retry-After:0HTTP_UNAVAILABLE(503)服务不可用1、本次请求结果不是响应503的重试结果,和上述408相似
2、服务器明确响应 Rerty-After:0,即立即重试HTTP_MISDIRECTED_REQUEST(421)从当前客户端所在的 IP 地点到服务器的连接数超过了服务器许可的最大范围自动再次使用另一个连接对象发起请求 5.2、桥接拦截器 (BridgeInterceptor)
桥接拦截器主要是设置了默认请求头,比如 Host 是 HTTP 中必备的请求头,但是我们平时并不会给 request 设置 Host 请求头,然而我们依然能用,缘故原由就是在桥接拦截器为我们自动设置了 Host 请求头。
来看看 BridgeInterceptor 中 intercept 源码:
- override fun intercept(chain: Interceptor.Chain): Response {
- val userRequest = chain.request()
- val requestBuilder = userRequest.newBuilder()
- val body = userRequest.body
- if (body != null) {
- val contentType = body.contentType()
- if (contentType != null) {
- requestBuilder.header("Content-Type", contentType.toString())
- }
- val contentLength = body.contentLength()
- if (contentLength != -1L) {
- requestBuilder.header("Content-Length", contentLength.toString())
- requestBuilder.removeHeader("Transfer-Encoding")
- } else {
- requestBuilder.header("Transfer-Encoding", "chunked")
- requestBuilder.removeHeader("Content-Length")
- }
- }
- if (userRequest.header("Host") == null) {
- requestBuilder.header("Host", userRequest.url.toHostHeader())
- }
- if (userRequest.header("Connection") == null) {
- requestBuilder.header("Connection", "Keep-Alive")
- }
- // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
- // the transfer stream.
- var transparentGzip = false
- if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
- transparentGzip = true
- requestBuilder.header("Accept-Encoding", "gzip")
- }
- val cookies = cookieJar.loadForRequest(userRequest.url)
- if (cookies.isNotEmpty()) {
- requestBuilder.header("Cookie", cookieHeader(cookies))
- }
- if (userRequest.header("User-Agent") == null) {
- requestBuilder.header("User-Agent", userAgent)
- }
- val networkResponse = chain.proceed(requestBuilder.build())
- cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
- val responseBuilder = networkResponse.newBuilder()
- .request(userRequest)
- if (transparentGzip &&
- "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
- networkResponse.promisesBody()) {
- val responseBody = networkResponse.body
- if (responseBody != null) {
- val gzipSource = GzipSource(responseBody.source())
- val strippedHeaders = networkResponse.headers.newBuilder()
- .removeAll("Content-Encoding")
- .removeAll("Content-Length")
- .build()
- responseBuilder.headers(strippedHeaders)
- val contentType = networkResponse.header("Content-Type")
- responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
- }
- }
- return responseBuilder.build()
- }
复制代码 此中除了设置请求头,还有比较重要的三个作用:
- 发出请求之前执行val cookies = cookieJar.loadForRequest(userRequest.url)加载 cookies。
- 获取响应之后执行cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)生存 cookies。
- 如果服务端响应的数据是 GZIP 格式压缩的,则执行val gzipSource = GzipSource(responseBody.source())进行 GZIP 解压。
5.3、缓存拦截器 (CacheInterceptor)
在相识缓存拦截器之前,先来简朴相识下 HTTP 的缓存机制,主要分为以下两种:
- 强缓存:客户端不会将请求发送给服务器,而是将本地缓存响应出去。强缓存是使用 HTTP 的响应头中的 Expires 大概 Cache-Control两个字段控制的,用来表现资源的缓存时间。
- 协商缓存:客户端会将请求发送给服务器。服务器根据请求头中的 Last-Modify/If-Modify-Since 或 Etag/If-None-Match 来判定协商结果,如果协商成功,即资源尚未改变,则返回304,告诉客户端可以从缓存中加载资源,如果不成功,则返回新的资源。
相识完以上机制之后,我们可以看看缓存拦截器中是怎么处理惩罚本地缓存和响应数据的,源码:
- override fun intercept(chain: Interceptor.Chain): Response {
- val call = chain.call()
- val cacheCandidate = cache?.get(chain.request())
- val now = System.currentTimeMillis()
- val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
- val networkRequest = strategy.networkRequest
- val cacheResponse = strategy.cacheResponse
- cache?.trackResponse(strategy)
- val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
- if (cacheCandidate != null && cacheResponse == null) {
- // The cache candidate wasn't applicable. Close it.
- cacheCandidate.body?.closeQuietly()
- }
- // If we're forbidden from using the network and the cache is insufficient, fail.
- if (networkRequest == null && cacheResponse == null) {
- ......
- }
- // If we don't need the network, we're done.
- if (networkRequest == null) {
- ......
- }
- if (cacheResponse != null) {
- listener.cacheConditionalHit(call, cacheResponse)
- } else if (cache != null) {
- listener.cacheMiss(call)
- }
- var networkResponse: Response? = null
- try {
- networkResponse = chain.proceed(networkRequest)
- } finally {
- // If we're crashing on I/O or otherwise, don't leak the cache body.
- if (networkResponse == null && cacheCandidate != null) {
- cacheCandidate.body?.closeQuietly()
- }
- }
- // If we have a cache response too, then we're doing a conditional get.
- if (cacheResponse != null) {
- if (networkResponse?.code == HTTP_NOT_MODIFIED) {
- ......
- } else {
- cacheResponse.body?.closeQuietly()
- }
- }
- val response = networkResponse!!.newBuilder()
- .cacheResponse(stripBody(cacheResponse))
- .networkResponse(stripBody(networkResponse))
- .build()
- if (cache != null) {
- ......
- }
- return response
- }
复制代码 此中最关键的代码是:
- val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
- val networkRequest = strategy.networkRequest
- val cacheResponse = strategy.cacheResponse
复制代码 获取缓存计谋 strategy,并根据此中 networkRequest 和 cacheResponse 变量的值进行后续操作,处理惩罚规则如下表格所示:
networkRequestcacheResponse处理惩罚方式NullNot Null直接使用缓存(强缓存)Not NullNull向服务器发起请求(协商缓存或普通请求)NullNull要求使用缓存,但是没有缓存,则 OkHttp 直接在本地创建一个响应,返回504Not NullNot Null发起请求,若得到响应为304(表现资源未修改),则更新缓存并返回(协商缓存) 5.4、连接拦截器 (ConnectInterceptor)
连接拦截器主要是获取一个连接,如果无法用缓存,则会用这个连接进行网络请求,ConnectInterceptor 的 intercept() 方法代码只有四行:
- override fun intercept(chain: Interceptor.Chain): Response {
- val realChain = chain as RealInterceptorChain
- val exchange = realChain.call.initExchange(chain)
- val connectedChain = realChain.copy(exchange = exchange)
- return connectedChain.proceed(realChain.request)
- }
复制代码 学习连接拦截器主要有两个要点,新建连接和连接池:
新建连接
新建连接主要是通过执行 val exchange = realChain.call.initExchange(chain) 这行代码,进入 initExchange() 方法内部:
- internal fun initExchange(chain: RealInterceptorChain): Exchange {
- synchronized(this) {
- check(expectMoreExchanges) { "released" }
- check(!responseBodyOpen)
- check(!requestBodyOpen)
- }
- val exchangeFinder = this.exchangeFinder!!
- val codec = exchangeFinder.find(client, chain)
- val result = Exchange(this, eventListener, exchangeFinder, codec)
- this.interceptorScopedExchange = result
- this.exchange = result
- synchronized(this) {
- this.requestBodyOpen = true
- this.responseBodyOpen = true
- }
- if (canceled) throw IOException("Canceled")
- return result
- }
复制代码 此中比较重要的是 val codec = exchangeFinder.find(client, chain),这里的 exchangeFinder 用的是重试重定向拦截器中 call.enterNetworkInterceptorExchange(request, newExchangeFinder) 这句代码传入的 newExchangeFinder,find() 方法内部判定请求的 HTTP 协议版本并返回连接。
连接池
连接池就是一个生存连接的容器,提供了 get() 和 put() 方法取出连接和生存连接,可以在创建 OkHttpClient 时自定义连接池:
- val cp = ConnectionPool(5, 5, TimeUnit.MINUTES)
- OkHttpClient.Builder().connectionPool(cp)
复制代码 ConnectionPool 构造方法中传入的三个参数分别为:
- maxIdleConnections 连接池最大允许的空闲连接数
- keepAliveDuration 连接最大允许的空闲时间
- timeUnit 时间单位
为什么要设置最大允许空闲连接数和最大允许空闲时间呢,这是为了实现连接池中的重要机制:cleanUp(),下面我们来相识一下这个机制:
- fun cleanup(now: Long): Long {
- var inUseConnectionCount = 0
- var idleConnectionCount = 0
- var longestIdleConnection: RealConnection? = null
- var longestIdleDurationNs = Long.MIN_VALUE
- // Find either a connection to evict, or the time that the next eviction is due.
- for (connection in connections) {
- synchronized(connection) {
- // If the connection is in use, keep searching.
- if (pruneAndGetAllocationCount(connection, now) > 0) {
- inUseConnectionCount++
- } else {
- idleConnectionCount++
- // If the connection is ready to be evicted, we're done.
- val idleDurationNs = now - connection.idleAtNs
- if (idleDurationNs > longestIdleDurationNs) {
- longestIdleDurationNs = idleDurationNs
- longestIdleConnection = connection
- } else {
- Unit
- }
- }
- }
- }
- when {
- longestIdleDurationNs >= this.keepAliveDurationNs
- || idleConnectionCount > this.maxIdleConnections -> {
- // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it.
- val connection = longestIdleConnection!!
- synchronized(connection) {
- if (connection.calls.isNotEmpty()) return 0L // No longer idle.
- if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest.
- connection.noNewExchanges = true
- connections.remove(longestIdleConnection)
- }
- connection.socket().closeQuietly()
- if (connections.isEmpty()) cleanupQueue.cancelAll()
- // Clean up again immediately.
- return 0L
- }
- idleConnectionCount > 0 -> {
- // A connection will be ready to evict soon.
- return keepAliveDurationNs - longestIdleDurationNs
- }
- inUseConnectionCount > 0 -> {
- // All connections are in use. It'll be at least the keep alive duration 'til we run
- // again.
- return keepAliveDurationNs
- }
- else -> {
- // No connections, idle or in use.
- return -1
- }
- }
- }
复制代码- private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
- override fun runOnce() = cleanup(System.nanoTime())
- }
复制代码- fun put(connection: RealConnection) {
- connection.assertThreadHoldsLock()
- connections.add(connection)
- cleanupQueue.schedule(cleanupTask)
- }
复制代码 从这三段代码中可以看到连接池内部有一个 cleanupTask 变量来生存定时整理使命,而每次调用连接池的 put() 方法时,都会启动定时整理使命,内部的整理逻辑就是根据 ConnectionPool 的构造函数中传入的三个参数决定的。
5.5、请求服务拦截器 (CallServerInterceptor)
这个拦截器作用主要是向服务器发起请求并天生 Response 对象给客户端。intercept() 方法主要源码:
- override fun intercept(chain: Interceptor.Chain): Response {
- ......
- ......
- try {
- // 1)
- exchange.writeRequestHeaders(request)
- // 2)
- if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
- ......
- ......
- } else {
- exchange.noRequestBody()
- }
- if (requestBody == null || !requestBody.isDuplex()) {
- exchange.finishRequest()
- }
- } catch (e: IOException) {
- if (e is ConnectionShutdownException) {
- throw e // No request was sent so there's no response to read.
- }
- if (!exchange.hasFailure) {
- throw e // Don't attempt to read the response; we failed to send the request.
- }
- sendRequestException = e
- }
- // 3)
- try {
- if (responseBuilder == null) {
- responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
- if (invokeStartEvent) {
- exchange.responseHeadersStart()
- invokeStartEvent = false
- }
- }
- var response = responseBuilder
- .request(request)
- .handshake(exchange.connection.handshake())
- .sentRequestAtMillis(sentRequestMillis)
- .receivedResponseAtMillis(System.currentTimeMillis())
- .build()
- var code = response.code
- ......
- ......
- } catch (e: IOException) {
- if (sendRequestException != null) {
- sendRequestException.addSuppressed(e)
- throw sendRequestException
- }
- throw e
- }
- }
复制代码 主要可以分为以下三个部分:
1)发送请求头
exchange 就是 ConnectInterceptor 中获取的连接对象,用这个对象调用 exchange.writeRequestHeaders(request) 方法向服务器写入请求头。
2)判定是否需要发送请求体,GET 请求和 HEAD 请求直接跳过这步
发送完请求头之后,如果是 POST 请求或其他需要请求体的请求,就会发送请求体,此中 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) 就是来进行这步判定。
3)读取结果并天生 Response 对象
之后就根据响应头和响应体以及 HTTP 协议版本天生对应的 Response 对象并返回给用户使用。
6、自定义拦截器
自定义拦截器就是我们在创建 OkHttpClient 时可以自定义添加的拦截器,可以用来打印日记大概查看网络状态等。
- OkHttpClient.Builder()
- .addInterceptor {}
- .addNetworkInterceptor {}
复制代码 值得注意的是,在自定义拦截器的内部肯定要执行 chain.proceed(Request) 方法,由于我们前面提到了,拦截器是责任链设计模式,一层一层通报,如果不调用 proceed() 方法,则请求链会断开,请求也就不能正确执行。
我们知道不论同步还是异步请求,都会调用到 getResponseWithInterceptorChain() 方法获取拦截器链完成请求:
- internal fun getResponseWithInterceptorChain(): Response {
- // Build a full stack of interceptors.
- val interceptors = mutableListOf<Interceptor>()
- interceptors += client.interceptors // 自定义应用拦截器
- interceptors += RetryAndFollowUpInterceptor(client)
- interceptors += BridgeInterceptor(client.cookieJar)
- interceptors += CacheInterceptor(client.cache)
- interceptors += ConnectInterceptor
- if (!forWebSocket) {
- interceptors += client.networkInterceptors // 自定义网络拦截器
- }
- interceptors += CallServerInterceptor(forWebSocket)
- ......
- ......
- }
复制代码 根据此方法中拦截器的添加顺序,我们添加的自定义拦截器有以下特点:
- 应用拦截器是第一个得到请求,最后一个得到响应,因此我们可以在这里对 request 进行操作。
- 网络拦截器是倒数第二个得到请求,第二个得到响应,在这个拦截器中得到的是最后真正发给服务器的 request,得到响应后可以对 response 进行操作。
|