You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
6.8 KiB

5 months ago
# -*- coding: utf-8 -*-
5 months ago
from pathlib import Path
5 months ago
from typing import Optional, Tuple, Iterator
5 months ago
from openai import OpenAI, APIError, APITimeoutError
import time
import httpx
5 months ago
class ContentAnalyzer:
5 months ago
"""课程内容分析器(流式版本)"""
5 months ago
def __init__(
self,
api_key: str = "sk-01d13a39e09844038322108ecdbd1bbc",
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
5 months ago
model: str = "deepseek-r1",
5 months ago
max_retries: int = 10,
5 months ago
initial_timeout: int = 300
5 months ago
):
5 months ago
self._show_progress("🔧", "初始化分析器...", level=0)
5 months ago
self.client = OpenAI(api_key=api_key, base_url=base_url)
self.model = model
5 months ago
self.max_retries = max_retries
self.initial_timeout = initial_timeout
self._check_network()
self._show_progress("", "分析器准备就绪", level=0)
def _show_progress(self, emoji: str, message: str, level: int = 1):
indent = " " * level
timestamp = time.strftime("%H:%M:%S")
print(f"{indent}{emoji} [{timestamp}] {message}")
def _check_network(self):
try:
5 months ago
with httpx.Client(timeout=30) as client:
5 months ago
client.get("https://dashscope.aliyuncs.com")
self._show_progress("🌐", "网络连接正常", level=1)
except Exception as e:
self._show_progress("", f"网络异常: {str(e)}", level=1)
raise
5 months ago
5 months ago
def _retry_delay(self, attempt: int) -> int:
"""指数退避延迟"""
return min(2 ** attempt, 60) # 最大延迟60秒
def analyze_content_stream(
5 months ago
self,
content: str,
prompt_template: str = "帮我梳理:这节课分了几个部分,每部分的名称和开始的时间是多少:{}"
5 months ago
) -> Iterator[Tuple[bool, str]]:
"""流式分析内容"""
5 months ago
for attempt in range(self.max_retries + 1):
try:
current_timeout = self.initial_timeout + attempt * 5
5 months ago
self._show_progress("⏱️", f"尝试 {attempt + 1}/{self.max_retries} (超时: {current_timeout}s)", level=2)
5 months ago
full_prompt = prompt_template.format(content)
5 months ago
stream = self.client.chat.completions.create(
5 months ago
model=self.model,
messages=[{'role': 'user', 'content': full_prompt}],
5 months ago
timeout=current_timeout,
stream=True # 启用流式模式
5 months ago
)
5 months ago
buffer = []
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
buffer.append(content_chunk)
yield True, content_chunk # 实时返回每个片段
5 months ago
5 months ago
# 返回完整结果
if buffer:
yield True, ''.join(buffer)
return
5 months ago
5 months ago
except APITimeoutError as e:
5 months ago
if attempt < self.max_retries:
5 months ago
delay = self._retry_delay(attempt)
5 months ago
self._show_progress("", f"{delay}s后重试...", level=2)
time.sleep(delay)
else:
5 months ago
yield False, f"API请求超时已重试{self.max_retries}"
return
5 months ago
except APIError as e:
5 months ago
yield False, f"API错误: {str(e)}"
return
5 months ago
except Exception as e:
5 months ago
yield False, f"未处理的异常: {str(e)}"
return
5 months ago
def analyze_file(
self,
file_path: Path,
output_path: Optional[Path] = None,
encoding: str = 'utf-8'
) -> Tuple[bool, str]:
5 months ago
"""处理文件全流程(流式版本)"""
5 months ago
try:
5 months ago
self._show_progress("📂", f"开始处理文件: {file_path}", level=0)
5 months ago
# 文件验证
5 months ago
self._show_progress("🔍", "验证文件...", level=1)
5 months ago
if not file_path.exists():
5 months ago
self._show_progress("", "文件不存在", level=2)
5 months ago
return False, f"文件不存在: {file_path}"
5 months ago
if file_path.stat().st_size > 10 * 1024 * 1024:
5 months ago
self._show_progress("⚠️", "注意:大文件可能影响处理速度", level=2)
# 读取内容
self._show_progress("📖", "读取文件内容...", level=1)
try:
content = file_path.read_text(encoding=encoding)
except UnicodeDecodeError:
self._show_progress("🔠", "解码失败尝试GBK编码...", level=2)
content = file_path.read_text(encoding='gbk')
5 months ago
# 流式分析
self._show_progress("🧠", "开始流式分析...", level=1)
result_buffer = []
has_error = False
error_msg = ""
for status, chunk in self.analyze_content_stream(content):
if not status:
has_error = True
error_msg = chunk
break
print(chunk, end='', flush=True) # 实时输出
result_buffer.append(chunk)
5 months ago
5 months ago
if has_error:
self._show_progress("", f"分析失败: {error_msg}", level=1)
return False, error_msg
final_result = ''.join(result_buffer)
5 months ago
# 保存结果
if output_path:
5 months ago
self._show_progress("💾", f"保存到: {output_path}", level=1)
5 months ago
try:
5 months ago
output_path.parent.mkdir(parents=True, exist_ok=True)
5 months ago
output_path.write_text(final_result, encoding=encoding)
5 months ago
self._show_progress("", "保存成功", level=2)
except Exception as e:
self._show_progress("", f"保存失败: {str(e)}", level=2)
5 months ago
return False, f"结果保存失败: {str(e)}"
5 months ago
self._show_progress("🎉", "处理完成!", level=0)
5 months ago
return True, final_result
5 months ago
except Exception as e:
5 months ago
self._show_progress("💣", f"严重错误: {str(e)}", level=1)
return False, f"文件处理失败: {str(e)}"
5 months ago
5 months ago
def analyzer_action(input_file, output_file):
5 months ago
print("\n" + "=" * 50)
5 months ago
print(" 🚀 长春云校视频课程智能打标记系统 ".center(50, ""))
5 months ago
print("=" * 50)
5 months ago
analyzer = ContentAnalyzer(initial_timeout=300)
success, result = analyzer.analyze_file(Path(input_file), Path(output_file))
5 months ago
print("\n" + "=" * 50)
5 months ago
if success:
5 months ago
print("\n✅ 分析成功!结果已保存至:", output_file)
5 months ago
else:
5 months ago
print(f"\n❌ 分析失败:{result}")
print("=" * 50)