From c1cf4c581695c46551669e7031ce6b9a09c6466c Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Wed, 10 Sep 2025 13:53:34 +0800 Subject: [PATCH] 'commit' --- Tools/T1_RenKou.py | 124 +++++------------ Tools/T2_MaoRuXueLv.py | 160 ++++------------------ Util/DataUtil.py | 152 ++++++++++++++++++++ Util/__pycache__/DataUtil.cpython-310.pyc | Bin 0 -> 4353 bytes 4 files changed, 216 insertions(+), 220 deletions(-) create mode 100644 Util/DataUtil.py create mode 100644 Util/__pycache__/DataUtil.cpython-310.pyc diff --git a/Tools/T1_RenKou.py b/Tools/T1_RenKou.py index 6da2301..06f4716 100644 --- a/Tools/T1_RenKou.py +++ b/Tools/T1_RenKou.py @@ -1,9 +1,12 @@ import openpyxl -import json import os from typing import List, Dict, Any + from Config.Config import EXCEL_PATH -from Util.AreaUtil import query_area_info +from Util.DataUtil import ( + init_directories, process_value, print_conversion_stats, + convert_area_name, save_to_json, load_workbook_sheet +) # ======================= 配置常量 ======================= DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data') @@ -22,124 +25,69 @@ DATA_COLUMNS = { 'birth_population': ('AP', 'AY') # 出生人口 } -# ======================= 工具函数 ======================= -def init_directories() -> None: - """初始化数据目录""" - os.makedirs(DATA_DIR, exist_ok=True) - - -def process_value(value: Any) -> int | float | int: - """处理单元格值,转换为合适的数值类型""" - if value is None or str(value).strip() == '': - return 0 - try: - if isinstance(value, str): - value = value.replace(',', '').strip() - return float(value) if '.' in str(value) else int(value) - except (ValueError, TypeError): - return 0 - # ======================= 核心逻辑 ======================= def extract_area_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> List[Dict[str, Any]]: """从工作表提取区域数据""" population_data: List[Dict[str, Any]] = [] conversion_records: List[Dict[str, str]] = [] name_conversion_errors: List[str] = [] - + + # 计算区域名称列索引 + region_col_idx = openpyxl.utils.column_index_from_string(REGION_NAME_COLUMN) - 1 + # 遍历数据行 for row_num in range(START_ROW, sheet.max_row + 1): row = sheet[row_num] - raw_name = str(row[openpyxl.utils.column_index_from_string(REGION_NAME_COLUMN)-1].value or '未知地区').strip() - if not raw_name: + + # 获取区域名称并转换 + raw_name = row[region_col_idx].value + area_name, area_code, conv_records, errors = convert_area_name(raw_name, row_num) + conversion_records.extend(conv_records) + name_conversion_errors.extend(errors) + + if not area_name: continue - - # 区域名称转换 - area_info = query_area_info(raw_name) - if area_info: - area_name = area_info['full_name'] - area_code = area_info['area_code'] - if raw_name != area_name: - conversion_records.append({ - 'row': row_num, - 'raw_name': raw_name, - 'converted_name': area_name - }) - else: - area_name = raw_name - area_code = 'unknown' - name_conversion_errors.append(f"行 {row_num}: '{raw_name}'") - + # 构建区域数据 area_data = { 'area_name': area_name, 'area_code': area_code, - 'raw_name': raw_name + 'raw_name': str(raw_name).strip() if raw_name else '未知地区' } - + # 提取各指标年度数据 for metric, (start_col, end_col) in DATA_COLUMNS.items(): start_idx = openpyxl.utils.column_index_from_string(start_col) - 1 end_idx = openpyxl.utils.column_index_from_string(end_col) - 1 year_data = {} - + for col_idx, year in zip(range(start_idx, end_idx + 1), YEAR_RANGE): cell_value = row[col_idx].value year_data[str(year)] = process_value(cell_value) - + area_data[metric] = year_data - + population_data.append(area_data) - + # 输出转换统计 print_conversion_stats(conversion_records, name_conversion_errors) return population_data - -def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None: - """打印名称转换统计信息""" - print("\n=== 名称转换记录 ===") - for record in conversion_records: - print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}") - print(f"📊 共检测到 {len(conversion_records)} 项名称转换") - - if errors: - print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:") - for error in errors: - print(f" - {error}") - else: - print("✅ 所有区域名称均成功转换为全称") - # ======================= 主函数 ======================= def main() -> None: """人口数据提取主函数""" - init_directories() - try: - # 加载工作簿 - workbook = openpyxl.load_workbook(EXCEL_PATH, read_only=True, data_only=True) - if SHEET_NAME not in workbook.sheetnames: - print(f"❌ 错误:未找到'{SHEET_NAME}'工作表") - return - - # 提取并处理数据 - sheet = workbook[SHEET_NAME] - population_data = extract_area_data(sheet) - - # 保存结果 - with open(JSON_PATH, 'w', encoding='utf-8') as f: - json.dump(population_data, f, ensure_ascii=False, indent=2) - - print(f"✅ 人口数据提取完成,已保存至:{JSON_PATH}") - print(f"📊 共处理 {len(population_data)} 条地区数据") - - except FileNotFoundError: - print(f"🔴 错误:Excel文件 '{EXCEL_PATH}' 不存在") - except Exception as e: - print(f"🔴 处理数据时发生错误:{str(e)}") - finally: - try: - workbook.close() - except: - pass + init_directories(DATA_DIR) + + # 加载工作表 + sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME) + if not sheet: + return + + # 提取并处理数据 + population_data = extract_area_data(sheet) + + # 保存结果 + save_to_json(population_data, JSON_PATH) if __name__ == '__main__': main() \ No newline at end of file diff --git a/Tools/T2_MaoRuXueLv.py b/Tools/T2_MaoRuXueLv.py index 3f64b35..2025f2f 100644 --- a/Tools/T2_MaoRuXueLv.py +++ b/Tools/T2_MaoRuXueLv.py @@ -4,21 +4,17 @@ import os from typing import List, Dict, Any, Tuple from Config.Config import EXCEL_PATH -from Util.AreaUtil import query_area_info +from Util.DataUtil import ( + init_directories, process_value, print_conversion_stats, + convert_area_name, save_to_json, load_workbook_sheet +) # ======================= 配置常量 ======================= -"""数据提取配置""" -# 数据保存目录 DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'Data') -# JSON输出路径 JSON_PATH = os.path.join(DATA_DIR, 'MaoRuXueLv.json') -# 工作表名称 SHEET_NAME = '毛入学率' -# 数据起始行 START_ROW = 5 -# 区域名称所在列 REGION_NAME_COLUMN = 'B' -# 年份范围 YEAR_RANGE = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024] # 数据列映射配置 @@ -74,78 +70,9 @@ DATA_COLUMNS = { } } -# ======================= 工具函数 ======================= -def init_directories() -> None: - """初始化数据目录 - 创建数据保存目录,如果目录已存在则不执行操作 - """ - os.makedirs(DATA_DIR, exist_ok=True) - - -def process_value(value: Any) -> int | float | int: - """处理单元格值,转换为合适的数值类型 - - Args: - value: 原始单元格值 - - Returns: - int | float | int: 转换后的数值,无法转换时返回0 - """ - if value is None: - return 0 - - # 统一转换为字符串处理 - str_value = str(value).strip() - if str_value == '' or str_value == '####': - return 0 - - try: - if '%' in str_value: - # 移除百分号并转换为小数 - return float(str_value.replace('%', '')) - elif '.' in str_value: - return float(str_value) - else: - return int(str_value) - except (ValueError, TypeError): - return 0 - - -def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None: - """打印名称转换统计信息 - - Args: - conversion_records: 转换记录列表 - errors: 错误信息列表 - """ - print("\n=== 名称转换记录 ===") - if conversion_records: - for record in conversion_records: - print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}") - print(f"📊 共检测到 {len(conversion_records)} 项名称转换") - else: - print("📝 不存在名称转换的情况") - - if errors: - print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:") - for error in errors: - print(f" - {error}") - else: - print("✅ 所有区域名称均成功转换为全称") - # ======================= 核心逻辑 ======================= def extract_enrollment_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]], List[str]]: - """从工作表提取毛入学率数据 - - Args: - sheet: 毛入学率工作表对象 - - Returns: - Tuple包含: - - enrollment_data: 提取的毛入学率数据列表 - - conversion_records: 名称转换记录 - - name_conversion_errors: 名称转换错误列表 - """ + """从工作表提取毛入学率数据""" enrollment_data: List[Dict[str, Any]] = [] conversion_records: List[Dict[str, str]] = [] name_conversion_errors: List[str] = [] @@ -157,34 +84,20 @@ def extract_enrollment_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> Tu for row_num, row in enumerate(sheet.iter_rows(min_row=START_ROW, values_only=True), start=START_ROW): # 获取区域名称 raw_name = row[region_col_idx] if (len(row) > region_col_idx and row[region_col_idx] is not None) else '未知地区' - if not raw_name: + + # 转换区域名称 + area_name, area_code, conv_records, errors = convert_area_name(raw_name, row_num) + conversion_records.extend(conv_records) + name_conversion_errors.extend(errors) + + if not area_name: continue - # 区域名称转换 - str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区' - area_info = query_area_info(str_raw_name) - - if area_info: - area_name = area_info['full_name'] - area_code = area_info['area_code'] - - # 记录名称转换 - if str_raw_name != area_name: - conversion_records.append({ - 'row': row_num, - 'raw_name': str_raw_name, - 'converted_name': area_name - }) - else: - area_name = str_raw_name - area_code = 'unknown' - name_conversion_errors.append(f"行 {row_num}: '{str_raw_name}'") - # 创建区域数据对象 area_data = { 'area_name': area_name, 'area_code': area_code, - 'raw_name': str_raw_name # 保留原始名称用于调试 + 'raw_name': str(raw_name).strip() if raw_name else '未知地区' } # 提取各指标年度数据 @@ -205,38 +118,21 @@ def extract_enrollment_data(sheet: openpyxl.worksheet.worksheet.Worksheet) -> Tu # ======================= 主函数 ======================= def main() -> None: """主函数:执行毛入学率数据提取流程""" - try: - # 初始化目录 - init_directories() - - # 加载工作簿并选择工作表 - workbook = openpyxl.load_workbook(EXCEL_PATH, read_only=True) - - if SHEET_NAME not in workbook.sheetnames: - print(f"❌ 错误:未找到'{SHEET_NAME}'Sheet") - return - - sheet = workbook[SHEET_NAME] - - # 提取数据 - enrollment_data, conversion_records, name_conversion_errors = extract_enrollment_data(sheet) - - # 关闭工作簿释放资源 - workbook.close() - - # 保存为JSON文件 - with open(JSON_PATH, 'w', encoding='utf-8') as f: - json.dump(enrollment_data, f, ensure_ascii=False, indent=2) - - # 输出结果统计 - print(f"✅ 毛入学率数据提取完成,已保存至:{JSON_PATH}") - print(f"📊 共处理 {len(enrollment_data)} 条地区数据") - print_conversion_stats(conversion_records, name_conversion_errors) - - except FileNotFoundError: - print(f"🔴 错误:Excel文件 '{EXCEL_PATH}' 不存在") - except Exception as e: - print(f"🔴 处理数据时发生错误:{str(e)}") + init_directories(DATA_DIR) + + # 加载工作表 + sheet = load_workbook_sheet(EXCEL_PATH, SHEET_NAME) + if not sheet: + return + + # 提取数据 + enrollment_data, conversion_records, name_conversion_errors = extract_enrollment_data(sheet) + + # 保存结果 + save_to_json(enrollment_data, JSON_PATH) + + # 输出转换统计 + print_conversion_stats(conversion_records, name_conversion_errors) if __name__ == '__main__': main() diff --git a/Util/DataUtil.py b/Util/DataUtil.py new file mode 100644 index 0000000..96945ed --- /dev/null +++ b/Util/DataUtil.py @@ -0,0 +1,152 @@ +import os +import json +import openpyxl +from typing import List, Dict, Any, Tuple, Optional + +"""数据处理通用工具函数""" + +def init_directories(data_dir: str) -> None: + """初始化数据目录 + + Args: + data_dir: 数据保存目录路径 + """ + os.makedirs(data_dir, exist_ok=True) + + +def process_value(value: Any) -> int | float: + """处理单元格值,转换为合适的数值类型 + + 支持处理空值、空字符串、####、带百分号的值和千分位逗号 + + Args: + value: 原始单元格值 + + Returns: + int | float: 转换后的数值,无法转换时返回0 + """ + if value is None: + return 0 + + # 统一转换为字符串处理 + str_value = str(value).strip() + if str_value == '' or str_value == '####': + return 0 + + try: + # 处理百分号 + if '%' in str_value: + return float(str_value.replace('%', '')) + # 处理千分位逗号 + if ',' in str_value: + str_value = str_value.replace(',', '') + # 区分整数和浮点数 + return float(str_value) if '.' in str_value else int(str_value) + except (ValueError, TypeError): + return 0 + + +def print_conversion_stats(conversion_records: List[Dict[str, str]], errors: List[str]) -> None: + """打印名称转换统计信息 + + Args: + conversion_records: 转换记录列表 + errors: 错误信息列表 + """ + print("\n=== 名称转换记录 ===") + if conversion_records: + for record in conversion_records: + print(f"🔄 行 {record['row']}: {record['raw_name']} → {record['converted_name']}") + print(f"📊 共检测到 {len(conversion_records)} 项名称转换") + else: + print("📝 不存在名称转换的情况") + + if errors: + print(f"⚠️ 发现 {len(errors)} 个区域名称转换失败:") + for error in errors: + print(f" - {error}") + else: + print("✅ 所有区域名称均成功转换为全称") + + +def convert_area_name(raw_name: Any, row_num: int) -> Tuple[str, str, List[Dict[str, str]], List[str]]: + """转换区域名称并记录转换结果 + + Args: + raw_name: 原始区域名称 + row_num: 行号 + + Returns: + Tuple包含: + - area_name: 转换后的区域名称 + - area_code: 区域代码 + - conversion_records: 更新的转换记录列表 + - name_conversion_errors: 更新的错误列表 + """ + from Util.AreaUtil import query_area_info + + conversion_records: List[Dict[str, str]] = [] + name_conversion_errors: List[str] = [] + + # 确保raw_name为字符串类型 + str_raw_name = str(raw_name).strip() if raw_name is not None else '未知地区' + if not str_raw_name: + str_raw_name = '未知地区' + + # 查询区域信息 + area_info = query_area_info(str_raw_name) + if area_info: + area_name = area_info['full_name'] + area_code = area_info['area_code'] + + # 记录名称转换 + if str_raw_name != area_name: + conversion_records.append({ + 'row': row_num, + 'raw_name': str_raw_name, + 'converted_name': area_name + }) + else: + area_name = str_raw_name + area_code = 'unknown' + name_conversion_errors.append(f"行 {row_num}: '{str_raw_name}'") + + return area_name, area_code, conversion_records, name_conversion_errors + + +def save_to_json(data: List[Dict[str, Any]], json_path: str) -> None: + """保存数据到JSON文件 + + Args: + data: 要保存的数据 + json_path: JSON文件路径 + """ + with open(json_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + print(f"✅ 数据提取完成,已保存至:{json_path}") + print(f"📊 共处理 {len(data)} 条地区数据") + + +def load_workbook_sheet(excel_path: str, sheet_name: str) -> Optional[openpyxl.worksheet.worksheet.Worksheet]: + """加载工作簿并选择指定工作表 + + Args: + excel_path: Excel文件路径 + sheet_name: 工作表名称 + + Returns: + 工作表对象,若加载失败则返回None + """ + try: + workbook = openpyxl.load_workbook(excel_path, read_only=True, data_only=True) + if sheet_name not in workbook.sheetnames: + print(f"❌ 错误:未找到'{sheet_name}'工作表") + workbook.close() + return None + return workbook[sheet_name] + except FileNotFoundError: + print(f"🔴 错误:Excel文件 '{excel_path}' 不存在") + return None + except Exception as e: + print(f"🔴 加载工作簿时发生错误:{str(e)}") + return None \ No newline at end of file diff --git a/Util/__pycache__/DataUtil.cpython-310.pyc b/Util/__pycache__/DataUtil.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1b54fc9851486f10d796c37aca796cdc3eccc364 GIT binary patch literal 4353 zcmZ`+`*YmX71q^jwd=LxIF3mmwBi}I)1=T&OUFQ4Xn;)%(4BK>eww4SVuUE^J z6l0Ivac~|Eu@e{afW(fyh43mQ4zC1~km*0rKR`c%wQK(bKWEy~?_90D>j2W|>ge8c z&V7C7+)qwxYf8ekqvK!C{&|BWeL)}1R~jF`!sGr2g(X?6C}ra}%QEpTXJx$QqB5vv z)j=(*4eD8aFp*6R8d-x$Ny$>|q@`M{U$rz#->+sIaT)o!uVk4dOCtR!k%t(2|W znw_+h{qkmMvt+g4JBc^h>Q>rng;tx@2HSOZ3+xl5yVfSQ=vOyOuxPi|!(yG)QPhB7!7pgl7 zeO{K@^IK*C>1=KGFly8uw4W8u8g8X z=WTd}+TMK8x7Vx$H7D_im6(P=tKdsqx8z=|N+6B1c~+4|S->o1Kz2LvRtIF>MeQT^ zSX#jLO8jn54%oZ`bSV{iob8f&B}*4A+eN)j^`p#6h`I+!JJ#~l6-kvm%>zD$pJWo< zPXp1t(vXxf{EgVI==J@z>4VYKiRi$1b^dBJcJ<4vlS|j$txo-8@$#q9^u+Sm3$@ex zvF>Q9UAz*VJ<`~v>hTNJ$$hbl+S{K(tm*2A`N6q53spYX#sBbu4yZvHyyUpn696_dOs11WZ2rM>1rcm$hKz^ zq2hWx)KC=4p(ebC32v8*`93>T&}W$X6G7o|&K({mcMg|hF(dPJ@R&`)GAE`ZKo}RT z)K_bnJAJn6#;xm7Tt^qb({Kk`v`<97lq|2AW!4N10cs40~C2CkakFa**GerP7Y*C*({wwa)P7@Nr#jm zX;vqT=M3l>9s+xuK`%M0P;E&?ZAy4#{7J|y&_EB+NXHrhLodlu1#PW?0&jpcygg*C zqmZ!L=!@h`yT2a;tR8+jnp%jar)qC5#M@N6GPiWTvUt5x-FKm}_iMrO&E;6x|E2Q_ z$T88xsin$W!rH_YuT;0WeEij=3m0P7O)5UEaq5vr9xG>}&A2Xw4f3F>dzweX3 z#mnzRlb=R&bIoSa>lc?kJon|*(<7>BZZrMuWIS`gtR5b#&Kz#mk7f^5C#Itp=Ng&e zz+2cBO0rs+6Y!OyUCOAOK+A7OLFWUZG1SBo-;DPzVU=szE!WF??gOxNH=!^RDl^!9 ztP7A=5g0IsGzlS=YIU(>Itg%112IjOMl;%a=eFXxYJw<7djpF=yMjBP{sK~{N z&v+;DCFNnIB9)MRN3}q6ZxwGj(8k%Q9_ajMflfg%n!sDf+X%EEF`)3D2Fg5gEPYvt z8KnZ~mf8btG+`OgQRh`F84&bSfrheWRwIod5g36IsQoHHp7!&|GWFEdv^UWwAIJDw z!TihBvoqg?_7#||X2&K+s~E4Z6XKvUxhBniBWsG&KAJobP5-lT`ffAx+|CQku7pyY zN}Ca~+F4z@&#`O_N(^W5%4@axeN7royIxOOIN8s9SUtG_B*)YK)?nRcQb;v}W+i-A zt>ZXuieQn-K@8SC=i7WZC#F&;^*cR2(SYjAJGHrU(aZuynoaim#iE#Jm=t<6iHyR( z2SounZbAwZercfO43&IJsTB4jO1HW72HTp^vaNzNJc{)qfo~*lm^{+%i=cvE03u<+F4rWloUdh7+9FxlGHI7&B~&oQaRp&xJRrHZ4Iw;Ydcq`6u?z~H_T)P8JY@k{KmdIixlJr6m%7z ztF!Dj8{V2|X2^y{G~esr_H)5EOF&7X^vn~>blM^xrtOmJb32!J`wE3nFO)31Cgwr$3BsTm7^i1SM_tDI$Mds8c&!gy8}pt zDh(RjdC=Hs+RG3=UvV&ZD77_y=03NGVAxB<{_*uqRb&3OcA?Z~QgNrY0^=a3( zuJ+a==nm-2hYjVXshL~-7SeB!5T8<*(7$OhL8ys;9G{ExF*?l{p+QhB4?kZFTfxh% z+z@`g{^~dbbzD(~as_%rt*_{~cDVkDLecJVyeAyLWIY~pY8sFD85MaIjN>5;fj^8Q zG*%rl`)06jSkq%pY|9HUaw)mXsdknE!7#w9Q!EWhXKkzlk1;0yk3R{OOed$c;tx{s zOBA8*4VMx3oWh<{4&)RDocJEssStUe2pK0r!@H;;IvG;C~LRkpzZ literal 0 HcmV?d00001