|
|
|
|
import os
|
|
|
|
|
import os.path
|
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
|
|
|
|
from Util import ConfigUtil
|
|
|
|
|
from Util.AesUtil import *
|
|
|
|
|
from Util.SDUtil import *
|
|
|
|
|
from Util.CommonUtil import *
|
|
|
|
|
from Util.OssUtil import uploadOss
|
|
|
|
|
from Util.PngUtil import *
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_img_file(file_path, model_id):
|
|
|
|
|
png_file = os.path.basename(file_path)
|
|
|
|
|
# 清除Exif信息
|
|
|
|
|
clearExif(file_path, file_path) # 清除Exif信息
|
|
|
|
|
# 上传到OSS
|
|
|
|
|
key = 'Images/User/' + str(model_id) + '/' + png_file
|
|
|
|
|
uploadOss(key, file_path)
|
|
|
|
|
# # 生成下载链接
|
|
|
|
|
url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key
|
|
|
|
|
# 删除本地文件
|
|
|
|
|
os.remove(file_path)
|
|
|
|
|
printf("成功生成1张图片,地址:" + url)
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 文生图服务调用
|
|
|
|
|
def webui_txt_2_img(model_id, json_data, input_image, target_folder):
|
|
|
|
|
# 提示词
|
|
|
|
|
prompt = json_data['prompt']
|
|
|
|
|
# 图片
|
|
|
|
|
args = json_data['alwayson_scripts']['ControlNet']['args']
|
|
|
|
|
|
|
|
|
|
# pretty_json = json.dumps(json_data, indent=4,ensure_ascii=False)
|
|
|
|
|
# print(pretty_json)
|
|
|
|
|
|
|
|
|
|
pose_image = json_data['pose_image']
|
|
|
|
|
args[0]['input_image'] = encode_image(input_image[0])
|
|
|
|
|
# 如果有特殊的姿态要求,那就用模板指定的姿态图
|
|
|
|
|
if len(pose_image) > 0:
|
|
|
|
|
if len(args) > 1:
|
|
|
|
|
args[1]['input_image'] = encode_image(pose_image)
|
|
|
|
|
else: # 如果没有特殊的姿态要求,那么就统一用一张图片
|
|
|
|
|
if len(args) > 1:
|
|
|
|
|
args[1]['input_image'] = encode_image(input_image[0])
|
|
|
|
|
|
|
|
|
|
# 提示词
|
|
|
|
|
json_data['prompt'] = prompt
|
|
|
|
|
# 开始生成图片
|
|
|
|
|
printf("开始生成图片...")
|
|
|
|
|
# 调用生图服务
|
|
|
|
|
response = submit_post(txt2img_url, json_data)
|
|
|
|
|
# 创建文件的完整路径
|
|
|
|
|
file_path = os.path.join(target_folder, str(uuid.uuid4()) + '.png')
|
|
|
|
|
save_encoded_image(response.json()['images'][0], file_path)
|
|
|
|
|
url = upload_img_file(file_path, model_id)
|
|
|
|
|
# 添加到结果集合中
|
|
|
|
|
res.append(url)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 图生图服务调用
|
|
|
|
|
def webui_img_2_img(model_id, prompt_id, json_data, input_image, target_folder):
|
|
|
|
|
source_img = encode_image(input_image[0])
|
|
|
|
|
# 模型固定的参考图片
|
|
|
|
|
refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.jpg'
|
|
|
|
|
if not os.path.exists(refer_source_img):
|
|
|
|
|
refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.png'
|
|
|
|
|
# 都找不到,报错
|
|
|
|
|
if not os.path.exists(refer_source_img):
|
|
|
|
|
print("模型:" + str(model_id) + ",参考图:" + str(prompt_id) + "不存在,请检查!")
|
|
|
|
|
exit(0)
|
|
|
|
|
|
|
|
|
|
# 找到参考图
|
|
|
|
|
refer_img = encode_image(refer_source_img)
|
|
|
|
|
|
|
|
|
|
# 1.5与XL通用
|
|
|
|
|
# 在ControlNet的第一个图片中使用用户输入的图片,用此图片的人脸进行换脸操作
|
|
|
|
|
json_data["alwayson_scripts"]["ControlNet"]["args"][0]["input_image"] = source_img
|
|
|
|
|
# 图生图,风格参考图是输入
|
|
|
|
|
json_data["init_images"] = [refer_img]
|
|
|
|
|
|
|
|
|
|
# XL 专享
|
|
|
|
|
if len(json_data["alwayson_scripts"]["ControlNet"]["args"]) == 2:
|
|
|
|
|
json_data["alwayson_scripts"]["ControlNet"]["args"][1]["input_image"] = refer_img
|
|
|
|
|
|
|
|
|
|
# 开始生成图片
|
|
|
|
|
printf("开始生成图片...")
|
|
|
|
|
|
|
|
|
|
# 调用生图服务
|
|
|
|
|
response = submit_post(img2img_url, json_data)
|
|
|
|
|
# 创建文件的完整路径
|
|
|
|
|
file_path = os.path.join(target_folder, str(uuid.uuid4()) + '.png')
|
|
|
|
|
save_encoded_image(response.json()['images'][0], file_path)
|
|
|
|
|
url = upload_img_file(file_path, model_id)
|
|
|
|
|
res.append(url) # 添加到结果集合中
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ComfyUI服务调用
|
|
|
|
|
def runComfyUI(model_id, json_data, input_image, target_folder):
|
|
|
|
|
# 生成一个唯一的客户端ID
|
|
|
|
|
client_id = str(uuid.uuid4())
|
|
|
|
|
|
|
|
|
|
# 上传图片
|
|
|
|
|
img = []
|
|
|
|
|
for x in input_image:
|
|
|
|
|
with open(x, "rb") as f:
|
|
|
|
|
y = upload_file(comfyui_address, f, "", True)
|
|
|
|
|
img.append(y)
|
|
|
|
|
# 如果存在多个姿势图和风格图,那么在配置文件中写死,比如pose=post_1.jpg style=style_1.jpg
|
|
|
|
|
# 同时,由于ComfyUI需要把这个文件上传到服务器,所以,需要在json文件中指定哪个文件需要上传
|
|
|
|
|
# 填充输入
|
|
|
|
|
json_data = fill_input(json_data, img)
|
|
|
|
|
# 过滤器,哪些返回节点中获取到的图片是有用的
|
|
|
|
|
myfilter = []
|
|
|
|
|
# 遍历prompt_data中的所有节点
|
|
|
|
|
for key, value in json_data.items():
|
|
|
|
|
if 'inputs' in value:
|
|
|
|
|
if 'filename_prefix' in value['inputs']:
|
|
|
|
|
if value['inputs']['filename_prefix'] == 'ComfyUI':
|
|
|
|
|
myfilter.append(key)
|
|
|
|
|
# 生成
|
|
|
|
|
files = generate_clip(comfyui_address, json_data, client_id, target_folder, myfilter)
|
|
|
|
|
|
|
|
|
|
# 上传到云存储,回写数据接口
|
|
|
|
|
for file in files:
|
|
|
|
|
file_path = './Out/Images/User/' + str(model_id) + "/" + file + '.png'
|
|
|
|
|
url = upload_img_file(file_path, model_id)
|
|
|
|
|
res.append(url)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_image(image_url, file_path):
|
|
|
|
|
# 发送GET请求
|
|
|
|
|
response = requests.get(image_url)
|
|
|
|
|
# 检查请求是否成功
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
# 打开一个文件用于写入
|
|
|
|
|
with open(file_path, 'wb') as f:
|
|
|
|
|
# 将响应的内容写入文件中
|
|
|
|
|
f.write(response.content)
|
|
|
|
|
else:
|
|
|
|
|
printf('图片下载失败,状态码:' + str(response.status_code))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 根据输入的图片集合,填充到
|
|
|
|
|
def fill_input(prompt_data, file_array):
|
|
|
|
|
# 是不是不存在配置节
|
|
|
|
|
if "source_images" not in prompt_data:
|
|
|
|
|
print("配置文件不包含source_images配置节,程序无法继续!")
|
|
|
|
|
exit(0)
|
|
|
|
|
# 是不是个数不匹配
|
|
|
|
|
if len(prompt_data["source_images"]) != len(file_array):
|
|
|
|
|
print("输入的图片数量与预替换的图片数量不一致,程序无法继续!")
|
|
|
|
|
exit(0)
|
|
|
|
|
# 替换
|
|
|
|
|
for i in range(len(prompt_data["source_images"])):
|
|
|
|
|
x = prompt_data["source_images"][i]
|
|
|
|
|
# 替换配置节内容
|
|
|
|
|
array = x.split(",")
|
|
|
|
|
prompt_data[array[0]][array[1]][array[2]] = file_array[i]
|
|
|
|
|
|
|
|
|
|
# 删除多余的配置节
|
|
|
|
|
if "source_images" in prompt_data:
|
|
|
|
|
del prompt_data["source_images"]
|
|
|
|
|
|
|
|
|
|
# 处理style,pose 的参考图片
|
|
|
|
|
if 'style_pose_images' in prompt_data:
|
|
|
|
|
for x in prompt_data['style_pose_images']:
|
|
|
|
|
key = x['key']
|
|
|
|
|
value = x['value']
|
|
|
|
|
with open(value, "rb") as f:
|
|
|
|
|
y = upload_file(comfyui_address, f, "", True)
|
|
|
|
|
# 需要
|
|
|
|
|
array = key.split(",")
|
|
|
|
|
prompt_data[array[0]][array[1]][array[2]] = y
|
|
|
|
|
# 删除多余的配置节
|
|
|
|
|
if "style_pose_images" in prompt_data:
|
|
|
|
|
del prompt_data["style_pose_images"]
|
|
|
|
|
|
|
|
|
|
return prompt_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 获取任务
|
|
|
|
|
def get_task():
|
|
|
|
|
# 获取签名
|
|
|
|
|
en_data = getenData(machine_id)
|
|
|
|
|
# 请求的地址
|
|
|
|
|
url = web_url + '/QingLong/HuiYa/getTask'
|
|
|
|
|
# 要发送的数据,可以是字典形式
|
|
|
|
|
data = {
|
|
|
|
|
'enData': en_data
|
|
|
|
|
}
|
|
|
|
|
# 发送POST请求
|
|
|
|
|
response = requests.post(url, data=data)
|
|
|
|
|
|
|
|
|
|
# 检查请求是否成功
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
success = response.json()["success"]
|
|
|
|
|
if success == 1:
|
|
|
|
|
# 清理服务器显存
|
|
|
|
|
release_sd(webui_address, comfyui_address)
|
|
|
|
|
|
|
|
|
|
data = response.json()["data"]
|
|
|
|
|
# 任务编号
|
|
|
|
|
task_id = data['task_id']
|
|
|
|
|
# 模板ID
|
|
|
|
|
model_id = data["model_id"]
|
|
|
|
|
# prompt_template 模板内容
|
|
|
|
|
prompt_id = data['prompt_id']
|
|
|
|
|
# 源图片
|
|
|
|
|
source_img_url = data["source_img_url"]
|
|
|
|
|
# 模板类型
|
|
|
|
|
model_type_id = data["model_type_id"]
|
|
|
|
|
|
|
|
|
|
# json文件
|
|
|
|
|
json_file = r'./JSON/' + str(model_id) + '/' + str(model_id) + '_' + str(prompt_id) + '.json'
|
|
|
|
|
|
|
|
|
|
# 为了扩展支持多张图片,所以这里需要对,号进行拆解
|
|
|
|
|
input_image = []
|
|
|
|
|
for x in source_img_url.split(','):
|
|
|
|
|
# 解析URL
|
|
|
|
|
parsed_url = urlparse(x)
|
|
|
|
|
# 获取URL路径中的最后一个部分,通常就是文件名
|
|
|
|
|
filename = parsed_url.path.split('/')[-1]
|
|
|
|
|
# 下载这个图片
|
|
|
|
|
t_file = 'Temp/' + filename
|
|
|
|
|
input_image.append(t_file) # input_image扩展为数组,支持多张图片
|
|
|
|
|
# 这里可以加一个优化:如果本地存在这个文件就不用再次去下载了
|
|
|
|
|
if not os.path.exists(t_file):
|
|
|
|
|
download_image(x, t_file)
|
|
|
|
|
|
|
|
|
|
# 获取提示词
|
|
|
|
|
with open(json_file, 'r', encoding='utf-8') as file:
|
|
|
|
|
vjson_data = json.load(file)
|
|
|
|
|
|
|
|
|
|
# 利用三元表达式输出
|
|
|
|
|
printf("发现需要处理的" + ("WebUI" if model_type_id in [1, 2] else "ComfyUI") + "任务,model_type_id=" + str(
|
|
|
|
|
model_type_id) + ",model_id=" + str(
|
|
|
|
|
model_id) + ",prompt_id=" + str(prompt_id) + ",task_id=" + str(task_id))
|
|
|
|
|
|
|
|
|
|
# 创建目标目录
|
|
|
|
|
output_path = "Out/Images/User/" + str(model_id) + "/"
|
|
|
|
|
if not os.path.exists(output_path):
|
|
|
|
|
os.makedirs(output_path)
|
|
|
|
|
|
|
|
|
|
# SD_TXT2IMG
|
|
|
|
|
if model_type_id == 1:
|
|
|
|
|
webui_txt_2_img(model_id=model_id, json_data=vjson_data, input_image=input_image,
|
|
|
|
|
target_folder=output_path)
|
|
|
|
|
|
|
|
|
|
# SD_IMG2IMG
|
|
|
|
|
if model_type_id == 2:
|
|
|
|
|
webui_img_2_img(model_id=model_id, prompt_id=prompt_id, json_data=vjson_data,
|
|
|
|
|
input_image=input_image, target_folder=output_path)
|
|
|
|
|
# COMFY_UI
|
|
|
|
|
if model_type_id == 3:
|
|
|
|
|
runComfyUI(model_id=model_id, json_data=vjson_data, input_image=input_image,
|
|
|
|
|
target_folder=output_path)
|
|
|
|
|
|
|
|
|
|
# 清理服务器显存
|
|
|
|
|
release_sd(webui_address, comfyui_address)
|
|
|
|
|
|
|
|
|
|
return task_id
|
|
|
|
|
elif success == 0:
|
|
|
|
|
printf("现在没有待处理的任务,休息3秒后再试...")
|
|
|
|
|
return 0
|
|
|
|
|
else:
|
|
|
|
|
message = response.json()["message"]
|
|
|
|
|
printf('发生了异常:' + message)
|
|
|
|
|
return success
|
|
|
|
|
else:
|
|
|
|
|
printf('请求失败,状态码:' + response.status_code)
|
|
|
|
|
return -1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 回复服务器,生图任务完成
|
|
|
|
|
def finish_task(task_id, target_img_urls):
|
|
|
|
|
# 获取签名
|
|
|
|
|
en_data = getenData(machine_id)
|
|
|
|
|
|
|
|
|
|
# 请求的地址
|
|
|
|
|
url = web_url + '/QingLong/HuiYa/finishTask'
|
|
|
|
|
|
|
|
|
|
# 要发送的数据,可以是字典形式
|
|
|
|
|
data = {
|
|
|
|
|
'task_id': task_id,
|
|
|
|
|
'enData': en_data,
|
|
|
|
|
'target_img_urls': target_img_urls
|
|
|
|
|
}
|
|
|
|
|
# 发送POST请求
|
|
|
|
|
response = requests.post(url, data=data)
|
|
|
|
|
# 检查请求是否成功
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
printf("成功完成数据回写!")
|
|
|
|
|
else:
|
|
|
|
|
printf('请求失败,状态码:' + str(response.status_code))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
# 禁止代理
|
|
|
|
|
set_http_proxy("")
|
|
|
|
|
|
|
|
|
|
# 配置文件
|
|
|
|
|
config = ConfigUtil.getConfig()
|
|
|
|
|
# 处理机编号
|
|
|
|
|
machine_id = config['system']['machine_id']
|
|
|
|
|
|
|
|
|
|
# webui 服务器地址
|
|
|
|
|
webui_address = config['webui']['webui_address']
|
|
|
|
|
# 文生图服务地址
|
|
|
|
|
txt2img_url = 'http://' + webui_address + config['webui']['txt2img_url']
|
|
|
|
|
# 图生图服务地址
|
|
|
|
|
img2img_url = 'http://' + webui_address + config['webui']['img2img_url']
|
|
|
|
|
|
|
|
|
|
# WEB服务器地址
|
|
|
|
|
web_url = config['webServer']['web_url']
|
|
|
|
|
# COMFYUI服务器地址
|
|
|
|
|
comfyui_address = config.get('comfyui', 'server_address')
|
|
|
|
|
# 反推接口地址
|
|
|
|
|
wd_url = 'http://' + webui_address + '/tagger/v1/interrogate'
|
|
|
|
|
#image_path = r'D:\KeCheng\BaiHu\Backup\mote2.png'
|
|
|
|
|
#print(get_wd_14(wd_url, image_path))
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
res = [] # 所有图片的地址路径
|
|
|
|
|
v_task_id = get_task()
|
|
|
|
|
# 回复上位生成完毕
|
|
|
|
|
if v_task_id > 0:
|
|
|
|
|
# 图片地址
|
|
|
|
|
v_target_img_urls = ''
|
|
|
|
|
for r in res:
|
|
|
|
|
v_target_img_urls = r + ',' + v_target_img_urls
|
|
|
|
|
v_target_img_urls = v_target_img_urls[0:len(v_target_img_urls) - 1]
|
|
|
|
|
# 生成完毕
|
|
|
|
|
finish_task(v_task_id, v_target_img_urls)
|
|
|
|
|
elif v_task_id == 0:
|
|
|
|
|
# 本轮没有任务
|
|
|
|
|
time.sleep(3)
|
|
|
|
|
|
|
|
|
|
except Exception as err:
|
|
|
|
|
printf("发生异常,将休息3秒后重试..." + str(err))
|
|
|
|
|
time.sleep(3)
|