本文基于okhttp-3.14.9版本
相关核心类
Transmitter 发射机,OkHttp应用程序和网络层之间的桥梁,暴露出高级应用程序原语:连接、请求、响应、流ExchangeFinder 尝试查找一系列交换的连接Route 具体的路由被一个连接用来到达一个抽象的原服务器,创建一个连接到客户端有多个选项;HTTP代理、IP地址;其中每一个路由都是这些选项的具体选择Exchange 传输单个HTTP请求和响应对;这一层的连接管理和事件在ExchangeCodec,它处理真正的I/OExchangeCodec 编码HTTP请求和解码HTTP响应RouteSelector 选择要连接到源服务器的路由,每一个连接都需要选择代理服务器、IP地址和TLS模式,连接也可以回收本篇文章分析的就是这个ConnectInterceptor这个拦截器,此拦截器用来打开一个到服务器的连接并传递到下一个拦截器
/** Opens a connection to the target server and proceeds to the next interceptor. */ public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); } }核心代码就在transmitter.newExchange 和realChain.proceed上,查看Exchange类
/** * Transmits a single HTTP request and a response pair. This layers connection management and events * on {@link ExchangeCodec}, which handles the actual I/O. */ public final class Exchange { } /** Encodes HTTP requests and decodes HTTP responses. */ public interface ExchangeCodec { }Exchange传输单个HTTP请求和响应对; 这层的来凝结管理和事件都在ExchangeCodec,它负责处理真正的I/O,也就是对HTTP请求进行编码并且对HTTP响应进行解码
public final class Transmitter { /** Returns a new exchange to carry a new request and response. */ Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) { synchronized (connectionPool) { if (noMoreExchanges) { throw new IllegalStateException("released"); } if (exchange != null) { throw new IllegalStateException("cannot make a new request because the previous response " + "is still open: please call response.close()"); } } ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec); synchronized (connectionPool) { this.exchange = result; this.exchangeRequestDone = false; this.exchangeResponseDone = false; return result; } } }通过exchangeFinder.find获取一个ExchangeCodec,然后new了一个Exchange返回
final class ExchangeFinder { public ExchangeCodec find( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); int readTimeout = chain.readTimeoutMillis(); int writeTimeout = chain.writeTimeoutMillis(); int pingIntervalMillis = client.pingIntervalMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); return resultConnection.newCodec(client, chain); } catch (RouteException e) { trackFailure(); throw e; } catch (IOException e) { trackFailure(); throw new RouteException(e); } } } 发现一个正常的连接RealConnection根据RealConnection创建一个ExchangeCode final class ExchangeFinder { /** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0 && !candidate.isMultiplexed()) { return candidate; } } // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue; } return candidate; } } }查询连接,如果连接正常则返回,否则重复该过程直到查找到一个正常的连接
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); hasStreamFailure = false; // This is a fresh attempt. // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new exchanges. releasedConnection = transmitter.connection; toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null; if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection; releasedConnection = null; } if (result == null) { // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else if (nextRouteToTry != null) { selectedRoute = nextRouteToTry; nextRouteToTry = null; } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; } // If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } List<Route> routes = null; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } } // If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } // Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route()); Socket socket = null; synchronized (connectionPool) { connectingConnection = null; // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In // that case we will retry the route we just successfully connected with. nextRouteToTry = selectedRoute; } else { connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; } 方法介绍:返回一个承载新流的连接,如果现有的连接存在,则首选这个连接,然后是连接池里的连接,最后是构建一个新的连接从transmitter.connection获取到一个连接,如果不是null的那么赋值给result,返回否则从线程池内获取connectionPool.transmitterAcquirePooledConnection(address,transmitter,null,false)参数中路由是空的,不是多路复用获取连接,如果查询到一个连接,那么返回后续是需要选择路由,那么通过routeSelection.getAll获取所有的路由,通过通过connectionPool.transmitterAcquirePooledConnection(address,transmitter,routes,false),参数中不是多路复用,用来获取一个连接如果从第二次从连接池中没有发现连接,那么重新创建个新的连接,new RealConnection,然后进行TCP+TLS握手然后从连接池进行多路复用连接,connectionPool.transmitterAcquirePooledConnection(address,transmitter,routes,true)进行连接最后返回连接 public final class RealConnectionPool { /** * Attempts to acquire a recycled connection to {@code address} for {@code transmitter}. Returns * true if a connection was acquired. * * <p>If {@code routes} is non-null these are the resolved routes (ie. IP addresses) for the * connection. This is used to coalesce related domains to the same HTTP/2 connection, such as * {@code square.com} and {@code square.ca}. */ boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter, @Nullable List<Route> routes, boolean requireMultiplexed) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (requireMultiplexed && !connection.isMultiplexed()) continue; if (!connection.isEligible(address, routes)) continue; transmitter.acquireConnectionNoEvents(connection); return true; } return false; } }为了获取一个复用的连接;当一个连接可用时返回true; 如果routes是非null的,那么这些连接时解析的路由(IP地址).用来将相同域合并成同一个HTTP/2连接
未完,后续看了再来写,关于这个连接有点苦难