# -*- coding: utf-8 -*- """ 请求限速器模块 """ import threading import time from functools import wraps from typing import Callable, Tuple class TokenBucket: """ 令牌桶 """ def __init__(self, max_tokens: int, refill_rate: float): """ 初始化 :param refill_rate: 令牌填充速率,单位为个/秒 :param max_tokens: 最大令牌数,单位为个 """ self.max_tokens = max_tokens # 初始化当前令牌数 self.tokens = int(round(self.max_tokens * 0.5)) self.refill_rate = refill_rate # 初始化上一次填充令牌的时间戳(使用单调时间戳) self.refill_timestamp = time.monotonic() # 初始化线程锁(所有线程共用) self.thread_lock = threading.Lock() def _refill(self) -> None: """ 填充令牌 :return: 无 """ with self.thread_lock: # 本次填充令牌的时间戳 refill_timestamp = time.monotonic() # 重新计算令牌桶中令牌数 self.tokens = min( self.max_tokens, max( 0, self.tokens + self.refill_rate * (refill_timestamp - self.refill_timestamp), ), ) self.refill_timestamp = refill_timestamp def consume(self) -> Tuple[bool, float]: """ 消耗令牌 :return: 消耗令牌是否成功与否和等待时长 """ # 填充令牌 self._refill() with self.thread_lock: if self.tokens >= 1: self.tokens -= 1 return True, 0 # 等待时长 wait_time = min( 1 / self.refill_rate, max( 0, 1 / self.refill_rate - (time.monotonic() - self.refill_timestamp), ), ) return False, wait_time def restrict( max_tokens: int = 5, refill_rate: float = 5.0, ) -> Callable: """ 请求限速器 :param max_tokens: 最大令牌数 :param refill_rate: 令牌填充速率,单位为个/秒 :return: 限速器装饰器 """ # 初始化所有被装饰的函数创建令牌桶限流存储 buckets = {} def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): # 若当前被装饰的函数不在所有被装饰的函数创建令牌桶存储则为当前被装饰的函数实例化令牌桶 if func not in buckets: # 初始化令牌桶 buckets[func] = TokenBucket( refill_rate=refill_rate, max_tokens=max_tokens ) # 重试次数 retries = 0 bucket = buckets[func] while retries <= 10: # 消耗令牌 success, wait_time = bucket.consume() # 若消耗令牌成功则返回被装饰的函数,否则等待 if success: return func(*args, **kwargs) time.sleep(wait_time * 2) retries += 1 raise Exception("request too frequently") return wrapper return decorator