import logging from Util.ObsUtil import ObsUploader prompt = """ ### 几何图形识别专家指令(输入:纯几何图形照片) **任务目标** 精确提取图形中的几何元素及其空间关系,为GeoGebra重建建立数学模型 ## 一、坐标系建立规则(必须遵守) 1. 原点设定: - 若存在明显顶点,选最左下角的点为原点(0,0) - 若图形对称,选对称中心为原点 - 示例:原点O = 三角形ABC的顶点A 2. 坐标轴定向: - 优先顺序:水平线段 > 垂直线段 > 最长线段 - 具体规则: if 存在水平线段: 以该线段为x轴正方向 elif 存在垂直结构: 以最左侧垂直线为y轴 else: 以最长线段为基准轴 ## 二、元素列举 1. 按点,线,三角形,四边形,梯形,平行四边形,矩形,正方形,圆等由简单到复杂的顺序列举所有图形 2. 详细描述元素之间的关系,比如点D在线段AB上 3. 详细描述元素之间的位置关系,比如D 在A点正上方,B在CD边的上方中间位置 """ from openai import OpenAI from Config.Config import ALY_LLM_BASE_URL, ALY_LLM_API_KEY, OBS_AK, OBS_SERVER, OBS_SK, OBS_PREFIX, OBS_BUCKET logger = logging.getLogger(__name__) # 批量处理图片 def batch_qvq(output_dir, img_list): img_url_list = [] for file_path in img_list: # 创建上传器实例 uploader = ObsUploader(OBS_AK, OBS_SK, "https://" + OBS_SERVER) # 上传参数 object_key = OBS_PREFIX + "/" + file_path # 执行上传 success, result = uploader.upload_file(OBS_BUCKET, object_key, file_path) # 处理结果 if success: logger.info(f'{file_path}上传成功!') # 获取上传文件的 URL file_url = f"https://{OBS_BUCKET}.{OBS_SERVER}/{object_key}" img_url_list.append(file_url) else: logger.error(f'{file_path}上传失败!') if 'errorCode' in result: logger.info(f'错误代码: {result["errorCode"]}') logger.info(f'错误信息: {result["errorMessage"]}') else: logger.error(f'错误信息: {result["error"]}') # 多张图片开始解析 answer_content = "" for img_url in img_url_list: answer_content = answer_content + qvq_single(img_url) # 保存结果到JSON文件 qvq_result = f"{output_dir}/QvqResult.json" with open(qvq_result, "w", encoding='utf-8') as f: f.write(answer_content) return qvq_result def qvq_single(image_url): # 初始化OpenAI客户端 client = OpenAI( api_key=ALY_LLM_API_KEY, base_url=ALY_LLM_BASE_URL ) reasoning_content = "" # 定义完整思考过程 answer_content = "" # 定义完整回复 is_answering = False # 判断是否结束思考过程并开始回复 # 创建聊天完成请求 completion = client.chat.completions.create( model="qvq-max", # 此处以 qvq-max 为例,可按需更换模型名称 messages=[ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": image_url }, }, {"type": "text", "text": prompt}, ], }, ], stream=True, ) print("\n" + "=" * 20 + "思考过程" + "=" * 20 + "\n") for chunk in completion: # 如果chunk.choices为空,则打印usage if not chunk.choices: print("\nUsage:") print(chunk.usage) else: delta = chunk.choices[0].delta # 打印思考过程 if hasattr(delta, 'reasoning_content') and delta.reasoning_content != None: print(delta.reasoning_content, end='', flush=True) reasoning_content += delta.reasoning_content else: # 开始回复 if delta.content != "" and is_answering is False: print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n") is_answering = True # 打印回复过程 print(delta.content, end='', flush=True) answer_content += delta.content # print("=" * 20 + "完整思考过程" + "=" * 20 + "\n") # print(reasoning_content) # print("=" * 20 + "完整回复" + "=" * 20 + "\n") answer_content = answer_content.replace("```json", "") answer_content = answer_content.replace("```", "") return answer_content if __name__ == '__main__': img_url = "https://dsideal.obs.cn-north-1.myhuaweicloud.com/HuangHai/extracted/26c367a3c3cf20e62e1d6c7904f6abf6/image_1.png" ans = qvq_single(img_url) print(ans)