main
HuangHai 5 months ago
parent 43b739a4cd
commit 46b8665528

@ -2,8 +2,58 @@ import uvicorn
from fastapi import FastAPI, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, PlainTextResponse
import asyncio
import socket
from openai import OpenAI
from MarkdownToJsonUtil import *
# 阿里云中用来调用 deepseek v3 的密钥
MODEL_API_KEY = "sk-01d13a39e09844038322108ecdbd1bbc"
MODEL_NAME = "deepseek-v3"
#MODEL_NAME = "qwen-plus"
# 初始化 OpenAI 客户端
client = OpenAI(
api_key=MODEL_API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# 获取本机所有 IPv4 地址
def get_local_ips():
ips = []
hostname = socket.gethostname()
try:
# 获取所有 IP 地址
addrs = socket.getaddrinfo(hostname, None, family=socket.AF_INET) # 只获取 IPv4 地址
for addr in addrs:
ip = addr[4][0]
if ip not in ips:
ips.append(ip)
except Exception as e:
print(f"获取 IP 地址失败: {e}")
return ips
# 流式生成数据的函数
async def generate_stream_markdown(course_name: str):
# 调用阿里云 API启用流式响应
stream = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{'role': 'system', 'content': '你是一个教学经验丰富的基础教育教师'},
{'role': 'user', 'content': '帮我设计一下' + course_name + '的课件提纲用markdown格式返回。不要返回 ```markdown 或者 ``` 这样的内容!'}
],
stream=True, # 启用流式响应
timeout=6000,
)
# 逐字返回数据
for chunk in stream:
if chunk.choices[0].delta.content:
for char in chunk.choices[0].delta.content:
yield char.encode("utf-8")
await asyncio.sleep(0.05) # 控制逐字输出的速度
from Util import *
app = FastAPI()
# 添加 CORS 中间件
@ -39,7 +89,7 @@ async def aippt_outline(
@app.post("/api/tools/aippt") # 修改为 POST 方法
async def aippt(content: str = Body(..., embed=True, description="Markdown 内容")): # 使用 Body 接收请求体参数
return StreamingResponse(
convert_markdown_to_json(content), # 传入 content
getMyJson(content), # 传入 content
media_type="text/plain", # 使用 text/plain 格式
headers={
"Cache-Control": "no-cache",

@ -0,0 +1,97 @@
import markdown_to_json
import json
import asyncio
def markdown_to_dict(markdown_content):
"""
Markdown 内容转换为 Python 字典
"""
# 将 Markdown 转换为 JSON 字符串
json_content = markdown_to_json.jsonify(markdown_content)
# 解码 Unicode 转义
json_content = json_content.encode('utf-8').decode('unicode_escape')
# 将 JSON 字符串转换为字典
return json.loads(json_content)
def extract_level1(json_dict):
"""
提取一级目录生成指定格式的 JSON 对象列表
"""
# 获取第一个一级目录的名称
level1_title = next(iter(json_dict.keys()), None)
if level1_title:
return [{"type": "cover", "data": {"title": level1_title, "text": level1_title}}]
return []
def extract_level2_and_level3(json_dict, level1_title=None):
"""
提取指定一级目录下的二级目录及其三级目录内容生成指定格式的 JSON 对象列表
"""
# 如果没有指定一级目录,则使用第一个一级目录
if level1_title is None:
level1_title = next(iter(json_dict.keys()), None)
if level1_title and level1_title in json_dict:
result = []
for level2_title, level2_content in json_dict[level1_title].items():
# 输出二级目录
result.append({"type": "transition", "data": {"title": level2_title, "text": level2_title}})
# 输出三级目录内容
if isinstance(level2_content, dict):
for level3_title, level3_items in level2_content.items():
items = [{"title": item, "text": item} for item in level3_items]
result.append({
"type": "content",
"data": {
"title": level3_title,
"items": items
}
})
return result
return []
def extract_contents(json_dict, level1_title=None):
"""
提取所有二级目录名称生成目录部分的 JSON 对象
"""
# 如果没有指定一级目录,则使用第一个一级目录
if level1_title is None:
level1_title = next(iter(json_dict.keys()), None)
if level1_title and level1_title in json_dict:
# 获取所有二级目录名称
level2_titles = list(json_dict[level1_title].keys())
return {"type": "contents", "data": {"items": level2_titles}}
return {"type": "contents", "data": {"items": []}}
async def getMyJson(markdown_content):
"""
生成一个 AsyncIterable逐行返回 JSON 字符串
"""
# 将 Markdown 转换为字典
json_dict = markdown_to_dict(markdown_content)
# 提取一级目录
level1_json = extract_level1(json_dict)
for item in level1_json:
yield json.dumps(item, ensure_ascii=False)
await asyncio.sleep(0.5) # 控制逐行输出的速度
# 生成目录部分
contents_json = extract_contents(json_dict)
yield json.dumps(contents_json, ensure_ascii=False)
await asyncio.sleep(0.5)
# 提取二级目录及其三级目录内容
level2_and_level3_json = extract_level2_and_level3(json_dict)
for item in level2_and_level3_json:
yield json.dumps(item, ensure_ascii=False)
await asyncio.sleep(0.5)
# 添加结束标记
yield '{"type": "end" }'

@ -0,0 +1,27 @@
{"type": "cover", "data": { "title": "大学生职业生涯规划", "text": "助力大学生规划美好职业未来" } }
{"type": "contents", "data": { "items": [ "认识职业生涯规划", "自我评估", "职业环境分析", "设定职业目标", "制定行动计划", "评估与调整", "常见误区与应对" ] } }
{"type": "transition", "data": { "title": "认识规划", "text": "了解职业生涯规划定义与重要性" } }
{"type": "content", "data": { "title": "规划定义", "items": [ { "title": "计划安排", "text": "职业生涯规划是明确职业发展的一系列计划和安排,涵盖自我认知、职业探索等多方面,帮助大学生理清职业发展路径。" }, { "title": "多方面涵盖", "text": "它不仅仅是确定一份工作,还涉及到对自身的全面了解和对职业市场的深入探索,为未来职业发展奠定基础。" } ] } }
{"type": "content", "data": { "title": "规划重要性", "items": [ { "title": "明确方向", "text": "帮助大学生明确职业方向,避免在求职过程中盲目选择,使大学生能够有针对性地进行学习和实践。" }, { "title": "提升竞争力", "text": "通过规划可以提前做好职业准备,提升就业竞争力,增加在就业市场中的优势。" }, { "title": "增加满意度", "text": "合理的规划能让大学生找到适合自己的职业,从而增加职业满意度和成就感,提升工作幸福感。" } ] } }
{"type": "transition", "data": { "title": "自我评估", "text": "剖析自身性格、兴趣与能力" } }
{"type": "content", "data": { "title": "性格特点分析", "items": [ { "title": "性格倾向", "text": "分析自己外向或内向的性格倾向,不同性格在职业中有不同的适配性。外向性格适合销售等岗位,内向性格适合研发等工作。" }, { "title": "职业适配", "text": "了解性格与职业的适配性,能让大学生在选择职业时更贴合自身特点,提高职业满意度和工作效率。" } ] } }
{"type": "content", "data": { "title": "兴趣爱好探索", "items": [ { "title": "兴趣领域", "text": "探索个人的兴趣领域,兴趣是最好的老师,它能让大学生在工作中更有动力和热情。" }, { "title": "兴趣结合", "text": "找到兴趣与职业的结合点,将兴趣转化为职业优势,有助于在职业生涯中取得更好的发展。" } ] } }
{"type": "content", "data": { "title": "能力优势盘点", "items": [ { "title": "专业技能", "text": "盘点专业技能水平,明确自己在专业领域的优势和不足,以便有针对性地进行提升。" }, { "title": "通用能力", "text": "评估沟通、团队协作等通用能力,这些能力在不同职业中都非常重要,能帮助大学生更好地适应工作环境。" } ] } }
{"type": "transition", "data": { "title": "职业环境分析", "text": "洞察行业、职业与企业需求" } }
{"type": "content", "data": { "title": "行业发展趋势", "items": [ { "title": "新兴行业", "text": "关注新兴行业的崛起,如人工智能、大数据等,这些行业蕴含着巨大的发展潜力和就业机会。" }, { "title": "传统变革", "text": "了解传统行业的变革,传统行业在科技的推动下不断转型升级,也创造了新的职业需求。" } ] } }
{"type": "content", "data": { "title": "职业需求状况", "items": [ { "title": "人才数量", "text": "了解各职业的人才需求数量,选择人才需求大的职业,就业机会相对更多。" }, { "title": "发展前景", "text": "分析职业的发展前景,选择有良好发展前景的职业,能为个人的职业生涯带来更多的上升空间。" } ] } }
{"type": "content", "data": { "title": "企业招聘要求", "items": [ { "title": "素质要求", "text": "明确不同企业对人才的素质要求,包括专业知识、技能、品德等方面,以便有针对性地提升自己。" }, { "title": "企业文化", "text": "了解企业的文化和价值观,选择与自己价值观相符的企业,能更好地融入企业,发挥自己的优势。" } ] } }
{"type": "transition", "data": { "title": "设定职业目标", "text": "确立短、中、长期职业目标" } }

@ -1,60 +0,0 @@
import markdown_to_json
import json
def markdown_to_dict(markdown_content):
"""
Markdown 内容转换为 Python 字典
"""
# 将 Markdown 转换为 JSON 字符串
json_content = markdown_to_json.jsonify(markdown_content)
# 解码 Unicode 转义
json_content = json_content.encode('utf-8').decode('unicode_escape')
# 将 JSON 字符串转换为字典
return json.loads(json_content)
def extract_level1(json_dict):
"""
提取一级目录生成指定格式的 JSON 对象列表
"""
# 获取第一个一级目录的名称
level1_title = next(iter(json_dict.keys()), None)
if level1_title:
return [{"type": "cover", "data": {"title": level1_title, "text": level1_title}}]
return []
def extract_level2(json_dict, level1_title=None):
"""
提取指定一级目录下的二级目录生成指定格式的 JSON 对象列表
"""
# 如果没有指定一级目录,则使用第一个一级目录
if level1_title is None:
level1_title = next(iter(json_dict.keys()), None)
if level1_title and level1_title in json_dict:
return [{"type": "transition", "data": {"title": title, "text": title}} for title in
json_dict[level1_title].keys()]
return []
if __name__ == '__main__':
# 打开文本文件 Sample.md
with open("Sample.md", "r", encoding="utf-8") as file:
# 读取 Markdown 文件的内容
markdown_content = file.read()
# 将 Markdown 转换为字典
json_dict = markdown_to_dict(markdown_content)
# 提取一级目录
level1_json = extract_level1(json_dict)
print("一级目录:")
for item in level1_json:
print(json.dumps(item, ensure_ascii=False))
# 提取二级目录(自动获取一级目录名称)
level2_json = extract_level2(json_dict)
print("\n二级目录:")
for item in level2_json:
print(json.dumps(item, ensure_ascii=False))

@ -1,109 +0,0 @@
import asyncio
import json
import socket
from typing import AsyncGenerator
from openai import OpenAI
# 阿里云中用来调用 deepseek v3 的密钥
MODEL_API_KEY = "sk-01d13a39e09844038322108ecdbd1bbc"
MODEL_NAME = "deepseek-v3"
#MODEL_NAME = "qwen-plus"
# 初始化 OpenAI 客户端
client = OpenAI(
api_key=MODEL_API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# 获取本机所有 IPv4 地址
def get_local_ips():
ips = []
hostname = socket.gethostname()
try:
# 获取所有 IP 地址
addrs = socket.getaddrinfo(hostname, None, family=socket.AF_INET) # 只获取 IPv4 地址
for addr in addrs:
ip = addr[4][0]
if ip not in ips:
ips.append(ip)
except Exception as e:
print(f"获取 IP 地址失败: {e}")
return ips
# 流式生成数据的函数
async def generate_stream_markdown(course_name: str):
# 调用阿里云 API启用流式响应
stream = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{'role': 'system', 'content': '你是一个教学经验丰富的基础教育教师'},
{'role': 'user', 'content': '帮我设计一下' + course_name + '的课件提纲用markdown格式返回。不要返回 ```markdown 或者 ``` 这样的内容!'}
],
stream=True, # 启用流式响应
timeout=6000,
)
# 逐字返回数据
for chunk in stream:
if chunk.choices[0].delta.content:
for char in chunk.choices[0].delta.content:
yield char.encode("utf-8")
await asyncio.sleep(0.05) # 控制逐字输出的速度
def generate_prompt(source: str) -> str:
# 打开 prompt.txt 文件,读取内容
with open("prompt.txt", "r", encoding="utf-8") as file:
prompt = file.read()
prompt = prompt.replace("{{source}}", source)
return prompt
async def convert_markdown_to_json(source: str) -> AsyncGenerator[bytes, None]:
# 生成提示词
prompt = generate_prompt(source)
# 调用 AI 模型,启用流式响应
stream = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个专业的 Markdown 内容解析器。"},
{"role": "user", "content": prompt}
],
stream=True, # 启用流式响应
max_tokens=2000,
temperature=0.5
)
# 缓存当前内容
buffer = ""
# 将同步的 Stream 转换为异步迭代器
async def async_iter_stream():
for chunk in stream:
yield chunk
# 逐块处理流式数据
async for chunk in async_iter_stream():
if chunk.choices[0].delta.content:
# 将新内容添加到缓冲区
buffer += chunk.choices[0].delta.content
# 检查是否有换行符(\n 或 \r
while "\n" in buffer or "\r" in buffer:
# 找到第一个换行符的位置
newline_index = buffer.find("\n") if "\n" in buffer else buffer.find("\r")
# 提取并返回完整的一行
line = buffer[:newline_index]
if line: # 确保 line 不为空
yield line.encode("utf-8")
# 移除已处理的部分
buffer = buffer[newline_index + 1:]
# 返回缓冲区中剩余的内容(最后一行)
if buffer:
yield buffer.encode("utf-8")

@ -1,27 +0,0 @@
你是一个专业的 Markdown 内容解析器,能够将 Markdown 格式的文本转换为文本字符串。请根据以下规则处理 Markdown 内容:
1. 每生成一个 JSON 对象后,必须添加一个换行符(\n确保每个 JSON 对象独占一行。这是最重要的规则,请务必遵守。
2. 第一级目录按下面的格式返回title 就是一级目录文字text 你帮着生成简短的描述语言:
{"type": "cover", "data": { "title": "说明文阅读技巧课件提纲", "text": "掌握说明文阅读方法,提高理解能力" } }\n
3. 整理所有二级目录,按下面格式返回:
{"type": "contents", "data": { "items": [ "说明文概述", "明确说明对象", "把握说明顺序", "分析说明方法及其作用", "理解说明文的语言特点", "归纳文章结构", "练习巩固" ] } }\n
4. 每个二级目录按下面的格式返回同样title 是二级目录文字text 你帮着生成简短的描述语言:
{"type": "transition", "data": { "title": "进入说明文概述", "text": "了解说明文的基本概念和特征" } }\r\n
5. 每个二级下的三级目录按下面格式返回同样title 是二级目录文字text 你帮着生成简短的描述语言:
{"type": "content", "data": { "title": "说明文概述", "items": [{ "title": "定义", "text": "以说明为主要表达方式,介绍事物或事理的文章" }, { "title": "内容的知识性", "text": "提供丰富的知识信息,帮助读者了解新事物" }, { "title": "语言的准确性", "text": "用词准确,表达严谨,确保信息传递无误" }, { "title": "结构的条理性", "text": "层次分明,逻辑清晰,便于读者理解" } ] } }\n
6. 最后按下面字符串固定输出:
{"type": "end" }\n
7. 不要输出:```json 和 ```。
8. 不要生成 **xx** 这样的 markdown 中加重的符号。
以下是需要处理的 Markdown 内容:
{{source}}
请将上述 Markdown 内容转换为以下文本字符串格式:
Loading…
Cancel
Save