This commit is contained in:
2025-08-19 15:26:41 +08:00
parent 670e9f1ec3
commit bf2f63b0b8
3 changed files with 46 additions and 86 deletions

View File

@@ -3,6 +3,9 @@ import os
import time import time
import base64 import base64
from JiMeng.Kit.JmCommon import JmCommon
from JiMeng.Kit.JmErrorCode import JmErrorCode
class JmTxt2Img: class JmTxt2Img:
action = "CVProcess" action = "CVProcess"

View File

@@ -5,7 +5,6 @@ import os
import time import time
from datetime import datetime, timezone from datetime import datetime, timezone
import requests import requests
import base64
class JmCommon: class JmCommon:
@@ -26,58 +25,26 @@ class JmCommon:
base_path = os.path.join(project_root, "src", "main", "python", "com", "dsideal", "aiSupport", "Util", "JiMeng", "Example") base_path = os.path.join(project_root, "src", "main", "python", "com", "dsideal", "aiSupport", "Util", "JiMeng", "Example")
@staticmethod @staticmethod
def sign_string_encoder(source): def sign(key, msg):
"""URL编码字符串用于签名"""
if source is None:
return None
# 不需要编码的字符
safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~")
result = []
for char in source:
if char in safe_chars:
result.append(char)
elif char == ' ':
result.append("%20")
else:
# 对其他字符进行URL编码
result.append(f"%{ord(char):02X}")
return ''.join(result)
@staticmethod
def hash_sha256(content):
"""计算内容的SHA256哈希值"""
try:
md = hashlib.sha256()
md.update(content)
return md.hexdigest()
except Exception as e:
raise Exception(f"计算哈希值失败: {str(e)}")
@staticmethod
def hmac_sha256(key, content):
"""使用HMAC-SHA256算法计算消息认证码""" """使用HMAC-SHA256算法计算消息认证码"""
try: return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
h = hmac.new(key, content.encode('utf-8'), hashlib.sha256)
return h.digest()
except Exception as e:
raise Exception(f"计算HMAC-SHA256失败: {str(e)}")
@staticmethod @staticmethod
def gen_signing_secret_key_v4(date, region, service): def get_signature_key(key, date_stamp, region_name, service_name):
"""生成V4版本的签名密钥""" """生成V4版本的签名密钥"""
try: k_date = JmCommon.sign(key.encode('utf-8'), date_stamp)
# 解码SK (Base64) k_region = JmCommon.sign(k_date, region_name)
sk_bytes = base64.b64decode(JmCommon.sk) k_service = JmCommon.sign(k_region, service_name)
k_date = JmCommon.hmac_sha256(sk_bytes, date) k_signing = JmCommon.sign(k_service, 'request')
k_region = JmCommon.hmac_sha256(k_date, region)
k_service = JmCommon.hmac_sha256(k_region, service)
k_signing = JmCommon.hmac_sha256(k_service, "request")
return k_signing return k_signing
except Exception as e:
raise Exception(f"生成签名密钥失败: {str(e)}") @staticmethod
def format_query(parameters):
"""格式化查询参数"""
request_parameters_init = ''
for key in sorted(parameters):
request_parameters_init += key + '=' + str(parameters[key]) + '&'
return request_parameters_init[:-1] if request_parameters_init else ''
@staticmethod @staticmethod
def do_request(method, query_list, body, action): def do_request(method, query_list, body, action):
@@ -86,59 +53,49 @@ class JmCommon:
if body is None: if body is None:
body = b'' body = b''
# 计算请求体的SHA256哈希值 # 格式化日期时间
x_content_sha256 = JmCommon.hash_sha256(body) current_date = date.strftime("%Y%m%dT%H%M%SZ")
date_stamp = date.strftime("%Y%m%d")
# 格式化日期时间,用于请求头
x_date = date.strftime("%Y%m%dT%H%M%SZ")
short_x_date = x_date[:8] # 提取日期部分,用于凭证范围
content_type = "application/json"
sign_header = "host;x-date;x-content-sha256;content-type"
# 构建查询参数字符串 # 构建查询参数字符串
real_query_list = query_list.copy() if query_list else {} real_query_list = query_list.copy() if query_list else {}
real_query_list["Action"] = action real_query_list["Action"] = action
real_query_list["Version"] = JmCommon.version real_query_list["Version"] = JmCommon.version
canonical_querystring = JmCommon.format_query(real_query_list)
# 排序查询参数 # 计算请求体哈希值
sorted_query = sorted(real_query_list.items()) if isinstance(body, bytes):
query_sb = [] body_str = body.decode('utf-8')
for key, value in sorted_query: else:
encoded_key = JmCommon.sign_string_encoder(key) body_str = str(body)
encoded_value = JmCommon.sign_string_encoder(str(value)) payload_hash = hashlib.sha256(body_str.encode('utf-8')).hexdigest()
query_sb.append(f"{encoded_key}={encoded_value}")
query_string = "&" .join(query_sb)
# 构建规范请求字符串,用于签名 # 设置请求头相关信息
canonical_string = f"{method}\n{JmCommon.path}\n{query_string}\n" content_type = "application/json"
canonical_string += f"host:{JmCommon.host}\n" signed_headers = 'content-type;host;x-content-sha256;x-date'
canonical_string += f"x-date:{x_date}\n" canonical_headers = f'content-type:{content_type}\nhost:{JmCommon.host}\nx-content-sha256:{payload_hash}\nx-date:{current_date}\n'
canonical_string += f"x-content-sha256:{x_content_sha256}\n"
canonical_string += f"content-type:{content_type}\n\n"
canonical_string += f"{sign_header}\n{x_content_sha256}"
# 计算规范请求字符串的哈希值 # 构建规范请求字符串
hash_canonical_string = JmCommon.hash_sha256(canonical_string.encode('utf-8')) canonical_request = f'{method}\n{JmCommon.path}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}'
# 构建凭证范围和签名字符串 # 构建签名字符串
credential_scope = f"{short_x_date}/{JmCommon.region}/{JmCommon.service}/request" algorithm = 'HMAC-SHA256'
sign_string = f"HMAC-SHA256\n{x_date}\n{credential_scope}\n{hash_canonical_string}" credential_scope = f'{date_stamp}/{JmCommon.region}/{JmCommon.service}/request'
string_to_sign = f'{algorithm}\n{current_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()}'
# 生成签名密钥并计算签名 # 生成签名密钥并计算签名
sign_key = JmCommon.gen_signing_secret_key_v4(short_x_date, JmCommon.region, JmCommon.service) signing_key = JmCommon.get_signature_key(JmCommon.sk, date_stamp, JmCommon.region, JmCommon.service)
signature = hmac.new(sign_key, sign_string.encode('utf-8'), hashlib.sha256).hexdigest() signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
# 构建URL # 构建URL
url = f"{JmCommon.schema}://{JmCommon.host}{JmCommon.path}?{query_string}" url = f'{JmCommon.schema}://{JmCommon.host}{JmCommon.path}?{canonical_querystring}'
# 设置请求头 # 设置请求头
headers = { headers = {
"Host": JmCommon.host, "X-Date": current_date,
"X-Date": x_date, "Authorization": f'{algorithm} Credential={JmCommon.ak}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}',
"X-Content-Sha256": x_content_sha256, "X-Content-Sha256": payload_hash,
"Content-Type": content_type, "Content-Type": content_type
"Authorization": f"HMAC-SHA256 Credential={JmCommon.ak}/{credential_scope}, SignedHeaders={sign_header}, Signature={signature}"
} }
# 发送请求 # 发送请求