From c78ff28a5392f21a3c084d16ac070d144a8d74ee Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Wed, 10 Sep 2025 14:09:58 +0800 Subject: [PATCH] 'commit' --- Tools/T6_ZaiXiaoShengCount.py | 335 ++++++++++++++++++---------------- 1 file changed, 174 insertions(+), 161 deletions(-) diff --git a/Tools/T6_ZaiXiaoShengCount.py b/Tools/T6_ZaiXiaoShengCount.py index ff4b49b..48ba204 100644 --- a/Tools/T6_ZaiXiaoShengCount.py +++ b/Tools/T6_ZaiXiaoShengCount.py @@ -1,19 +1,41 @@ -import openpyxl -import json import os +import json import traceback +from typing import List, Dict, Any, Tuple from openpyxl.utils import column_index_from_string +from openpyxl.workbook import Workbook +from openpyxl.worksheet.worksheet import Worksheet from Config.Config import EXCEL_PATH from Util.AreaUtil import query_area_info +from Util.DataUtil import ( + init_directories, + process_value, + print_conversion_stats, + convert_area_name, + save_to_json, + load_workbook_sheet +) -# 创建数据保存目录 +# ======================== 配置常量 ======================== # +# 数据目录和JSON路径 DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data') -os.makedirs(DATA_DIR, exist_ok=True) -JSON_PATH = os.path.join(DATA_DIR, 'ZaiXiaoShengCount.json') # 在校生数JSON路径 +JSON_PATH = os.path.join(DATA_DIR, 'ZaiXiaoShengCount.json') + +# 工作表名称 +SHEET_NAME = '在校生数' + +# 区域名称所在列 +REGION_NAME_COLUMN = 'B' + +# 数据起始行 +START_ROW = 5 + +# 年份范围 +YEAR_RANGE = range(2015, 2025) # 教育阶段配置 - 在校生数(2015-2024年) -education_stages = [ +EDUCATION_STAGES = [ { 'name': 'preschool', 'chinese_name': '学前教育', @@ -96,173 +118,164 @@ education_stages = [ } ] -def process_value(value): - """处理单元格值,转换为适当类型""" - if value is None: - return 0 - - # 转换为字符串处理 - str_value = str(value).strip() - - # 处理空字符串 - if not str_value: - return 0 - - # 处理特殊标记 - if str_value in ['####', 'NA', 'N/A', '—', ' ']: - return 0 - - # 处理数字 - try: - # 移除千分位逗号 - if ',' in str_value: - str_value = str_value.replace(',', '') - return int(float(str_value)) - except ValueError: - return 0 +# ======================== 核心逻辑 ======================== # +def extract_stage_data(row: Tuple[Any, ...], stage: Dict[str, Any]) -> Dict[str, Dict[str, int]]: + """ + 提取单个教育阶段的在校生数据 + :param row: Excel行数据 + :param stage: 教育阶段配置 + :return: 格式化的阶段数据 + """ + stage_data = {} + for year_config in stage['columns']: + year = year_config['year'] + year_data = {} -def main(): - file_name = EXCEL_PATH + # 处理多类别教育阶段(学前到高中) + if 'urban' in year_config: + # 城区 + urban_col = column_index_from_string(year_config['urban']) - 1 + urban_val = row[urban_col] if len(row) > urban_col else None + year_data['urban'] = process_value(urban_val) + + # 镇区 + town_col = column_index_from_string(year_config['town']) - 1 + town_val = row[town_col] if len(row) > town_col else None + year_data['town'] = process_value(town_val) + + # 乡村 + rural_col = column_index_from_string(year_config['rural']) - 1 + rural_val = row[rural_col] if len(row) > rural_col else None + year_data['rural'] = process_value(rural_val) + + # 总计 + total_col = column_index_from_string(year_config['total']) - 1 + total_val = row[total_col] if len(row) > total_col else None + year_data['total'] = process_value(total_val) + # 处理中职教育(单值) + else: + total_col = column_index_from_string(year_config['total']) - 1 + total_val = row[total_col] if len(row) > total_col else None + year_data['total'] = process_value(total_val) + + stage_data[str(year)] = year_data + return stage_data + + +# 修改函数定义,更新返回类型注解为4个值 +def extract_student_data(sheet: Worksheet) -> Tuple[List[Dict[str, Any]], List[str], List[str], int]: + """ + 提取所有区域的在校生数据 + :param sheet: Excel工作表对象 + :return: 在校生数据列表、转换错误列表、处理总数 + """ student_data = [] name_conversion_errors = [] conversion_records = [] processed_count = 0 - - try: - # 加载工作簿并选择在校生数Sheet - workbook = openpyxl.load_workbook(file_name, data_only=True) - if '在校生数' not in workbook.sheetnames: - print("❌ 错误:未找到'在校生数'Sheet") - return - sheet = workbook['在校生数'] - - print(f"✅ 成功加载Excel文件:{file_name}") - print(f"✅ 开始处理在校生数数据,共{sheet.max_row}行数据") - - # 遍历行数据 - for row_idx, row in enumerate(sheet.iter_rows(values_only=True), start=1): - # 跳过前4行表头 - if row_idx < 5: + region_col_index = column_index_from_string(REGION_NAME_COLUMN) - 1 + + print(f"✅ 开始处理在校生数数据,共{sheet.max_row}行数据") + + # 遍历行数据 + for row_idx, row in enumerate(sheet.iter_rows(values_only=True), start=1): + # 跳过表头行 + if row_idx < START_ROW: + continue + + try: + # 检查行是否有足够的列 + if len(row) <= region_col_index: + print(f"⚠️ 第{row_idx}行数据不足,跳过") continue - - # 从B列获取区域名称(索引1) - try: - # 检查行是否有足够的列 - if len(row) < 2: - print(f"⚠️ 第{row_idx}行数据不足,跳过") - continue - - raw_name = row[1] - if raw_name is None: - print(f"⚠️ 第{row_idx}行B列区域名称为空,跳过该行") - continue - - raw_name = str(raw_name).strip() - if not raw_name: - print(f"⚠️ 第{row_idx}行B列区域名称为空字符串,跳过该行") - continue - - # 查询区域信息 - area_info = query_area_info(raw_name) - area_name = raw_name - area_code = 'unknown' - - # 验证区域信息 - if isinstance(area_info, dict): - if 'full_name' in area_info and 'area_code' in area_info: - area_name = area_info['full_name'] - area_code = area_info['area_code'] - conversion_records.append(f"✅ 第{row_idx}行: {raw_name} → {area_name}") - processed_count += 1 - else: - name_conversion_errors.append(f"第{row_idx}行: {raw_name} (缺少必要字段)") - conversion_records.append(f"❌ 第{row_idx}行: {raw_name} (格式错误)") - else: - name_conversion_errors.append(f"第{row_idx}行: {raw_name}") - conversion_records.append(f"❌ 第{row_idx}行: {raw_name} (未找到匹配)") - - # 创建区域数据对象 - area_data = { - 'area_name': area_name, - 'area_code': area_code, + + # 提取区域名称 + raw_name = row[region_col_index] + # 修复:接收四个返回值并合并结果 + area_name, area_code, new_conversion, new_errors = convert_area_name(raw_name, row_idx) + conversion_records.extend(new_conversion) + name_conversion_errors.extend(new_errors) + is_valid = len(new_errors) == 0 + + # 记录转换结果 + if is_valid: + # 将字符串记录改为字典格式 + conversion_records.append({ + 'row': row_idx, 'raw_name': raw_name, - 'student_data': {} - } - - # 提取各教育阶段在校生数据 - for stage in education_stages: - stage_name = stage['name'] - stage_data = {} - - for year_config in stage['columns']: - year = year_config['year'] - year_data = {} - - # 处理多类别教育阶段(学前到高中) - if 'urban' in year_config: - # 城区 - urban_col = column_index_from_string(year_config['urban']) - 1 - urban_val = row[urban_col] if len(row) > urban_col else None - year_data['urban'] = process_value(urban_val) - - # 镇区 - town_col = column_index_from_string(year_config['town']) - 1 - town_val = row[town_col] if len(row) > town_col else None - year_data['town'] = process_value(town_val) - - # 乡村 - rural_col = column_index_from_string(year_config['rural']) - 1 - rural_val = row[rural_col] if len(row) > rural_col else None - year_data['rural'] = process_value(rural_val) - - # 总计 - total_col = column_index_from_string(year_config['total']) - 1 - total_val = row[total_col] if len(row) > total_col else None - year_data['total'] = process_value(total_val) - # 处理中职教育(单值) - else: - total_col = column_index_from_string(year_config['total']) - 1 - total_val = row[total_col] if len(row) > total_col else None - year_data['total'] = process_value(total_val) - - stage_data[str(year)] = year_data - - area_data['student_data'][stage_name] = stage_data - - student_data.append(area_data) - - # 进度提示 - if processed_count % 10 == 0 and processed_count > 0: - print(f"🔄 已处理{processed_count}条数据...") - - except Exception as e: - print(f"🔴 处理第{row_idx}行时发生错误:{str(e)}") + 'converted_name': area_name, + 'status': 'success' + }) + processed_count += 1 + else: + error_msg = f"第{row_idx}行: {raw_name}" + name_conversion_errors.append(error_msg) + # 将字符串记录改为字典格式 + conversion_records.append({ + 'row': row_idx, + 'raw_name': raw_name, + 'converted_name': None, + 'status': 'error' + }) continue - - # 保存数据到JSON文件 - with open(JSON_PATH, 'w', encoding='utf-8') as f: - json.dump(student_data, f, ensure_ascii=False, indent=2) - - print("\n=== 数据处理完成 ===") - print(f"📊 共处理 {processed_count} 条地区数据") - print(f"✅ 区域名称转换成功: {processed_count - len(name_conversion_errors)}") - if name_conversion_errors: - print(f"❌ 区域名称转换失败: {len(name_conversion_errors)}个") - for error in name_conversion_errors[:5]: - print(f" - {error}") - if len(name_conversion_errors) > 5: - print(f" - ... 等{len(name_conversion_errors)-5}个错误") - + + # 创建区域数据对象 + area_data = { + 'area_name': area_name, + 'area_code': area_code, + 'raw_name': str(raw_name).strip(), + 'student_data': {} + } + + # 提取各教育阶段数据 + for stage in EDUCATION_STAGES: + stage_name = stage['name'] + area_data['student_data'][stage_name] = extract_stage_data(row, stage) + + student_data.append(area_data) + + # 进度提示 + if processed_count % 10 == 0 and processed_count > 0: + print(f"🔄 已处理{processed_count}条数据...") + + except Exception as e: + print(f"🔴 处理第{row_idx}行时发生错误:{str(e)}") + continue + + # 修改return语句,添加name_conversion_errors返回值 + return student_data, conversion_records, name_conversion_errors, processed_count + + +def main() -> None: + """主函数:执行在校生数数据处理流程""" + try: + # 初始化目录 + init_directories(DATA_DIR) # 修复:移除列表括号,直接传入路径字符串 + + # 加载Excel工作表 + sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME) + if not sheet: + print(f"❌ 错误:未找到'{SHEET_NAME}'工作表") + return + + # 提取数据 + # 修复:调整返回值顺序,获取转换记录列表 + student_data, conversion_records, name_conversion_errors, processed_count = extract_student_data(sheet) + + # 保存数据到JSON + save_to_json(student_data, JSON_PATH) + + # 打印转换统计 + # 修复:传入转换记录列表而非处理数量 + print_conversion_stats(conversion_records, name_conversion_errors) print(f"💾 数据已保存至 {JSON_PATH}") - + except FileNotFoundError: - print(f"🔴 错误:Excel文件 '{file_name}' 不存在") + print(f"🔴 错误:Excel文件 '{EXCEL_PATH}' 不存在") except Exception as e: print(f"🔴 处理数据时发生错误:{str(e)}{traceback.format_exc()}") - finally: - if 'workbook' in locals(): - workbook.close() - + + if __name__ == "__main__": main()