You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
12 KiB

1 year ago
import os
import os.path
from urllib.parse import urlparse
from Util import ConfigUtil
from Util.AesUtil import *
from Util.ComfyUIUtil import *
from Util.CommonUtil import *
from Util.OssUtil import uploadOss
from Util.PngUtil import *
1 year ago
def upload_img_file(file_path, model_id):
png_file = os.path.basename(file_path)
1 year ago
# 清除Exif信息
clearExif(file_path, file_path) # 清除Exif信息
# 上传到OSS
key = 'Images/User/' + str(model_id) + '/' + png_file
uploadOss(key, file_path)
# # 生成下载链接
url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key
# 删除本地文件
os.remove(file_path)
printf("成功生成1张图片,地址:" + url)
return url
1 year ago
# 文生图服务调用
1 year ago
def webui_txt_2_img(model_id, json_data, input_image, target_folder):
1 year ago
# 提示词
prompt = json_data['prompt']
# 图片
args = json_data['alwayson_scripts']['ControlNet']['args']
pose_image = json_data['pose_image']
args[0]['input_image'] = encode_image(input_image[0])
# 如果有特殊的姿态要求,那就用模板指定的姿态图
if len(pose_image) > 0:
if len(args) > 1:
args[1]['input_image'] = encode_image(pose_image)
else: # 如果没有特殊的姿态要求,那么就统一用一张图片
if len(args) > 1:
args[1]['input_image'] = encode_image(input_image[0])
# 提示词
json_data['prompt'] = prompt
# 开始生成图片
printf("开始生成图片...")
# 调用生图服务
response = submit_post(txt2img_url, json_data)
# 创建文件的完整路径
1 year ago
file_path = os.path.join(target_folder, str(uuid.uuid4()) + '.png')
1 year ago
save_encoded_image(response.json()['images'][0], file_path)
1 year ago
url = upload_img_file(file_path, model_id)
1 year ago
# 添加到结果集合中
res.append(url)
1 year ago
# 图生图服务调用
1 year ago
def webui_img_2_img(model_id, prompt_id, json_data, input_image, target_folder):
1 year ago
source_img = encode_image(input_image[0])
# 模型固定的参考图片
refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.jpg'
if not os.path.exists(refer_source_img):
refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.png'
# 都找不到,报错
if not os.path.exists(refer_source_img):
print("模型:" + str(model_id) + ",参考图:" + str(prompt_id) + "不存在,请检查!")
exit(0)
# 找到参考图
refer_img = encode_image(refer_source_img)
1 year ago
# 1.5与XL通用
1 year ago
# 在ControlNet的第一个图片中使用用户输入的图片用此图片的人脸进行换脸操作
json_data["alwayson_scripts"]["ControlNet"]["args"][0]["input_image"] = source_img
# 图生图,风格参考图是输入
json_data["init_images"] = [refer_img]
1 year ago
# XL 专享
if len(json_data["alwayson_scripts"]["ControlNet"]["args"]) == 2:
json_data["alwayson_scripts"]["ControlNet"]["args"][1]["input_image"] = refer_img
1 year ago
# 开始生成图片
printf("开始生成图片...")
# 调用生图服务
response = submit_post(img2img_url, json_data)
# 创建文件的完整路径
1 year ago
file_path = os.path.join(target_folder, str(uuid.uuid4()) + '.png')
1 year ago
save_encoded_image(response.json()['images'][0], file_path)
1 year ago
url = upload_img_file(file_path, model_id)
1 year ago
res.append(url) # 添加到结果集合中
1 year ago
# ComfyUI服务调用
1 year ago
def runComfyUI(model_id, json_data, input_image, target_folder):
# 生成一个唯一的客户端ID
client_id = str(uuid.uuid4())
# 上传图片
img = []
for x in input_image:
with open(x, "rb") as f:
1 year ago
y = upload_file(comfyui_address, f, "", True)
1 year ago
img.append(y)
# 如果存在多个姿势图和风格图那么在配置文件中写死比如pose=post_1.jpg style=style_1.jpg
# 同时由于ComfyUI需要把这个文件上传到服务器所以需要在json文件中指定哪个文件需要上传
# 填充输入
json_data = fill_input(json_data, img)
# 过滤器,哪些返回节点中获取到的图片是有用的
1 year ago
myfilter = []
# 遍历prompt_data中的所有节点
for key, value in json_data.items():
if 'inputs' in value:
if 'filename_prefix' in value['inputs']:
if value['inputs']['filename_prefix'] == 'ComfyUI':
myfilter.append(key)
1 year ago
# 生成
1 year ago
files = generate_clip(comfyui_address, json_data, client_id, target_folder, myfilter)
1 year ago
# 上传到云存储,回写数据接口
for file in files:
1 year ago
file_path = './Out/Images/User/' + str(model_id) + "/" + file + '.png'
url = upload_img_file(file_path, model_id)
1 year ago
res.append(url)
1 year ago
1 year ago
def download_image(image_url, file_path):
1 year ago
os.system('./Tools/wget ' + image_url + ' -O ' + file_path)
1 year ago
# 根据输入的图片集合,填充到
def fill_input(prompt_data, file_array):
# 是不是不存在配置节
if "source_images" not in prompt_data:
print("配置文件不包含source_images配置节程序无法继续")
exit(0)
# 是不是个数不匹配
if len(prompt_data["source_images"]) != len(file_array):
print("输入的图片数量与预替换的图片数量不一致,程序无法继续!")
exit(0)
# 替换
for i in range(len(prompt_data["source_images"])):
x = prompt_data["source_images"][i]
# 替换配置节内容
array = x.split(",")
prompt_data[array[0]][array[1]][array[2]] = file_array[i]
# 删除多余的配置节
if "source_images" in prompt_data:
del prompt_data["source_images"]
1 year ago
# 处理style,pose 的参考图片
if 'style_pose_images' in prompt_data:
for x in prompt_data['style_pose_images']:
key = x['key']
value = x['value']
with open(value, "rb") as f:
1 year ago
y = upload_file(comfyui_address, f, "", True)
1 year ago
# 需要
array = key.split(",")
prompt_data[array[0]][array[1]][array[2]] = y
1 year ago
# 删除多余的配置节
1 year ago
if "style_pose_images" in prompt_data:
del prompt_data["style_pose_images"]
1 year ago
1 year ago
return prompt_data
# 获取任务
def get_task():
# 获取签名
en_data = getenData(machine_id)
# 请求的地址
url = web_url + '/QingLong/HuiYa/getTask'
# 要发送的数据,可以是字典形式
data = {
'enData': en_data
}
# 发送POST请求
response = requests.post(url, data=data)
# 检查请求是否成功
if response.status_code == 200:
success = response.json()["success"]
if success == 1:
data = response.json()["data"]
# 任务编号
task_id = data['task_id']
# 模板ID
model_id = data["model_id"]
# prompt_template 模板内容
prompt_id = data['prompt_id']
# 源图片
source_img_url = data["source_img_url"]
# 模板类型
model_type_id = data["model_type_id"]
# json文件
json_file = r'./JSON/' + str(model_id) + '_' + str(prompt_id) + '.json'
# 为了扩展支持多张图片,所以这里需要对,号进行拆解
input_image = []
for x in source_img_url.split(','):
# 解析URL
parsed_url = urlparse(x)
# 获取URL路径中的最后一个部分通常就是文件名
filename = parsed_url.path.split('/')[-1]
# 下载这个图片
t_file = 'Temp/' + filename
input_image.append(t_file) # input_image扩展为数组支持多张图片
# 这里可以加一个优化:如果本地存在这个文件就不用再次去下载了
if not os.path.exists(t_file):
download_image(x, t_file)
# 获取提示词
with open(json_file, 'r', encoding='utf-8') as file:
vjson_data = json.load(file)
# 利用三元表达式输出
1 year ago
printf("发现需要处理的" + ("WebUI" if model_type_id in [1, 2] else "ComfyUI") + "任务,model_type_id=" + str(
1 year ago
model_type_id) + ",model_id=" + str(
1 year ago
model_id) + ",prompt_id=" + str(prompt_id) + ",task_id=" + str(task_id))
1 year ago
1 year ago
# 创建目标目录
output_path = "Out/Images/User/" + str(model_id) + "/"
if not os.path.exists(output_path):
os.makedirs(output_path)
1 year ago
# SD_TXT2IMG
1 year ago
if model_type_id == 1:
1 year ago
webui_txt_2_img(model_id=model_id, json_data=vjson_data, input_image=input_image,
target_folder=output_path)
1 year ago
1 year ago
# SD_IMG2IMG
1 year ago
if model_type_id == 2:
webui_img_2_img(model_id=model_id, prompt_id=prompt_id, json_data=vjson_data,
1 year ago
input_image=input_image, target_folder=output_path)
1 year ago
# COMFY_UI
1 year ago
if model_type_id == 3:
1 year ago
runComfyUI(model_id=model_id, json_data=vjson_data, input_image=input_image,
target_folder=output_path)
1 year ago
1 year ago
return task_id
elif success == 0:
printf("现在没有待处理的任务,休息3秒后再试...")
return 0
else:
message = response.json()["message"]
printf('发生了异常:' + message)
return success
else:
printf('请求失败,状态码:' + response.status_code)
return -1
# 回复服务器,生图任务完成
def finish_task(task_id, target_img_urls):
# 获取签名
en_data = getenData(machine_id)
# 请求的地址
url = web_url + '/QingLong/HuiYa/finishTask'
# 要发送的数据,可以是字典形式
data = {
'task_id': task_id,
'enData': en_data,
'target_img_urls': target_img_urls
}
# 发送POST请求
response = requests.post(url, data=data)
# 检查请求是否成功
if response.status_code == 200:
printf("成功完成数据回写!")
else:
printf('请求失败,状态码:' + str(response.status_code))
if __name__ == '__main__':
# 禁止代理
set_http_proxy("")
# 配置文件
config = ConfigUtil.getConfig()
# 处理机编号
machine_id = config['system']['machine_id']
1 year ago
# webui 服务器地址
1 year ago
webui_address = "http://{}".format(config['webui']['webui_address'])
1 year ago
# 文生图服务地址
1 year ago
txt2img_url = webui_address + config['webui']['txt2img_url']
1 year ago
# 图生图服务地址
1 year ago
img2img_url = webui_address + config['webui']['img2img_url']
1 year ago
1 year ago
# WEB服务器地址
1 year ago
web_url = config['webServer']['web_url']
# COMFYUI服务器地址
1 year ago
comfyui_address = config.get('comfyui', 'server_address')
1 year ago
while True:
try:
res = [] # 所有图片的地址路径
v_task_id = get_task()
# 回复上位生成完毕
if v_task_id > 0:
# 图片地址
v_target_img_urls = ''
for r in res:
v_target_img_urls = r + ',' + v_target_img_urls
v_target_img_urls = v_target_img_urls[0:len(v_target_img_urls) - 1]
# 生成完毕
finish_task(v_task_id, v_target_img_urls)
1 year ago
# 清理SD缓存
1 year ago
clear_webui_cache(webui_address)
get_webui_used(webui_address)
1 year ago
# 清理GPU缓存
1 year ago
clear_comfyui_cache(comfyui_address)
get_comfyui_used(comfyui_address)
1 year ago
1 year ago
elif v_task_id == 0:
# 本轮没有任务
time.sleep(3)
except Exception as err:
printf("发生异常将休息3秒后重试..." + str(err))
time.sleep(3)