152 lines
4.8 KiB
Python
152 lines
4.8 KiB
Python
import os
|
||
import json
|
||
import openpyxl
|
||
from typing import List, Dict, Any, Tuple, Optional
|
||
|
||
"""数据处理通用工具函数"""
|
||
|
||
def init_directories(data_dir: str) -> None:
|
||
"""初始化数据目录
|
||
|
||
Args:
|
||
data_dir: 数据保存目录路径
|
||
"""
|
||
os.makedirs(data_dir, exist_ok=True)
|
||
|
||
|
||
def process_value(value: Any) -> int | float:
|
||
"""处理单元格值,转换为合适的数值类型
|
||
|
||
支持处理空值、空字符串、####、带百分号的值和千分位逗号
|
||
|
||
Args:
|
||
value: 原始单元格值
|
||
|
||
Returns:
|
||
int | float: 转换后的数值,无法转换时返回0
|
||
"""
|
||
if value is None:
|
||
return 0
|
||
|
||
# 统一转换为字符串处理
|
||
str_value = str(value).strip()
|
||
if str_value == '' or str_value == '####':
|
||
return 0
|
||
|
||
try:
|
||
# 处理百分号
|
||
if '%' in str_value:
|
||
return float(str_value.replace('%', ''))
|
||
# 处理千分位逗号
|
||
if ',' in str_value:
|
||
str_value = str_value.replace(',', '')
|
||
# 区分整数和浮点数
|
||
return float(str_value) if '.' in str_value else int(str_value)
|
||
except (ValueError, TypeError):
|
||
return 0
|
||
|
||
|
||
def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None:
|
||
"""打印名称转换统计信息
|
||
|
||
Args:
|
||
conversion_records: 转换记录列表
|
||
errors: 错误信息列表
|
||
"""
|
||
print("\n=== 名称转换记录 ===")
|
||
if conversion_records:
|
||
for record in conversion_records:
|
||
print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}")
|
||
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
|
||
else:
|
||
print("📝 不存在名称转换的情况")
|
||
|
||
if errors:
|
||
print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:")
|
||
for error in errors:
|
||
print(f" - {error}")
|
||
else:
|
||
print("✅ 所有区域名称均成功转换为全称")
|
||
|
||
|
||
def convert_area_name(raw_name: Any, row_num: int) -> Tuple[str, str, List[Dict[str, str]], List[str]]:
|
||
"""转换区域名称并记录转换结果
|
||
|
||
Args:
|
||
raw_name: 原始区域名称
|
||
row_num: 行号
|
||
|
||
Returns:
|
||
Tuple包含:
|
||
- area_name: 转换后的区域名称
|
||
- area_code: 区域代码
|
||
- conversion_records: 更新的转换记录列表
|
||
- name_conversion_errors: 更新的错误列表
|
||
"""
|
||
from Util.AreaUtil import query_area_info
|
||
|
||
conversion_records: List[Dict[str, str]] = []
|
||
name_conversion_errors: List[str] = []
|
||
|
||
# 确保raw_name为字符串类型
|
||
str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区'
|
||
if not str_raw_name:
|
||
str_raw_name = '未知地区'
|
||
|
||
# 查询区域信息
|
||
area_info = query_area_info(str_raw_name)
|
||
if area_info:
|
||
area_name = area_info['full_name']
|
||
area_code = area_info['area_code']
|
||
|
||
# 记录名称转换
|
||
if str_raw_name != area_name:
|
||
conversion_records.append({
|
||
'row': row_num,
|
||
'raw_name': str_raw_name,
|
||
'converted_name': area_name
|
||
})
|
||
else:
|
||
area_name = str_raw_name
|
||
area_code = 'unknown'
|
||
name_conversion_errors.append(f"行 {row_num}: '{str_raw_name}'")
|
||
|
||
return area_name, area_code, conversion_records, name_conversion_errors
|
||
|
||
|
||
def save_to_json(data: List[Dict[str, Any]], json_path: str) -> None:
|
||
"""保存数据到JSON文件
|
||
|
||
Args:
|
||
data: 要保存的数据
|
||
json_path: JSON文件路径
|
||
"""
|
||
with open(json_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
print(f"✅ 数据提取完成,已保存至:{json_path}")
|
||
print(f"📊 共处理 {len(data)} 条地区数据")
|
||
|
||
|
||
def load_workbook_sheet(excel_path: str, sheet_name: str) -> Optional[openpyxl.worksheet.worksheet.Worksheet]:
|
||
"""加载工作簿并选择指定工作表
|
||
|
||
Args:
|
||
excel_path: Excel文件路径
|
||
sheet_name: 工作表名称
|
||
|
||
Returns:
|
||
工作表对象,若加载失败则返回None
|
||
"""
|
||
try:
|
||
workbook = openpyxl.load_workbook(excel_path, read_only=True, data_only=True)
|
||
if sheet_name not in workbook.sheetnames:
|
||
print(f"❌ 错误:未找到'{sheet_name}'工作表")
|
||
workbook.close()
|
||
return None
|
||
return workbook[sheet_name]
|
||
except FileNotFoundError:
|
||
print(f"🔴 错误:Excel文件 '{excel_path}' 不存在")
|
||
return None
|
||
except Exception as e:
|
||
print(f"🔴 加载工作簿时发生错误:{str(e)}")
|
||
return None |