Administrator
发布于 2023-11-20 / 10 阅读
0
0

okhttp-原理解析-调用流程

调用过程

public final OkHttpClient client = new OkHttpClient.Builder()

                .addInterceptor(new HttpLoggingInterceptor()) 

                .cache(new Cache(cacheDir, cacheSize)) 

                .build();

//new request 

//client.newCall(request).execute()

//或 client.newCall(request).enqueue()

okhttpClient适合单例,它初始化了很多局部变量,比如Dispatcher,Cache ,CookieJar,等

构建OkHttpClient用到了建造器模式

RealCall.execute()

execute()是同步方法。实际是getResponseWithInterceptorChain()在执行真正的请求

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      e = timeoutExit(e);
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

RealCall.enqueue()

enqueue(),是异步方法

enqueue(Callback responseCallback),**观察者模式**

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));//开始执行
  }

AsyncCall 是个Runnable ,Dispatcher 内部有个ThreadPoolExecutor 执行线程

Dispatcher

其中Dispatcher将重点讲解,有三个集合,存储不同状态的Call。

 /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()
  
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()

根据Call 状态放到不同的队列中。

Dispatcher 内部有个线程池:核心线程池0,最大Max的线程池

@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

client.dispatcher().enqueue(new AsyncCall(responseCallback));

会加入readyAsyncCalls 中,再执行promoteAndExecute方法

void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
    }
    promoteAndExecute();
  }

进入promoteAndExecute

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

	//遍历readyAsyncCalls,找到可执行的方法;将找到的AsynCall 加入到runningAsyncCalls
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

	//挨个执行AsynCall
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService()); 
    }

    return isRunning;
  }

AsynCall.executeOn(executorService)

内部是让Dispatcher内部线程池执行Asyncall线程的

void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this); //线程池执行线程
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        eventListener.callFailed(RealCall.this, ioException);
        responseCallback.onFailure(RealCall.this, ioException); //异常时监听函数回调
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

当线程执行的时候,AsyncCall 的run方法会调用execute();

AsyncCall .execute()内部也调用了getResponseWithInterceptorChain();

@Override protected void execute() {
      boolean signalledCallback = false;
      timeout.enter();
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

## 重头戏来了:getResponseWithInterceptorChain()

构建拦截器栈,挨个执行拦截器的intercept(chain)并返回response

拦截器这里用到了**责任链模式**

InterceptorChain调用proceed() 挨个调用intercept(chain);且 intercept(chain)每次传递的chain是根据上一个状态的chain.copy来的,且少一层Interceptor的处理。

每个拦截器会先处理request对象,然后执行china.proceed(request) ,该方法放回response再处理

 @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    //构建拦截器栈
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
	//将拦截器栈放到拦截器责任链中(责任链模式)
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
    	//它会挨个拦截器.intercept(chain)执行过去
      val response = chain.proceed(originalRequest) //返回结果
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

拦截器执行顺序图

9e0e39d497ef4f35913011206b190178.png


评论