import os import random from Util.PngUtil import * from Util import ConfigUtil from Util.SDUtil import * from Util.CommonUtil import * from Util.OssUtil import uploadOss # 获取Model对象 def getModel(model_id): file_name = r'../JSON/' + str(model_id) + '.json' # 打开JSON文件 if not os.path.exists(file_name): printf("文件:" + file_name + "不存在,跳过!") return with open(file_name, 'r', encoding='utf-8') as file: data = json.load(file) return data # 获取模型有多少个提示词 def getPromptCount(): if 'prompts' in data: return len(data['prompts']) return 1 # 运行一次提示词 def runWebUIOnce(model_id, task_type_code, input_image, prompt_index, batch_size): # 生成多少张 data['batch_size'] = batch_size # 图片 args = data['alwayson_scripts']['ControlNet']['args'] pose_image = data['pose_image'] args[0]['input_image'] = encode_image(input_image[0]) # 如果有特殊的姿态要求,那就用模板指定的姿态图 if len(pose_image) > 0: if len(args) > 1: args[1]['input_image'] = encode_image(pose_image) else: # 如果没有特殊的姿态要求,那么就统一用一张图片 if len(args) > 1: args[1]['input_image'] = encode_image(input_image[0]) # 宽度与高度 width = data['width'] height = data['height'] # 提示词 data['prompt'] = data["prompts"][prompt_index] # 开始生成图片 printf("开始生成图片...") # 设置目标文件夹 target_folder = "../Out" # 创建以当前日期命名的目录 folder = os.path.join(target_folder, 'Images/' + task_type_code + "/" + str(model_id) + '/') if not os.path.exists(folder): os.makedirs(folder) # 调用生图服务 response = submit_post(txt2img_url, data) # 保存为图片 for i in range(0, batch_size): # 创建文件名 png_file = str(uuid.uuid4()) + '.png' # 创建文件的完整路径 file_path = os.path.join(folder, png_file) save_encoded_image(response.json()['images'][i], file_path) # 清除Exif信息 clearExif(file_path, file_path) # 清除Exif信息 # 上传到OSS key = 'Images/' + task_type_code + "/" + str(model_id) + '/' + png_file # uploadOss(key, file_path) # # 生成下载链接 url = 'http://hzkc.oss-cn-beijing.aliyuncs.com/' + key # 生成预览链接 previewSuffix = '?x-oss-process=image/resize,w_150,limit_0' # 删除本地文件 # os.remove(file_path) printf("成功生成第" + str(i + 1) + "张图片,宽度:" + str(width) + ",高度:" + str(height) + " 地址:" + url) printf("本轮图片生成成功, 共完成" + str(batch_size) + "张图片") # 获取模型名称 def getModelName(): return data['override_settings']['sd_model_checkpoint'] def generate_tasks(total_tasks, prompt_count): # 确保提示词的数量大于0 if prompt_count <= 0: raise ValueError("提示词的数量必须大于0") # 初始化任务分配列表,每个提示词分配0个任务 task_allocation = [0] * prompt_count # 循环直到所有任务都被分配 while sum(task_allocation) < total_tasks: # 随机选择一个提示词 prompt_index = random.randint(0, prompt_count - 1) # 为选中的提示词分配一个随机数量的任务,确保总数不会超过total_tasks additional_tasks = min(1, total_tasks - sum(task_allocation)) task_allocation[prompt_index] += additional_tasks return task_allocation # 整体的生成任务 def runWebUI(model_id, task_type_code, input_image, total_tasks): global data data = getModel(model_id) # data是全局变量 # 示例使用 prompt_count = getPromptCount() # 提示词数量 task_allocation = generate_tasks(total_tasks, prompt_count) for i in range(len(task_allocation)): if task_allocation[i] == 0: continue printf(f"模型 {getModelName()},第{i + 1}个提示词,生成{task_allocation[i]}张图片") runWebUIOnce(model_id, task_type_code, input_image, i, task_allocation[i]) if __name__ == '__main__': config = ConfigUtil.getConfig() # 处理机编号 machine_id = config['system']['machine_id'] # 生图服务地址 txt2img_url = config['webui']['txt2img_url'] # WEB服务器地址 web_url = config['webui']['web_url'] # 配置文件 server_address = config.get('comfyui', 'server_address') model_id = 21 task_type_code = 'System' input_image = ['../Image/xiaoqiao.jpg'] total_count = 10 runWebUI(model_id, task_type_code, input_image, total_count) # 原来的input_image是单张图片,现在修改为支持多张图片