main
HuangHai 5 months ago
parent 8491098bd2
commit 56c087602d

@ -1,2 +1,5 @@
# 提交任务
curl -X POST "http://127.0.0.1:8000/create-task" -H "Content-Type: application/json" -d "{\"prompt\": \"Generate an image of a sunset\"}"
curl -X POST "http://127.0.0.1:8000/create-task" -H "Content-Type: application/json" -d "{\"prompt\": \"Generate an image of a sunset\"}"
# 查询任务
curl "http://127.0.0.1:8000/task-status/699f6d14-86da-459c-8cc4-952ce5451352"

@ -1,6 +1,5 @@
import mysql.connector
from AiService.Config.Config import *
import mysql.connector.pooling
from AiService.Config.Config import MYSQL_CONFIG
class TaskModel:
# 创建连接池
@ -30,6 +29,49 @@ class TaskModel:
print(f"Failed to insert task: {str(e)}")
raise
def update_task_status(self, task_id: str, status: str, result_url: str = None, error_message: str = None):
"""
更新任务状态
"""
try:
query = """
UPDATE t_gen_tasks
SET status = %s, result_url = %s, error_message = %s, complete_time = NOW()
WHERE task_id = %s
"""
self.cursor.execute(query, (status, result_url, error_message, task_id))
self.connection.commit()
except Exception as e:
print(f"Failed to update task status: {str(e)}")
raise
def get_task_status(self, task_id: str):
"""
查询任务状态
"""
try:
query = """
SELECT task_id, keyword, status, submit_time, complete_time, result_url, error_message
FROM t_gen_tasks
WHERE task_id = %s
"""
self.cursor.execute(query, (task_id,))
result = self.cursor.fetchone()
if result:
return {
"task_id": result[0],
"keyword": result[1],
"status": result[2],
"submit_time": result[3],
"complete_time": result[4],
"result_url": result[5],
"error_message": result[6]
}
return None
except Exception as e:
print(f"Failed to get task status: {str(e)}")
raise
def close(self):
"""
将连接归还到连接池

@ -5,6 +5,8 @@ from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
import uuid
import json
from AiService.Config.Config import KAFKA_TOPIC, KAFKA_BOOTSTRAP_SERVERS
from AiService.Model.TaskModel import *
# 定义请求体模型
@ -46,6 +48,21 @@ def ensure_topic_exists():
except Exception as e:
print(f"Failed to ensure topic exists: {str(e)}")
# 查询任务状态接口
@app.get("/task-status/{task_id}")
def get_task_status(task_id: str):
try:
tm = TaskModel()
task_status = tm.get_task_status(task_id)
tm.close()
if task_status:
return task_status
else:
raise HTTPException(status_code=404, detail="Task not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}")
# 定义 Lifespan 事件
@asynccontextmanager
async def lifespan(app: FastAPI):

@ -0,0 +1,40 @@
import json
import time
from kafka import KafkaConsumer
from AiService.Model.TaskModel import TaskModel
from AiService.Config.Config import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC
def process_task(task_data: dict):
"""
模拟任务处理
"""
task_id = task_data.get("task_id")
keyword = task_data.get("prompt")
# 模拟处理任务(休息 3 秒)
print(f"Processing task {task_id} with keyword: {keyword}")
time.sleep(3)
# 更新任务状态为“已完成”
tm = TaskModel()
tm.update_task_status(task_id, "completed", result_url="http://example.com/result")
tm.close()
print(f"Task {task_id} completed.")
def start_worker():
"""
启动 Worker Kafka 中消费任务
"""
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
print("Worker started. Waiting for tasks...")
for message in consumer:
task_data = message.value
process_task(task_data)
if __name__ == "__main__":
start_worker()
Loading…
Cancel
Save