Files
QingLong/AI/WxMini/Utils/RedisUtil.py
2025-08-15 09:13:13 +08:00

78 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import redis
from WxMini.Milvus.Config.MulvusConfig import *
# 初始化 Redis 连接池
pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)
def set_tts_token(token, expire_time=60 * 60 * 4):
"""
设置 TTS Token 到 Redis并设置过期时间
:param token: TTS Token 字符串
:param expire_time: 过期时间(秒),默认 750 秒
"""
try:
# 从连接池获取 Redis 连接
r = redis.Redis(connection_pool=pool)
# 设置键值对,并设置过期时间
r.set('tts_token', token, ex=expire_time)
print(f"TTS Token 已成功设置,过期时间为 {expire_time}")
except Exception as e:
print(f"设置 TTS Token 失败: {e}")
def del_tts_token():
try:
# 从连接池获取 Redis 连接
r = redis.Redis(connection_pool=pool)
# 设置键值对,并设置过期时间
r.delete('tts_token')
print(f"TTS Token 已成功删除")
except Exception as e:
print(f"删除 TTS Token 失败: {e}")
def get_tts_token():
"""
从 Redis 获取 TTS Token
:return: TTS Token 字符串(如果不存在,返回 None
"""
try:
# 从连接池获取 Redis 连接
r = redis.Redis(connection_pool=pool)
# 获取 TTS Token
token = r.get('tts_token')
if token:
return token.decode('utf-8') # 将 bytes 转换为字符串
else:
print("TTS Token 不存在或已过期")
return None
except Exception as e:
print(f"获取 TTS Token 失败: {e}")
return None
def close_redis_connection():
"""
显式关闭 Redis 连接池
"""
try:
pool.disconnect()
print("Redis 连接池已关闭")
except Exception as e:
print(f"关闭 Redis 连接池失败: {e}")
if __name__ == '__main__':
# 示例:设置 TTS Token
#tts_token = "your_tts_token_here" # 替换为你的 TTS Token
#set_tts_token(tts_token)
# 示例:获取 TTS Token
#retrieved_token = get_tts_token()
#if retrieved_token:
# print("获取到的 TTS Token:", retrieved_token)
del_tts_token()
# 显式关闭 Redis 连接池
close_redis_connection()