diff --git a/AI/AiService/Config/Config.py b/AI/AiService/Config/Config.py new file mode 100644 index 00000000..90c30fa8 --- /dev/null +++ b/AI/AiService/Config/Config.py @@ -0,0 +1,12 @@ +# Kafka 配置 +KAFKA_BOOTSTRAP_SERVERS = '10.10.14.250:9092' +KAFKA_TOPIC = 'user-tasks' + +# MySQL 配置 +MYSQL_CONFIG = { + 'host': '10.10.14.210', + 'port': 22066, + 'user': 'root', + 'password': 'DsideaL147258369', + 'database': 'ai_db' +} \ No newline at end of file diff --git a/AI/AiService/Config/__init__.py b/AI/AiService/Config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AI/AiService/Config/__pycache__/Config.cpython-310.pyc b/AI/AiService/Config/__pycache__/Config.cpython-310.pyc new file mode 100644 index 00000000..d0a94ccb Binary files /dev/null and b/AI/AiService/Config/__pycache__/Config.cpython-310.pyc differ diff --git a/AI/AiService/Config/__pycache__/__init__.cpython-310.pyc b/AI/AiService/Config/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 00000000..372515ba Binary files /dev/null and b/AI/AiService/Config/__pycache__/__init__.cpython-310.pyc differ diff --git a/AI/AiService/Doc/docker-compose.yml b/AI/AiService/Doc/docker-compose.yml index 881c8f9c..9a821baf 100644 --- a/AI/AiService/Doc/docker-compose.yml +++ b/AI/AiService/Doc/docker-compose.yml @@ -13,7 +13,7 @@ services: - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.14.250:9092,CONTROLLER://10.10.14.250:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.14.250:9092,CONTROLLER://10.10.14.250:9093 # 注意修改IP地址 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 添加控制器监听器名称 - KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data volumes: diff --git a/AI/AiService/Doc/测试文档.txt b/AI/AiService/Doc/测试文档.txt new file mode 100644 index 00000000..2f9bd1e4 --- /dev/null +++ b/AI/AiService/Doc/测试文档.txt @@ -0,0 +1,2 @@ +# 提交任务 +curl -X POST "http://127.0.0.1:8000/create-task" -H "Content-Type: application/json" -d "{\"prompt\": \"Generate an image of a sunset\"}" \ No newline at end of file diff --git a/AI/AiService/Start.py b/AI/AiService/Start.py new file mode 100644 index 00000000..0794aea8 --- /dev/null +++ b/AI/AiService/Start.py @@ -0,0 +1,109 @@ +from contextlib import asynccontextmanager +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from kafka import KafkaProducer, KafkaAdminClient +from kafka.admin import NewTopic +import uuid +import json +import mysql.connector +from Config.Config import KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, MYSQL_CONFIG + +# 定义请求体模型 +class TaskRequest(BaseModel): + prompt: str # 用户输入的提示词 + +# 初始化 FastAPI 应用 +app = FastAPI() + +# 创建 Kafka 生产者 +producer = KafkaProducer( + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + value_serializer=lambda v: json.dumps(v).encode('utf-8') # 将消息序列化为 JSON +) + +# 检查并创建主题 +def ensure_topic_exists(): + try: + # 创建 Kafka AdminClient + admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS) + + # 获取所有主题 + existing_topics = admin_client.list_topics() + + # 如果主题不存在,则创建 + if KAFKA_TOPIC not in existing_topics: + topic = NewTopic( + name=KAFKA_TOPIC, + num_partitions=1, # 分区数 + replication_factor=1 # 副本数 + ) + admin_client.create_topics([topic]) + print(f"Topic '{KAFKA_TOPIC}' created successfully.") + else: + print(f"Topic '{KAFKA_TOPIC}' already exists.") + + # 关闭 AdminClient + admin_client.close() + except Exception as e: + print(f"Failed to ensure topic exists: {str(e)}") + +# 定义 Lifespan 事件 +@asynccontextmanager +async def lifespan(app: FastAPI): + # 应用启动时检查并创建主题 + ensure_topic_exists() + yield + # 应用关闭时清理资源(可选) + print("Application shutdown.") + +# 写入 MySQL 日志 +def log_to_mysql(task_id: str, keyword: str): + try: + # 连接 MySQL + connection = mysql.connector.connect(**MYSQL_CONFIG) + cursor = connection.cursor() + + # 插入日志 + query = """ + INSERT INTO t_gen_tasks (task_id, keyword, status) + VALUES (%s, %s, %s) + """ + cursor.execute(query, (task_id, keyword, 'pending')) + connection.commit() + + # 关闭连接 + cursor.close() + connection.close() + except Exception as e: + print(f"Failed to log to MySQL: {str(e)}") + +# 定义接口 +@app.post("/create-task") +def create_task(task_request: TaskRequest): + try: + # 生成唯一的 task_id + task_id = str(uuid.uuid4()) + + # 构造任务信息 + task_data = { + "task_id": task_id, + "prompt": task_request.prompt + } + + # 将任务信息写入 Kafka + producer.send(KAFKA_TOPIC, value=task_data) + producer.flush() # 确保消息发送完成 + + # 将任务信息写入 MySQL + log_to_mysql(task_id, task_request.prompt) + + # 返回 task_id + return {"task_id": task_id} + except Exception as e: + # 处理异常 + raise HTTPException(status_code=500, detail=f"Failed to create task: {str(e)}") + +# 启动应用 +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/AI/AiService/Test/kafka_Producer.py b/AI/AiService/Test/kafka_Producer.py index f99cf474..318fa671 100644 --- a/AI/AiService/Test/kafka_Producer.py +++ b/AI/AiService/Test/kafka_Producer.py @@ -4,7 +4,7 @@ from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='10.10.14.250:9092') # 发送消息 -producer.send('test-topic', b'Hello, Kafka!') +producer.send('test-topic', b'Hello, Kafka ,ChangChun!') producer.flush() # 关闭生产者 diff --git a/AI/AiService/__init__.py b/AI/AiService/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AI/AiService/requirements.txt b/AI/AiService/requirements.txt index 5233a3df..bb55b44d 100644 --- a/AI/AiService/requirements.txt +++ b/AI/AiService/requirements.txt @@ -1 +1,4 @@ kafka-python==2.0.5 +fastapi==0.115.9 +uvicorn==0.34.0 +mysql-connector-python==9.2.0