import json import os import os.path from urllib.parse import urlparse from Util import ConfigUtil from Util.AesUtil import * from Util.ComfyUIUtil import * from Util.CommonUtil import * from Util.OssUtil import uploadOss from Util.PngUtil import * def upload_img_file(file_path, model_id): png_file = os.path.basename(file_path) # 清除Exif信息 clearExif(file_path, file_path) # 清除Exif信息 # 上传到OSS key = 'Images/User/' + str(model_id) + '/' + png_file uploadOss(key, file_path) # # 生成下载链接 url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key # 删除本地文件 os.remove(file_path) printf("成功生成1张图片,地址:" + url) return url # 文生图服务调用 def webui_txt_2_img(model_id, json_data, input_image, target_folder): # 提示词 prompt = json_data['prompt'] # 图片 args = json_data['alwayson_scripts']['ControlNet']['args'] #pretty_json = json.dumps(json_data, indent=4,ensure_ascii=False) #print(pretty_json) pose_image = json_data['pose_image'] args[0]['input_image'] = encode_image(input_image[0]) # 如果有特殊的姿态要求,那就用模板指定的姿态图 if len(pose_image) > 0: if len(args) > 1: args[1]['input_image'] = encode_image(pose_image) else: # 如果没有特殊的姿态要求,那么就统一用一张图片 if len(args) > 1: args[1]['input_image'] = encode_image(input_image[0]) # 提示词 json_data['prompt'] = prompt # 开始生成图片 printf("开始生成图片...") # 调用生图服务 response = submit_post(txt2img_url, json_data) # 创建文件的完整路径 file_path = os.path.join(target_folder, str(uuid.uuid4()) + '.png') save_encoded_image(response.json()['images'][0], file_path) url = upload_img_file(file_path, model_id) # 添加到结果集合中 res.append(url) # 图生图服务调用 def webui_img_2_img(model_id, prompt_id, json_data, input_image, target_folder): source_img = encode_image(input_image[0]) # 模型固定的参考图片 refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.jpg' if not os.path.exists(refer_source_img): refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.png' # 都找不到,报错 if not os.path.exists(refer_source_img): print("模型:" + str(model_id) + ",参考图:" + str(prompt_id) + "不存在,请检查!") exit(0) # 找到参考图 refer_img = encode_image(refer_source_img) # 1.5与XL通用 # 在ControlNet的第一个图片中使用用户输入的图片,用此图片的人脸进行换脸操作 json_data["alwayson_scripts"]["ControlNet"]["args"][0]["input_image"] = source_img # 图生图,风格参考图是输入 json_data["init_images"] = [refer_img] # XL 专享 if len(json_data["alwayson_scripts"]["ControlNet"]["args"]) == 2: json_data["alwayson_scripts"]["ControlNet"]["args"][1]["input_image"] = refer_img # 开始生成图片 printf("开始生成图片...") # 调用生图服务 response = submit_post(img2img_url, json_data) # 创建文件的完整路径 file_path = os.path.join(target_folder, str(uuid.uuid4()) + '.png') save_encoded_image(response.json()['images'][0], file_path) url = upload_img_file(file_path, model_id) res.append(url) # 添加到结果集合中 # ComfyUI服务调用 def runComfyUI(model_id, json_data, input_image, target_folder): # 生成一个唯一的客户端ID client_id = str(uuid.uuid4()) # 上传图片 img = [] for x in input_image: with open(x, "rb") as f: y = upload_file(comfyui_address, f, "", True) img.append(y) # 如果存在多个姿势图和风格图,那么在配置文件中写死,比如pose=post_1.jpg style=style_1.jpg # 同时,由于ComfyUI需要把这个文件上传到服务器,所以,需要在json文件中指定哪个文件需要上传 # 填充输入 json_data = fill_input(json_data, img) # 过滤器,哪些返回节点中获取到的图片是有用的 myfilter = [] # 遍历prompt_data中的所有节点 for key, value in json_data.items(): if 'inputs' in value: if 'filename_prefix' in value['inputs']: if value['inputs']['filename_prefix'] == 'ComfyUI': myfilter.append(key) # 生成 files = generate_clip(comfyui_address, json_data, client_id, target_folder, myfilter) # 上传到云存储,回写数据接口 for file in files: file_path = './Out/Images/User/' + str(model_id) + "/" + file + '.png' url = upload_img_file(file_path, model_id) res.append(url) def download_image(image_url, file_path): os.system('./Tools/wget ' + image_url + ' -O ' + file_path) # 根据输入的图片集合,填充到 def fill_input(prompt_data, file_array): # 是不是不存在配置节 if "source_images" not in prompt_data: print("配置文件不包含source_images配置节,程序无法继续!") exit(0) # 是不是个数不匹配 if len(prompt_data["source_images"]) != len(file_array): print("输入的图片数量与预替换的图片数量不一致,程序无法继续!") exit(0) # 替换 for i in range(len(prompt_data["source_images"])): x = prompt_data["source_images"][i] # 替换配置节内容 array = x.split(",") prompt_data[array[0]][array[1]][array[2]] = file_array[i] # 删除多余的配置节 if "source_images" in prompt_data: del prompt_data["source_images"] # 处理style,pose 的参考图片 if 'style_pose_images' in prompt_data: for x in prompt_data['style_pose_images']: key = x['key'] value = x['value'] with open(value, "rb") as f: y = upload_file(comfyui_address, f, "", True) # 需要 array = key.split(",") prompt_data[array[0]][array[1]][array[2]] = y # 删除多余的配置节 if "style_pose_images" in prompt_data: del prompt_data["style_pose_images"] return prompt_data # 获取任务 def get_task(): # 获取签名 en_data = getenData(machine_id) # 请求的地址 url = web_url + '/QingLong/HuiYa/getTask' # 要发送的数据,可以是字典形式 data = { 'enData': en_data } # 发送POST请求 response = requests.post(url, data=data) # 检查请求是否成功 if response.status_code == 200: success = response.json()["success"] if success == 1: data = response.json()["data"] # 任务编号 task_id = data['task_id'] # 模板ID model_id = data["model_id"] # prompt_template 模板内容 prompt_id = data['prompt_id'] # 源图片 source_img_url = data["source_img_url"] # 模板类型 model_type_id = data["model_type_id"] # json文件 json_file = r'./JSON/' + str(model_id) + '_' + str(prompt_id) + '.json' # 为了扩展支持多张图片,所以这里需要对,号进行拆解 input_image = [] for x in source_img_url.split(','): # 解析URL parsed_url = urlparse(x) # 获取URL路径中的最后一个部分,通常就是文件名 filename = parsed_url.path.split('/')[-1] # 下载这个图片 t_file = 'Temp/' + filename input_image.append(t_file) # input_image扩展为数组,支持多张图片 # 这里可以加一个优化:如果本地存在这个文件就不用再次去下载了 if not os.path.exists(t_file): download_image(x, t_file) # 获取提示词 with open(json_file, 'r', encoding='utf-8') as file: vjson_data = json.load(file) # 利用三元表达式输出 printf("发现需要处理的" + ("WebUI" if model_type_id in [1, 2] else "ComfyUI") + "任务,model_type_id=" + str( model_type_id) + ",model_id=" + str( model_id) + ",prompt_id=" + str(prompt_id) + ",task_id=" + str(task_id)) # 创建目标目录 output_path = "Out/Images/User/" + str(model_id) + "/" if not os.path.exists(output_path): os.makedirs(output_path) # SD_TXT2IMG if model_type_id == 1: webui_txt_2_img(model_id=model_id, json_data=vjson_data, input_image=input_image, target_folder=output_path) # SD_IMG2IMG if model_type_id == 2: webui_img_2_img(model_id=model_id, prompt_id=prompt_id, json_data=vjson_data, input_image=input_image, target_folder=output_path) # COMFY_UI if model_type_id == 3: runComfyUI(model_id=model_id, json_data=vjson_data, input_image=input_image, target_folder=output_path) return task_id elif success == 0: printf("现在没有待处理的任务,休息3秒后再试...") return 0 else: message = response.json()["message"] printf('发生了异常:' + message) return success else: printf('请求失败,状态码:' + response.status_code) return -1 # 回复服务器,生图任务完成 def finish_task(task_id, target_img_urls): # 获取签名 en_data = getenData(machine_id) # 请求的地址 url = web_url + '/QingLong/HuiYa/finishTask' # 要发送的数据,可以是字典形式 data = { 'task_id': task_id, 'enData': en_data, 'target_img_urls': target_img_urls } # 发送POST请求 response = requests.post(url, data=data) # 检查请求是否成功 if response.status_code == 200: printf("成功完成数据回写!") else: printf('请求失败,状态码:' + str(response.status_code)) if __name__ == '__main__': # 禁止代理 set_http_proxy("") # 配置文件 config = ConfigUtil.getConfig() # 处理机编号 machine_id = config['system']['machine_id'] # webui 服务器地址 webui_address = config['webui']['webui_address'] # 文生图服务地址 txt2img_url = 'http://' + webui_address + config['webui']['txt2img_url'] # 图生图服务地址 img2img_url = 'http://' + webui_address + config['webui']['img2img_url'] # WEB服务器地址 web_url = config['webServer']['web_url'] # COMFYUI服务器地址 comfyui_address = config.get('comfyui', 'server_address') while True: try: res = [] # 所有图片的地址路径 v_task_id = get_task() # 回复上位生成完毕 if v_task_id > 0: # 图片地址 v_target_img_urls = '' for r in res: v_target_img_urls = r + ',' + v_target_img_urls v_target_img_urls = v_target_img_urls[0:len(v_target_img_urls) - 1] # 生成完毕 finish_task(v_task_id, v_target_img_urls) # 清理SD缓存 clear_webui_cache(webui_address) get_webui_used(webui_address) # 清理GPU缓存 clear_comfyui_cache(comfyui_address) get_comfyui_used(comfyui_address) elif v_task_id == 0: # 本轮没有任务 time.sleep(3) except Exception as err: printf("发生异常,将休息3秒后重试..." + str(err)) time.sleep(3)