main
HuangHai 5 months ago
parent 7ba782e265
commit a95983aab6

@ -0,0 +1,12 @@
# Kafka 配置
KAFKA_BOOTSTRAP_SERVERS = '10.10.14.250:9092'
KAFKA_TOPIC = 'user-tasks'
# MySQL 配置
MYSQL_CONFIG = {
'host': '10.10.14.210',
'port': 22066,
'user': 'root',
'password': 'DsideaL147258369',
'database': 'ai_db'
}

@ -13,7 +13,7 @@ services:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.14.250:9092,CONTROLLER://10.10.14.250:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.14.250:9092,CONTROLLER://10.10.14.250:9093 # 注意修改IP地址
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 添加控制器监听器名称
- KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
volumes:

@ -0,0 +1,2 @@
# 提交任务
curl -X POST "http://127.0.0.1:8000/create-task" -H "Content-Type: application/json" -d "{\"prompt\": \"Generate an image of a sunset\"}"

@ -0,0 +1,109 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
import uuid
import json
import mysql.connector
from Config.Config import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, MYSQL_CONFIG
# 定义请求体模型
class TaskRequest(BaseModel):
prompt: str # 用户输入的提示词
# 初始化 FastAPI 应用
app = FastAPI()
# 创建 Kafka 生产者
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8') # 将消息序列化为 JSON
)
# 检查并创建主题
def ensure_topic_exists():
try:
# 创建 Kafka AdminClient
admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
# 获取所有主题
existing_topics = admin_client.list_topics()
# 如果主题不存在,则创建
if KAFKA_TOPIC not in existing_topics:
topic = NewTopic(
name=KAFKA_TOPIC,
num_partitions=1, # 分区数
replication_factor=1 # 副本数
)
admin_client.create_topics([topic])
print(f"Topic '{KAFKA_TOPIC}' created successfully.")
else:
print(f"Topic '{KAFKA_TOPIC}' already exists.")
# 关闭 AdminClient
admin_client.close()
except Exception as e:
print(f"Failed to ensure topic exists: {str(e)}")
# 定义 Lifespan 事件
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时检查并创建主题
ensure_topic_exists()
yield
# 应用关闭时清理资源(可选)
print("Application shutdown.")
# 写入 MySQL 日志
def log_to_mysql(task_id: str, keyword: str):
try:
# 连接 MySQL
connection = mysql.connector.connect(**MYSQL_CONFIG)
cursor = connection.cursor()
# 插入日志
query = """
INSERT INTO t_gen_tasks (task_id, keyword, status)
VALUES (%s, %s, %s)
"""
cursor.execute(query, (task_id, keyword, 'pending'))
connection.commit()
# 关闭连接
cursor.close()
connection.close()
except Exception as e:
print(f"Failed to log to MySQL: {str(e)}")
# 定义接口
@app.post("/create-task")
def create_task(task_request: TaskRequest):
try:
# 生成唯一的 task_id
task_id = str(uuid.uuid4())
# 构造任务信息
task_data = {
"task_id": task_id,
"prompt": task_request.prompt
}
# 将任务信息写入 Kafka
producer.send(KAFKA_TOPIC, value=task_data)
producer.flush() # 确保消息发送完成
# 将任务信息写入 MySQL
log_to_mysql(task_id, task_request.prompt)
# 返回 task_id
return {"task_id": task_id}
except Exception as e:
# 处理异常
raise HTTPException(status_code=500, detail=f"Failed to create task: {str(e)}")
# 启动应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

@ -4,7 +4,7 @@ from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='10.10.14.250:9092')
# 发送消息
producer.send('test-topic', b'Hello, Kafka!')
producer.send('test-topic', b'Hello, Kafka ,ChangChun!')
producer.flush()
# 关闭生产者

@ -1 +1,4 @@
kafka-python==2.0.5
fastapi==0.115.9
uvicorn==0.34.0
mysql-connector-python==9.2.0

Loading…
Cancel
Save