This commit is contained in:
2025-09-10 14:21:19 +08:00
parent c78ff28a53
commit 7799142c32

View File

@@ -1,32 +1,28 @@
import openpyxl # 添加缺少的导入
import json
import os
import traceback
from typing import List, Dict, Any, Tuple
import openpyxl
from openpyxl.utils import column_index_from_string
from openpyxl.worksheet.worksheet import Worksheet
from Config.Config import EXCEL_PATH
from Util.AreaUtil import query_area_info
from Util.DataUtil import (
init_directories, process_value, print_conversion_stats,
convert_area_name, save_to_json, load_workbook_sheet
)
# 创建数据保存目录
# ======================== 配置常量 ======================== #
# 数据目录与输出路径
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data')
os.makedirs(DATA_DIR, exist_ok=True)
JSON_PATH = os.path.join(DATA_DIR, 'TeacherCount.json') # 修改为教师数的JSON路径
JSON_PATH = os.path.join(DATA_DIR, 'TeacherCount.json')
file_name = EXCEL_PATH
teacher_data = []
name_conversion_errors = [] # 记录转换失败的名称
conversion_records = [] # 定义转换记录变量
# Excel配置
SHEET_NAME = '教职工数、专任教师数'
REGION_NAME_COLUMN = 'B' # 区域名称所在列
START_ROW = 5 # 数据起始行从第5行开始
try:
# 加载工作簿并选择教职工数、专任教师数Sheet
workbook = openpyxl.load_workbook(file_name, data_only=True)
if '教职工数、专任教师数' not in workbook.sheetnames:
print("❌ 错误:未找到'教职工数、专任教师数'Sheet")
exit(1)
sheet = workbook['教职工数、专任教师数']
# 定义数据列范围与英文属性映射
data_columns = {
# 学前教育每年份8列教职工总数/城区/镇区/乡村,专任教师总数/城区/镇区/乡村)
# 教育阶段数据列配置
EDUCATION_STAGES = {
'preschool_teachers': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'columns': [
@@ -42,8 +38,6 @@ try:
{'year': 2024, 'total_staff': 'BX', 'urban_staff': 'BY', 'town_staff': 'BZ', 'rural_staff': 'CA', 'total_teacher': 'CB', 'urban_teacher': 'CC', 'town_teacher': 'CD', 'rural_teacher': 'CE'}
]
},
# 小学教育
'primary_teachers': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'columns': [
@@ -59,8 +53,6 @@ try:
{'year': 2024, 'total_staff': 'EZ', 'urban_staff': 'FA', 'town_staff': 'FB', 'rural_staff': 'FC', 'total_teacher': 'FD', 'urban_teacher': 'FE', 'town_teacher': 'FF', 'rural_teacher': 'FG'}
]
},
# 初中教育
'junior_high_teachers': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'columns': [
@@ -76,8 +68,6 @@ try:
{'year': 2024, 'total_staff': 'IB', 'urban_staff': 'IC', 'town_staff': 'ID', 'rural_staff': 'IE', 'total_teacher': 'IF', 'urban_teacher': 'IG', 'town_teacher': 'IH', 'rural_teacher': 'II'}
]
},
# 高中教育
'senior_high_teachers': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'columns': [
@@ -93,8 +83,6 @@ try:
{'year': 2024, 'total_staff': 'LD', 'urban_staff': 'LE', 'town_staff': 'LF', 'rural_staff': 'LG', 'total_teacher': 'LH', 'urban_teacher': 'LI', 'town_teacher': 'LJ', 'rural_teacher': 'LK'}
]
},
# 中职教育特殊每年2列
'vocational_teachers': {
'years': [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024],
'columns': [
@@ -112,76 +100,35 @@ try:
}
}
# 遍历数据行跳过前4行表头从第5行开始
for row_num, row in enumerate(sheet.iter_rows(min_row=5, values_only=True), start=5):
# 区域名称从B列获取索引1
raw_name = row[1] if (len(row) > 1 and row[1] is not None) else '未知地区'
if not raw_name: # 跳过空行
continue
# 区域名称转换
str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区'
area_info = query_area_info(str_raw_name)
if area_info and isinstance(area_info, dict) and 'full_name' in area_info and 'area_code' in area_info:
area_name = area_info['full_name']
area_code = area_info['area_code']
if raw_name != area_name:
conversion_records.append({
'row': row_num,
'raw_name': raw_name,
'converted_name': area_name
})
else:
area_name = raw_name
area_code = 'unknown'
name_conversion_errors.append(f"{row_num}: '{raw_name}'")
def extract_stage_data(row: Tuple[Any, ...], year_config: Dict[str, str]) -> Dict[str, int]:
"""提取单个教育阶段单一年份的教师数据
area_data = {
'area_name': area_name,
'area_code': area_code,
'raw_name': raw_name
}
Args:
row: 工作表行数据
year_config: 年份配置字典
# 提取各教育阶段教师数据
for stage, config in data_columns.items():
stage_data = {}
for year_config in config['columns']:
year = year_config['year']
Returns:
包含教职工和教师数据的字典
"""
year_data = {}
# 处理教职工数据
staff_cols = ['total_staff', 'urban_staff', 'town_staff', 'rural_staff']
has_staff_categories = all(col in year_config for col in staff_cols)
if has_staff_categories:
# 处理分类职工数据
# 处理分类职工数据
for col in staff_cols:
col_name = year_config[col]
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
if col_idx < len(row):
value = row[col_idx]
# 数据清洗与转换
if value is None:
year_data[col] = 0
col_idx = column_index_from_string(col_name) - 1
value = row[col_idx] if col_idx < len(row) else None
year_data[col] = process_value(value)
else:
str_value = str(value).strip()
if str_value in ['', '####']:
year_data[col] = 0
else:
year_data[col] = int(str_value) if str_value.isdigit() else 0
else:
# 处理中职教职工总数
# 处理中职职工总数
col_name = year_config['total_staff']
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
if col_idx < len(row):
value = row[col_idx]
if value is None:
year_data['total_staff'] = 0
else:
str_value = str(value).strip()
if str_value in ['', '####']:
year_data['total_staff'] = 0
else:
year_data['total_staff'] = int(str_value) if str_value.isdigit() else 0
col_idx = column_index_from_string(col_name) - 1
value = row[col_idx] if col_idx < len(row) else None
year_data['total_staff'] = process_value(value)
# 处理专任教师数据
teacher_cols = ['total_teacher', 'urban_teacher', 'town_teacher', 'rural_teacher']
@@ -191,65 +138,94 @@ try:
# 处理分类专任教师数据
for col in teacher_cols:
col_name = year_config[col]
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
if col_idx < len(row):
value = row[col_idx]
# 数据清洗与转换
if value is None:
year_data[col] = 0
else:
str_value = str(value).strip()
if str_value in ['', '####']:
year_data[col] = 0
else:
year_data[col] = int(str_value) if str_value.isdigit() else 0
col_idx = column_index_from_string(col_name) - 1
value = row[col_idx] if col_idx < len(row) else None
year_data[col] = process_value(value)
else:
# 处理中职专任教师总数
col_name = year_config['total_teacher']
col_idx = openpyxl.utils.column_index_from_string(col_name) - 1
if col_idx < len(row):
value = row[col_idx]
if value is None:
year_data['total_teacher'] = 0
else:
str_value = str(value).strip()
if str_value in ['', '####']:
year_data['total_teacher'] = 0
else:
year_data['total_teacher'] = int(str_value) if str_value.isdigit() else 0
col_idx = column_index_from_string(col_name) - 1
value = row[col_idx] if col_idx < len(row) else None
year_data['total_teacher'] = process_value(value)
stage_data[str(year)] = year_data
return year_data
def extract_teacher_data(sheet: Worksheet) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]], List[str], int]:
"""提取所有区域的教师数据
Args:
sheet: 工作表对象
Returns:
教师数据列表、转换记录列表、错误列表、处理总数
"""
teacher_data = []
conversion_records = []
name_conversion_errors = []
processed_count = 0
region_col_index = column_index_from_string(REGION_NAME_COLUMN) - 1
print(f"✅ 开始处理教师数据,共{sheet.max_row}行数据")
# 遍历行数据
for row_idx, row in enumerate(sheet.iter_rows(min_row=START_ROW, values_only=True), start=START_ROW):
processed_count += 1
# 区域名称处理
raw_name = row[region_col_index] if (len(row) > region_col_index and row[region_col_index] is not None) else '未知地区'
if not raw_name: # 跳过空行
continue
# 区域名称转换
area_name, area_code, new_conversion, new_errors = convert_area_name(raw_name, row_idx)
conversion_records.extend(new_conversion)
name_conversion_errors.extend(new_errors)
# 创建区域数据对象
area_data = {
'area_name': area_name,
'area_code': area_code,
'raw_name': str(raw_name).strip(),
}
# 提取各教育阶段数据
for stage, config in EDUCATION_STAGES.items():
stage_data = {}
for year_config in config['columns']:
year = year_config['year']
stage_data[str(year)] = extract_stage_data(row, year_config)
area_data[stage] = stage_data
teacher_data.append(area_data)
workbook.close()
return teacher_data, conversion_records, name_conversion_errors, processed_count
# 保存JSON文件
with open(JSON_PATH, 'w', encoding='utf-8') as f:
json.dump(teacher_data, f, ensure_ascii=False, indent=2)
# 输出统计信息
print(f"✅ 教师数据提取完成,已保存至:{JSON_PATH}")
print(f"📊 共处理 {len(teacher_data)} 条地区数据")
print("\n=== 名称转换记录 ===")
if conversion_records:
for record in conversion_records:
print(f"🔄 行 {record['row']}: {record['raw_name']}{record['converted_name']}")
print(f"📊 共检测到 {len(conversion_records)} 项名称转换")
else:
print("📝 不存在名称转换的情况")
if name_conversion_errors:
print(f"⚠️ 发现 {len(name_conversion_errors)} 个区域名称转换失败:")
for error in name_conversion_errors:
print(f" - {error}")
def main() -> None:
"""主函数:教师数据提取主流程"""
try:
# 初始化目录
init_directories(DATA_DIR)
# 加载工作表
sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME)
if not sheet:
print("❌ 无法加载工作表,程序退出")
return
# 提取教师数据
teacher_data, conversion_records, name_conversion_errors, processed_count = extract_teacher_data(sheet)
# 保存数据到JSON
save_to_json(teacher_data, JSON_PATH)
# 打印转换统计
print_conversion_stats(conversion_records, name_conversion_errors)
except FileNotFoundError:
print(f"🔴 错误Excel文件 '{file_name}' 不存在")
except Exception as e:
print(f"🔴 处理数据时发生错误:{str(e)}{traceback.format_exc()}")
finally:
try:
workbook.close()
except:
pass
if __name__ == "__main__":
main()