import hashlib import hmac import json import os import time from datetime import datetime, timezone import requests import base64 class JmCommon: # 请求域名和相关配置 host = "visual.volcengineapi.com" path = "/" service = "cv" region = "cn-north-1" schema = "https" version = "2022-08-31" # API访问凭证 ak = "AKLTZjVlOGU1NzA1YWZkNDExMzkzYzY5YTNlOTRmMTMxODg" sk = "WkdabU9UTXdNVEJpTmpWbE5HVTJZVGxtTnpWbU5XSTBaRGN5TW1NMk5tRQ==" # 项目路径 project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "..", "..")) base_path = os.path.join(project_root, "src", "main", "python", "com", "dsideal", "aiSupport", "Util", "JiMeng", "Example") @staticmethod def sign_string_encoder(source): """URL编码字符串,用于签名""" if source is None: return None # 不需要编码的字符 safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~") result = [] for char in source: if char in safe_chars: result.append(char) elif char == ' ': result.append("%20") else: # 对其他字符进行URL编码 result.append(f"%{ord(char):02X}") return ''.join(result) @staticmethod def hash_sha256(content): """计算内容的SHA256哈希值""" try: md = hashlib.sha256() md.update(content) return md.hexdigest() except Exception as e: raise Exception(f"计算哈希值失败: {str(e)}") @staticmethod def hmac_sha256(key, content): """使用HMAC-SHA256算法计算消息认证码""" try: h = hmac.new(key, content.encode('utf-8'), hashlib.sha256) return h.digest() except Exception as e: raise Exception(f"计算HMAC-SHA256失败: {str(e)}") @staticmethod def gen_signing_secret_key_v4(date, region, service): """生成V4版本的签名密钥""" # 注意:这里sk需要解码,因为Java代码中sk是Base64编码的 sk_bytes = base64.b64decode(JmCommon.sk) k_date = JmCommon.hmac_sha256(sk_bytes, date) k_region = JmCommon.hmac_sha256(k_date, region) k_service = JmCommon.hmac_sha256(k_region, service) return JmCommon.hmac_sha256(k_service, "request") @staticmethod def do_request(method, query_list, body, action): """发送HTTP请求到火山引擎API""" date = datetime.now(timezone.utc) if body is None: body = b'' # 计算请求体的SHA256哈希值 x_content_sha256 = JmCommon.hash_sha256(body) # 格式化日期时间,用于请求头 x_date = date.strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8] # 提取日期部分,用于凭证范围 content_type = "application/json" sign_header = "host;x-date;x-content-sha256;content-type" # 构建查询参数字符串 real_query_list = query_list.copy() if query_list else {} real_query_list["Action"] = action real_query_list["Version"] = JmCommon.version # 排序查询参数 sorted_query = sorted(real_query_list.items()) query_sb = [] for key, value in sorted_query: encoded_key = JmCommon.sign_string_encoder(key) encoded_value = JmCommon.sign_string_encoder(str(value)) query_sb.append(f"{encoded_key}={encoded_value}") query_string = "&" .join(query_sb) # 构建规范请求字符串,用于签名 canonical_string = f"{method}\n{JmCommon.path}\n{query_string}\n" canonical_string += f"host:{JmCommon.host}\n" canonical_string += f"x-date:{x_date}\n" canonical_string += f"x-content-sha256:{x_content_sha256}\n" canonical_string += f"content-type:{content_type}\n\n" canonical_string += f"{sign_header}\n{x_content_sha256}" # 计算规范请求字符串的哈希值 hash_canonical_string = JmCommon.hash_sha256(canonical_string.encode('utf-8')) # 构建凭证范围和签名字符串 credential_scope = f"{short_x_date}/{JmCommon.region}/{JmCommon.service}/request" sign_string = f"HMAC-SHA256\n{x_date}\n{credential_scope}\n{hash_canonical_string}" # 生成签名密钥并计算签名 sign_key = JmCommon.gen_signing_secret_key_v4(short_x_date, JmCommon.region, JmCommon.service) signature = hmac.new(sign_key, sign_string.encode('utf-8'), hashlib.sha256).hexdigest() # 构建URL url = f"{JmCommon.schema}://{JmCommon.host}{JmCommon.path}?{query_string}" # 设置请求头 headers = { "Host": JmCommon.host, "X-Date": x_date, "X-Content-Sha256": x_content_sha256, "Content-Type": content_type, "Authorization": f"HMAC-SHA256 Credential={JmCommon.ak}/{credential_scope}, SignedHeaders={sign_header}, Signature={signature}" } # 发送请求 try: response = requests.request( method=method, url=url, headers=headers, data=body, timeout=(30, 30) # 连接超时和读取超时 ) response.raise_for_status() # 如果状态码不是200,抛出异常 return response.text except requests.exceptions.RequestException as e: raise Exception(f"API请求失败: {str(e)}") @staticmethod def download_file(file_url, save_file_path): """从URL下载文件到指定路径""" try: # 确保目录存在 os.makedirs(os.path.dirname(save_file_path), exist_ok=True) # 下载文件 response = requests.get(file_url, timeout=30) response.raise_for_status() with open(save_file_path, 'wb') as f: f.write(response.content) file_size = os.path.getsize(save_file_path) print(f"文件下载成功,保存路径: {save_file_path}, 文件大小: {file_size}字节") except Exception as e: print(f"文件下载失败: {str(e)}") raise Exception(f"文件下载失败: {str(e)}") @staticmethod def query_task_result(task_id): """查询异步任务结果""" req_key = "jimeng_vgfm_t2v_l20" action = "CVSync2AsyncGetResult" # 创建请求体 req = { "task_id": task_id, "req_key": req_key } response_body = JmCommon.do_request("POST", {}, json.dumps(req).encode('utf-8'), action) return json.loads(response_body)