Administrator
发布于 2023-11-20 / 11 阅读
0
0

okhttp-原理解析-ConnectionPool

介绍

ConectionPool 是管理HTTP和HTTP/2连接的重用,以减少网络延迟。相同[地址]的HTTP请求可以共享[连接]。此类实现了保持打开以备将来使用的连接的策略。

ConnectionPool

以下可见,内部持有delegate(RealConnectionPool ),这是代理设计模式。创建了RealConnectionPool :最大空闲连接为5,最大空闲时长为5分钟时间

class ConnectionPool internal constructor(
  internal val delegate: RealConnectionPool //实际的连接
) {
  constructor(
    maxIdleConnections: Int,
    keepAliveDuration: Long,
    timeUnit: TimeUnit
  ) : this(RealConnectionPool(
      taskRunner = TaskRunner.INSTANCE,
      maxIdleConnections = maxIdleConnections,
      keepAliveDuration = keepAliveDuration,
      timeUnit = timeUnit
  ))

  //创建ConnectionPool :最大空闲连接为5,最大空闲时长为5分钟时间
  constructor() : this(5, 5, TimeUnit.MINUTES)

  /** 返回空闲连接的总量 */
  fun idleConnectionCount(): Int = delegate.idleConnectionCount()

  /** 返回pool中的总数量 */
  fun connectionCount(): Int = delegate.connectionCount()

  /** 关闭和移除所有空闲的链接 */
  fun evictAll() {
    delegate.evictAll()
  }
}

RealConnectionPool

RealConnectionPool 的成员变量

maxIdleConnections:Int

keepAliveDurationNs:Long

cleanupQueue:TaskQueue

cleanupTask:Task

connections:ConcurrentLinkedQueue<RealConnection>(java7.是分段锁)

class RealConnectionPool(
  taskRunner: TaskRunner,
  /** 1.最大空闲连接数量【每个地址对应自己的连接】*/
  private val maxIdleConnections: Int,
  keepAliveDuration: Long,
  timeUnit: TimeUnit
) {
//2.空闲时长(纳秒)
  private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)

  //3.清理任务栈
  private val cleanupQueue: TaskQueue = taskRunner.newQueue()
 //5.清理任务
  private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }

  /**
   * 在删除或添加时操作时需要持有锁 
   * 6.存放连接栈
   */
  private val connections = ConcurrentLinkedQueue<RealConnection>()

RealConnectionPool 关键方法:

callAcquirePooledConnection

方法目地:寻找连接池中是否有可复用的连接。

可复用的判断条件:该连接是isMultiplexed==true),且在分配限制内,noNewExchanges = false ,必须是http/2 ,共享IP地址,证书能覆盖该地址。

如果有call(RealCall),关联关系(call.connection = connection ,connection.calls.put(CallReference(this, callStackTrace))),并返回true;反之,return false 。

/**寻找连接池中是否有可复用的连接 */
fun callAcquirePooledConnection(
  address: Address,
  call: RealCall,
  routes: List<Route>?,
  requireMultiplexed: Boolean
): Boolean {
  for (connection in connections) {
    synchronized(connection) {
	  //可多路复用 且 连接是可多路复用(http2)
      if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
		//connection 是否可分配
      if (!connection.isEligible(address, routes)) return@synchronized
      call.acquireConnectionNoEvents(connection) //关联关系
      return true //找到了
    }
  }
  return false
}

connectionBecameIdle(connection: RealConnection): Boolean

通知连接池【该连接】变空闲。当返回true,该连接被移除并关闭

fun connectionBecameIdle(connection: RealConnection): Boolean {
  connection.assertThreadHoldsLock()
  //连接不在使用 或 连接池的最大空闲数量 == 0 ,就将【连接】移除
  return if (connection.noNewExchanges || maxIdleConnections == 0) {
    connection.noNewExchanges = true 
    connections.remove(connection)
    if (connections.isEmpty()) cleanupQueue.cancelAll()
    true
  } else { //执行清理任务
    cleanupQueue.schedule(cleanupTask)
    false
  }
}

put(connection: RealConnection)

加入任务栈,并执行清理任务

fun put(connection: RealConnection) {
  connection.assertThreadHoldsLock()

  connections.add(connection)
  cleanupQueue.schedule(cleanupTask)
}

evictAll()

逐出所有连接。

遍历连接栈。每个连接,当它的calls 为空就移除,并关闭连接的socket。

遍历结束,发现连接栈为空,就取消清理任务栈中的所有任务

fun evictAll() {
  val i = connections.iterator()
  while (i.hasNext()) {
    val connection = i.next()
    val socketToClose = synchronized(connection) {
      if (connection.calls.isEmpty()) {
        i.remove()
        connection.noNewExchanges = true
        return@synchronized connection.socket()
      } else {
        return@synchronized null
      }
    }
    socketToClose?.closeQuietly()
  }

  if (connections.isEmpty()) cleanupQueue.cancelAll()
}

cleanup()

清除空闲过久的连接。

遍历连接栈(concurrentLinkedQueue)。计算inUseConnectionCount(使用中的连接数),idleConnectionCount(空闲连接连接数),longestIdleConnection(最久空闲的连接),longestIdleDurationNs (最久空闲连接的空闲时长)

遍历结束,四种情况

1.结束遍历后,如果空闲连接数 超过限制的(5个) 或 最久空闲连接的时长 超过限制(5分钟),就移除。并返回0(马上执行下次清理任务)

2.空闲连接数>0 ,返回下次被移除连接时间

3.使用中的连接数>0,空闲连接数==0 ,返回下次被移除连接时间

4.栈中无连接,返回-1,不执行

fun cleanup(now: Long): Long {
  var inUseConnectionCount = 0
  var idleConnectionCount = 0
  var longestIdleConnection: RealConnection? = null
  var longestIdleDurationNs = Long.MIN_VALUE

  // Find either a connection to evict, or the time that the next eviction is due.
  for (connection in connections) {
    synchronized(connection) {
      // 如果连接还在使用继续搜索
      if (pruneAndGetAllocationCount(connection, now) > 0) {//返回存活calls的数量
        inUseConnectionCount++
      } else {
        idleConnectionCount++

        // If the connection is ready to be evicted, we're done.
        val idleDurationNs = now - connection.idleAtNs
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs
          longestIdleConnection = connection
        } else {
          Unit
        }
      }
    }
  }

  when {
	//1.空闲连接数 超过限制的(5个) 或 最久空闲连接的时长 超过限制(5分钟)
    longestIdleDurationNs >= this.keepAliveDurationNs
        || idleConnectionCount > this.maxIdleConnections -> {
	 //选择连接驱逐(evict)。再次确认是否被驱逐
      val connection = longestIdleConnection!!
      synchronized(connection) {
        if (connection.calls.isNotEmpty()) return 0L // No longer idle.
        if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // 不再是最久的
        connection.noNewExchanges = true
        connections.remove(longestIdleConnection)
      }

      connection.socket().closeQuietly() //socket 关闭
      if (connections.isEmpty()) cleanupQueue.cancelAll() 

      // Clean up again immediately. 清理任务再次马上执行
      return 0L
    }
	//2.空闲连接 > 0
    idleConnectionCount > 0 -> {
      // A connection will be ready to evict soon.
	  
      return keepAliveDurationNs - longestIdleDurationNs
    }
	//3.正在使用的连接 > 0
    inUseConnectionCount > 0 -> {
      // All connections are in use. It'll be at least the keep alive duration 'til we run
      // again.
      return keepAliveDurationNs //成员变量,保持alive时长
    }
	//4.没有连接,不执行
    else -> { 
      // No connections, idle or in use.
      return -1
    }
  }
}

pruneAndGetAllocationCount

cleanup()方法调用该方法

删除任何可能泄露的calls,并返回存活calls的数量。

如果连接正在跟踪调用,但应用程序代码已放弃,则调用会泄漏。依赖垃圾回收是不准确的,容易内存泄露


private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
  connection.assertThreadHoldsLock()

  val references = connection.calls // MutableList<Reference<RealCall>> , CallReference 继承 WeakReference

  var i = 0
  while (i < references.size) {
    val reference = references[i] 

    if (reference.get() != null) {
      i++
      continue
    }

    // 发现泄露call。这是application bug
    val callReference = reference as CallReference 
    val message = "A connection to ${connection.route().address.url} was leaked. " +
        "Did you forget to close a response body?"
    Platform.get().logCloseableLeak(message, callReference.callStackTrace)

    references.removeAt(i) //数组里面移除它,这里i不自增,继续循环
    connection.noNewExchanges = true //设为true, 将无新的交流,且要被移除pool

//这是最后一个call, 且移除。这个connection将可被驱逐
    if (references.isEmpty()) {
      connection.idleAtNs = now - keepAliveDurationNs
      return 0
    }
  }
 return references.size
}


评论