import asyncio import json import socket from typing import AsyncGenerator from openai import OpenAI # 阿里云中用来调用 deepseek v3 的密钥 MODEL_API_KEY = "sk-01d13a39e09844038322108ecdbd1bbc" MODEL_NAME = "deepseek-v3" #MODEL_NAME = "qwen-plus" # 初始化 OpenAI 客户端 client = OpenAI( api_key=MODEL_API_KEY, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) # 获取本机所有 IPv4 地址 def get_local_ips(): ips = [] hostname = socket.gethostname() try: # 获取所有 IP 地址 addrs = socket.getaddrinfo(hostname, None, family=socket.AF_INET) # 只获取 IPv4 地址 for addr in addrs: ip = addr[4][0] if ip not in ips: ips.append(ip) except Exception as e: print(f"获取 IP 地址失败: {e}") return ips # 流式生成数据的函数 async def generate_stream_markdown(course_name: str): # 调用阿里云 API,启用流式响应 stream = client.chat.completions.create( model=MODEL_NAME, messages=[ {'role': 'system', 'content': '你是一个教学经验丰富的基础教育教师'}, {'role': 'user', 'content': '帮我设计一下' + course_name + '的课件提纲,用markdown格式返回。不要返回 ```markdown 或者 ``` 这样的内容!'} ], stream=True, # 启用流式响应 timeout=6000, ) # 逐字返回数据 for chunk in stream: if chunk.choices[0].delta.content: for char in chunk.choices[0].delta.content: yield char.encode("utf-8") await asyncio.sleep(0.05) # 控制逐字输出的速度 def generate_prompt(source: str) -> str: # 打开 prompt.txt 文件,读取内容 with open("prompt.txt", "r", encoding="utf-8") as file: prompt = file.read() prompt = prompt.replace("{{source}}", source) return prompt async def convert_markdown_to_json(source: str) -> AsyncGenerator[bytes, None]: # 生成提示词 prompt = generate_prompt(source) # 调用 AI 模型,启用流式响应 stream = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "你是一个专业的 Markdown 内容解析器。"}, {"role": "user", "content": prompt} ], stream=True, # 启用流式响应 max_tokens=2000, temperature=0.5 ) # 缓存当前内容 buffer = "" # 将同步的 Stream 转换为异步迭代器 async def async_iter_stream(): for chunk in stream: yield chunk # 逐块处理流式数据 async for chunk in async_iter_stream(): if chunk.choices[0].delta.content: # 将新内容添加到缓冲区 buffer += chunk.choices[0].delta.content # 检查是否有换行符(\n 或 \r) while "\n" in buffer or "\r" in buffer: # 找到第一个换行符的位置 newline_index = buffer.find("\n") if "\n" in buffer else buffer.find("\r") # 提取并返回完整的一行 line = buffer[:newline_index] if line: # 确保 line 不为空 yield line.encode("utf-8") # 移除已处理的部分 buffer = buffer[newline_index + 1:] # 返回缓冲区中剩余的内容(最后一行) if buffer: yield buffer.encode("utf-8")