main
黄海 5 months ago
parent 498baaf6bf
commit 6649c9d743

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from pathlib import Path from pathlib import Path
from typing import Optional, Tuple from typing import Optional, Tuple, Iterator
from openai import OpenAI, APIError, APITimeoutError from openai import OpenAI, APIError, APITimeoutError
import time import time
import httpx import httpx
class ContentAnalyzer: class ContentAnalyzer:
"""课程内容分析器(长超时版""" """课程内容分析器(流式版本"""
def __init__( def __init__(
self, self,
@ -15,7 +15,7 @@ class ContentAnalyzer:
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1", base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
model: str = "deepseek-r1", model: str = "deepseek-r1",
max_retries: int = 10, max_retries: int = 10,
initial_timeout: int = 300 # 初始超时改为300秒 initial_timeout: int = 300
): ):
self._show_progress("🔧", "初始化分析器...", level=0) self._show_progress("🔧", "初始化分析器...", level=0)
self.client = OpenAI(api_key=api_key, base_url=base_url) self.client = OpenAI(api_key=api_key, base_url=base_url)
@ -32,56 +32,64 @@ class ContentAnalyzer:
def _check_network(self): def _check_network(self):
try: try:
with httpx.Client(timeout=30) as client: # 网络检查超时延长到30秒 with httpx.Client(timeout=30) as client:
client.get("https://dashscope.aliyuncs.com") client.get("https://dashscope.aliyuncs.com")
self._show_progress("🌐", "网络连接正常", level=1) self._show_progress("🌐", "网络连接正常", level=1)
except Exception as e: except Exception as e:
self._show_progress("", f"网络异常: {str(e)}", level=1) self._show_progress("", f"网络异常: {str(e)}", level=1)
raise raise
def analyze_content( def _retry_delay(self, attempt: int) -> int:
"""指数退避延迟"""
return min(2 ** attempt, 60) # 最大延迟60秒
def analyze_content_stream(
self, self,
content: str, content: str,
prompt_template: str = "帮我梳理:这节课分了几个部分,每部分的名称和开始的时间是多少:{}" prompt_template: str = "帮我梳理:这节课分了几个部分,每部分的名称和开始的时间是多少:{}"
) -> Tuple[bool, str]: ) -> Iterator[Tuple[bool, str]]:
"""流式分析内容"""
for attempt in range(self.max_retries + 1): for attempt in range(self.max_retries + 1):
try: try:
# 动态计算超时时间60s, 65s, 70s...
current_timeout = self.initial_timeout + attempt * 5 current_timeout = self.initial_timeout + attempt * 5
self._show_progress("⏱️", f"尝试 {attempt + 1}/{self.max_retries} (超时: {current_timeout}s)") self._show_progress("⏱️", f"尝试 {attempt + 1}/{self.max_retries} (超时: {current_timeout}s)", level=2)
full_prompt = prompt_template.format(content) full_prompt = prompt_template.format(content)
completion = self.client.chat.completions.create( stream = self.client.chat.completions.create(
model=self.model, model=self.model,
messages=[{'role': 'user', 'content': full_prompt}], messages=[{'role': 'user', 'content': full_prompt}],
timeout=current_timeout timeout=current_timeout,
stream=True # 启用流式模式
) )
if not completion.choices: buffer = []
self._show_progress("⚠️", "API返回空响应", level=2) for chunk in stream:
return False, "API响应中未包含有效结果" if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
buffer.append(content_chunk)
yield True, content_chunk # 实时返回每个片段
self._show_progress("", "请求成功", level=2) # 返回完整结果
return True, completion.choices[0].message.content if buffer:
yield True, ''.join(buffer)
return
except APITimeoutError: except APITimeoutError as e:
if attempt < self.max_retries: if attempt < self.max_retries:
delay = 2 ** attempt delay = self._retry_delay(attempt)
self._show_progress("", f"{delay}s后重试...", level=2) self._show_progress("", f"{delay}s后重试...", level=2)
time.sleep(delay) time.sleep(delay)
else: else:
self._show_progress("", "超过最大重试次数", level=2) yield False, f"API请求超时已重试{self.max_retries}"
return False, f"API请求超时已重试{self.max_retries}" return
except APIError as e: except APIError as e:
self._show_progress("🚨", f"API错误: {str(e)}", level=2) yield False, f"API错误: {str(e)}"
return False, f"API调用失败: {str(e)}" return
except Exception as e: except Exception as e:
self._show_progress("💥", f"意外错误: {str(e)}", level=2) yield False, f"未处理的异常: {str(e)}"
return False, f"未处理的异常: {str(e)}" return
return False, "未知错误"
def analyze_file( def analyze_file(
self, self,
@ -89,16 +97,16 @@ class ContentAnalyzer:
output_path: Optional[Path] = None, output_path: Optional[Path] = None,
encoding: str = 'utf-8' encoding: str = 'utf-8'
) -> Tuple[bool, str]: ) -> Tuple[bool, str]:
"""处理文件全流程""" """处理文件全流程(流式版本)"""
try: try:
self._show_progress("📂", f"开始处理文件: {file_path}", level=0) self._show_progress("📂", f"开始处理文件: {file_path}", level=0)
# 检查文件 # 文件验证
self._show_progress("🔍", "验证文件...", level=1) self._show_progress("🔍", "验证文件...", level=1)
if not file_path.exists(): if not file_path.exists():
self._show_progress("", "文件不存在", level=2) self._show_progress("", "文件不存在", level=2)
return False, f"文件不存在: {file_path}" return False, f"文件不存在: {file_path}"
if file_path.stat().st_size > 10 * 1024 * 1024: # 10MB警告 if file_path.stat().st_size > 10 * 1024 * 1024:
self._show_progress("⚠️", "注意:大文件可能影响处理速度", level=2) self._show_progress("⚠️", "注意:大文件可能影响处理速度", level=2)
# 读取内容 # 读取内容
@ -109,45 +117,56 @@ class ContentAnalyzer:
self._show_progress("🔠", "解码失败尝试GBK编码...", level=2) self._show_progress("🔠", "解码失败尝试GBK编码...", level=2)
content = file_path.read_text(encoding='gbk') content = file_path.read_text(encoding='gbk')
# 分析内容 # 流式分析
self._show_progress("🧠", "开始分析...", level=1) self._show_progress("🧠", "开始流式分析...", level=1)
success, result = self.analyze_content(content) result_buffer = []
has_error = False
error_msg = ""
for status, chunk in self.analyze_content_stream(content):
if not status:
has_error = True
error_msg = chunk
break
print(chunk, end='', flush=True) # 实时输出
result_buffer.append(chunk)
if not success: if has_error:
return False, result self._show_progress("", f"分析失败: {error_msg}", level=1)
return False, error_msg
final_result = ''.join(result_buffer)
# 保存结果 # 保存结果
if output_path: if output_path:
self._show_progress("💾", f"保存到: {output_path}", level=1) self._show_progress("💾", f"保存到: {output_path}", level=1)
try: try:
output_path.parent.mkdir(parents=True, exist_ok=True) output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(result, encoding=encoding) output_path.write_text(final_result, encoding=encoding)
self._show_progress("", "保存成功", level=2) self._show_progress("", "保存成功", level=2)
except Exception as e: except Exception as e:
self._show_progress("", f"保存失败: {str(e)}", level=2) self._show_progress("", f"保存失败: {str(e)}", level=2)
return False, f"结果保存失败: {str(e)}" return False, f"结果保存失败: {str(e)}"
self._show_progress("🎉", "处理完成!", level=0) self._show_progress("🎉", "处理完成!", level=0)
return True, result return True, final_result
except Exception as e: except Exception as e:
self._show_progress("💣", f"严重错误: {str(e)}", level=1) self._show_progress("💣", f"严重错误: {str(e)}", level=1)
return False, f"文件处理失败: {str(e)}" return False, f"文件处理失败: {str(e)}"
def analyzer_action(input_file,output_file): def analyzer_action(input_file, output_file):
print("\n" + "=" * 50) print("\n" + "=" * 50)
print(" 🚀 长春云校视频课程智能打标记系统 ".center(50, "")) print(" 🚀 长春云校视频课程智能打标记系统 ".center(50, ""))
print("=" * 50) print("=" * 50)
analyzer = ContentAnalyzer(initial_timeout=300) # 显式设置初始超时 analyzer = ContentAnalyzer(initial_timeout=300)
success, result = analyzer.analyze_file(input_file, output_file) success, result = analyzer.analyze_file(Path(input_file), Path(output_file))
print("\n" + "=" * 50) print("\n" + "=" * 50)
if success: if success:
print("✅ 分析成功!结果如下:\n") print("\n✅ 分析成功!结果已保存至:", output_file)
print(result)
else: else:
print(f"❌ 分析失败:{result}") print(f"\n❌ 分析失败:{result}")
print("=" * 50) print("=" * 50)
Loading…
Cancel
Save