Files
YunNanProject/Util/DataUtil.py

152 lines
4.8 KiB
Python
Raw Normal View History

2025-09-10 13:53:34 +08:00
import os
import json
import openpyxl
from typing import List, Dict, Any, Tuple, Optional
"""数据处理通用工具函数"""
def init_directories(data_dir: str) -> None:
"""初始化数据目录
Args:
data_dir: 数据保存目录路径
"""
os.makedirs(data_dir, exist_ok=True)
def process_value(value: Any) -> int | float:
"""处理单元格值,转换为合适的数值类型
支持处理空值空字符串####、带百分号的值和千分位逗号
Args:
value: 原始单元格值
Returns:
int | float: 转换后的数值无法转换时返回0
"""
if value is None:
return 0
# 统一转换为字符串处理
str_value = str(value).strip()
if str_value == '' or str_value == '####':
return 0
try:
# 处理百分号
if '%' in str_value:
return float(str_value.replace('%', ''))
# 处理千分位逗号
if ',' in str_value:
str_value = str_value.replace(',', '')
# 区分整数和浮点数
return float(str_value) if '.' in str_value else int(str_value)
except (ValueError, TypeError):
return 0
def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None:
"""打印名称转换统计信息
Args:
conversion_records: 转换记录列表
errors: 错误信息列表
"""
print("\n=== 名称转换记录 ===")
if conversion_records:
for record in conversion_records:
print(f"🔄 行 {record['row']}: {record['raw_name']}{record['converted_name']}")
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
else:
print("📝 不存在名称转换的情况")
if errors:
print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:")
for error in errors:
print(f" - {error}")
else:
print("✅ 所有区域名称均成功转换为全称")
def convert_area_name(raw_name: Any, row_num: int) -> Tuple[str, str, List[Dict[str, str]], List[str]]:
"""转换区域名称并记录转换结果
Args:
raw_name: 原始区域名称
row_num: 行号
Returns:
Tuple包含:
- area_name: 转换后的区域名称
- area_code: 区域代码
- conversion_records: 更新的转换记录列表
- name_conversion_errors: 更新的错误列表
"""
from Util.AreaUtil import query_area_info
conversion_records: List[Dict[str, str]] = []
name_conversion_errors: List[str] = []
# 确保raw_name为字符串类型
str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区'
if not str_raw_name:
str_raw_name = '未知地区'
# 查询区域信息
area_info = query_area_info(str_raw_name)
if area_info:
area_name = area_info['full_name']
area_code = area_info['area_code']
# 记录名称转换
if str_raw_name != area_name:
conversion_records.append({
'row': row_num,
'raw_name': str_raw_name,
'converted_name': area_name
})
else:
area_name = str_raw_name
area_code = 'unknown'
name_conversion_errors.append(f"{row_num}: '{str_raw_name}'")
return area_name, area_code, conversion_records, name_conversion_errors
def save_to_json(data: List[Dict[str, Any]], json_path: str) -> None:
"""保存数据到JSON文件
Args:
data: 要保存的数据
json_path: JSON文件路径
"""
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"✅ 数据提取完成,已保存至:{json_path}")
print(f"📊 共处理 {len(data)} 条地区数据")
def load_workbook_sheet(excel_path: str, sheet_name: str) -> Optional[openpyxl.worksheet.worksheet.Worksheet]:
"""加载工作簿并选择指定工作表
Args:
excel_path: Excel文件路径
sheet_name: 工作表名称
Returns:
工作表对象若加载失败则返回None
"""
try:
workbook = openpyxl.load_workbook(excel_path, read_only=True, data_only=True)
if sheet_name not in workbook.sheetnames:
print(f"❌ 错误:未找到'{sheet_name}'工作表")
workbook.close()
return None
return workbook[sheet_name]
except FileNotFoundError:
print(f"🔴 错误Excel文件 '{excel_path}' 不存在")
return None
except Exception as e:
print(f"🔴 加载工作簿时发生错误:{str(e)}")
return None