import os import os.path from urllib.parse import urlparse from Util import ConfigUtil from Util.AesUtil import * from Util.ComfyUIUtil import * from Util.CommonUtil import * from Util.OssUtil import uploadOss from Util.PngUtil import * # 文生图服务调用 def webui_txt_2_img(model_id, json_data, input_image): # 提示词 prompt = json_data['prompt'] # 图片 args = json_data['alwayson_scripts']['ControlNet']['args'] pose_image = json_data['pose_image'] args[0]['input_image'] = encode_image(input_image[0]) # 如果有特殊的姿态要求,那就用模板指定的姿态图 if len(pose_image) > 0: if len(args) > 1: args[1]['input_image'] = encode_image(pose_image) else: # 如果没有特殊的姿态要求,那么就统一用一张图片 if len(args) > 1: args[1]['input_image'] = encode_image(input_image[0]) # 宽度与高度 width = json_data['width'] height = json_data['height'] # 提示词 json_data['prompt'] = prompt # 开始生成图片 printf("开始生成图片...") # 设置目标文件夹 target_folder = "Out" # 创建以当前模型命名的目录 folder = os.path.join(target_folder, 'Images/User/' + str(model_id) + '/') if not os.path.exists(folder): os.makedirs(folder) # 调用生图服务 response = submit_post(txt2img_url, json_data) # 创建文件名 png_file = str(uuid.uuid4()) + '.png' # 创建文件的完整路径 file_path = os.path.join(folder, png_file) save_encoded_image(response.json()['images'][0], file_path) # 清除Exif信息 clearExif(file_path, file_path) # 清除Exif信息 # 上传到OSS key = 'Images/User/' + str(model_id) + '/' + png_file uploadOss(key, file_path) # # 生成下载链接 url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key # 删除本地文件 os.remove(file_path) printf("成功生成1张图片,宽度:" + str(width) + ",高度:" + str(height) + " 地址:" + url) # 添加到结果集合中 res.append(url) # 图生图服务调用 def webui_img_2_img(model_id, prompt_id, json_data, input_image): # 提示词 prompt = json_data['prompt'] # 图片 # 用户输入的图片 source_img = encode_image(input_image[0]) # 模型固定的参考图片 refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.jpg' if not os.path.exists(refer_source_img): refer_source_img = r'./Image/' + str(model_id) + '/' + str(prompt_id) + '.png' # 都找不到,报错 if not os.path.exists(refer_source_img): print("模型:" + str(model_id) + ",参考图:" + str(prompt_id) + "不存在,请检查!") exit(0) # 找到参考图 refer_img = encode_image(refer_source_img) # 在ControlNet的第一个图片中使用用户输入的图片,用此图片的人脸进行换脸操作 json_data["alwayson_scripts"]["ControlNet"]["args"][0]["input_image"] = source_img # 图生图,风格参考图是输入 json_data["init_images"] = [refer_img] json_data["alwayson_scripts"]["ControlNet"]["args"][1]["input_image"] = refer_img # 宽度与高度 width = json_data['width'] height = json_data['height'] # 开始生成图片 printf("开始生成图片...") # 设置目标文件夹 target_folder = "Out" # 创建以当前模型命名的目录 folder = os.path.join(target_folder, 'Images/User/' + str(model_id) + '/') if not os.path.exists(folder): os.makedirs(folder) # 调用生图服务 response = submit_post(img2img_url, json_data) # 创建文件名 png_file = str(uuid.uuid4()) + '.png' # 创建文件的完整路径 file_path = os.path.join(folder, png_file) save_encoded_image(response.json()['images'][0], file_path) # 清除Exif信息 clearExif(file_path, file_path) # 清除Exif信息 # 上传到OSS key = 'Images/User/' + str(model_id) + '/' + png_file uploadOss(key, file_path) # # 生成下载链接 url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key # 删除本地文件 os.remove(file_path) printf("成功生成1张图片,宽度:" + str(width) + ",高度:" + str(height) + " 地址:" + url) # 添加到结果集合中 res.append(url) def download_image(image_url, file_path): # 发送GET请求 response = requests.get(image_url) # 检查请求是否成功 if response.status_code == 200: # 打开一个文件用于写入 with open(file_path, 'wb') as f: # 将响应的内容写入文件中 f.write(response.content) else: printf('图片下载失败,状态码:' + str(response.status_code)) # 指定目录下有多少个指定前缀的文件 def getPrefixFileCount(directory_path, prefix): # 使用os.listdir()获取目录下的所有文件和文件夹 entries = os.listdir(directory_path) # 使用os.path.isfile()检查每个条目是否是文件 files = [entry for entry in entries if os.path.isfile(os.path.join(directory_path, entry))] cnt = 0 for file in files: if file.startswith(prefix): cnt = cnt + 1 return cnt def getMyFilter(prompt_data): myfilter = [] # 遍历prompt_data中的所有节点 for key, value in prompt_data.items(): if 'inputs' in value: if 'filename_prefix' in value['inputs']: if value['inputs']['filename_prefix'] == 'ComfyUI': myfilter.append(key) return myfilter # 根据输入的图片集合,填充到 def fill_input(prompt_data, file_array): # 是不是不存在配置节 if "source_images" not in prompt_data: print("配置文件不包含source_images配置节,程序无法继续!") exit(0) # 是不是个数不匹配 if len(prompt_data["source_images"]) != len(file_array): print("输入的图片数量与预替换的图片数量不一致,程序无法继续!") exit(0) # 替换 for i in range(len(prompt_data["source_images"])): x = prompt_data["source_images"][i] # 替换配置节内容 array = x.split(",") prompt_data[array[0]][array[1]][array[2]] = file_array[i] # 删除多余的配置节 if "source_images" in prompt_data: del prompt_data["source_images"] return prompt_data def runComfyUI(model_id, json_data, input_image): # 创建目标目录 output_path = "Out/Images/User/" + str(model_id) + "/" if not os.path.exists(output_path): os.makedirs(output_path) # 生成一个唯一的客户端ID client_id = str(uuid.uuid4()) # 上传图片 img = [] for x in input_image: with open(x, "rb") as f: y = upload_file(server_address, f, "", True) img.append(y) # 填充输入 json_data = fill_input(json_data, img) # 过滤器,哪些返回节点中获取到的图片是有用的 myfilter = getMyFilter(json_data) # 生成 files = generate_clip(server_address, json_data, client_id, output_path, myfilter) # 上传到云存储,回写数据接口 for file in files: png_file = file + '.png' file_path = './Out/Images/User/' + str(model_id) + "/" + png_file # 清除Exif信息 clearExif(file_path, file_path) # 清除Exif信息 # 上传到OSS key = 'Images/User/' + str(model_id) + '/' + png_file uploadOss(key, file_path) # # 生成下载链接 url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key res.append(url) # 获取任务 def get_task(): # 获取签名 en_data = getenData(machine_id) # 请求的地址 url = web_url + '/QingLong/HuiYa/getTask' # 要发送的数据,可以是字典形式 data = { 'enData': en_data } # 发送POST请求 response = requests.post(url, data=data) # 检查请求是否成功 if response.status_code == 200: success = response.json()["success"] if success == 1: data = response.json()["data"] # 任务编号 task_id = data['task_id'] # 模板ID model_id = data["model_id"] # prompt_template 模板内容 prompt_id = data['prompt_id'] # 源图片 source_img_url = data["source_img_url"] # 模板类型 model_type_id = data["model_type_id"] # json文件 json_file = r'./JSON/' + str(model_id) + '_' + str(prompt_id) + '.json' # 为了扩展支持多张图片,所以这里需要对,号进行拆解 input_image = [] for x in source_img_url.split(','): # 解析URL parsed_url = urlparse(x) # 获取URL路径中的最后一个部分,通常就是文件名 filename = parsed_url.path.split('/')[-1] # 下载这个图片 t_file = 'Temp/' + filename input_image.append(t_file) # input_image扩展为数组,支持多张图片 # 这里可以加一个优化:如果本地存在这个文件就不用再次去下载了 if not os.path.exists(t_file): download_image(x, t_file) # 获取提示词 with open(json_file, 'r', encoding='utf-8') as file: vjson_data = json.load(file) # 利用三元表达式输出 printf("发现需要处理的" + ("WebUI" if model_type_id in [1, 3] else "ComfyUI") + "任务,model_id=" + str( model_id) + ",prompt_id=" + str(prompt_id) + ",task_id=" + str(task_id)) # SD_TXT2IMG if model_type_id == 1: webui_txt_2_img(model_id=model_id, json_data=vjson_data, input_image=input_image) # COMFY_UI if model_type_id == 2: runComfyUI(model_id=model_id, json_data=vjson_data, input_image=input_image) # SD_IMG2IMG if model_type_id == 3: webui_img_2_img(model_id=model_id, prompt_id=prompt_id, json_data=vjson_data, input_image=input_image) return task_id elif success == 0: printf("现在没有待处理的任务,休息3秒后再试...") return 0 else: message = response.json()["message"] printf('发生了异常:' + message) return success else: printf('请求失败,状态码:' + response.status_code) return -1 # 回复服务器,生图任务完成 def finish_task(task_id, target_img_urls): # 获取签名 en_data = getenData(machine_id) # 请求的地址 url = web_url + '/QingLong/HuiYa/finishTask' # 要发送的数据,可以是字典形式 data = { 'task_id': task_id, 'enData': en_data, 'target_img_urls': target_img_urls } # 发送POST请求 response = requests.post(url, data=data) # 检查请求是否成功 if response.status_code == 200: printf("成功完成数据回写!") else: printf('请求失败,状态码:' + str(response.status_code)) if __name__ == '__main__': # 禁止代理 set_http_proxy("") # 配置文件 config = ConfigUtil.getConfig() # 处理机编号 machine_id = config['system']['machine_id'] # 文生图服务地址 txt2img_url = config['webui']['txt2img_url'] # 图生图服务地址 img2img_url = config['webui']['img2img_url'] # WEB服务器地址 web_url = config['webServer']['web_url'] # # COMFYUI服务器地址 server_address = config.get('comfyui', 'server_address') while True: try: res = [] # 所有图片的地址路径 v_task_id = get_task() # 回复上位生成完毕 if v_task_id > 0: # 图片地址 v_target_img_urls = '' for r in res: v_target_img_urls = r + ',' + v_target_img_urls v_target_img_urls = v_target_img_urls[0:len(v_target_img_urls) - 1] # 生成完毕 finish_task(v_task_id, v_target_img_urls) elif v_task_id == 0: # 本轮没有任务 time.sleep(3) except Exception as err: printf("发生异常,将休息3秒后重试..." + str(err)) time.sleep(3)