import json import logging from typing import Dict, List from Util.MySQLUtil import init_mysql_pool # 配置日志 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) async def process_node(node: Dict, parent_id: str = None) -> List[Dict]: """处理单个知识点节点""" knowledge_point = { "id": node["value"], "title": node["title"], "parent_id": parent_id, "is_leaf": node["isLeaf"], "prerequisite": json.dumps(node.get("PREREQUISITE", [])), "related": json.dumps(node.get("RELATED_TO", [])) } children = [] if not node["isLeaf"] and "children" in node: for child in node["children"]: children.extend(await process_node(child, node["value"])) return [knowledge_point] + children async def insert_knowledge_points(mysql_pool, knowledge_points: List[Dict]): """批量插入知识点数据""" async with mysql_pool.acquire() as conn: await conn.ping() async with conn.cursor() as cur: for point in knowledge_points: await cur.execute( """INSERT INTO knowledge_points (id, title, parent_id, is_leaf, prerequisite, related) VALUES (%s, %s, %s, %s, %s, %s)""", (point["id"], point["title"], point["parent_id"], point["is_leaf"], point["prerequisite"], point["related"]) ) await conn.commit() async def main(): """主函数""" # 初始化MySQL连接池 mysql_pool = await init_mysql_pool() # 读取JSON文件 with open("d:\\dsWork\\dsProject\\dsRag\\Neo4j\\小学数学知识点体系.json", "r", encoding="utf-8") as f: data = json.load(f) # 处理知识点数据 knowledge_points = [] for node in data["data"]["tree"]: knowledge_points.extend(await process_node(node)) # 插入数据库 await insert_knowledge_points(mysql_pool, knowledge_points) logger.info(f"成功插入 {len(knowledge_points)} 条知识点数据") if __name__ == "__main__": import asyncio asyncio.run(main())