main
HuangHai 5 months ago
parent a95983aab6
commit 17d495c73a

@ -0,0 +1,30 @@
import mysql.connector
from AiService.Config.Config import *
class TaskModel:
def __init__(self):
# 初始化 MySQL 连接
self.connection = mysql.connector.connect(**MYSQL_CONFIG)
self.cursor = self.connection.cursor()
def insert_task(self, task_id: str, keyword: str):
"""
插入任务记录到 t_gen_tasks
"""
try:
query = """
INSERT INTO t_gen_tasks (task_id, keyword, status)
VALUES (%s, %s, %s)
"""
self.cursor.execute(query, (task_id, keyword, 'pending'))
self.connection.commit()
except Exception as e:
print(f"Failed to insert task: {str(e)}")
raise
def close(self):
"""
关闭 MySQL 连接
"""
self.cursor.close()
self.connection.close()

@ -5,8 +5,7 @@ from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
import uuid
import json
import mysql.connector
from Config.Config import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, MYSQL_CONFIG
from AiService.Model.TaskModel import *
# 定义请求体模型
class TaskRequest(BaseModel):
@ -56,27 +55,6 @@ async def lifespan(app: FastAPI):
# 应用关闭时清理资源(可选)
print("Application shutdown.")
# 写入 MySQL 日志
def log_to_mysql(task_id: str, keyword: str):
try:
# 连接 MySQL
connection = mysql.connector.connect(**MYSQL_CONFIG)
cursor = connection.cursor()
# 插入日志
query = """
INSERT INTO t_gen_tasks (task_id, keyword, status)
VALUES (%s, %s, %s)
"""
cursor.execute(query, (task_id, keyword, 'pending'))
connection.commit()
# 关闭连接
cursor.close()
connection.close()
except Exception as e:
print(f"Failed to log to MySQL: {str(e)}")
# 定义接口
@app.post("/create-task")
def create_task(task_request: TaskRequest):
@ -95,7 +73,9 @@ def create_task(task_request: TaskRequest):
producer.flush() # 确保消息发送完成
# 将任务信息写入 MySQL
log_to_mysql(task_id, task_request.prompt)
tm = TaskModel()
tm.insert_task(task_id, task_request.prompt)
tm.close()
# 返回 task_id
return {"task_id": task_id}

Loading…
Cancel
Save