This commit is contained in:
2025-09-10 14:09:58 +08:00
parent 4d77d0a881
commit c78ff28a53

View File

@@ -1,19 +1,41 @@
import openpyxl
import json
import os import os
import json
import traceback import traceback
from typing import List, Dict, Any, Tuple
from openpyxl.utils import column_index_from_string from openpyxl.utils import column_index_from_string
from openpyxl.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from Config.Config import EXCEL_PATH from Config.Config import EXCEL_PATH
from Util.AreaUtil import query_area_info from Util.AreaUtil import query_area_info
from Util.DataUtil import (
init_directories,
process_value,
print_conversion_stats,
convert_area_name,
save_to_json,
load_workbook_sheet
)
# 创建数据保存目录 # ======================== 配置常量 ======================== #
# 数据目录和JSON路径
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data') DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data')
os.makedirs(DATA_DIR, exist_ok=True) JSON_PATH = os.path.join(DATA_DIR, 'ZaiXiaoShengCount.json')
JSON_PATH = os.path.join(DATA_DIR, 'ZaiXiaoShengCount.json') # 在校生数JSON路径
# 工作表名称
SHEET_NAME = '在校生数'
# 区域名称所在列
REGION_NAME_COLUMN = 'B'
# 数据起始行
START_ROW = 5
# 年份范围
YEAR_RANGE = range(2015, 2025)
# 教育阶段配置 - 在校生数(2015-2024年) # 教育阶段配置 - 在校生数(2015-2024年)
education_stages = [ EDUCATION_STAGES = [
{ {
'name': 'preschool', 'name': 'preschool',
'chinese_name': '学前教育', 'chinese_name': '学前教育',
@@ -96,173 +118,164 @@ education_stages = [
} }
] ]
def process_value(value): # ======================== 核心逻辑 ======================== #
"""处理单元格值,转换为适当类型""" def extract_stage_data(row: Tuple[Any, ...], stage: Dict[str, Any]) -> Dict[str, Dict[str, int]]:
if value is None: """
return 0 提取单个教育阶段的在校生数据
:param row: Excel行数据
# 转换为字符串处理 :param stage: 教育阶段配置
str_value = str(value).strip() :return: 格式化的阶段数据
"""
# 处理空字符串 stage_data = {}
if not str_value: for year_config in stage['columns']:
return 0 year = year_config['year']
year_data = {}
# 处理特殊标记
if str_value in ['####', 'NA', 'N/A', '', ' ']:
return 0
# 处理数字
try:
# 移除千分位逗号
if ',' in str_value:
str_value = str_value.replace(',', '')
return int(float(str_value))
except ValueError:
return 0
def main(): # 处理多类别教育阶段(学前到高中)
file_name = EXCEL_PATH if 'urban' in year_config:
# 城区
urban_col = column_index_from_string(year_config['urban']) - 1
urban_val = row[urban_col] if len(row) > urban_col else None
year_data['urban'] = process_value(urban_val)
# 镇区
town_col = column_index_from_string(year_config['town']) - 1
town_val = row[town_col] if len(row) > town_col else None
year_data['town'] = process_value(town_val)
# 乡村
rural_col = column_index_from_string(year_config['rural']) - 1
rural_val = row[rural_col] if len(row) > rural_col else None
year_data['rural'] = process_value(rural_val)
# 总计
total_col = column_index_from_string(year_config['total']) - 1
total_val = row[total_col] if len(row) > total_col else None
year_data['total'] = process_value(total_val)
# 处理中职教育(单值)
else:
total_col = column_index_from_string(year_config['total']) - 1
total_val = row[total_col] if len(row) > total_col else None
year_data['total'] = process_value(total_val)
stage_data[str(year)] = year_data
return stage_data
# 修改函数定义更新返回类型注解为4个值
def extract_student_data(sheet: Worksheet) -> Tuple[List[Dict[str, Any]], List[str], List[str], int]:
"""
提取所有区域的在校生数据
:param sheet: Excel工作表对象
:return: 在校生数据列表、转换错误列表、处理总数
"""
student_data = [] student_data = []
name_conversion_errors = [] name_conversion_errors = []
conversion_records = [] conversion_records = []
processed_count = 0 processed_count = 0
region_col_index = column_index_from_string(REGION_NAME_COLUMN) - 1
try:
# 加载工作簿并选择在校生数Sheet print(f"✅ 开始处理在校生数数据,共{sheet.max_row}行数据")
workbook = openpyxl.load_workbook(file_name, data_only=True)
if '在校生数' not in workbook.sheetnames: # 遍历行数据
print("❌ 错误:未找到'在校生数'Sheet") for row_idx, row in enumerate(sheet.iter_rows(values_only=True), start=1):
return # 跳过表头行
sheet = workbook['在校生数'] if row_idx < START_ROW:
continue
print(f"✅ 成功加载Excel文件{file_name}")
print(f"✅ 开始处理在校生数数据,共{sheet.max_row}行数据") try:
# 检查行是否有足够的列
# 遍历行数据 if len(row) <= region_col_index:
for row_idx, row in enumerate(sheet.iter_rows(values_only=True), start=1): print(f"⚠️ 第{row_idx}行数据不足,跳过")
# 跳过前4行表头
if row_idx < 5:
continue continue
# 从B列获取区域名称索引1 # 取区域名称
try: raw_name = row[region_col_index]
# 检查行是否有足够的列 # 修复:接收四个返回值并合并结果
if len(row) < 2: area_name, area_code, new_conversion, new_errors = convert_area_name(raw_name, row_idx)
print(f"⚠️ 第{row_idx}行数据不足,跳过") conversion_records.extend(new_conversion)
continue name_conversion_errors.extend(new_errors)
is_valid = len(new_errors) == 0
raw_name = row[1]
if raw_name is None: # 记录转换结果
print(f"⚠️ 第{row_idx}行B列区域名称为空跳过该行") if is_valid:
continue # 将字符串记录改为字典格式
conversion_records.append({
raw_name = str(raw_name).strip() 'row': row_idx,
if not raw_name:
print(f"⚠️ 第{row_idx}行B列区域名称为空字符串跳过该行")
continue
# 查询区域信息
area_info = query_area_info(raw_name)
area_name = raw_name
area_code = 'unknown'
# 验证区域信息
if isinstance(area_info, dict):
if 'full_name' in area_info and 'area_code' in area_info:
area_name = area_info['full_name']
area_code = area_info['area_code']
conversion_records.append(f"✅ 第{row_idx}行: {raw_name}{area_name}")
processed_count += 1
else:
name_conversion_errors.append(f"{row_idx}行: {raw_name} (缺少必要字段)")
conversion_records.append(f"❌ 第{row_idx}行: {raw_name} (格式错误)")
else:
name_conversion_errors.append(f"{row_idx}行: {raw_name}")
conversion_records.append(f"❌ 第{row_idx}行: {raw_name} (未找到匹配)")
# 创建区域数据对象
area_data = {
'area_name': area_name,
'area_code': area_code,
'raw_name': raw_name, 'raw_name': raw_name,
'student_data': {} 'converted_name': area_name,
} 'status': 'success'
})
# 提取各教育阶段在校生数据 processed_count += 1
for stage in education_stages: else:
stage_name = stage['name'] error_msg = f"{row_idx}行: {raw_name}"
stage_data = {} name_conversion_errors.append(error_msg)
# 将字符串记录改为字典格式
for year_config in stage['columns']: conversion_records.append({
year = year_config['year'] 'row': row_idx,
year_data = {} 'raw_name': raw_name,
'converted_name': None,
# 处理多类别教育阶段(学前到高中) 'status': 'error'
if 'urban' in year_config: })
# 城区
urban_col = column_index_from_string(year_config['urban']) - 1
urban_val = row[urban_col] if len(row) > urban_col else None
year_data['urban'] = process_value(urban_val)
# 镇区
town_col = column_index_from_string(year_config['town']) - 1
town_val = row[town_col] if len(row) > town_col else None
year_data['town'] = process_value(town_val)
# 乡村
rural_col = column_index_from_string(year_config['rural']) - 1
rural_val = row[rural_col] if len(row) > rural_col else None
year_data['rural'] = process_value(rural_val)
# 总计
total_col = column_index_from_string(year_config['total']) - 1
total_val = row[total_col] if len(row) > total_col else None
year_data['total'] = process_value(total_val)
# 处理中职教育(单值)
else:
total_col = column_index_from_string(year_config['total']) - 1
total_val = row[total_col] if len(row) > total_col else None
year_data['total'] = process_value(total_val)
stage_data[str(year)] = year_data
area_data['student_data'][stage_name] = stage_data
student_data.append(area_data)
# 进度提示
if processed_count % 10 == 0 and processed_count > 0:
print(f"🔄 已处理{processed_count}条数据...")
except Exception as e:
print(f"🔴 处理第{row_idx}行时发生错误:{str(e)}")
continue continue
# 保存数据到JSON文件 # 创建区域数据对象
with open(JSON_PATH, 'w', encoding='utf-8') as f: area_data = {
json.dump(student_data, f, ensure_ascii=False, indent=2) 'area_name': area_name,
'area_code': area_code,
print("\n=== 数据处理完成 ===") 'raw_name': str(raw_name).strip(),
print(f"📊 共处理 {processed_count} 条地区数据") 'student_data': {}
print(f"✅ 区域名称转换成功: {processed_count - len(name_conversion_errors)}") }
if name_conversion_errors:
print(f"❌ 区域名称转换失败: {len(name_conversion_errors)}") # 提取各教育阶段数据
for error in name_conversion_errors[:5]: for stage in EDUCATION_STAGES:
print(f" - {error}") stage_name = stage['name']
if len(name_conversion_errors) > 5: area_data['student_data'][stage_name] = extract_stage_data(row, stage)
print(f" - ... 等{len(name_conversion_errors)-5}个错误")
student_data.append(area_data)
# 进度提示
if processed_count % 10 == 0 and processed_count > 0:
print(f"🔄 已处理{processed_count}条数据...")
except Exception as e:
print(f"🔴 处理第{row_idx}行时发生错误:{str(e)}")
continue
# 修改return语句添加name_conversion_errors返回值
return student_data, conversion_records, name_conversion_errors, processed_count
def main() -> None:
"""主函数:执行在校生数数据处理流程"""
try:
# 初始化目录
init_directories(DATA_DIR) # 修复:移除列表括号,直接传入路径字符串
# 加载Excel工作表
sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME)
if not sheet:
print(f"❌ 错误:未找到'{SHEET_NAME}'工作表")
return
# 提取数据
# 修复:调整返回值顺序,获取转换记录列表
student_data, conversion_records, name_conversion_errors, processed_count = extract_student_data(sheet)
# 保存数据到JSON
save_to_json(student_data, JSON_PATH)
# 打印转换统计
# 修复:传入转换记录列表而非处理数量
print_conversion_stats(conversion_records, name_conversion_errors)
print(f"💾 数据已保存至 {JSON_PATH}") print(f"💾 数据已保存至 {JSON_PATH}")
except FileNotFoundError: except FileNotFoundError:
print(f"🔴 错误Excel文件 '{file_name}' 不存在") print(f"🔴 错误Excel文件 '{EXCEL_PATH}' 不存在")
except Exception as e: except Exception as e:
print(f"🔴 处理数据时发生错误:{str(e)}{traceback.format_exc()}") print(f"🔴 处理数据时发生错误:{str(e)}{traceback.format_exc()}")
finally:
if 'workbook' in locals():
workbook.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()