This commit is contained in:
2025-08-19 15:06:49 +08:00
parent a28d041a71
commit 670e9f1ec3
5 changed files with 300 additions and 13 deletions

View File

@@ -0,0 +1,89 @@
import json
import os
import time
import base64
from com.dsideal.Ai.Util.JiMeng.Kit.JmCommon import JmCommon
from com.dsideal.Ai.Util.JiMeng.Kit.JmErrorCode import JmErrorCode
class JmTxt2Img:
action = "CVProcess"
req_key = "jimeng_high_aes_general_v21_L"
@staticmethod
def generate_image(prompt, save_img_path):
"""生成图片"""
# 创建请求体
req = {
"req_key": JmTxt2Img.req_key,
"prompt": prompt
}
response_body = JmCommon.do_request("POST", {}, json.dumps(req).encode('utf-8'), JmTxt2Img.action)
jo = json.loads(response_body)
# 检查响应状态码
code = jo.get("code")
if not JmErrorCode.is_success(code):
error_msg = JmErrorCode.get_message_by_code(code)
print(f"生成图片失败: 错误码={code}, 错误信息={error_msg}")
raise Exception(f"生成图片失败: {error_msg}")
# 获取图片Base64数据
img_base64 = jo.get("data", {}).get("binary_data_base64", [""])[0]
# 确保目录存在
os.makedirs(os.path.dirname(save_img_path), exist_ok=True)
# 对 Base64 字符串进行解码并保存为文件
try:
# 注意有些Base64字符串可能有前缀需要去除
if img_base64.startswith('data:image'):
img_base64 = img_base64.split(',')[1]
bytes_data = base64.b64decode(img_base64)
with open(save_img_path, 'wb') as f:
f.write(bytes_data)
print(f"文件保存成功!文件位置: {save_img_path}")
except Exception as e:
print(f"保存图片失败: {str(e)}")
raise Exception(f"保存图片失败: {str(e)}")
@staticmethod
def main():
"""主函数,用于测试"""
# 示例提示词
prompt = "鸿门宴,室内场景图。非真人,是一幅画。远景镜头,让人一看就知道是在讲鸿门宴,"
prompt += "1、要亮"
prompt += "2、不要有人出现"
prompt += "3、有大厅、墙面桌子,不要有台阶,否则项庄没机会刺杀刘邦"
prompt += "4、西周时期特点"
prompt += "5、宏大雄伟,颜色鲜明,不要灰色调的,不要有雾霾之类的"
prompt += "6、超高清画质"
# 保存图片路径 - 使用绝对路径避免相对路径问题
save_image_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "..", "..", "Text2Img.jpg"))
print(f"保存图片路径:{save_image_path}")
# 添加重试逻辑处理API并发限制错误
retry_count = 0
max_retries = 5 # 减少重试次数以便更快排查问题
retry_interval = 5000 # 重试间隔(毫秒)
while True:
try:
JmTxt2Img.generate_image(prompt, save_image_path)
# 成功生成图片,跳出循环
break
except Exception as e:
print(f"生成图片异常: {str(e)}")
retry_count += 1
if retry_count < max_retries:
print(f"等待{retry_interval}毫秒后重试...")
time.sleep(retry_interval / 1000)
else:
print(f"已达到最大重试次数: {max_retries}")
raise e # 达到最大重试次数,抛出异常
if __name__ == "__main__":
JmTxt2Img.main()

View File

@@ -0,0 +1,193 @@
import hashlib
import hmac
import json
import os
import time
from datetime import datetime, timezone
import requests
import base64
class JmCommon:
# 请求域名和相关配置
host = "visual.volcengineapi.com"
path = "/"
service = "cv"
region = "cn-north-1"
schema = "https"
version = "2022-08-31"
# API访问凭证 - 请替换为您自己的凭证
ak = "AKLTZjVlOGU1NzA1YWZkNDExMzkzYzY5YTNlOTRmMTMxODg"
sk = "WkdabU9UTXdNVEJpTmpWbE5HVTJZVGxtTnpWbU5XSTBaRGN5TW1NMk5tRQ=="
# 项目路径
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "..", ".."))
base_path = os.path.join(project_root, "src", "main", "python", "com", "dsideal", "aiSupport", "Util", "JiMeng", "Example")
@staticmethod
def sign_string_encoder(source):
"""URL编码字符串用于签名"""
if source is None:
return None
# 不需要编码的字符
safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~")
result = []
for char in source:
if char in safe_chars:
result.append(char)
elif char == ' ':
result.append("%20")
else:
# 对其他字符进行URL编码
result.append(f"%{ord(char):02X}")
return ''.join(result)
@staticmethod
def hash_sha256(content):
"""计算内容的SHA256哈希值"""
try:
md = hashlib.sha256()
md.update(content)
return md.hexdigest()
except Exception as e:
raise Exception(f"计算哈希值失败: {str(e)}")
@staticmethod
def hmac_sha256(key, content):
"""使用HMAC-SHA256算法计算消息认证码"""
try:
h = hmac.new(key, content.encode('utf-8'), hashlib.sha256)
return h.digest()
except Exception as e:
raise Exception(f"计算HMAC-SHA256失败: {str(e)}")
@staticmethod
def gen_signing_secret_key_v4(date, region, service):
"""生成V4版本的签名密钥"""
try:
# 解码SK (Base64)
sk_bytes = base64.b64decode(JmCommon.sk)
k_date = JmCommon.hmac_sha256(sk_bytes, date)
k_region = JmCommon.hmac_sha256(k_date, region)
k_service = JmCommon.hmac_sha256(k_region, service)
k_signing = JmCommon.hmac_sha256(k_service, "request")
return k_signing
except Exception as e:
raise Exception(f"生成签名密钥失败: {str(e)}")
@staticmethod
def do_request(method, query_list, body, action):
"""发送HTTP请求到火山引擎API"""
date = datetime.now(timezone.utc)
if body is None:
body = b''
# 计算请求体的SHA256哈希值
x_content_sha256 = JmCommon.hash_sha256(body)
# 格式化日期时间,用于请求头
x_date = date.strftime("%Y%m%dT%H%M%SZ")
short_x_date = x_date[:8] # 提取日期部分,用于凭证范围
content_type = "application/json"
sign_header = "host;x-date;x-content-sha256;content-type"
# 构建查询参数字符串
real_query_list = query_list.copy() if query_list else {}
real_query_list["Action"] = action
real_query_list["Version"] = JmCommon.version
# 排序查询参数
sorted_query = sorted(real_query_list.items())
query_sb = []
for key, value in sorted_query:
encoded_key = JmCommon.sign_string_encoder(key)
encoded_value = JmCommon.sign_string_encoder(str(value))
query_sb.append(f"{encoded_key}={encoded_value}")
query_string = "&" .join(query_sb)
# 构建规范请求字符串,用于签名
canonical_string = f"{method}\n{JmCommon.path}\n{query_string}\n"
canonical_string += f"host:{JmCommon.host}\n"
canonical_string += f"x-date:{x_date}\n"
canonical_string += f"x-content-sha256:{x_content_sha256}\n"
canonical_string += f"content-type:{content_type}\n\n"
canonical_string += f"{sign_header}\n{x_content_sha256}"
# 计算规范请求字符串的哈希值
hash_canonical_string = JmCommon.hash_sha256(canonical_string.encode('utf-8'))
# 构建凭证范围和签名字符串
credential_scope = f"{short_x_date}/{JmCommon.region}/{JmCommon.service}/request"
sign_string = f"HMAC-SHA256\n{x_date}\n{credential_scope}\n{hash_canonical_string}"
# 生成签名密钥并计算签名
sign_key = JmCommon.gen_signing_secret_key_v4(short_x_date, JmCommon.region, JmCommon.service)
signature = hmac.new(sign_key, sign_string.encode('utf-8'), hashlib.sha256).hexdigest()
# 构建URL
url = f"{JmCommon.schema}://{JmCommon.host}{JmCommon.path}?{query_string}"
# 设置请求头
headers = {
"Host": JmCommon.host,
"X-Date": x_date,
"X-Content-Sha256": x_content_sha256,
"Content-Type": content_type,
"Authorization": f"HMAC-SHA256 Credential={JmCommon.ak}/{credential_scope}, SignedHeaders={sign_header}, Signature={signature}"
}
# 发送请求
try:
print(f"请求URL: {url}")
print(f"请求头: {headers}")
response = requests.request(
method=method,
url=url,
headers=headers,
data=body,
timeout=(30, 30) # 连接超时和读取超时
)
print(f"响应状态码: {response.status_code}")
print(f"响应内容: {response.text}")
response.raise_for_status() # 如果状态码不是200抛出异常
return response.text
except requests.exceptions.RequestException as e:
raise Exception(f"API请求失败: {str(e)}")
@staticmethod
def download_file(file_url, save_file_path):
"""从URL下载文件到指定路径"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(save_file_path), exist_ok=True)
# 下载文件
response = requests.get(file_url, timeout=30)
response.raise_for_status()
with open(save_file_path, 'wb') as f:
f.write(response.content)
file_size = os.path.getsize(save_file_path)
print(f"文件下载成功,保存路径: {save_file_path}, 文件大小: {file_size}字节")
except Exception as e:
print(f"文件下载失败: {str(e)}")
raise Exception(f"文件下载失败: {str(e)}")
@staticmethod
def query_task_result(task_id):
"""查询异步任务结果"""
req_key = "jimeng_vgfm_t2v_l20"
action = "CVSync2AsyncGetResult"
# 创建请求体
req = {
"task_id": task_id,
"req_key": req_key
}
response_body = JmCommon.do_request("POST", {}, json.dumps(req).encode('utf-8'), action)
return json.loads(response_body)

View File

@@ -3,9 +3,6 @@ import os
import time import time
import base64 import base64
from JiMeng.Kit.JmCommon import JmCommon
from JiMeng.Kit.JmErrorCode import JmErrorCode
class JmTxt2Img: class JmTxt2Img:
action = "CVProcess" action = "CVProcess"
@@ -61,13 +58,13 @@ class JmTxt2Img:
prompt += "5、宏大雄伟,颜色鲜明,不要灰色调的,不要有雾霾之类的" prompt += "5、宏大雄伟,颜色鲜明,不要灰色调的,不要有雾霾之类的"
prompt += "6、超高清画质" prompt += "6、超高清画质"
# 保存图片路径 # 保存图片路径 - 使用绝对路径避免相对路径问题
save_image_path = os.path.join(JmCommon.base_path, "Text2Img.jpg") save_image_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "..", "..", "Text2Img.jpg"))
print(f"保存图片路径:{save_image_path}") print(f"保存图片路径:{save_image_path}")
# 添加重试逻辑处理API并发限制错误 # 添加重试逻辑处理API并发限制错误
retry_count = 0 retry_count = 0
max_retries = 1000 # 最大重试次数 max_retries = 5 # 减少重试次数以便更快排查问题
retry_interval = 5000 # 重试间隔(毫秒) retry_interval = 5000 # 重试间隔(毫秒)
while True: while True:

View File

@@ -17,7 +17,7 @@ class JmCommon:
schema = "https" schema = "https"
version = "2022-08-31" version = "2022-08-31"
# API访问凭证 # API访问凭证 - 请替换为您自己的凭证
ak = "AKLTZjVlOGU1NzA1YWZkNDExMzkzYzY5YTNlOTRmMTMxODg" ak = "AKLTZjVlOGU1NzA1YWZkNDExMzkzYzY5YTNlOTRmMTMxODg"
sk = "WkdabU9UTXdNVEJpTmpWbE5HVTJZVGxtTnpWbU5XSTBaRGN5TW1NMk5tRQ==" sk = "WkdabU9UTXdNVEJpTmpWbE5HVTJZVGxtTnpWbU5XSTBaRGN5TW1NMk5tRQ=="
@@ -68,12 +68,16 @@ class JmCommon:
@staticmethod @staticmethod
def gen_signing_secret_key_v4(date, region, service): def gen_signing_secret_key_v4(date, region, service):
"""生成V4版本的签名密钥""" """生成V4版本的签名密钥"""
# 注意这里sk需要解码因为Java代码中sk是Base64编码的 try:
sk_bytes = base64.b64decode(JmCommon.sk) # 解码SK (Base64)
k_date = JmCommon.hmac_sha256(sk_bytes, date) sk_bytes = base64.b64decode(JmCommon.sk)
k_region = JmCommon.hmac_sha256(k_date, region) k_date = JmCommon.hmac_sha256(sk_bytes, date)
k_service = JmCommon.hmac_sha256(k_region, service) k_region = JmCommon.hmac_sha256(k_date, region)
return JmCommon.hmac_sha256(k_service, "request") k_service = JmCommon.hmac_sha256(k_region, service)
k_signing = JmCommon.hmac_sha256(k_service, "request")
return k_signing
except Exception as e:
raise Exception(f"生成签名密钥失败: {str(e)}")
@staticmethod @staticmethod
def do_request(method, query_list, body, action): def do_request(method, query_list, body, action):
@@ -139,6 +143,8 @@ class JmCommon:
# 发送请求 # 发送请求
try: try:
print(f"请求URL: {url}")
print(f"请求头: {headers}")
response = requests.request( response = requests.request(
method=method, method=method,
url=url, url=url,
@@ -146,6 +152,8 @@ class JmCommon:
data=body, data=body,
timeout=(30, 30) # 连接超时和读取超时 timeout=(30, 30) # 连接超时和读取超时
) )
print(f"响应状态码: {response.status_code}")
print(f"响应内容: {response.text}")
response.raise_for_status() # 如果状态码不是200抛出异常 response.raise_for_status() # 如果状态码不是200抛出异常
return response.text return response.text
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e: