You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

510 lines
20 KiB

3 months ago
import time
import aiohttp
import asyncio
from tabulate import tabulate
from typing import Dict, List
from core.utils.llm import create_instance as create_llm_instance
from core.utils.tts import create_instance as create_tts_instance
import statistics
from config.settings import load_config
import inspect
import os
import logging
# 设置全局日志级别为WARNING抑制INFO级别日志
logging.basicConfig(level=logging.WARNING)
class AsyncPerformanceTester:
def __init__(self):
self.config = load_config()
self.test_sentences = self.config.get("module_test", {}).get(
"test_sentences",
[
"你好,请介绍一下你自己",
"What's the weather like today?",
"请用100字概括量子计算的基本原理和应用前景",
],
)
self.results = {"llm": {}, "tts": {}, "combinations": []}
async def _check_ollama_service(self, base_url: str, model_name: str) -> bool:
"""异步检查Ollama服务状态"""
async with aiohttp.ClientSession() as session:
try:
# 检查服务是否可用
async with session.get(f"{base_url}/api/version") as response:
if response.status != 200:
print(f"🚫 Ollama服务未启动或无法访问: {base_url}")
return False
# 检查模型是否存在
async with session.get(f"{base_url}/api/tags") as response:
if response.status == 200:
data = await response.json()
models = data.get("models", [])
if not any(model["name"] == model_name for model in models):
print(
f"🚫 Ollama模型 {model_name} 未找到,请先使用 ollama pull {model_name} 下载"
)
return False
else:
print(f"🚫 无法获取Ollama模型列表")
return False
return True
except Exception as e:
print(f"🚫 无法连接到Ollama服务: {str(e)}")
return False
async def _test_tts(self, tts_name: str, config: Dict) -> Dict:
"""异步测试单个TTS性能"""
try:
logging.getLogger("core.providers.tts.base").setLevel(logging.WARNING)
token_fields = ["access_token", "api_key", "token"]
if any(
field in config
and any(x in config[field] for x in ["你的", "placeholder"])
for field in token_fields
):
print(f"⏭️ TTS {tts_name} 未配置access_token/api_key已跳过")
return {"name": tts_name, "type": "tts", "errors": 1}
module_type = config.get("type", tts_name)
tts = create_tts_instance(module_type, config, delete_audio_file=True)
print(f"🎵 测试 TTS: {tts_name}")
tmp_file = tts.generate_filename()
await tts.text_to_speak("连接测试", tmp_file)
if not tmp_file or not os.path.exists(tmp_file):
print(f"{tts_name} 连接失败")
return {"name": tts_name, "type": "tts", "errors": 1}
total_time = 0
test_count = len(self.test_sentences[:2])
for i, sentence in enumerate(self.test_sentences[:2], 1):
start = time.time()
tmp_file = tts.generate_filename()
await tts.text_to_speak(sentence, tmp_file)
duration = time.time() - start
total_time += duration
if tmp_file and os.path.exists(tmp_file):
print(f"{tts_name} [{i}/{test_count}]")
else:
print(f"{tts_name} [{i}/{test_count}]")
return {"name": tts_name, "type": "tts", "errors": 1}
return {
"name": tts_name,
"type": "tts",
"avg_time": total_time / test_count,
"errors": 0,
}
except Exception as e:
print(f"⚠️ {tts_name} 测试失败: {str(e)}")
return {"name": tts_name, "type": "tts", "errors": 1}
async def _test_llm(self, llm_name: str, config: Dict) -> Dict:
"""异步测试单个LLM性能"""
try:
# 对于Ollama跳过api_key检查并进行特殊处理
if llm_name == "Ollama":
base_url = config.get("base_url", "http://localhost:11434")
model_name = config.get("model_name")
if not model_name:
print(f"🚫 Ollama未配置model_name")
return {"name": llm_name, "type": "llm", "errors": 1}
if not await self._check_ollama_service(base_url, model_name):
return {"name": llm_name, "type": "llm", "errors": 1}
else:
if "api_key" in config and any(
x in config["api_key"] for x in ["你的", "placeholder", "sk-xxx"]
):
print(f"🚫 跳过未配置的LLM: {llm_name}")
return {"name": llm_name, "type": "llm", "errors": 1}
# 获取实际类型(兼容旧配置)
module_type = config.get("type", llm_name)
llm = create_llm_instance(module_type, config)
# 统一使用UTF-8编码
test_sentences = [
s.encode("utf-8").decode("utf-8") for s in self.test_sentences
]
# 创建所有句子的测试任务
sentence_tasks = []
for sentence in test_sentences:
sentence_tasks.append(
self._test_single_sentence(llm_name, llm, sentence)
)
# 并发执行所有句子测试
sentence_results = await asyncio.gather(*sentence_tasks)
# 处理结果
valid_results = [r for r in sentence_results if r is not None]
if not valid_results:
print(f"⚠️ {llm_name} 无有效数据,可能配置错误")
return {"name": llm_name, "type": "llm", "errors": 1}
first_token_times = [r["first_token_time"] for r in valid_results]
response_times = [r["response_time"] for r in valid_results]
# 过滤异常数据
mean = statistics.mean(response_times)
stdev = statistics.stdev(response_times) if len(response_times) > 1 else 0
filtered_times = [t for t in response_times if t <= mean + 3 * stdev]
if len(filtered_times) < len(test_sentences) * 0.5:
print(f"⚠️ {llm_name} 有效数据不足,可能网络不稳定")
return {"name": llm_name, "type": "llm", "errors": 1}
return {
"name": llm_name,
"type": "llm",
"avg_response": sum(response_times) / len(response_times),
"avg_first_token": sum(first_token_times) / len(first_token_times),
"std_first_token": (
statistics.stdev(first_token_times)
if len(first_token_times) > 1
else 0
),
"std_response": (
statistics.stdev(response_times) if len(response_times) > 1 else 0
),
"errors": 0,
}
except Exception as e:
print(f"LLM {llm_name} 测试失败: {str(e)}")
return {"name": llm_name, "type": "llm", "errors": 1}
async def _test_single_sentence(self, llm_name: str, llm, sentence: str) -> Dict:
"""测试单个句子的性能"""
try:
print(f"📝 {llm_name} 开始测试: {sentence[:20]}...")
sentence_start = time.time()
first_token_received = False
first_token_time = None
async def process_response():
nonlocal first_token_received, first_token_time
for chunk in llm.response(
"perf_test", [{"role": "user", "content": sentence}]
):
if not first_token_received and chunk.strip() != "":
first_token_time = time.time() - sentence_start
first_token_received = True
print(f"{llm_name} 首个Token: {first_token_time:.3f}s")
yield chunk
response_chunks = []
async for chunk in process_response():
response_chunks.append(chunk)
response_time = time.time() - sentence_start
print(f"{llm_name} 完成响应: {response_time:.3f}s")
if first_token_time is None:
first_token_time = (
response_time # 如果没有检测到first token使用总响应时间
)
return {
"name": llm_name,
"type": "llm",
"first_token_time": first_token_time,
"response_time": response_time,
}
except Exception as e:
print(f"⚠️ {llm_name} 句子测试失败: {str(e)}")
return None
def _generate_combinations(self):
"""生成最佳组合建议"""
valid_llms = [
k
for k, v in self.results["llm"].items()
if v["errors"] == 0 and v["avg_first_token"] >= 0.05
]
valid_tts = [k for k, v in self.results["tts"].items() if v["errors"] == 0]
# 找出基准值
min_first_token = (
min([self.results["llm"][llm]["avg_first_token"] for llm in valid_llms])
if valid_llms
else 1
)
min_tts_time = (
min([self.results["tts"][tts]["avg_time"] for tts in valid_tts])
if valid_tts
else 1
)
for llm in valid_llms:
for tts in valid_tts:
# 计算相对性能分数(越小越好)
llm_score = (
self.results["llm"][llm]["avg_first_token"] / min_first_token
)
tts_score = self.results["tts"][tts]["avg_time"] / min_tts_time
# 计算稳定性分数(标准差/平均值,越小越稳定)
llm_stability = (
self.results["llm"][llm]["std_first_token"]
/ self.results["llm"][llm]["avg_first_token"]
)
# 综合得分(考虑性能和稳定性)
# 性能权重0.7稳定性权重0.3
llm_final_score = llm_score * 0.7 + llm_stability * 0.3
# 总分 = LLM得分(70%) + TTS得分(30%)
total_score = llm_final_score * 0.7 + tts_score * 0.3
self.results["combinations"].append(
{
"llm": llm,
"tts": tts,
"score": total_score,
"details": {
"llm_first_token": self.results["llm"][llm][
"avg_first_token"
],
"llm_stability": llm_stability,
"tts_time": self.results["tts"][tts]["avg_time"],
},
}
)
# 分数越小越好
self.results["combinations"].sort(key=lambda x: x["score"])
def _print_results(self):
"""打印测试结果"""
llm_table = []
for name, data in self.results["llm"].items():
if data["errors"] == 0:
stability = data["std_first_token"] / data["avg_first_token"]
llm_table.append(
[
name, # 不需要固定宽度让tabulate自己处理对齐
f"{data['avg_first_token']:.3f}",
f"{data['avg_response']:.3f}",
f"{stability:.3f}",
]
)
if llm_table:
print("\nLLM 性能排行:")
print(
tabulate(
llm_table,
headers=["模型名称", "首字耗时", "总耗时", "稳定性"],
tablefmt="github",
colalign=("left", "right", "right", "right"),
disable_numparse=True,
)
)
else:
print("\n⚠️ 没有可用的LLM模块进行测试。")
tts_table = []
for name, data in self.results["tts"].items():
if data["errors"] == 0:
tts_table.append([name, f"{data['avg_time']:.3f}"]) # 不需要固定宽度
if tts_table:
print("\nTTS 性能排行:")
print(
tabulate(
tts_table,
headers=["模型名称", "合成耗时"],
tablefmt="github",
colalign=("left", "right"),
disable_numparse=True,
)
)
else:
print("\n⚠️ 没有可用的TTS模块进行测试。")
if self.results["combinations"]:
print("\n推荐配置组合 (得分越小越好):")
combo_table = []
for combo in self.results["combinations"][:5]:
combo_table.append(
[
f"{combo['llm']} + {combo['tts']}", # 不需要固定宽度
f"{combo['score']:.3f}",
f"{combo['details']['llm_first_token']:.3f}",
f"{combo['details']['llm_stability']:.3f}",
f"{combo['details']['tts_time']:.3f}",
]
)
print(
tabulate(
combo_table,
headers=[
"组合方案",
"综合得分",
"LLM首字耗时",
"稳定性",
"TTS合成耗时",
],
tablefmt="github",
colalign=("left", "right", "right", "right", "right"),
disable_numparse=True,
)
)
else:
print("\n⚠️ 没有可用的模块组合建议。")
def _process_results(self, all_results):
"""处理测试结果"""
for result in all_results:
if result["errors"] == 0:
if result["type"] == "llm":
self.results["llm"][result["name"]] = result
else:
self.results["tts"][result["name"]] = result
async def run(self):
"""执行全量异步测试"""
print("🔍 开始筛选可用模块...")
# 创建所有测试任务
all_tasks = []
# LLM测试任务
for llm_name, config in self.config.get("LLM", {}).items():
# 检查配置有效性
if llm_name == "CozeLLM":
if any(x in config.get("bot_id", "") for x in ["你的"]) or any(
x in config.get("user_id", "") for x in ["你的"]
):
print(f"⏭️ LLM {llm_name} 未配置bot_id/user_id已跳过")
continue
elif "api_key" in config and any(
x in config["api_key"] for x in ["你的", "placeholder", "sk-xxx"]
):
print(f"⏭️ LLM {llm_name} 未配置api_key已跳过")
continue
# 对于Ollama先检查服务状态
if llm_name == "Ollama":
base_url = config.get("base_url", "http://localhost:11434")
model_name = config.get("model_name")
if not model_name:
print(f"🚫 Ollama未配置model_name")
continue
if not await self._check_ollama_service(base_url, model_name):
continue
print(f"📋 添加LLM测试任务: {llm_name}")
module_type = config.get("type", llm_name)
llm = create_llm_instance(module_type, config)
# 为每个句子创建独立任务
for sentence in self.test_sentences:
sentence = sentence.encode("utf-8").decode("utf-8")
all_tasks.append(self._test_single_sentence(llm_name, llm, sentence))
# TTS测试任务
for tts_name, config in self.config.get("TTS", {}).items():
token_fields = ["access_token", "api_key", "token"]
if any(
field in config
and any(x in config[field] for x in ["你的", "placeholder"])
for field in token_fields
):
print(f"⏭️ TTS {tts_name} 未配置access_token/api_key已跳过")
continue
print(f"🎵 添加TTS测试任务: {tts_name}")
all_tasks.append(self._test_tts(tts_name, config))
print(
f"\n✅ 找到 {len([t for t in all_tasks if 'test_single_sentence' in str(t)]) / len(self.test_sentences):.0f} 个可用LLM模块"
)
print(
f"✅ 找到 {len([t for t in all_tasks if '_test_tts' in str(t)])} 个可用TTS模块"
)
print("\n⏳ 开始并发测试所有模块...\n")
# 并发执行所有测试任务
all_results = await asyncio.gather(*all_tasks, return_exceptions=True)
# 处理LLM结果
llm_results = {}
for result in [
r
for r in all_results
if r and isinstance(r, dict) and r.get("type") == "llm"
]:
llm_name = result["name"]
if llm_name not in llm_results:
llm_results[llm_name] = {
"name": llm_name,
"type": "llm",
"first_token_times": [],
"response_times": [],
"errors": 0,
}
llm_results[llm_name]["first_token_times"].append(
result["first_token_time"]
)
llm_results[llm_name]["response_times"].append(result["response_time"])
# 计算LLM平均值和标准差
for llm_name, data in llm_results.items():
if len(data["first_token_times"]) >= len(self.test_sentences) * 0.5:
self.results["llm"][llm_name] = {
"name": llm_name,
"type": "llm",
"avg_response": sum(data["response_times"])
/ len(data["response_times"]),
"avg_first_token": sum(data["first_token_times"])
/ len(data["first_token_times"]),
"std_first_token": (
statistics.stdev(data["first_token_times"])
if len(data["first_token_times"]) > 1
else 0
),
"std_response": (
statistics.stdev(data["response_times"])
if len(data["response_times"]) > 1
else 0
),
"errors": 0,
}
# 处理TTS结果
for result in [
r
for r in all_results
if r and isinstance(r, dict) and r.get("type") == "tts"
]:
if result["errors"] == 0:
self.results["tts"][result["name"]] = result
# 生成组合建议并打印结果
print("\n📊 生成测试报告...")
self._generate_combinations()
self._print_results()
async def main():
tester = AsyncPerformanceTester()
await tester.run()
if __name__ == "__main__":
asyncio.run(main())