import redis from WxMini.Milvus.Config.MulvusConfig import * # 初始化 Redis 连接池 pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD) def set_tts_token(token, expire_time=60 * 60 * 4): """ 设置 TTS Token 到 Redis,并设置过期时间 :param token: TTS Token 字符串 :param expire_time: 过期时间(秒),默认 750 秒 """ try: # 从连接池获取 Redis 连接 r = redis.Redis(connection_pool=pool) # 设置键值对,并设置过期时间 r.set('tts_token', token, ex=expire_time) print(f"TTS Token 已成功设置,过期时间为 {expire_time} 秒") except Exception as e: print(f"设置 TTS Token 失败: {e}") def del_tts_token(): try: # 从连接池获取 Redis 连接 r = redis.Redis(connection_pool=pool) # 设置键值对,并设置过期时间 r.delete('tts_token') print(f"TTS Token 已成功删除") except Exception as e: print(f"删除 TTS Token 失败: {e}") def get_tts_token(): """ 从 Redis 获取 TTS Token :return: TTS Token 字符串(如果不存在,返回 None) """ try: # 从连接池获取 Redis 连接 r = redis.Redis(connection_pool=pool) # 获取 TTS Token token = r.get('tts_token') if token: return token.decode('utf-8') # 将 bytes 转换为字符串 else: print("TTS Token 不存在或已过期") return None except Exception as e: print(f"获取 TTS Token 失败: {e}") return None def close_redis_connection(): """ 显式关闭 Redis 连接池 """ try: pool.disconnect() print("Redis 连接池已关闭") except Exception as e: print(f"关闭 Redis 连接池失败: {e}") if __name__ == '__main__': # 示例:设置 TTS Token #tts_token = "your_tts_token_here" # 替换为你的 TTS Token #set_tts_token(tts_token) # 示例:获取 TTS Token #retrieved_token = get_tts_token() #if retrieved_token: # print("获取到的 TTS Token:", retrieved_token) del_tts_token() # 显式关闭 Redis 连接池 close_redis_connection()