|
|
# -*- coding: utf-8 -*-
|
|
|
import time
|
|
|
from pathlib import Path
|
|
|
from typing import Optional, Tuple
|
|
|
|
|
|
from openai import OpenAI, APIError
|
|
|
|
|
|
|
|
|
class ContentAnalyzer:
|
|
|
"""课程内容分析器(带Emoji进度展示)"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
api_key: str = "sk-01d13a39e09844038322108ecdbd1bbc",
|
|
|
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
|
|
|
model: str = "deepseek-r1"
|
|
|
):
|
|
|
"""
|
|
|
初始化分析器
|
|
|
:param api_key: 阿里云API密钥
|
|
|
:param base_url: API基础地址
|
|
|
:param model: 使用的模型名称
|
|
|
"""
|
|
|
self.client = OpenAI(api_key=api_key, base_url=base_url)
|
|
|
self.model = model
|
|
|
|
|
|
def _show_progress(self, emoji: str, message: str, level: int = 1, delay: float = 0.3):
|
|
|
"""进度展示方法
|
|
|
:param emoji: 表情符号
|
|
|
:param message: 进度信息
|
|
|
:param level: 缩进层级(0-无缩进,1-一级缩进)
|
|
|
:param delay: 展示后的延迟时间
|
|
|
"""
|
|
|
indent = " " * level
|
|
|
progress_line = f"{indent}{emoji} {message}"
|
|
|
print(progress_line)
|
|
|
time.sleep(delay) # 增加进度展示的节奏感
|
|
|
|
|
|
def analyze_content(
|
|
|
self,
|
|
|
content: str,
|
|
|
prompt_template: str = "帮我梳理:这节课分了几个部分,每部分的名称和开始的时间是多少:{}"
|
|
|
) -> Tuple[bool, str]:
|
|
|
"""
|
|
|
分析课程内容结构
|
|
|
:param content: 需要分析的文本内容
|
|
|
:param prompt_template: 提示词模板
|
|
|
:return: (成功标志, 分析结果或错误信息)
|
|
|
"""
|
|
|
try:
|
|
|
# 构建提示词
|
|
|
self._show_progress("📝", "正在构建提示词...", level=1)
|
|
|
full_prompt = prompt_template.format(content)
|
|
|
|
|
|
# 发送API请求
|
|
|
self._show_progress("📡", "正在发送API请求...", level=1)
|
|
|
start_time = time.time()
|
|
|
completion = self.client.chat.completions.create(
|
|
|
model=self.model,
|
|
|
messages=[{'role': 'user', 'content': full_prompt}]
|
|
|
)
|
|
|
cost_time = time.time() - start_time
|
|
|
|
|
|
# 处理响应
|
|
|
self._show_progress("✅", f"收到API响应(耗时{cost_time:.1f}s)", level=1)
|
|
|
if not completion.choices:
|
|
|
self._show_progress("❌", "响应内容为空", level=2)
|
|
|
return False, "API响应中未包含有效结果"
|
|
|
|
|
|
# 解析结果
|
|
|
self._show_progress("🔍", "正在解析结果内容...", level=1)
|
|
|
return True, completion.choices[0].message.content
|
|
|
|
|
|
except APIError as e:
|
|
|
self._show_progress("🚨", f"API异常: {str(e)}", level=2)
|
|
|
return False, f"API调用失败: {str(e)}"
|
|
|
except Exception as e:
|
|
|
self._show_progress("💥", f"未处理的异常: {str(e)}", level=2)
|
|
|
return False, f"未处理的异常: {str(e)}"
|
|
|
|
|
|
def analyze_file(
|
|
|
self,
|
|
|
file_path: Path,
|
|
|
output_path: Optional[Path] = None,
|
|
|
encoding: str = 'utf-8'
|
|
|
) -> Tuple[bool, str]:
|
|
|
"""
|
|
|
从文件读取内容并分析
|
|
|
:param file_path: 输入文件路径
|
|
|
:param output_path: 输出文件路径(可选)
|
|
|
:param encoding: 文件编码格式
|
|
|
:return: (成功标志, 最终结果或错误信息)
|
|
|
"""
|
|
|
try:
|
|
|
self._show_progress("📂", f"开始处理文件: {file_path}", level=0)
|
|
|
|
|
|
# 检查文件存在性
|
|
|
self._show_progress("🔎", "检查文件是否存在...", level=1)
|
|
|
if not file_path.exists():
|
|
|
self._show_progress("❌", "文件不存在", level=2)
|
|
|
return False, f"文件不存在: {file_path}"
|
|
|
|
|
|
# 读取文件内容
|
|
|
self._show_progress("📖", "正在读取文件内容...", level=1)
|
|
|
try:
|
|
|
content = file_path.read_text(encoding=encoding)
|
|
|
except UnicodeDecodeError:
|
|
|
self._show_progress("🔠", f"解码失败(当前编码: {encoding})", level=2)
|
|
|
return False, f"文件解码失败,请尝试使用正确的编码格式"
|
|
|
|
|
|
# 分析内容
|
|
|
self._show_progress("🧠", "开始分析内容...", level=1)
|
|
|
success, result = self.analyze_content(content)
|
|
|
|
|
|
if not success:
|
|
|
self._show_progress("⚠️", "内容分析失败", level=2)
|
|
|
return False, result
|
|
|
|
|
|
# 保存结果
|
|
|
if output_path:
|
|
|
self._show_progress("💾", f"正在保存结果到: {output_path}", level=1)
|
|
|
try:
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
output_path.write_text(result, encoding=encoding)
|
|
|
self._show_progress("✅", "文件保存成功", level=2)
|
|
|
except IOError as e:
|
|
|
self._show_progress("❌", f"保存失败: {str(e)}", level=2)
|
|
|
return False, f"结果保存失败: {str(e)}"
|
|
|
|
|
|
self._show_progress("🎉", "文件处理完成", level=0)
|
|
|
return True, result
|
|
|
|
|
|
except Exception as e:
|
|
|
self._show_progress("💣", f"意外错误: {str(e)}", level=2)
|
|
|
return False, f"文件处理异常: {str(e)}"
|
|
|
|
|
|
|
|
|
def show_banner():
|
|
|
"""显示启动横幅"""
|
|
|
print("\n" + "=" * 50)
|
|
|
print("🌟 课程内容智能分析系统 🌟".center(50))
|
|
|
print("=" * 50 + "\n")
|
|
|
|
|
|
|
|
|
def analyzer_action(input_file: str, output_file: Optional[str] = None):
|
|
|
"""执行分析流程
|
|
|
:param input_file: 输入文件路径
|
|
|
:param output_file: 输出文件路径(可选)
|
|
|
"""
|
|
|
show_banner()
|
|
|
|
|
|
analyzer = ContentAnalyzer()
|
|
|
input_path = Path(input_file)
|
|
|
output_path = Path(output_file) if output_file else None
|
|
|
|
|
|
success, result = analyzer.analyze_file(input_path, output_path)
|
|
|
|
|
|
print("\n" + "=" * 50)
|
|
|
if success:
|
|
|
print("✅ 最终分析结果:".center(50))
|
|
|
print("-" * 50)
|
|
|
print(result)
|
|
|
print("-" * 50)
|
|
|
if output_path:
|
|
|
print(f"结果已保存至:{output_path.resolve()}")
|
|
|
else:
|
|
|
print(f"❌ 分析失败:{result}")
|
|
|
print("=" * 50 + "\n")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# 示例配置
|
|
|
input_file = r"D:\dsWork\QingLong\AI\音频文本.txt"
|
|
|
output_file = r"D:\dsWork\QingLong\AI\分析结果.txt"
|
|
|
|
|
|
# 执行分析
|
|
|
analyzer_action(input_file, output_file) |