okhttp源码分析(一)

OkHttp用法

1.同步

1
2
3
4
5
6
OkHttpClient client = new OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).build();
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
Call call = client.newCall(request);
call.execute();

2.异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
OkHttpClient client = new OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).build();
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});

请求源码分析(Dispatcher)

1.创建client对象

设置连接超时时间、读取超时时间、拦截器,创建dispatcher、连接池等OkHttpClient client = new OkHttpClient.Builder().build();

1
2
3
4
5
6
7
public Builder() {
//dispatcher中会创建三个队列来保存call对象,在4、5中介绍
dispatcher = new Dispatcher();
connectionPool = new ConnectionPool();
......
}
1
2
3
public OkHttpClient build() {
return new OkHttpClient(this);
}

2.创建Request对象

设置请求方法、请求链接、请求头部等Request request = new Request.Builder().url("http://www.baidu.com").build();

1
2
3
4
5
6
7
8
9
10
11
12
13
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
public Builder url(HttpUrl url) {
if (url == null) throw new NullPointerException("url == null");
this.url = url;
return this;
}
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}

3.创建Call对象

将request、client封装为一次请求Call call = client.newCall(request);

1
2
3
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false);
}
1
2
3
4
5
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
1
2
3
4
5
6
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

4.同步请求call.execute();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Response execute() throws IOException {
//同步,设置executed,保证一个call对象只被执行一次,保证线程安全
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//请求开始执行
eventListener.callStart(this);
try {
//调用dispatcher对象执行call
client.dispatcher().executed(this);
//通过拦截器链获取请求结果
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
//执行结束后通过dispatcher结束掉当前call对象
client.dispatcher().finished(this);
}
}

1.dispatcher的excuted方法将call对象放入runningSyncCalls队列中(runningSyncCalls为dispatcher中三个队列之一,用来保存执行中的同步请求)

1
2
3
4
5
6
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}

2.dispatcher的finish方法将call对象从runningSyncCalls队列中移除,并更新当前运行的call的数量(运行中的同步+异步call)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//移除call,未移除就抛出异常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//为false,不执行
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
//当所有call执行结束且空闲时要执行的runable不为空
idleCallback.run();
}
}

5.异步请求call.enqueue(mCallBack)

1.将responseCallback封装入AsyncCall对象,调用dispatcher的enqueue方法

1
2
3
4
5
6
7
8
9
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

2.接着介绍dispatcher中其他两个队列及线程池

1
2
3
4
5
6
//保存就绪的异步call
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//保存正在执行的异步call
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//可以为空,只同步执行时不需要
private @Nullable ExecutorService executorService;

当执行异步call时,会执行executorService()方法设置executorService线程池,该线程池无核心线程,非核心线程数量不限,线程超过60秒就被回收,无队列(SynchronousQueue无法放入),直接用非核心线程执行

1
2
3
4
5
6
7
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

3.调用enqueue方法后,判断当前是否能执行call,可以执行就将AsyncCall对象放入runningAsyncCalls队列,并将其加入线程池中进行执行,如果不可以就放入就绪队列,等待条件满足后执行

1
2
3
4
5
6
7
8
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

4.AsyncCall
AsyncCall继承自Runable接口,可以直接加入线程池中excute()方法会在NamedRunnable的void run()方法中调用,因此responseCallback.onFailure(RealCall.this, e);responseCallback.onResponse(RealCall.this, response);即异步请求传入的接口均在子线程中调用,不可操作UI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
...
protected void execute() {
//标记callBack是否被调用
boolean signalledCallback = false;
try {
//通过拦截器获取response
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//调用请求失败方法,线程池中
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//调用请求成功方法,线程池中
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}

5.dispatcher.finished(AsyncCall);

1
2
3
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}

从runningAsyncCalls中移除call,调用promoteCalls()方法从readyAsyncCalls中移入call执行,更新runningCallsCount,call数量为0且空闲runable不为空即执行该runable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//为true
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

Dispatcher总结

  1. 包含三个call队列,保存执行中的同步call,执行中的异步call,准备就绪的异步call
  2. 包含一个线程池,无核心线程,队列不可存放,无线非核心线程,有runable需要execute即创建新线程,或复用空闲中的线程,空闲线程超过60秒被回收
  3. 同步中,dispatcher用来存放执行中的call,finish执行中的call,调整执行中的call的数量
  4. 异步中,dispatcher用来存放执行中的call,finish执行中的call,调整执行中的call的数量,促进准备就绪的call执行