From 56c087602d1e4c063b483e1be5b71faff9accea3 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Fri, 28 Feb 2025 15:26:56 +0800 Subject: [PATCH] 'commit' --- AI/AiService/Doc/测试文档.txt | 5 +- AI/AiService/Model/TaskModel.py | 46 +++++++++++++++++- .../__pycache__/TaskModel.cpython-310.pyc | Bin 1477 -> 2725 bytes AI/AiService/Server.py | 17 +++++++ AI/AiService/Worker/TaskWorker.py | 40 +++++++++++++++ AI/AiService/Worker/__init__.py | 0 6 files changed, 105 insertions(+), 3 deletions(-) create mode 100644 AI/AiService/Worker/TaskWorker.py create mode 100644 AI/AiService/Worker/__init__.py diff --git a/AI/AiService/Doc/测试文档.txt b/AI/AiService/Doc/测试文档.txt index 2f9bd1e4..8391e34d 100644 --- a/AI/AiService/Doc/测试文档.txt +++ b/AI/AiService/Doc/测试文档.txt @@ -1,2 +1,5 @@ # 提交任务 -curl -X POST "http://127.0.0.1:8000/create-task" -H "Content-Type: application/json" -d "{\"prompt\": \"Generate an image of a sunset\"}" \ No newline at end of file +curl -X POST "http://127.0.0.1:8000/create-task" -H "Content-Type: application/json" -d "{\"prompt\": \"Generate an image of a sunset\"}" + +# 查询任务 +curl "http://127.0.0.1:8000/task-status/699f6d14-86da-459c-8cc4-952ce5451352" \ No newline at end of file diff --git a/AI/AiService/Model/TaskModel.py b/AI/AiService/Model/TaskModel.py index b127c3c8..5f558c69 100644 --- a/AI/AiService/Model/TaskModel.py +++ b/AI/AiService/Model/TaskModel.py @@ -1,6 +1,5 @@ -import mysql.connector -from AiService.Config.Config import * import mysql.connector.pooling +from AiService.Config.Config import MYSQL_CONFIG class TaskModel: # 创建连接池 @@ -30,6 +29,49 @@ class TaskModel: print(f"Failed to insert task: {str(e)}") raise + def update_task_status(self, task_id: str, status: str, result_url: str = None, error_message: str = None): + """ + 更新任务状态 + """ + try: + query = """ + UPDATE t_gen_tasks + SET status = %s, result_url = %s, error_message = %s, complete_time = NOW() + WHERE task_id = %s + """ + self.cursor.execute(query, (status, result_url, error_message, task_id)) + self.connection.commit() + except Exception as e: + print(f"Failed to update task status: {str(e)}") + raise + + def get_task_status(self, task_id: str): + """ + 查询任务状态 + """ + try: + query = """ + SELECT task_id, keyword, status, submit_time, complete_time, result_url, error_message + FROM t_gen_tasks + WHERE task_id = %s + """ + self.cursor.execute(query, (task_id,)) + result = self.cursor.fetchone() + if result: + return { + "task_id": result[0], + "keyword": result[1], + "status": result[2], + "submit_time": result[3], + "complete_time": result[4], + "result_url": result[5], + "error_message": result[6] + } + return None + except Exception as e: + print(f"Failed to get task status: {str(e)}") + raise + def close(self): """ 将连接归还到连接池 diff --git a/AI/AiService/Model/__pycache__/TaskModel.cpython-310.pyc b/AI/AiService/Model/__pycache__/TaskModel.cpython-310.pyc index 0a62b1294187b7586f3c0bf597a4249225340179..5fbad08ecc95922a82b6e912b2662bfc3768843b 100644 GIT binary patch literal 2725 zcmai0?Q0xG7~h$FxtCmCnl_EGRrZ6GUYiE-i%?V=(rYay*OyBhT>?wonZ!-+Zj;$P z+Z>ltZIsdv6$*+_UlO%oK{SH+rIg~ov0pVY`4bc@KF{oC?>235j`{7p-aIqUZ+_2s z-Q9TwlI8q9w+N*_u+o`SsGNZ8zYM?>MyiTM;7hBtrdlddPAg1h+KR%o8`RPmsVGH# z8^$W-A_)r-$% zIR8w!Qo1k+pUJbSDTm{o4x8~#hfP122^xMeHT}})_{CDiJbcI>Hlv*G4>JqA&cGAR z!BaDCl`}K&Ot)=ttL-m4>b7w094b+KCO@RkbXi*_Bwss z9GKkU`$)%(a%e=`4)cQhjcQ;wL^bT@LU_Wiao?Yr;|}_LC!z5P2i}27a{xG?#~jdG z6nLPu$gK7R3)GkeM}Y-K%mNB5Fj6c~myK(NWP#Rr3D-(kaQD~E&(|Xs{B`TM&Fdc~ zUCr;3B96Xz?&RoXsngr3?n-GgcD4D2^l5wJaYJhNaT~a6p~?ep2X2i+TlwtN;WQPd z-YQ*yRZ&V@CTFFVZNAthNMnJ`1YC}dO#z+=^jaf}BPTAb1Pq}%ZO+W}dY4#NjDN+K(Bh1e7 zz*?|~uN5sxGB@RLmk_o(BAPCdOG3ZWQwXy!m)}yaCZ{OQ^cxUU;7Z6jT z>gF%2$(V9cjl^4bsvd|ss-M8O%dRUZ@QL&`WEo!bS`TUVw#|>`;-xLpa=F-x5gFeW zwjJheyXLV*70U(NUTn-%;~u@XK{Rgq6WZ34|JJ@5B;3t;Ukq#5rgeNEd

U8#G_u2RsUjFd1hNeI<>UEfl{}>oHMGZ6HU7;S{rbV)p1#5XIjX2z ygVL~(#IBLpEO7)Thl9yeWCRSHb?4$wWFIc{Gz-G)>(Md1iPZOT8(F15`sshV38(J> delta 435 zcmZ9HF-yZh6vyw~U1Cg=6pE!PI=GpFg?)eR z1wVm<-=LG5qqD9S@1jHT!TsL7d++`K+++E*;0W6`k&O4=OFyC$XKhrK6-OK*#6V^o z;qGru7&H+loQ@Hvx0q259pq{sYQ62k8&o}YoexbG+w!D)5`Heouuno7YN5_;4jglG zkNt%QxC3Tz%C*T;#Ws0ijF`nOsqai?Utm9f!Zm=K1tf%0HYN3}4y)PJ7*toJ@`kdY z5+-$ESnT%FvzFj~RP!Q{`t773TdtiA7rP^7wT?SS2Pcig_U?W&(PbyG70L?B3Q93w znS-U?h=lr4x+GyB6Jx+YXPD^UhF;RFVm{$&tav_1FQvPPW8b?HNwO{%SY;>ZZ28Ge i;Q6(Nh);w5Ut*Pi^R){hq9E>PskvzxxtRe5l<*H?;9JT7 diff --git a/AI/AiService/Server.py b/AI/AiService/Server.py index b6934169..f181da10 100644 --- a/AI/AiService/Server.py +++ b/AI/AiService/Server.py @@ -5,6 +5,8 @@ from kafka import KafkaProducer, KafkaAdminClient from kafka.admin import NewTopic import uuid import json + +from AiService.Config.Config import KAFKA_TOPIC, KAFKA_BOOTSTRAP_SERVERS from AiService.Model.TaskModel import * # 定义请求体模型 @@ -46,6 +48,21 @@ def ensure_topic_exists(): except Exception as e: print(f"Failed to ensure topic exists: {str(e)}") +# 查询任务状态接口 +@app.get("/task-status/{task_id}") +def get_task_status(task_id: str): + try: + tm = TaskModel() + task_status = tm.get_task_status(task_id) + tm.close() + + if task_status: + return task_status + else: + raise HTTPException(status_code=404, detail="Task not found") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") + # 定义 Lifespan 事件 @asynccontextmanager async def lifespan(app: FastAPI): diff --git a/AI/AiService/Worker/TaskWorker.py b/AI/AiService/Worker/TaskWorker.py new file mode 100644 index 00000000..41b45e00 --- /dev/null +++ b/AI/AiService/Worker/TaskWorker.py @@ -0,0 +1,40 @@ +import json +import time +from kafka import KafkaConsumer +from AiService.Model.TaskModel import TaskModel +from AiService.Config.Config import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC + +def process_task(task_data: dict): + """ + 模拟任务处理 + """ + task_id = task_data.get("task_id") + keyword = task_data.get("prompt") + + # 模拟处理任务(休息 3 秒) + print(f"Processing task {task_id} with keyword: {keyword}") + time.sleep(3) + + # 更新任务状态为“已完成” + tm = TaskModel() + tm.update_task_status(task_id, "completed", result_url="http://example.com/result") + tm.close() + print(f"Task {task_id} completed.") + +def start_worker(): + """ + 启动 Worker,从 Kafka 中消费任务 + """ + consumer = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + value_deserializer=lambda v: json.loads(v.decode('utf-8')) + ) + + print("Worker started. Waiting for tasks...") + for message in consumer: + task_data = message.value + process_task(task_data) + +if __name__ == "__main__": + start_worker() \ No newline at end of file diff --git a/AI/AiService/Worker/__init__.py b/AI/AiService/Worker/__init__.py new file mode 100644 index 00000000..e69de29b