244 lines
8.3 KiB
Python
244 lines
8.3 KiB
Python
import openpyxl
|
||
import json
|
||
import os
|
||
from typing import List, Dict, Any, Tuple
|
||
|
||
from Config.Config import EXCEL_PATH
|
||
from Util.AreaUtil import query_area_info
|
||
|
||
# ======================= 配置常量 =======================
|
||
"""数据提取配置"""
|
||
# 数据保存目录
|
||
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data')
|
||
# JSON输出路径
|
||
JSON_PATH = os.path.join(DATA_DIR, 'MaoRuXueLv.json')
|
||
# 工作表名称
|
||
SHEET_NAME = '毛入学率'
|
||
# 数据起始行
|
||
START_ROW = 5
|
||
# 区域名称所在列
|
||
REGION_NAME_COLUMN = 'B'
|
||
# 年份范围
|
||
YEAR_RANGE = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
|
||
|
||
# 数据列映射配置
|
||
DATA_COLUMNS = {
|
||
# 学前教育 - 交替列映射
|
||
'preschool_enrollment': {
|
||
'columns': ['D', 'F', 'H', 'J', 'L', 'N', 'P', 'R', 'T', 'V'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
'preschool_enrollment_rate': {
|
||
'columns': ['E', 'G', 'I', 'K', 'M', 'O', 'Q', 'S', 'U', 'W'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
|
||
# 小学教育
|
||
'primary_enrollment': {
|
||
'columns': ['X', 'Z', 'AB', 'AD', 'AF', 'AH', 'AJ', 'AL', 'AN', 'AP'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
'primary_enrollment_rate': {
|
||
'columns': ['Y', 'AA', 'AC', 'AE', 'AG', 'AI', 'AK', 'AM', 'AO', 'AQ'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
|
||
# 初中教育
|
||
'junior_high_enrollment': {
|
||
'columns': ['AR', 'AT', 'AV', 'AX', 'AZ', 'BB', 'BD', 'BF', 'BH', 'BJ'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
'junior_high_enrollment_rate': {
|
||
'columns': ['AS', 'AU', 'AW', 'AY', 'BA', 'BC', 'BE', 'BG', 'BI', 'BK'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
|
||
# 普通高中教育
|
||
'senior_high_enrollment': {
|
||
'columns': ['BL', 'BN', 'BP', 'BR', 'BT', 'BV', 'BX', 'BZ', 'CB', 'CD'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
'senior_high_enrollment_rate': {
|
||
'columns': ['BM', 'BO', 'BQ', 'BS', 'BU', 'BW', 'BY', 'CA', 'CC', 'CE'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
|
||
# 中职教育
|
||
'vocational_enrollment': {
|
||
'columns': ['CF', 'CH', 'CJ', 'CL', 'CN', 'CP', 'CR', 'CT', 'CV', 'CX'],
|
||
'years': YEAR_RANGE
|
||
},
|
||
'vocational_enrollment_rate': {
|
||
'columns': ['CG', 'CI', 'CK', 'CM', 'CO', 'CQ', 'CS', 'CU', 'CW', 'CY'],
|
||
'years': YEAR_RANGE
|
||
}
|
||
}
|
||
|
||
# ======================= 工具函数 =======================
|
||
def init_directories() -> None:
|
||
"""初始化数据目录
|
||
创建数据保存目录,如果目录已存在则不执行操作
|
||
"""
|
||
os.makedirs(DATA_DIR, exist_ok=True)
|
||
|
||
|
||
def process_value(value: Any) -> int | float | int:
|
||
"""处理单元格值,转换为合适的数值类型
|
||
|
||
Args:
|
||
value: 原始单元格值
|
||
|
||
Returns:
|
||
int | float | int: 转换后的数值,无法转换时返回0
|
||
"""
|
||
if value is None:
|
||
return 0
|
||
|
||
# 统一转换为字符串处理
|
||
str_value = str(value).strip()
|
||
if str_value == '' or str_value == '####':
|
||
return 0
|
||
|
||
try:
|
||
if '%' in str_value:
|
||
# 移除百分号并转换为小数
|
||
return float(str_value.replace('%', ''))
|
||
elif '.' in str_value:
|
||
return float(str_value)
|
||
else:
|
||
return int(str_value)
|
||
except (ValueError, TypeError):
|
||
return 0
|
||
|
||
|
||
def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None:
|
||
"""打印名称转换统计信息
|
||
|
||
Args:
|
||
conversion_records: 转换记录列表
|
||
errors: 错误信息列表
|
||
"""
|
||
print("\n=== 名称转换记录 ===")
|
||
if conversion_records:
|
||
for record in conversion_records:
|
||
print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}")
|
||
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
|
||
else:
|
||
print("📝 不存在名称转换的情况")
|
||
|
||
if errors:
|
||
print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:")
|
||
for error in errors:
|
||
print(f" - {error}")
|
||
else:
|
||
print("✅ 所有区域名称均成功转换为全称")
|
||
|
||
# ======================= 核心逻辑 =======================
|
||
def extract_enrollment_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]], List[str]]:
|
||
"""从工作表提取毛入学率数据
|
||
|
||
Args:
|
||
sheet: 毛入学率工作表对象
|
||
|
||
Returns:
|
||
Tuple包含:
|
||
- enrollment_data: 提取的毛入学率数据列表
|
||
- conversion_records: 名称转换记录
|
||
- name_conversion_errors: 名称转换错误列表
|
||
"""
|
||
enrollment_data: List[Dict[str, Any]] = []
|
||
conversion_records: List[Dict[str, str]] = []
|
||
name_conversion_errors: List[str] = []
|
||
|
||
# 计算区域名称列索引
|
||
region_col_idx = openpyxl.utils.column_index_from_string(REGION_NAME_COLUMN) - 1
|
||
|
||
# 遍历数据行
|
||
for row_num, row in enumerate(sheet.iter_rows(min_row=START_ROW, values_only=True), start=START_ROW):
|
||
# 获取区域名称
|
||
raw_name = row[region_col_idx] if (len(row) > region_col_idx and row[region_col_idx] is not None) else '未知地区'
|
||
if not raw_name:
|
||
continue
|
||
|
||
# 区域名称转换
|
||
str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区'
|
||
area_info = query_area_info(str_raw_name)
|
||
|
||
if area_info:
|
||
area_name = area_info['full_name']
|
||
area_code = area_info['area_code']
|
||
|
||
# 记录名称转换
|
||
if str_raw_name != area_name:
|
||
conversion_records.append({
|
||
'row': row_num,
|
||
'raw_name': str_raw_name,
|
||
'converted_name': area_name
|
||
})
|
||
else:
|
||
area_name = str_raw_name
|
||
area_code = 'unknown'
|
||
name_conversion_errors.append(f"行 {row_num}: '{str_raw_name}'")
|
||
|
||
# 创建区域数据对象
|
||
area_data = {
|
||
'area_name': area_name,
|
||
'area_code': area_code,
|
||
'raw_name': str_raw_name # 保留原始名称用于调试
|
||
}
|
||
|
||
# 提取各指标年度数据
|
||
for metric, config in DATA_COLUMNS.items():
|
||
year_data = {}
|
||
if 'columns' in config and 'years' in config:
|
||
for col_name, year in zip(config['columns'], config['years']):
|
||
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
|
||
if col_idx < len(row):
|
||
value = row[col_idx]
|
||
year_data[str(year)] = process_value(value)
|
||
area_data[metric] = year_data
|
||
|
||
enrollment_data.append(area_data)
|
||
|
||
return enrollment_data, conversion_records, name_conversion_errors
|
||
|
||
# ======================= 主函数 =======================
|
||
def main() -> None:
|
||
"""主函数:执行毛入学率数据提取流程"""
|
||
try:
|
||
# 初始化目录
|
||
init_directories()
|
||
|
||
# 加载工作簿并选择工作表
|
||
workbook = openpyxl.load_workbook(EXCEL_PATH, read_only=True)
|
||
|
||
if SHEET_NAME not in workbook.sheetnames:
|
||
print(f"❌ 错误:未找到'{SHEET_NAME}'Sheet")
|
||
return
|
||
|
||
sheet = workbook[SHEET_NAME]
|
||
|
||
# 提取数据
|
||
enrollment_data, conversion_records, name_conversion_errors = extract_enrollment_data(sheet)
|
||
|
||
# 关闭工作簿释放资源
|
||
workbook.close()
|
||
|
||
# 保存为JSON文件
|
||
with open(JSON_PATH, 'w', encoding='utf-8') as f:
|
||
json.dump(enrollment_data, f, ensure_ascii=False, indent=2)
|
||
|
||
# 输出结果统计
|
||
print(f"✅ 毛入学率数据提取完成,已保存至:{JSON_PATH}")
|
||
print(f"📊 共处理 {len(enrollment_data)} 条地区数据")
|
||
print_conversion_stats(conversion_records, name_conversion_errors)
|
||
|
||
except FileNotFoundError:
|
||
print(f"🔴 错误:Excel文件 '{EXCEL_PATH}' 不存在")
|
||
except Exception as e:
|
||
print(f"🔴 处理数据时发生错误:{str(e)}")
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|