import asyncio import json import re from CommonUtil import * # 流式生成数据的函数 async def generate_stream_markdown(course_name: str): """ 流式生成 Markdown 数据,并在控制台输出完整的 Markdown 内容 """ # 调用阿里云 API,启用流式响应 stream = client.chat.completions.create( model=MODEL_NAME, messages=[ {'role': 'system', 'content': '你是一个教学经验丰富的基础教育教师'}, {'role': 'user', 'content': '帮我设计一下' + course_name + '的课件提纲,用markdown格式返回。强调1、标签只能返回 #,##,###,-,其它标签一率不可以返回,这个非常重要!2、不要返回 ```markdown 或者 ``` 这样的内容! 3、每部分都有生成完整的一、二、三级内容,不能省略。'} ], stream=True, # 启用流式响应 timeout=6000, ) # 初始化完整的 Markdown 内容 full_markdown = "" # 逐字返回数据 for chunk in stream: if chunk.choices[0].delta.content: chunk_content = chunk.choices[0].delta.content full_markdown += chunk_content # 拼接 Markdown 内容 for char in chunk_content: yield char.encode("utf-8") await asyncio.sleep(0.05) # 控制逐字输出的速度 # 在控制台输出完整的 Markdown 内容 print("\n完整的 Markdown 内容:") print(full_markdown) def extract_level1_title(markdown_content): """ 从 Markdown 字符串中提取一级目录的文本内容 """ # 使用正则表达式匹配一级目录的标题 match = re.search(r'^#\s+(.+)$', markdown_content, re.MULTILINE) if match: return match.group(1) # 返回一级目录的文本内容 return None def extract_level2_titles(markdown_content): """ 从 Markdown 字符串中提取所有二级目录的文本内容,返回一个数组 """ # 使用正则表达式匹配所有二级目录的标题 matches = re.findall(r'^##\s+(.+)$', markdown_content, re.MULTILINE) # 去重,确保每个二级目录标题只出现一次 unique_matches = list(dict.fromkeys(matches)) return unique_matches # 返回去重后的二级目录标题数组 def extract_level2_and_level3(markdown_content): """ 遍历 Markdown 内容,记录所有二级目录、三级目录及其下的 - 内容 """ lines = markdown_content.splitlines() # 将内容按行分割 current_level3 = None level3_items = [] result = [] # 用于存储最终的结构化数据 def save_level3(): """保存当前三级目录及其内容""" nonlocal current_level3, level3_items if current_level3: result[-1]["children"].append({ "title": current_level3, "items": level3_items.copy() }) level3_items.clear() for line in lines: line = line.strip() # 去掉前后空格 if not line: # 跳过空行 continue if line.startswith("## "): # 二级目录 # 先处理之前未完成的三级目录 if current_level3: save_level3() current_level3 = None # 处理二级目录 current_level2 = line[3:].strip() # 去掉 "## " 取标题 result.append({ "title": current_level2, "children": [] # 用于存储三级目录 }) elif line.startswith("### "): # 三级目录 # 遇到新三级目录时,先处理前一个 if current_level3: save_level3() current_level3 = line[4:].strip() # 去掉 "### " 取标题 level3_items = [] elif line.startswith("- "): # 三级目录下的内容项 if current_level3 is None: current_level3 = "未命名章节" # 兜底处理 level3_items.append(line[2:].strip()) # 去掉 "- " 取内容 # 处理最后一个三级目录 if current_level3: save_level3() return result def convert_structure_to_json(structure): """ 将结构化数据转换为指定的 JSON 格式 """ result = [] for level2 in structure: # 处理二级目录 level2_title = level2["title"] level2_json = { "type": "transition", "data": { "title": level2_title, "text": level2_title } } result.append(level2_json) # 处理三级目录 for level3 in level2["children"]: level3_title = level3["title"] items = [{"title": item, "text": item} for item in level3["items"]] level3_json = { "type": "content", "data": { "title": level3_title, "text": level3_title, "items": items } } result.append(level3_json) return result # 运行应用 if __name__ == "__main__": # 使用 asyncio.run 运行异步函数 # asyncio.run(test_generate_stream_markdown()) # 读取Sample.md内容 with open("Sample.md", "r", encoding="utf-8") as file: markdown_content = file.read() listAll = [] # 一级名称 level1_title = extract_level1_title(markdown_content) json_obj = {"type": "cover", "data": {"title": level1_title, "text": ""}} listAll.append(json_obj) # 二级名称列表 contents = extract_level2_titles(markdown_content) json_obj = {"type": "contents", "data": {"items": contents}} listAll.append(json_obj) # 二级目录和三级目录 result = extract_level2_and_level3(markdown_content) json_obj = convert_structure_to_json(result) # 遍历结果并一个对象一行打印 for item in json_obj: listAll.append(item) print(listAll)