import os.path import redis import yaml class redis_pool: instance = None def __init__(self): # 获取当前目录 current_dir = os.path.dirname(__file__) # 使用相对路径访问 config.yaml 文件 config_file = os.path.join(current_dir, '../config.yaml') with open(config_file, 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) lock_timeout = config['redis']['timeout'] self.pool = redis.BlockingConnectionPool(host=config['redis']['host'], port=config['redis']['port'], password=config['redis']['password'], db=config['redis']['db'], decode_responses=True, max_connections=6, timeout=lock_timeout) def __get_connection(self): conn = redis.StrictRedis(connection_pool=self.pool) return conn @classmethod def get_conn(cls): if redis_pool.instance is None: redis_pool.instance = redis_pool() return redis_pool.instance.__get_connection()