# -*- coding: utf-8 -*- from typing import Optional, Tuple, Iterator from openai import OpenAI, APIError, APITimeoutError import time import httpx from Config import * class ContentAnalyzer: """课程内容分析器(流式版本)""" def __init__( self, api_key: str = MODEL_API_KEY, base_url: str = MODEL_API_URL, model: str = MODEL_NAME, max_retries: int = 10, initial_timeout: int = 300 ): self._show_progress("🔧", "初始化分析器...", level=0) self.client = OpenAI(api_key=api_key, base_url=base_url) self.model = model self.max_retries = max_retries self.initial_timeout = initial_timeout self._check_network() self._show_progress("✅", "分析器准备就绪", level=0) def _show_progress(self, emoji: str, message: str, level: int = 1): indent = " " * level timestamp = time.strftime("%H:%M:%S") print(f"{indent}{emoji} [{timestamp}] {message}") def _check_network(self): try: with httpx.Client(timeout=30) as client: client.get("https://dashscope.aliyuncs.com") self._show_progress("🌐", "网络连接正常", level=1) except Exception as e: self._show_progress("❌", f"网络异常: {str(e)}", level=1) raise def _retry_delay(self, attempt: int) -> int: """指数退避延迟""" return min(2 ** attempt, 60) # 最大延迟60秒 def analyze_content_stream( self, content: str, prompt_template: str = "帮我梳理:这节课分了几个部分,每部分的名称和开始的时间是多少:{}" ) -> Iterator[Tuple[bool, str]]: """流式分析内容""" for attempt in range(self.max_retries + 1): try: current_timeout = self.initial_timeout + attempt * 5 self._show_progress("⏱️", f"尝试 {attempt + 1}/{self.max_retries} (超时: {current_timeout}s)", level=2) full_prompt = prompt_template.format(content) stream = self.client.chat.completions.create( model=self.model, messages=[{'role': 'user', 'content': full_prompt}], timeout=current_timeout, stream=True # 启用流式模式 ) buffer = [] for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: content_chunk = chunk.choices[0].delta.content buffer.append(content_chunk) yield True, content_chunk # 实时返回每个片段 # 返回完整结果 if buffer: yield True, ''.join(buffer) return except APITimeoutError as e: if attempt < self.max_retries: delay = self._retry_delay(attempt) self._show_progress("⏳", f"{delay}s后重试...", level=2) time.sleep(delay) else: yield False, f"API请求超时,已重试{self.max_retries}次" return except APIError as e: yield False, f"API错误: {str(e)}" return except Exception as e: yield False, f"未处理的异常: {str(e)}" return def analyze_file( self, file_path: Path, output_path: Optional[Path] = None, encoding: str = 'utf-8' ) -> Tuple[bool, str]: """处理文件全流程(流式版本)""" try: self._show_progress("📂", f"开始处理文件: {file_path}", level=0) # 文件验证 self._show_progress("🔍", "验证文件...", level=1) if not file_path.exists(): self._show_progress("❌", "文件不存在", level=2) return False, f"文件不存在: {file_path}" if file_path.stat().st_size > 10 * 1024 * 1024: self._show_progress("⚠️", "注意:大文件可能影响处理速度", level=2) # 读取内容 self._show_progress("📖", "读取文件内容...", level=1) try: content = file_path.read_text(encoding=encoding) except UnicodeDecodeError: self._show_progress("🔠", "解码失败,尝试GBK编码...", level=2) content = file_path.read_text(encoding='gbk') # 流式分析 self._show_progress("🧠", "开始流式分析...", level=1) result_buffer = [] has_error = False error_msg = "" for status, chunk in self.analyze_content_stream(content): if not status: has_error = True error_msg = chunk break print(chunk, end='', flush=True) # 实时输出 result_buffer.append(chunk) if has_error: self._show_progress("❌", f"分析失败: {error_msg}", level=1) return False, error_msg final_result = ''.join(result_buffer) # 保存结果 if output_path: self._show_progress("💾", f"保存到: {output_path}", level=1) try: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(final_result, encoding=encoding) self._show_progress("✅", "保存成功", level=2) except Exception as e: self._show_progress("❌", f"保存失败: {str(e)}", level=2) return False, f"结果保存失败: {str(e)}" self._show_progress("🎉", "处理完成!", level=0) return True, final_result except Exception as e: self._show_progress("💣", f"严重错误: {str(e)}", level=1) return False, f"文件处理失败: {str(e)}" def analyzer_action(input_file, output_file): print("\n" + "=" * 50) print(" 🚀 长春云校视频课程智能打标记系统 ".center(50, "✨")) print("=" * 50) analyzer = ContentAnalyzer(initial_timeout=300) success, result = analyzer.analyze_file(Path(input_file), Path(output_file)) print("\n" + "=" * 50) if success: print("\n✅ 分析成功!结果已保存至:", output_file) else: print(f"\n❌ 分析失败:{result}") print("=" * 50)