This commit is contained in:
2025-09-10 13:58:30 +08:00
parent b6580d5c59
commit 54fae48629

View File

@@ -1,34 +1,36 @@
import openpyxl # 添加缺少的导入
import json
import os
import traceback
from typing import List, Dict, Any, Tuple
import openpyxl
from Config.Config import EXCEL_PATH
from Util.AreaUtil import query_area_info
from Util.DataUtil import (
init_directories, process_value, print_conversion_stats,
convert_area_name, save_to_json, load_workbook_sheet
)
# 创建数据保存目录
# ======================== 配置常量 ========================
# 数据目录和JSON路径
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data')
os.makedirs(DATA_DIR, exist_ok=True)
JSON_PATH = os.path.join(DATA_DIR, 'ClassCount.json') # 班级数JSON路径
JSON_PATH = os.path.join(DATA_DIR, 'ClassCount.json')
file_name = EXCEL_PATH
class_data = []
name_conversion_errors = [] # 记录转换失败的名称
conversion_records = [] # 定义转换记录变量
# 工作表名称
SHEET_NAME = '招生数'
try:
# 加载工作簿并选择招生数Sheet
workbook = openpyxl.load_workbook(file_name, read_only=True)
if '招生数' not in workbook.sheetnames:
print("❌ 错误:未找到'招生数'Sheet")
exit(1)
sheet = workbook['招生数']
# 区域名称所在列
REGION_NAME_COLUMN = 'B'
# 定义数据列范围与英文属性映射
data_columns = {
# 起始行(跳过表头)
START_ROW = 4
# 年份范围
YEAR_RANGE = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
# 数据列映射配置
DATA_COLUMNS = {
# 学前教育每年份3列城区/镇区/乡村)
'preschool_classes': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'years': YEAR_RANGE,
'columns': [
{'year': 2015, 'urban': 'D', 'town': 'E', 'rural': 'F'},
{'year': 2016, 'urban': 'H', 'town': 'I', 'rural': 'J'},
@@ -46,7 +48,7 @@ try:
# 小学教育
'primary_classes': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'years': YEAR_RANGE,
'columns': [
{'year': 2015, 'urban': 'AR', 'town': 'AS', 'rural': 'AT'},
{'year': 2016, 'urban': 'AV', 'town': 'AW', 'rural': 'AX'},
@@ -64,7 +66,7 @@ try:
# 初中教育
'junior_high_classes': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'years': YEAR_RANGE,
'columns': [
{'year': 2015, 'urban': 'CF', 'town': 'CG', 'rural': 'CH'},
{'year': 2016, 'urban': 'CJ', 'town': 'CK', 'rural': 'CL'},
@@ -82,7 +84,7 @@ try:
# 高中教育
'senior_high_classes': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'years': YEAR_RANGE,
'columns': [
{'year': 2015, 'urban': 'DT', 'town': 'DU', 'rural': 'DV'},
{'year': 2016, 'urban': 'DX', 'town': 'DY', 'rural': 'DZ'},
@@ -98,9 +100,9 @@ try:
'categories': ['urban', 'town', 'rural']
},
# 中职教育(特殊每年1列)
# 中职教育(单值列)
'vocational_classes': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'years': YEAR_RANGE,
'columns': [
{'year': 2015, 'column': 'FH'},
{'year': 2016, 'column': 'FI'},
@@ -116,40 +118,46 @@ try:
}
}
# 遍历数据行跳过前3行表头根据实际情况调整
for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):
# 区域名称从B列获取索引1
# ======================== 核心逻辑 ========================
def extract_class_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]], List[str]]:
"""提取班级数据并处理区域名称转换
Args:
sheet: 工作表对象
Returns:
Tuple包含:
- class_data: 提取的班级数据列表
- conversion_records: 区域名称转换记录
- errors: 错误信息列表
"""
class_data: List[Dict[str, Any]] = []
conversion_records: List[Dict[str, str]] = []
errors: List[str] = []
for row_num, row in enumerate(sheet.iter_rows(min_row=START_ROW, values_only=True), start=START_ROW):
# 获取区域名称B列
raw_name = row[1] if (len(row) > 1 and row[1] is not None) else '未知地区'
if not raw_name: # 跳过空行
continue
# 区域名称转换
str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区'
area_info = query_area_info(str_raw_name)
if area_info:
area_name = area_info['full_name']
area_code = area_info['area_code']
if raw_name != area_name:
conversion_records.append({
'row': row_num,
'raw_name': raw_name,
'converted_name': area_name
})
else:
area_name = raw_name
area_code = 'unknown'
name_conversion_errors.append(f"{row_num}: '{raw_name}'")
# 转换区域名称
area_name, area_code, row_conversions, row_errors = convert_area_name(raw_name, row_num)
conversion_records.extend(row_conversions)
errors.extend(row_errors)
area_data = {
# 创建区域数据对象
area_info = {
'area_name': area_name,
'area_code': area_code,
'raw_name': raw_name
}
# 提取各教育阶段班级数据
for stage, config in data_columns.items():
for stage, config in DATA_COLUMNS.items():
stage_data = {}
# 处理多类别教育阶段
# 处理多类别教育阶段(学前/小学/初中/高中)
if 'categories' in config:
for year_config in config['columns']:
year = year_config['year']
@@ -157,17 +165,8 @@ try:
for category in config['categories']:
col_name = year_config[category]
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
if col_idx < len(row):
value = row[col_idx]
# 数据清洗与转换
if value is None:
year_data[category] = 0
else:
str_value = str(value).strip()
if str_value in ['', '####']:
year_data[category] = 0
else:
year_data[category] = int(str_value) if str_value.isdigit() else 0
value = row[col_idx] if col_idx < len(row) else None
year_data[category] = process_value(value)
stage_data[str(year)] = year_data
# 处理中职教育(单值)
else:
@@ -175,39 +174,39 @@ try:
year = year_config['year']
col_name = year_config['column']
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
if col_idx < len(row):
value = row[col_idx]
if value is None:
stage_data[str(year)] = 0
else:
str_value = str(value).strip()
stage_data[str(year)] = int(str_value) if str_value.isdigit() else 0
area_data[stage] = stage_data
value = row[col_idx] if col_idx < len(row) else None
stage_data[str(year)] = process_value(value)
area_info[stage] = stage_data
class_data.append(area_data)
class_data.append(area_info)
workbook.close()
return class_data, conversion_records, errors
# 保存JSON文件
with open(JSON_PATH, 'w', encoding='utf-8') as f:
json.dump(class_data, f, ensure_ascii=False, indent=2)
# ======================== 主函数 ========================
# 输出统计信息
print(f"✅ 班级数数据提取完成,已保存至:{JSON_PATH}")
print(f"📊 共处理 {len(class_data)} 条地区数据")
print("\n=== 名称转换记录 ===")
if conversion_records:
for record in conversion_records:
print(f"🔄 行 {record['row']}: {record['raw_name']}{record['converted_name']}")
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
else:
print("📝 不存在名称转换的情况")
if name_conversion_errors:
print(f"⚠️ 发现 {len(name_conversion_errors)} 个区域名称转换失败:")
for error in name_conversion_errors:
print(f" - {error}")
def main() -> None:
"""主函数:加载数据并执行提取流程"""
try:
# 初始化目录
init_directories(DATA_DIR)
# 加载工作表
sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME)
if not sheet:
return
# 提取班级数据
class_data, conversion_records, errors = extract_class_data(sheet)
# 打印转换统计
print_conversion_stats(conversion_records, errors)
# 保存数据到JSON
save_to_json(class_data, JSON_PATH)
except FileNotFoundError:
print(f"🔴 错误Excel文件 '{file_name}' 不存在")
except Exception as e:
print(f"🔴 处理数据时发生错误:{str(e)}{traceback.format_exc()}")
print(f"🔴 处理数据时发生错误:{str(e)}")
print(traceback.format_exc())
if __name__ == '__main__':
main()