152 lines
4.8 KiB
Python
152 lines
4.8 KiB
Python
|
import os
|
|||
|
import json
|
|||
|
import openpyxl
|
|||
|
from typing import List, Dict, Any, Tuple, Optional
|
|||
|
|
|||
|
"""数据处理通用工具函数"""
|
|||
|
|
|||
|
def init_directories(data_dir: str) -> None:
|
|||
|
"""初始化数据目录
|
|||
|
|
|||
|
Args:
|
|||
|
data_dir: 数据保存目录路径
|
|||
|
"""
|
|||
|
os.makedirs(data_dir, exist_ok=True)
|
|||
|
|
|||
|
|
|||
|
def process_value(value: Any) -> int | float:
|
|||
|
"""处理单元格值,转换为合适的数值类型
|
|||
|
|
|||
|
支持处理空值、空字符串、####、带百分号的值和千分位逗号
|
|||
|
|
|||
|
Args:
|
|||
|
value: 原始单元格值
|
|||
|
|
|||
|
Returns:
|
|||
|
int | float: 转换后的数值,无法转换时返回0
|
|||
|
"""
|
|||
|
if value is None:
|
|||
|
return 0
|
|||
|
|
|||
|
# 统一转换为字符串处理
|
|||
|
str_value = str(value).strip()
|
|||
|
if str_value == '' or str_value == '####':
|
|||
|
return 0
|
|||
|
|
|||
|
try:
|
|||
|
# 处理百分号
|
|||
|
if '%' in str_value:
|
|||
|
return float(str_value.replace('%', ''))
|
|||
|
# 处理千分位逗号
|
|||
|
if ',' in str_value:
|
|||
|
str_value = str_value.replace(',', '')
|
|||
|
# 区分整数和浮点数
|
|||
|
return float(str_value) if '.' in str_value else int(str_value)
|
|||
|
except (ValueError, TypeError):
|
|||
|
return 0
|
|||
|
|
|||
|
|
|||
|
def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None:
|
|||
|
"""打印名称转换统计信息
|
|||
|
|
|||
|
Args:
|
|||
|
conversion_records: 转换记录列表
|
|||
|
errors: 错误信息列表
|
|||
|
"""
|
|||
|
print("\n=== 名称转换记录 ===")
|
|||
|
if conversion_records:
|
|||
|
for record in conversion_records:
|
|||
|
print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}")
|
|||
|
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
|
|||
|
else:
|
|||
|
print("📝 不存在名称转换的情况")
|
|||
|
|
|||
|
if errors:
|
|||
|
print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:")
|
|||
|
for error in errors:
|
|||
|
print(f" - {error}")
|
|||
|
else:
|
|||
|
print("✅ 所有区域名称均成功转换为全称")
|
|||
|
|
|||
|
|
|||
|
def convert_area_name(raw_name: Any, row_num: int) -> Tuple[str, str, List[Dict[str, str]], List[str]]:
|
|||
|
"""转换区域名称并记录转换结果
|
|||
|
|
|||
|
Args:
|
|||
|
raw_name: 原始区域名称
|
|||
|
row_num: 行号
|
|||
|
|
|||
|
Returns:
|
|||
|
Tuple包含:
|
|||
|
- area_name: 转换后的区域名称
|
|||
|
- area_code: 区域代码
|
|||
|
- conversion_records: 更新的转换记录列表
|
|||
|
- name_conversion_errors: 更新的错误列表
|
|||
|
"""
|
|||
|
from Util.AreaUtil import query_area_info
|
|||
|
|
|||
|
conversion_records: List[Dict[str, str]] = []
|
|||
|
name_conversion_errors: List[str] = []
|
|||
|
|
|||
|
# 确保raw_name为字符串类型
|
|||
|
str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区'
|
|||
|
if not str_raw_name:
|
|||
|
str_raw_name = '未知地区'
|
|||
|
|
|||
|
# 查询区域信息
|
|||
|
area_info = query_area_info(str_raw_name)
|
|||
|
if area_info:
|
|||
|
area_name = area_info['full_name']
|
|||
|
area_code = area_info['area_code']
|
|||
|
|
|||
|
# 记录名称转换
|
|||
|
if str_raw_name != area_name:
|
|||
|
conversion_records.append({
|
|||
|
'row': row_num,
|
|||
|
'raw_name': str_raw_name,
|
|||
|
'converted_name': area_name
|
|||
|
})
|
|||
|
else:
|
|||
|
area_name = str_raw_name
|
|||
|
area_code = 'unknown'
|
|||
|
name_conversion_errors.append(f"行 {row_num}: '{str_raw_name}'")
|
|||
|
|
|||
|
return area_name, area_code, conversion_records, name_conversion_errors
|
|||
|
|
|||
|
|
|||
|
def save_to_json(data: List[Dict[str, Any]], json_path: str) -> None:
|
|||
|
"""保存数据到JSON文件
|
|||
|
|
|||
|
Args:
|
|||
|
data: 要保存的数据
|
|||
|
json_path: JSON文件路径
|
|||
|
"""
|
|||
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|||
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|||
|
print(f"✅ 数据提取完成,已保存至:{json_path}")
|
|||
|
print(f"📊 共处理 {len(data)} 条地区数据")
|
|||
|
|
|||
|
|
|||
|
def load_workbook_sheet(excel_path: str, sheet_name: str) -> Optional[openpyxl.worksheet.worksheet.Worksheet]:
|
|||
|
"""加载工作簿并选择指定工作表
|
|||
|
|
|||
|
Args:
|
|||
|
excel_path: Excel文件路径
|
|||
|
sheet_name: 工作表名称
|
|||
|
|
|||
|
Returns:
|
|||
|
工作表对象,若加载失败则返回None
|
|||
|
"""
|
|||
|
try:
|
|||
|
workbook = openpyxl.load_workbook(excel_path, read_only=True, data_only=True)
|
|||
|
if sheet_name not in workbook.sheetnames:
|
|||
|
print(f"❌ 错误:未找到'{sheet_name}'工作表")
|
|||
|
workbook.close()
|
|||
|
return None
|
|||
|
return workbook[sheet_name]
|
|||
|
except FileNotFoundError:
|
|||
|
print(f"🔴 错误:Excel文件 '{excel_path}' 不存在")
|
|||
|
return None
|
|||
|
except Exception as e:
|
|||
|
print(f"🔴 加载工作簿时发生错误:{str(e)}")
|
|||
|
return None
|