import os import json import openpyxl from typing import List, Dict, Any, Tuple, Optional """数据处理通用工具函数""" def init_directories(data_dir: str) -> None: """初始化数据目录 Args: data_dir: 数据保存目录路径 """ os.makedirs(data_dir, exist_ok=True) def process_value(value: Any) -> int | float: """处理单元格值,转换为合适的数值类型 支持处理空值、空字符串、####、带百分号的值和千分位逗号 Args: value: 原始单元格值 Returns: int | float: 转换后的数值,无法转换时返回0 """ if value is None: return 0 # 统一转换为字符串处理 str_value = str(value).strip() if str_value == '' or str_value == '####': return 0 try: # 处理百分号 if '%' in str_value: return float(str_value.replace('%', '')) # 处理千分位逗号 if ',' in str_value: str_value = str_value.replace(',', '') # 区分整数和浮点数 return float(str_value) if '.' in str_value else int(str_value) except (ValueError, TypeError): return 0 def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None: """打印名称转换统计信息 Args: conversion_records: 转换记录列表 errors: 错误信息列表 """ print("\n=== 名称转换记录 ===") if conversion_records: for record in conversion_records: print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}") print(f"📊 共检测到 {len(conversion_records)} 项名称转换") else: print("📝 不存在名称转换的情况") if errors: print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:") for error in errors: print(f" - {error}") else: print("✅ 所有区域名称均成功转换为全称") def convert_area_name(raw_name: Any, row_num: int) -> Tuple[str, str, List[Dict[str, str]], List[str]]: """转换区域名称并记录转换结果 Args: raw_name: 原始区域名称 row_num: 行号 Returns: Tuple包含: - area_name: 转换后的区域名称 - area_code: 区域代码 - conversion_records: 更新的转换记录列表 - name_conversion_errors: 更新的错误列表 """ from Util.AreaUtil import query_area_info conversion_records: List[Dict[str, str]] = [] name_conversion_errors: List[str] = [] # 确保raw_name为字符串类型 str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区' if not str_raw_name: str_raw_name = '未知地区' # 查询区域信息 area_info = query_area_info(str_raw_name) if area_info: area_name = area_info['full_name'] area_code = area_info['area_code'] # 记录名称转换 if str_raw_name != area_name: conversion_records.append({ 'row': row_num, 'raw_name': str_raw_name, 'converted_name': area_name }) else: area_name = str_raw_name area_code = 'unknown' name_conversion_errors.append(f"行 {row_num}: '{str_raw_name}'") return area_name, area_code, conversion_records, name_conversion_errors def save_to_json(data: List[Dict[str, Any]], json_path: str) -> None: """保存数据到JSON文件 Args: data: 要保存的数据 json_path: JSON文件路径 """ with open(json_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"✅ 数据提取完成,已保存至:{json_path}") print(f"📊 共处理 {len(data)} 条地区数据") def load_workbook_sheet(excel_path: str, sheet_name: str) -> Optional[openpyxl.worksheet.worksheet.Worksheet]: """加载工作簿并选择指定工作表 Args: excel_path: Excel文件路径 sheet_name: 工作表名称 Returns: 工作表对象,若加载失败则返回None """ try: workbook = openpyxl.load_workbook(excel_path, read_only=True, data_only=True) if sheet_name not in workbook.sheetnames: print(f"❌ 错误:未找到'{sheet_name}'工作表") workbook.close() return None return workbook[sheet_name] except FileNotFoundError: print(f"🔴 错误:Excel文件 '{excel_path}' 不存在") return None except Exception as e: print(f"🔴 加载工作簿时发生错误:{str(e)}") return None