This commit is contained in:
2025-09-10 13:53:34 +08:00
parent 3009beeb3d
commit c1cf4c5816
4 changed files with 216 additions and 220 deletions

View File

@@ -1,9 +1,12 @@
import openpyxl
import json
import os
from typing import List, Dict, Any
from Config.Config import EXCEL_PATH
from Util.AreaUtil import query_area_info
from Util.DataUtil import (
init_directories, process_value, print_conversion_stats,
convert_area_name, save_to_json, load_workbook_sheet
)
# ======================= 配置常量 =======================
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data')
@@ -22,124 +25,69 @@ DATA_COLUMNS = {
'birth_population': ('AP', 'AY') # 出生人口
}
# ======================= 工具函数 =======================
def init_directories() -> None:
"""初始化数据目录"""
os.makedirs(DATA_DIR, exist_ok=True)
def process_value(value: Any) -> int | float | int:
"""处理单元格值,转换为合适的数值类型"""
if value is None or str(value).strip() == '':
return 0
try:
if isinstance(value, str):
value = value.replace(',', '').strip()
return float(value) if '.' in str(value) else int(value)
except (ValueError, TypeError):
return 0
# ======================= 核心逻辑 =======================
def extract_area_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> List[Dict[str, Any]]:
"""从工作表提取区域数据"""
population_data: List[Dict[str, Any]] = []
conversion_records: List[Dict[str, str]] = []
name_conversion_errors: List[str] = []
# 计算区域名称列索引
region_col_idx = openpyxl.utils.column_index_from_string(REGION_NAME_COLUMN) - 1
# 遍历数据行
for row_num in range(START_ROW, sheet.max_row + 1):
row = sheet[row_num]
raw_name = str(row[openpyxl.utils.column_index_from_string(REGION_NAME_COLUMN)-1].value or '未知地区').strip()
if not raw_name:
# 获取区域名称并转换
raw_name = row[region_col_idx].value
area_name, area_code, conv_records, errors = convert_area_name(raw_name, row_num)
conversion_records.extend(conv_records)
name_conversion_errors.extend(errors)
if not area_name:
continue
# 区域名称转换
area_info = query_area_info(raw_name)
if area_info:
area_name = area_info['full_name']
area_code = area_info['area_code']
if raw_name != area_name:
conversion_records.append({
'row': row_num,
'raw_name': raw_name,
'converted_name': area_name
})
else:
area_name = raw_name
area_code = 'unknown'
name_conversion_errors.append(f"{row_num}: '{raw_name}'")
# 构建区域数据
area_data = {
'area_name': area_name,
'area_code': area_code,
'raw_name': raw_name
'raw_name': str(raw_name).strip() if raw_name else '未知地区'
}
# 提取各指标年度数据
for metric, (start_col, end_col) in DATA_COLUMNS.items():
start_idx = openpyxl.utils.column_index_from_string(start_col) - 1
end_idx = openpyxl.utils.column_index_from_string(end_col) - 1
year_data = {}
for col_idx, year in zip(range(start_idx, end_idx + 1), YEAR_RANGE):
cell_value = row[col_idx].value
year_data[str(year)] = process_value(cell_value)
area_data[metric] = year_data
population_data.append(area_data)
# 输出转换统计
print_conversion_stats(conversion_records, name_conversion_errors)
return population_data
def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None:
"""打印名称转换统计信息"""
print("\n=== 名称转换记录 ===")
for record in conversion_records:
print(f"🔄 行 {record['row']}: {record['raw_name']}{record['converted_name']}")
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
if errors:
print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:")
for error in errors:
print(f" - {error}")
else:
print("✅ 所有区域名称均成功转换为全称")
# ======================= 主函数 =======================
def main() -> None:
"""人口数据提取主函数"""
init_directories()
try:
# 加载工作簿
workbook = openpyxl.load_workbook(EXCEL_PATH, read_only=True, data_only=True)
if SHEET_NAME not in workbook.sheetnames:
print(f"❌ 错误:未找到'{SHEET_NAME}'工作表")
return
# 提取并处理数据
sheet = workbook[SHEET_NAME]
population_data = extract_area_data(sheet)
# 保存结果
with open(JSON_PATH, 'w', encoding='utf-8') as f:
json.dump(population_data, f, ensure_ascii=False, indent=2)
print(f"✅ 人口数据提取完成,已保存至:{JSON_PATH}")
print(f"📊 共处理 {len(population_data)} 条地区数据")
except FileNotFoundError:
print(f"🔴 错误Excel文件 '{EXCEL_PATH}' 不存在")
except Exception as e:
print(f"🔴 处理数据时发生错误:{str(e)}")
finally:
try:
workbook.close()
except:
pass
init_directories(DATA_DIR)
# 加载工作
sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME)
if not sheet:
return
# 提取并处理数据
population_data = extract_area_data(sheet)
# 保存结果
save_to_json(population_data, JSON_PATH)
if __name__ == '__main__':
main()