当前位置 博文首页 > 如何用python实现一个HTTP连接池

    如何用python实现一个HTTP连接池

    作者:终末之冬 时间:2021-02-10 18:04

    一. 连接池的原理

      首先, HTTP连接是基于TCP连接的, 与服务器之间进行HTTP通信, 本质就是与服务器之间建立了TCP连接后, 相互收发基于HTTP协议的数据包. 因此, 如果我们需要频繁地去请求某个服务器的资源, 我们就可以一直维持与个服务器的TCP连接不断开, 然后在需要请求资源的时候, 把连接拿出来用就行了.

      一个项目可能需要与服务器之间同时保持多个连接, 比如一个爬虫项目, 有的线程需要请求服务器的网页资源, 有的线程需要请求服务器的图片等资源, 而这些请求都可以建立在同一条TCP连接上.

      因此, 我们使用一个管理器来对这些连接进行管理, 任何程序需要使用这些连接时, 向管理器申请就可以了, 等到用完之后再将连接返回给管理器, 以供其他程序重复使用, 这个管理器就是连接池.

    二. 实现代码

    1. HTTPConnectionPool类

      基于上一章的分析, 连接池应该是一个收纳连接的容器, 同时对这些连接有管理能力:

    class HTTPConnectionPool:
    
     def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None:
      """
      :param host: pass
      :param port: pass
      :param max_size: 同时存在的最大连接数, 默认None->连接数无限,没了就创建
      :param idle_timeout: 单个连接单次最长空闲时间,超时自动关闭,默认None->不限时
      """
      self.host = host
      self.port = port
      self.max_size = max_size
      self.idle_timeout = idle_timeout
      self._lock = threading.Condition()
      self._pool = []
      # 这里的conn_num指的是总连接数,包括其它线程拿出去正在使用的连接
      self.conn_num = 0
      self.is_closed = False
    
     def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
      ...
    
     def release(self, conn: WrapperHTTPConnection) -> None:
      ...

      因此, 我们定义这样一个HTTPConnectionPool类, 使用一个列表来保存可用的连接. 对于外部来说, 只需要调用这个连接池对象的acquire和release方法就能取得和释放连接.

    2. 线程安全地管理连接

      对于线程池内部来说, 至少需要三个关于连接的操作: 从连接池中取得连接, 将连接放回连接池, 以及创建一个连接:

    def _get_connection(self) -> WrapperHTTPConnection:
     # 这个方法会把连接从_idle_conn移动到_used_conn列表中,并返回这个连接
     try:
      return self._pool.pop()
     except IndexError:
      raise EmptyPoolError
    
    
    def _put_connection(self, conn: WrapperHTTPConnection) -> None:
     self._pool.append(conn)
    
    
    def _create_connection(self) -> WrapperHTTPConnection:
     self.conn_num += 1
     return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))

      对于连接池外部来说, 主要有申请连接和释放连接这两个操作, 实际上这就是个简单的生产者消费者模型. 考虑到外部可能是多线程的环境, 我们使用threading.Condition来保证线程安全. 关于Condition的资料可以看这里.

    def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
     if self.is_closed:
      raise ConnectionPoolClosed
     with self._lock:
      if self.max_size is None or not self.is_full():
       # 在还能创建新连接的情况下,如果没有空闲连接,直接创建一个就行了
       if self.is_pool_empty():
        self._put_connection(self._create_connection())
      else:
       # 不能创建新连接的情况下,如果设置了blocking=False,没连接就报错
       # 否则,就基于timeout进行阻塞,直到超时或者有可用连接为止
       if not blocking:
        if self.is_pool_empty():
         raise EmptyPoolError
       elif timeout is None:
        while self.is_pool_empty():
         self._lock.wait()
       elif timeout < 0:
        raise ValueError("'timeout' must be a non-negative number")
       else:
        end_time = time.time() + timeout
        while self.is_pool_empty():
         remaining = end_time - time.time()
         if remaining <= 0:
          raise EmptyPoolError
         self._lock.wait(remaining)
      # 走到这一步了,池子里一定有空闲连接
      return self._get_connection()
    
    
    def release(self, conn: WrapperHTTPConnection) -> None:
     if self.is_closed:
      # 如果这个连接是在连接池关闭后才释放的,那就不用回连接池了,直接放生
      conn.close()
      return
     # 实际上,python列表的append操作是线程安全的,可以不加锁
     # 这里调用锁是为了通过notify方法通知其它正在wait的线程:现在有连接可用了
     with self._lock:
      if not conn.is_available:
       # 如果这个连接不可用了,就应该创建一个新连接放进去,因为可能还有其它线程在等着连接用       conn.close()       self.conn_num -= 1
       conn = self._create_connection()
      self._put_connection(conn)
      self._lock.notify()

      我们首先看看acquire方法, 这个方法其实就是在申请到锁之后调用内部的_get_connection方法获取连接, 这样就线程安全了. 需要注意的是, 如果当前的条件无法获取连接, 就会调用条件变量的wait方法, 及时释放锁并阻塞住当前线程. 然后, 当其它线程作为生产者调用release方法释放连接时, 会触发条件变量的notify方法, 从而唤醒一个阻塞在wait阶段的线程, 即消费者. 这个消费者再从池中取出刚放回去的线程, 这样整个生产者消费者模型就运转起来了.

    3. 上下文管理器

      对于一个程序来说, 它使用连接池的形式是获取连接->使用连接->释放连接. 因此, 我们应该通过with语句来管理这个连接, 以免在程序的最后遗漏掉释放连接这一步骤.

      基于这个原因, 我们通过一个WrapperHTTPConnection类来对HTTPConnection进行封装, 以实现上下文管理器的功能. HTTPConnection的代码可以看《用python实现一个HTTP客户端》这篇文章.

    class WrapperHTTPConnection:
    
     def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
      self.pool = pool
      self.conn = conn
      self.response = None
      self.is_available = True
    
     def __enter__(self) -> 'WrapperHTTPConnection':
      return self
    
     def __exit__(self, *exit_info: Any) -> None:
      # 如果response没读完并且连接需要复用,就弃用这个连接
      if not self.response.will_close and not self.response.is_closed():
       self.close()
      self.pool.release(self)
    
     def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
      self.conn.request(*args, **kwargs)
      self.response = self.conn.get_response()
      return self.response
    
     def close(self) -> None:
      self.conn.close()
      self.is_available = False

      同样的, 连接池可能也需要关闭, 因此我们给连接池也加上上下文管理器的功能:

    class HTTPConnectionPool:
     ...
    
     def close(self) -> None:
      if self.is_closed:
       return
      self.is_closed = True
      pool, self._pool = self._pool, None
      for conn in pool:
       conn.close()
    
     def __enter__(self) -> 'HTTPConnectionPool':
      return self
    
     def __exit__(self, *exit_info: Any) -> None:
      self.close()

      这样, 我们就可以通过with语句优雅地管理连接池了:

    with HTTPConnectionPool(**kwargs) as pool:
     with pool.acquire() as conn:
      res = conn.request('GET', '/')
      ...

    4. 定时清理连接

      如果一个连接池的所需连接数是随时间变化的, 那么就会出现一种情况: 在高峰期, 我们创建了非常多的连接, 然后进入低谷期之后, 连接过剩, 大量的连接处于空闲状态, 浪费资源. 因此, 我们可以设置一个定时任务, 定期清理空闲时间过长的连接, 减少连接池的资源占用.

      首先, 我们需要为连接对象添加一个last_time属性, 每当连接释放进入连接池后, 就修改这个属性的值为当前时间, 这样我们就能明确知道, 连接池内的每个空闲连接空闲了多久:

    class WrapperHTTPConnection:
     ...
    
     def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
      ...
      self.last_time = None
    
    
    class HTTPConnectionPool:
     ...
    
     def _put_connection(self, conn: WrapperHTTPConnection) -> None:
      conn.last_time = time.time()
      self._pool.append(conn)

       然后, 我们通过threading.Timer来实现一个定时任务:

    def start_clear_conn(self) -> None:
     if self.idle_timeout is None:
      # 如果空闲连接的超时时间为无限,那么就不应该清理连接
      return
     self.clear_idle_conn()
     self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
     self._clearer.start()
    
    
    def stop_clear_conn(self) -> None:
     if self._clearer is not None:
      self._clearer.cancel()

      threading.Timer只会执行一次定时任务, 因此, 我们需要在start_clear_conn中不断地把自己设置为定时任务. 这其实等同于新开了一个线程来执行start_clear_conn方法, 因此并不会出现递归过深问题. 不过需要注意的是, threading.Timer虽然不会阻塞当前线程, 但是却会阻止当前线程结束, 就算把它设置为守护线程都不行, 唯一可行的办法就是调用stop_clear_conn方法取消这个定时任务.

      最后, 我们定义clear_idle_conn方法来清理闲置时间超时的连接:

    def clear_idle_conn(self) -> None:
     if self.is_closed:
      raise ConnectionPoolClosed
     # 这里开一个新线程来清理空闲连接,避免了阻塞主线程导致的定时精度出错
     threading.Thread(target=self._clear_idle_conn).start()
    
    
    def _clear_idle_conn(self) -> None:
     if not self._lock.acquire(timeout=self.idle_timeout):
      # 因为是每隔self.idle_timeout秒检查一次
      # 如果过了self.idle_timeout秒还没申请到锁,下一次都开始了,本次也就不用继续了
      return
     current_time = time.time()
     if self.is_pool_empty():
      pass
     elif current_time - self._pool[-1].last_time >= self.idle_timeout:
      # 这里处理下面的二分法没法处理的边界情况,即所有连接都闲置超时的情况
      self.conn_num -= len(self._pool)
      self._pool.clear()
     else:
      # 通过二分法找出从左往右第一个不超时的连接的指针
      left, right = 0, len(self._pool) - 1
      while left < right:
       mid = (left + right) // 2
       if current_time - self._pool[mid].last_time >= self.idle_timeout:
        left = mid + 1
       else:
        right = mid
      self._pool = self._pool[left:]
      self.conn_num -= left
     self._lock.release()

      由于我们获取和释放连接都是从self._pool的尾部开始操作的, 因此self._pool这个容器是一个先进后出队列, 它里面放着的连接, 一定是越靠近头部的闲置时间越长, 从头到尾闲置时间依次递减. 基于这个原因, 我们使用二分法来找出列表中第一个没有闲置超时的连接, 然后把在它之前的连接一次性删除, 这样就能达到O(logN)的时间复杂度, 算是一种比较高效的方法. 需要注意的是, 如果连接池内所有的连接都是超时的, 那么这种方法是删不干净的, 需要对这种边界情况单独处理.

    三. 总结

    1. 完整代码及分析

      这个连接池的完整代码如下:

    import threading
    import time
    from typing import Any
    
    from client import HTTPConnection, HTTPResponse
    
    
    class WrapperHTTPConnection:
    
     def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
      self.pool = pool
      self.conn = conn
      self.response = None
      self.last_time = time.time()
      self.is_available = True
    
     def __enter__(self) -> 'WrapperHTTPConnection':
      return self
    
     def __exit__(self, *exit_info: Any) -> None:
      # 如果response没读完并且连接需要复用,就弃用这个连接
      if not self.response.will_close and not self.response.is_closed():
       self.close()
      self.pool.release(self)
    
     def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
      self.conn.request(*args, **kwargs)
      self.response = self.conn.get_response()
      return self.response
    
     def close(self) -> None:
      self.conn.close()
      self.is_available = False
    
    
    class HTTPConnectionPool:
    
     def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None:
      """
      :param host: pass
      :param port: pass
      :param max_size: 同时存在的最大连接数, 默认None->连接数无限,没了就创建
      :param idle_timeout: 单个连接单次最长空闲时间,超时自动关闭,默认None->不限时
      """
      self.host = host
      self.port = port
      self.max_size = max_size
      self.idle_timeout = idle_timeout
      self._lock = threading.Condition()
      self._pool = []
      # 这里的conn_num指的是总连接数,包括其它线程拿出去正在使用的连接
      self.conn_num = 0
      self.is_closed = False
      self._clearer = None
      self.start_clear_conn()
    
     def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
      if self.is_closed:
       raise ConnectionPoolClosed
      with self._lock:
       if self.max_size is None or not self.is_full():
        # 在还能创建新连接的情况下,如果没有空闲连接,直接创建一个就行了
        if self.is_pool_empty():
         self._put_connection(self._create_connection())
       else:
        # 不能创建新连接的情况下,如果设置了blocking=False,没连接就报错
        # 否则,就基于timeout进行阻塞,直到超时或者有可用连接为止
        if not blocking:
         if self.is_pool_empty():
          raise EmptyPoolError
        elif timeout is None:
         while self.is_pool_empty():
          self._lock.wait()
        elif timeout < 0:
         raise ValueError("'timeout' must be a non-negative number")
        else:
         end_time = time.time() + timeout
         while self.is_pool_empty():
          remaining = end_time - time.time()
          if remaining <= 0:
           raise EmptyPoolError
          self._lock.wait(remaining)
       # 走到这一步了,池子里一定有空闲连接
       return self._get_connection()
    
     def release(self, conn: WrapperHTTPConnection) -> None:
      if self.is_closed:
       # 如果这个连接是在连接池关闭后才释放的,那就不用回连接池了,直接放生
       conn.close()
       return
      # 实际上,python列表的append操作是线程安全的,可以不加锁
      # 这里调用锁是为了通过notify方法通知其它正在wait的线程:现在有连接可用了
      with self._lock:
       if not conn.is_available:
        # 如果这个连接不可用了,就应该创建一个新连接放进去,因为可能还有其它线程在等着连接用
        conn.close()
        self.conn_num -= 1
        conn = self._create_connection()
       self._put_connection(conn)
       self._lock.notify()
    
     def _get_connection(self) -> WrapperHTTPConnection:
      # 这个方法会把连接从_idle_conn移动到_used_conn列表中,并返回这个连接
      try:
       return self._pool.pop()
      except IndexError:
       raise EmptyPoolError
    
     def _put_connection(self, conn: WrapperHTTPConnection) -> None:
      conn.last_time = time.time()
      self._pool.append(conn)
    
     def _create_connection(self) -> WrapperHTTPConnection:
      self.conn_num += 1
      return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))
    
     def is_pool_empty(self) -> bool:
      # 这里指的是,空闲可用的连接是否为空
      return len(self._pool) == 0
    
     def is_full(self) -> bool:
      if self.max_size is None:
       return False
      return self.conn_num >= self.max_size
    
     def close(self) -> None:
      if self.is_closed:
       return
      self.is_closed = True
      self.stop_clear_conn()
      pool, self._pool = self._pool, None
      for conn in pool:
       conn.close()
    
     def clear_idle_conn(self) -> None:
      if self.is_closed:
       raise ConnectionPoolClosed
      # 这里开一个新线程来清理空闲连接,避免了阻塞主线程导致的定时精度出错
      threading.Thread(target=self._clear_idle_conn).start()
    
     def _clear_idle_conn(self) -> None:
      if not self._lock.acquire(timeout=self.idle_timeout):
       # 因为是每隔self.idle_timeout秒检查一次
       # 如果过了self.idle_timeout秒还没申请到锁,下一次都开始了,本次也就不用继续了
       return
      current_time = time.time()
      if self.is_pool_empty():
       pass
      elif current_time - self._pool[-1].last_time >= self.idle_timeout:
       # 这里处理下面的二分法没法处理的边界情况,即所有连接都闲置超时的情况
       self.conn_num -= len(self._pool)
       self._pool.clear()
      else:
       # 通过二分法找出从左往右第一个不超时的连接的指针
       left, right = 0, len(self._pool) - 1
       while left < right:
        mid = (left + right) // 2
        if current_time - self._pool[mid].last_time >= self.idle_timeout:
         left = mid + 1
        else:
         right = mid
       self._pool = self._pool[left:]
       self.conn_num -= left
      self._lock.release()
    
     def start_clear_conn(self) -> None:
      if self.idle_timeout is None:
       # 如果空闲连接的超时时间为无限,那么就不应该清理连接
       return
      self.clear_idle_conn()
      self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
      self._clearer.start()
    
     def stop_clear_conn(self) -> None:
      if self._clearer is not None:
       self._clearer.cancel()
    
     def __enter__(self) -> 'HTTPConnectionPool':
      return self
    
     def __exit__(self, *exit_info: Any) -> None:
      self.close()
    
    
    class EmptyPoolError(Exception):
     pass
    
    
    class ConnectionPoolClosed(Exception):
     pass

      首先, 这个连接池的核心就是对连接进行管理, 而这包含取出连接和释放连接两个过程. 因此这东西的本质就是一个生产者消费者模型, 取出线程时是消费者, 放入线程时是生产者, 使用threading自带的Condition对象就能完美解决线程安全问题, 使二者协同合作.

      解决获取连接和释放连接这个问题之后, 其实这个连接池就已经能用了. 但是如果涉及到更多细节方面的东西, 比如判断连接是否可用, 自动释放连接, 清理闲置连接等等, 就需要对这个连接进行封装, 为它添加更多的属性和方法, 这就引入了WrapperHTTPConnection这个类. 实现它的__enter___和__exit__方法之后, 就能使用上下文管理器来自动释放连接. 至于清理闲置连接, 通过last_time属性记录每个连接的最后释放时间, 然后在连接池中添加一个定时任务就行了.

    js
    下一篇:没有了