This commit is contained in:
2025-09-12 21:31:45 +08:00
parent 14df704f6c
commit a3813e77d6
13 changed files with 356 additions and 815 deletions

View File

@@ -9,16 +9,18 @@ router = APIRouter(prefix="/RuYuanZaiYuan", tags=["入校、在校人数统计"]
@router.get("/school/chart") @router.get("/school/chart")
async def get_education_chart_config( async def get_education_chart_config(
education_stage: str = Query(default="preschool", pattern="^(preschool|primary|junior|senior|vocational)$", education_stage: str = Query(default="preschool", pattern="^(preschool|primary|junior|senior|vocational)$",
description="教育阶段: preschool(学前), primary(小学), junior(初中), senior(高中), vocational(中职)") description="教育阶段: preschool(学前), primary(小学), junior(初中), senior(高中), vocational(中职)"),
area_name: str = Query(default="云南省", description="区域名称,默认为云南省")
): ):
return RuYuanZaiYuanModel.generate_preschool_education_config(education_stage) return RuYuanZaiYuanModel.generate_preschool_education_config(education_stage, area_name)
@router.get("/school/inschool/chart") @router.get("/school/inschool/chart")
async def get_in_school_chart_config( async def get_in_school_chart_config(
education_stage: str = Query(default="preschool", pattern="^(preschool|primary|junior|senior|vocational)$", education_stage: str = Query(default="preschool", pattern="^(preschool|primary|junior|senior|vocational)$",
description="教育阶段: preschool(学前), primary(小学), junior(初中), senior(高中), vocational(中职)") description="教育阶段: preschool(学前), primary(小学), junior(初中), senior(高中), vocational(中职)"),
area_name: str = Query(default="云南省", description="区域名称,默认为云南省")
): ):
return RuYuanZaiYuanModel.generate_in_school_education_config(education_stage) return RuYuanZaiYuanModel.generate_in_school_education_config(education_stage, area_name)

View File

@@ -1,4 +1,6 @@
import json import json
import os
from Util.YuCeUtil import YuCeUtil
class RuYuanZaiYuanModel: class RuYuanZaiYuanModel:
@@ -28,46 +30,118 @@ class RuYuanZaiYuanModel:
return [], [] return [], []
@staticmethod @staticmethod
def generate_preschool_education_config(education_stage='preschool'): def generate_preschool_education_config(education_stage='preschool', area_name='云南省'):
# 验证教育阶段参数 # 验证教育阶段参数
if education_stage not in RuYuanZaiYuanModel.EDUCATION_STAGES: if education_stage not in RuYuanZaiYuanModel.EDUCATION_STAGES:
education_stage = 'preschool' # 默认使用学前 education_stage = 'preschool' # 默认使用学前
# 获取学前教育相关数据 # 获取学前教育相关数据
enrollment_data, in_school_data = RuYuanZaiYuanModel.load_student_data() enrollment_data, in_school_data = RuYuanZaiYuanModel.load_student_data()
# 提取云南省级数据 # 提取指定区域数据
yunnan_enroll = next((item for item in enrollment_data if item["area_name"] == "云南省"), None) area_enroll = next((item for item in enrollment_data if item["area_name"] == area_name), None)
if not yunnan_enroll: if not area_enroll:
return {} return {}
# # 构建学前教育数据 # 构建学前教育数据
urban_data = [] # 城区数据 urban_data = [] # 城区数据
town_data = [] # 镇区数据 town_data = [] # 镇区数据
rural_data = [] # 乡村数据 rural_data = [] # 乡村数据
total_enroll = [] # 总人数 total_enroll = [] # 总人数
# 提取年份数据(2015-2024) # 提取年份数据(2015-2035)
years = [str(year) for year in range(2015, 2025)] years = [str(year) for year in range(2015, 2036)]
# 初始化预测工具(只针对学前教育进行预测)
forecast_util = None
forecast_results = {}
forecast_urban_enrollment = {}
if education_stage == 'preschool':
# 获取数据目录
data_directory = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'Data')
# 创建预测实例,使用传入的区域名称
forecast_util = YuCeUtil(data_directory, area_name)
# 运行预测
forecast_util.run_forecast()
# 获取预测结果
forecast_results = forecast_util.forecast_results
forecast_urban_enrollment = forecast_util.forecast_urban_enrollment
for year in years: for year in years:
# 使用传入的教育阶段参数 # 使用传入的教育阶段参数
enroll_data = yunnan_enroll["education_data"].get(education_stage, {}).get(year, {}) if int(year) <= 2024:
# 2015-2024年使用实际数据
enroll_data = area_enroll["education_data"].get(education_stage, {}).get(year, {})
# 特殊处理中职数据格式只有total字段 # 特殊处理中职数据格式只有total字段
if education_stage == 'vocational': if education_stage == 'vocational':
total_value = enroll_data.get("total", 0) total_value = enroll_data.get("total", 0)
urban_data.append(0) # 中职没有城区数据 urban_data.append(0) # 中职没有城区数据
town_data.append(0) # 中职没有镇区数据 town_data.append(0) # 中职没有镇区数据
rural_data.append(0) # 中职没有乡村数据 rural_data.append(0) # 中职没有乡村数据
total_enroll.append(total_value / 10000) # 转换为万人 total_enroll.append(total_value / 10000) # 转换为万人
else:
urban_data.append(enroll_data.get("urban", 0) / 10000) # 转换为万人
town_data.append(enroll_data.get("town", 0) / 10000) # 转换为万人
rural_data.append(enroll_data.get("rural", 0) / 10000) # 转换为万人
# 计算总和作为总人数
calculated_total = enroll_data.get("urban", 0) + enroll_data.get("town", 0) + enroll_data.get("rural", 0)
total_enroll.append(calculated_total / 10000) # 转换为万人
else: else:
urban_data.append(enroll_data.get("urban", 0) / 10000) # 转换为万人 # 2025-2035年使用预测数据
town_data.append(enroll_data.get("town", 0) / 10000) # 转换为万人 if education_stage == 'preschool' and forecast_util:
rural_data.append(enroll_data.get("rural", 0) / 10000) # 转换为万人 if year in forecast_results:
# 计算总和作为总人数 total_value = forecast_results[year]
calculated_total = enroll_data.get("urban", 0) + enroll_data.get("town", 0) + enroll_data.get("rural", 0) # 获取城区招生数
total_enroll.append(calculated_total / 10000) # 转换为万人 urban_value = forecast_urban_enrollment.get(int(year), {}).get('urban', 0)
remaining_value = total_value - urban_value
# 假设乡镇和农村按历史比例分配剩余部分
# 查找最近一年的乡镇和农村比例
recent_year = str(int(year) - 1)
if recent_year in area_enroll["education_data"].get(education_stage, {}):
recent_data = area_enroll["education_data"].get(education_stage, {}).get(recent_year, {})
recent_town = recent_data.get("town", 0)
recent_rural = recent_data.get("rural", 0)
recent_remaining = recent_town + recent_rural
if recent_remaining > 0:
town_value = int(remaining_value * (recent_town / recent_remaining))
rural_value = remaining_value - town_value
else:
town_value = int(remaining_value / 2)
rural_value = remaining_value - town_value
else:
town_value = int(remaining_value / 2)
rural_value = remaining_value - town_value
urban_data.append(urban_value / 10000)
town_data.append(town_value / 10000)
rural_data.append(rural_value / 10000)
total_enroll.append(total_value / 10000)
else:
# 没有预测数据,使用前一年数据
if len(total_enroll) > 0:
urban_data.append(urban_data[-1])
town_data.append(town_data[-1])
rural_data.append(rural_data[-1])
total_enroll.append(total_enroll[-1])
else:
urban_data.append(0)
town_data.append(0)
rural_data.append(0)
total_enroll.append(0)
else:
# 非学前教育或没有预测工具,使用前一年数据
if len(total_enroll) > 0:
urban_data.append(urban_data[-1])
town_data.append(town_data[-1])
rural_data.append(rural_data[-1])
total_enroll.append(total_enroll[-1])
else:
urban_data.append(0)
town_data.append(0)
rural_data.append(0)
total_enroll.append(0)
# 添加2022年基数的粉色折线 # 添加2022年基数的粉色折线
base_year = "2022" base_year = "2022"
@@ -75,7 +149,7 @@ class RuYuanZaiYuanModel:
base_index = years.index(base_year) if base_year in years else 0 base_index = years.index(base_year) if base_year in years else 0
# 获取2022年的总人数作为基数 # 获取2022年的总人数作为基数
base_value = total_enroll[base_index] if base_index < len(total_enroll) else 0 base_value = total_enroll[base_index] if base_index < len(total_enroll) else 0
# 创建2022年基数折线数据2022-2024年) # 创建2022年基数折线数据2022-2035年)
base_2022_line = [] base_2022_line = []
for i, year in enumerate(years): for i, year in enumerate(years):
# 只在2022年及之后显示基数线 # 只在2022年及之后显示基数线
@@ -95,7 +169,7 @@ class RuYuanZaiYuanModel:
return data return data
@staticmethod @staticmethod
def generate_in_school_education_config(education_stage='preschool'): def generate_in_school_education_config(education_stage='preschool', area_name='云南省'):
# 验证教育阶段参数 # 验证教育阶段参数
if education_stage not in RuYuanZaiYuanModel.EDUCATION_STAGES: if education_stage not in RuYuanZaiYuanModel.EDUCATION_STAGES:
education_stage = 'preschool' # 默认使用学前 education_stage = 'preschool' # 默认使用学前
@@ -103,10 +177,10 @@ class RuYuanZaiYuanModel:
# 获取在校生相关数据 # 获取在校生相关数据
enrollment_data, in_school_data = RuYuanZaiYuanModel.load_student_data() enrollment_data, in_school_data = RuYuanZaiYuanModel.load_student_data()
# 提取云南省级数据 # 提取指定区域数据
yunnan_in_school = next((item for item in in_school_data if item["area_name"] == "云南省"), None) area_in_school = next((item for item in in_school_data if item["area_name"] == area_name), None)
if not yunnan_in_school: if not area_in_school:
return {} return {}
# 构建在校生数据 # 构建在校生数据
@@ -115,27 +189,96 @@ class RuYuanZaiYuanModel:
rural_data = [] # 乡村数据 rural_data = [] # 乡村数据
total_in_school = [] # 总人数 total_in_school = [] # 总人数
# 提取年份数据(2015-2024) # 提取年份数据(2015-2035)
years = [str(year) for year in range(2015, 2025)] years = [str(year) for year in range(2015, 2036)]
# 初始化预测工具(只针对学前教育进行预测)
forecast_util = None
enrollment_in_school = {}
if education_stage == 'preschool':
# 获取数据目录
data_directory = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'Data')
# 创建预测实例,使用传入的区域名称
forecast_util = YuCeUtil(data_directory, area_name)
# 运行预测
forecast_util.run_forecast()
# 获取预测结果
enrollment_in_school = forecast_util.enrollment_in_school
for year in years: for year in years:
# 使用传入的教育阶段参数 # 使用传入的教育阶段参数
in_school_year_data = yunnan_in_school["student_data"].get(education_stage, {}).get(year, {}) if int(year) <= 2024:
# 2015-2024年使用实际数据
in_school_year_data = area_in_school["student_data"].get(education_stage, {}).get(year, {})
# 特殊处理中职数据格式只有total字段 # 特殊处理中职数据格式只有total字段
if education_stage == 'vocational': if education_stage == 'vocational':
total_value = in_school_year_data.get("total", 0) total_value = in_school_year_data.get("total", 0)
urban_data.append(0) # 中职没有城区数据 urban_data.append(0) # 中职没有城区数据
town_data.append(0) # 中职没有镇区数据 town_data.append(0) # 中职没有镇区数据
rural_data.append(0) # 中职没有乡村数据 rural_data.append(0) # 中职没有乡村数据
total_in_school.append(total_value / 10000) # 转换为万人 total_in_school.append(total_value / 10000) # 转换为万人
else:
urban_data.append(in_school_year_data.get("urban", 0) / 10000) # 转换为万人
town_data.append(in_school_year_data.get("town", 0) / 10000) # 转换为万人
rural_data.append(in_school_year_data.get("rural", 0) / 10000) # 转换为万人
# 计算总和作为总人数
calculated_total = in_school_year_data.get("urban", 0) + in_school_year_data.get("town", 0) + in_school_year_data.get("rural", 0)
total_in_school.append(calculated_total / 10000) # 转换为万人
else: else:
urban_data.append(in_school_year_data.get("urban", 0) / 10000) # 转换为万人 # 2025-2035年使用预测数据
town_data.append(in_school_year_data.get("town", 0) / 10000) # 转换为万人 if education_stage == 'preschool' and forecast_util:
rural_data.append(in_school_year_data.get("rural", 0) / 10000) # 转换为万人 if int(year) in enrollment_in_school:
# 计算总和作为总人数 total_value = enrollment_in_school[int(year)]
calculated_total = in_school_year_data.get("urban", 0) + in_school_year_data.get("town", 0) + in_school_year_data.get("rural", 0) # 获取城区比例(使用最近一年的城区比例)
total_in_school.append(calculated_total / 10000) # 转换为万人 recent_year = str(int(year) - 1)
if recent_year in area_in_school["student_data"].get(education_stage, {}):
recent_data = area_in_school["student_data"].get(education_stage, {}).get(recent_year, {})
recent_urban = recent_data.get("urban", 0)
recent_total = recent_data.get("urban", 0) + recent_data.get("town", 0) + recent_data.get("rural", 0)
if recent_total > 0:
urban_ratio = recent_urban / recent_total
else:
urban_ratio = 0.5
else:
urban_ratio = 0.5
# 计算城乡分布
urban_value = int(total_value * urban_ratio)
remaining_value = total_value - urban_value
# 假设乡镇和农村按5:5分配剩余部分
town_value = int(remaining_value * 0.5)
rural_value = remaining_value - town_value
urban_data.append(urban_value / 10000)
town_data.append(town_value / 10000)
rural_data.append(rural_value / 10000)
total_in_school.append(total_value / 10000)
else:
# 没有预测数据,使用前一年数据
if len(total_in_school) > 0:
urban_data.append(urban_data[-1])
town_data.append(town_data[-1])
rural_data.append(rural_data[-1])
total_in_school.append(total_in_school[-1])
else:
urban_data.append(0)
town_data.append(0)
rural_data.append(0)
total_in_school.append(0)
else:
# 非学前教育或没有预测工具,使用前一年数据
if len(total_in_school) > 0:
urban_data.append(urban_data[-1])
town_data.append(town_data[-1])
rural_data.append(rural_data[-1])
total_in_school.append(total_in_school[-1])
else:
urban_data.append(0)
town_data.append(0)
rural_data.append(0)
total_in_school.append(0)
# 添加2022年基数的粉色折线 # 添加2022年基数的粉色折线
base_year = "2022" base_year = "2022"
@@ -143,7 +286,7 @@ class RuYuanZaiYuanModel:
base_index = years.index(base_year) if base_year in years else 0 base_index = years.index(base_year) if base_year in years else 0
# 获取2022年的总人数作为基数 # 获取2022年的总人数作为基数
base_value = total_in_school[base_index] if base_index < len(total_in_school) else 0 base_value = total_in_school[base_index] if base_index < len(total_in_school) else 0
# 创建2022年基数折线数据2022-2024年) # 创建2022年基数折线数据2022-2035年)
base_2022_line = [] base_2022_line = []
for i, year in enumerate(years): for i, year in enumerate(years):
# 只在2022年及之后显示基数线 # 只在2022年及之后显示基数线

Binary file not shown.

View File

@@ -1,149 +0,0 @@
import os
class EducationResourceForecast:
def __init__(self):
# 数据文件路径
self.data_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'Data')
# 教育阶段映射
self.education_stages = {
'preschool': '学前',
'primary': '小学',
'junior': '初中',
'senior': '高中',
'vocational': '中职'
}
# 预测年份范围
self.forecast_years = list(range(2023, 2030)) # 2023-2029年
# 加载数据
self.teacher_data = self._load_data('TeacherCount.json')
self.school_area_data = self._load_data('SchoolArea.json')
# 存储预测结果
self.forecast_results = {stage: [] for stage in self.education_stages.keys()}
def _load_data(self, filename):
"""加载JSON数据文件"""
import json
file_path = os.path.join(self.data_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 数据文件是数组格式,我们只需要云南省的数据
for item in data:
if item.get('area_name') == '云南省':
return item
return {}
except Exception as e:
print(f"加载文件 {filename} 失败: {e}")
return {}
def _get_teacher_data_by_stage(self, stage):
"""获取特定教育阶段的教师数据"""
# 教师数据字段映射
teacher_field_map = {
'preschool': 'preschool_teachers',
'primary': 'primary_teachers',
'junior': 'junior_teachers',
'senior': 'senior_teachers',
'vocational': 'vocational_teachers'
}
field = teacher_field_map.get(stage)
if not field or field not in self.teacher_data:
return []
teacher_data = []
# 提取2015-2024年的专任教师数据
for year in range(2015, 2025):
year_data = self.teacher_data[field].get(str(year), {})
# 优先使用专任教师数量
teacher_count = year_data.get('total_teacher', 0)
teacher_data.append({
'year': year,
'teacher_count': teacher_count
})
return teacher_data
def _get_school_area_data_by_stage(self, stage):
"""获取特定教育阶段的学校面积数据"""
# 学校面积字段映射
area_field_map = {
'preschool': 'preschool_area',
'primary': 'primary_area',
'junior': 'junior_high_area', # 修正字段名
'senior': 'senior_high_area', # 修正字段名
'vocational': 'vocational_area'
}
field = area_field_map.get(stage)
if not field or field not in self.school_area_data:
return []
area_data = []
# 提取2015-2024年的学校面积数据
for year in range(2015, 2025):
year_data = self.school_area_data[field].get(str(year), {})
# 优先使用教学面积如果为0则使用学校总面积
teaching_area = year_data.get('teaching_total', 0)
if teaching_area == 0:
teaching_area = year_data.get('school_total', 0)
area_data.append({
'year': year,
'teaching_area': teaching_area
})
return area_data
def forecast(self):
"""执行预测"""
for stage in self.education_stages.keys():
# 获取历史数据
teacher_history = self._get_teacher_data_by_stage(stage)
area_history = self._get_school_area_data_by_stage(stage)
# 生成最终结果
for year in self.forecast_years:
if year <= 2024:
# 2023-2024年使用实际数据
# 查找实际教师数据
teacher_item = next((item for item in teacher_history if item['year'] == year), None)
teacher_count = teacher_item['teacher_count'] if teacher_item else 0
# 查找实际面积数据
area_item = next((item for item in area_history if item['year'] == year), None)
teaching_area = area_item['teaching_area'] if area_item else 0
else:
# 2025-2029年预测数据设为0
teacher_count = 0
teaching_area = 0
# 添加结果,师资缺口/富余字段暂时设为0
self.forecast_results[stage].append({
'year': year,
'teacher_count': round(teacher_count / 10000, 2), # 转换为万人
'teacher_gap': 0, # 师资缺口/富余暂时设为0
'teaching_area': round(teaching_area / 10000, 2), # 转换为万平方米
'is_forecast': year >= 2025 # 标记是否为预测数据
})
def print_results(self):
"""打印预测结果"""
for stage, chinese_name in self.education_stages.items():
print(f"\n{chinese_name}教育阶段资源配置预测2023-2029年:")
print("{:<8} {:<12} {:<16} {:<20} {:<10}".format("年份", "专任教师(万人)", "师资缺口/富余", "教学及辅助用房面积(万㎡)", "数据类型"))
for item in self.forecast_results[stage]:
data_type = "预测" if item['is_forecast'] else "实际"
print("{:<8} {:<12.2f} {:<16.2f} {:<20.2f} {:<10}".format(
item['year'],
item['teacher_count'],
item['teacher_gap'],
item['teaching_area'],
data_type
))
if __name__ == "__main__":
print("开始执行教育资源配置发展预测...")
forecast = EducationResourceForecast()
forecast.forecast()
forecast.print_results()
print("\n预测完成!")

View File

@@ -1,274 +0,0 @@
import json
import os
from datetime import datetime
class PopulationStatistics:
def __init__(self, data_file):
"""初始化人口统计类,加载数据文件"""
self.data_file = data_file
self.data = self.load_data()
self.years = list(range(2015, 2025)) # 2015-2024年
def load_data(self):
"""加载人口数据JSON文件"""
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"错误:找不到数据文件 {self.data_file}")
return []
except json.JSONDecodeError:
print(f"错误:数据文件 {self.data_file} 格式不正确")
return []
def get_province_data(self):
"""获取云南省全省数据"""
for item in self.data:
if item['area_name'] == '云南省' and item['area_code'] == '530000000':
return item
return None
def get_city_data(self, city_name):
"""获取指定城市的数据"""
for item in self.data:
if item['area_name'] == city_name:
return item
return None
def get_all_cities(self):
"""获取所有地市级城市列表"""
cities = []
# 地市级城市的area_code格式为53XX00000
for item in self.data:
if len(item['area_code']) == 9 and item['area_code'][-5:] == '00000' and item['area_code'] != '530000000':
cities.append(item)
return cities
def calculate_total_population_by_year(self, level='province'):
"""按年份计算总人口
level: 'province'(全省), 'city'(昆明市), 'all_cities'(所有地级市)
"""
result = {}
if level == 'province':
province_data = self.get_province_data()
if province_data:
result = province_data['total_population']
elif level == 'city':
city_data = self.get_city_data('昆明市')
if city_data:
result = city_data['total_population']
elif level == 'all_cities':
cities = self.get_all_cities()
for year in self.years:
total = 0
for city in cities:
if str(year) in city['total_population']:
total += city['total_population'][str(year)]
result[str(year)] = total
return result
def calculate_urbanization_trend(self, area_name='云南省'):
"""计算指定地区的城镇化率趋势"""
area_data = None
if area_name == '云南省':
area_data = self.get_province_data()
else:
area_data = self.get_city_data(area_name)
if area_data and 'urbanization_rate' in area_data:
return area_data['urbanization_rate']
return {}
def compare_city_populations(self, year=2023):
"""比较指定年份各城市的人口数量"""
cities = self.get_all_cities()
result = {}
for city in cities:
if str(year) in city['total_population']:
result[city['area_name']] = city['total_population'][str(year)]
# 按人口数量排序
return dict(sorted(result.items(), key=lambda x: x[1], reverse=True))
def calculate_population_growth(self, area_name='云南省', start_year=2020, end_year=2023):
"""计算指定地区在指定时间段内的人口增长率"""
area_data = None
if area_name == '云南省':
area_data = self.get_province_data()
else:
area_data = self.get_city_data(area_name)
if not area_data:
return None
start_pop = area_data['total_population'].get(str(start_year), 0)
end_pop = area_data['total_population'].get(str(end_year), 0)
if start_pop == 0:
return None
growth_rate = ((end_pop - start_pop) / start_pop) * 100
return growth_rate
def analyze_birth_rate(self, area_name='云南省'):
"""分析指定地区的出生率趋势"""
area_data = None
if area_name == '云南省':
area_data = self.get_province_data()
else:
area_data = self.get_city_data(area_name)
if not area_data or 'birth_population' not in area_data or 'total_population' not in area_data:
return {}
birth_rates = {}
for year in self.years:
year_str = str(year)
if year_str in area_data['birth_population'] and year_str in area_data['total_population']:
birth_pop = area_data['birth_population'][year_str]
total_pop = area_data['total_population'][year_str]
# 注意:总人口单位是万人,出生人口单位是个,需要统一单位
if total_pop > 0 and birth_pop > 0:
birth_rate = (birth_pop / (total_pop * 10000)) * 1000 # 千分比
birth_rates[year] = birth_rate
return birth_rates
def save_results_to_file(self, results, filename):
"""将统计结果保存到文件"""
# 创建结果目录
result_dir = os.path.join(os.path.dirname(self.data_file), '../Results')
os.makedirs(result_dir, exist_ok=True)
# 生成带时间戳的文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
full_filename = f"{filename}_{timestamp}.txt"
full_path = os.path.join(result_dir, full_filename)
# 保存结果
with open(full_path, 'w', encoding='utf-8') as f:
f.write(f"人口统计结果 - 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
for key, value in results.items():
if isinstance(value, dict):
f.write(f"{key}:\n")
for k, v in value.items():
f.write(f" {k}: {v}\n")
else:
f.write(f"{key}: {value}\n")
print(f"统计结果已保存至:{full_path}")
return full_path
# 主函数演示如何使用这个类
if __name__ == "__main__":
# 数据文件路径
data_file = os.path.join(os.path.dirname(__file__), '../Data/RenKou.json')
# 创建统计类实例
stats = PopulationStatistics(data_file)
# 检查数据是否成功加载
if not stats.data:
print("无法加载数据,程序退出。")
exit(1)
# 主菜单
print("云南省人口数据统计分析工具")
print("========================")
while True:
print("\n请选择要执行的统计分析:")
print("1. 查看云南省历年总人口统计")
print("2. 查看昆明市历年总人口统计")
print("3. 分析云南省城镇化率趋势")
print("4. 分析昆明市城镇化率趋势")
print("5. 比较2023年各城市人口数量")
print("6. 计算云南省2020-2023年人口增长率")
print("7. 分析云南省出生率趋势")
print("8. 分析昆明市出生率趋势")
print("9. 导出所有统计结果")
print("0. 退出程序")
choice = input("请输入您的选择0-9")
if choice == '0':
print("感谢使用,再见!")
break
elif choice == '1':
result = stats.calculate_total_population_by_year('province')
print("\n云南省历年总人口统计(单位:万人):")
for year, population in result.items():
print(f"{year}年: {population}")
elif choice == '2':
result = stats.calculate_total_population_by_year('city')
print("\n昆明市历年总人口统计(单位:万人):")
for year, population in result.items():
print(f"{year}年: {population}")
elif choice == '3':
result = stats.calculate_urbanization_trend('云南省')
print("\n云南省历年城镇化率(单位:%")
for year, rate in result.items():
print(f"{year}年: {rate}")
elif choice == '4':
result = stats.calculate_urbanization_trend('昆明市')
print("\n昆明市历年城镇化率(单位:%")
for year, rate in result.items():
print(f"{year}年: {rate}")
elif choice == '5':
result = stats.compare_city_populations(2023)
print("\n2023年各城市人口数量排名单位万人")
for i, (city, population) in enumerate(result.items(), 1):
print(f"{i}. {city}: {population}")
elif choice == '6':
rate = stats.calculate_population_growth('云南省', 2020, 2023)
if rate is not None:
print(f"\n云南省2020-2023年人口增长率: {rate:.2f}%")
else:
print("\n无法计算人口增长率")
elif choice == '7':
result = stats.analyze_birth_rate('云南省')
print("\n云南省历年出生率(单位:‰):")
for year, rate in result.items():
print(f"{year}年: {rate:.2f}")
elif choice == '8':
result = stats.analyze_birth_rate('昆明市')
print("\n昆明市历年出生率(单位:‰):")
for year, rate in result.items():
print(f"{year}年: {rate:.2f}")
elif choice == '9':
# 收集所有统计结果
all_results = {
"云南省历年总人口统计(万人)": stats.calculate_total_population_by_year('province'),
"昆明市历年总人口统计(万人)": stats.calculate_total_population_by_year('city'),
"云南省历年城镇化率(%": stats.calculate_urbanization_trend('云南省'),
"昆明市历年城镇化率(%": stats.calculate_urbanization_trend('昆明市'),
"2023年各城市人口数量排名万人": stats.compare_city_populations(2023),
"云南省2020-2023年人口增长率%": stats.calculate_population_growth('云南省', 2020, 2023),
"云南省历年出生率(‰)": stats.analyze_birth_rate('云南省'),
"昆明市历年出生率(‰)": stats.analyze_birth_rate('昆明市')
}
# 保存结果到文件
file_path = stats.save_results_to_file(all_results, "population_statistics")
print(f"\n所有统计结果已保存至文件:{file_path}")
else:
print("\n无效的选择,请重新输入。")
# 等待用户按Enter键继续
input("\n按Enter键继续...")

View File

@@ -1,52 +0,0 @@
import json
import os
# 获取脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
# 构建数据文件的绝对路径
file_path = os.path.join(script_dir, '../Data/RenKou.json')
# 读取JSON数据
with open(file_path, 'r', encoding='utf-8') as f:
population_data = json.load(f)
# 查找云南省的数据
for area in population_data:
if area.get('area_name') == '云南省':
yunnan_data = area
break
# 提取2023年的人口数据
year = '2023'
# 总人口(万人)
total_population = yunnan_data.get('total_population', {}).get(year, 0)
# 新生人口(万人,注意原数据是整数,需要转换为万人)
birth_population = yunnan_data.get('birth_population', {}).get(year, 0)
# 转换为万人(如果是整数)
if isinstance(birth_population, int):
birth_population = round(birth_population / 10000, 1)
# 城镇人口(万人)
urban_population = yunnan_data.get('urban_population', {}).get(year, 0)
# 乡村人口(万人)
rural_population = yunnan_data.get('rural_population', {}).get(year, 0)
# 城镇化率(%
urbanization_rate = yunnan_data.get('urbanization_rate', {}).get(year, 0)
# 打印结果
print("云南省 2023年人口概览")
print("-------------------")
print(f"总人口 {total_population:,} 万人")
print(f"新生人口 {birth_population} 万人")
print(f"城镇人口 {urban_population:,} 万人")
print(f"乡村人口 {rural_population:,} 万人")
print(f"城镇化率 {urbanization_rate:.2f} %")
# 验证数据一致性
if abs(total_population - (urban_population + rural_population)) > 0.01:
print("\n注意:总人口与城镇人口+乡村人口存在差异")
print(f"差异值: {abs(total_population - (urban_population + rural_population)):.2f} 万人")

View File

@@ -1,167 +0,0 @@
import requests
import json
import time
# API基础URL
BASE_URL = "http://localhost:8100/RuYuanZaiYuan"
EDUCATION_DATA_BASE_URL = "http://localhost:8100/EducationData"
# 支持的教育阶段
test_education_stages = [
("preschool", "学前"),
("primary", "小学"),
("junior", "初中"),
("senior", "高中"),
("vocational", "中职") # 添加中职教育阶段
]
# 测试年份
test_years = [2023, 2022, 2024]
def test_school_chart_interface():
"""测试获取教育阶段图表数据接口"""
print("\n===== 测试 /school/chart 接口 =====")
for stage_code, stage_name in test_education_stages:
url = f"{BASE_URL}/school/chart?education_stage={stage_code}"
print(f"\n测试 {stage_name}({stage_code}) 数据:")
try:
response = requests.get(url)
response.raise_for_status() # 如果状态码不是200抛出异常
data = response.json()
print(f"响应状态码: {response.status_code}")
print(f"响应数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
# 验证返回数据的基本结构
assert "xAxis_data" in data, "返回数据缺少xAxis_data字段"
assert "series_data_0" in data, "返回数据缺少series_data_0字段(城区数据)"
assert "series_data_1" in data, "返回数据缺少series_data_1字段(镇区数据)"
assert "series_data_2" in data, "返回数据缺少series_data_2字段(乡村数据)"
assert "series_data_3" in data, "返回数据缺少series_data_3字段(总人数数据)"
assert "series_data_4" in data, "返回数据缺少series_data_4字段(2022年基数数据)"
assert "education_stage" in data, "返回数据缺少education_stage字段"
assert data["education_stage"] == stage_name, f"教育阶段名称不匹配,期望: {stage_name},实际: {data['education_stage']}"
print(f"{stage_name} 数据验证通过")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
except AssertionError as e:
print(f"❌ 数据验证失败: {e}")
# 添加短暂延迟,避免请求过于频繁
time.sleep(0.5)
# @router.get("/school/inschool/chart")
# async def get_in_school_chart_config(
# education_stage: str = Query(default="preschool", pattern="^(preschool|primary|junior|senior)$",
# description="教育阶段: preschool(学前), primary(小学), junior(初中), senior(高中)")
# ):
# return RuYuanZaiYuanModel.generate_in_school_education_config(education_stage)
#
def test_education_data_by_year():
"""测试获取指定年份教育数据接口"""
print("\n===== 测试 /EducationData/byYear 接口 =====")
# 测试不同年份的数据
for year in test_years:
url = f"{EDUCATION_DATA_BASE_URL}/byYear?year={year}"
print(f"\n测试 {year} 年数据:")
try:
response = requests.get(url)
response.raise_for_status() # 如果状态码不是200抛出异常
data = response.json()
print(f"响应状态码: {response.status_code}")
print(f"响应数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
# 验证返回数据的基本结构
assert "code" in data, "返回数据缺少code字段"
assert "message" in data, "返回数据缺少message字段"
assert "data" in data, "返回数据缺少data字段"
assert data["code"] == 200, f"状态码不匹配,期望: 200实际: {data['code']}"
assert isinstance(data["data"], list), "data字段应为列表类型"
# 验证数据内容
if data["data"]:
# 检查每个数据项的结构
for item in data["data"]:
assert "education_stage" in item, "数据项缺少education_stage字段"
assert "school_count" in item, "数据项缺少school_count字段"
assert "teacher_count_10k" in item, "数据项缺少teacher_count_10k字段"
assert "enrollment_count_10k" in item, "数据项缺少enrollment_count_10k字段"
assert "admission_count_10k" in item, "数据项缺少admission_count_10k字段"
# 检查数值类型
assert isinstance(item["school_count"], (int, float)), "school_count应为数值类型"
assert isinstance(item["teacher_count_10k"], (int, float)), "teacher_count_10k应为数值类型"
assert isinstance(item["enrollment_count_10k"], (int, float)), "enrollment_count_10k应为数值类型"
assert isinstance(item["admission_count_10k"], (int, float)), "admission_count_10k应为数值类型"
print(f"{year} 年数据验证通过,共包含 {len(data['data'])} 个教育阶段的数据")
else:
print(f"⚠️ {year} 年未返回数据")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
except AssertionError as e:
print(f"❌ 数据验证失败: {e}")
# 添加短暂延迟,避免请求过于频繁
time.sleep(0.5)
# 测试边界值
print("\n===== 测试边界值 =====")
# 测试最小值年份
min_year = 2015
url = f"{EDUCATION_DATA_BASE_URL}/byYear?year={min_year}"
print(f"\n测试最小值年份 {min_year}:")
try:
response = requests.get(url)
response.raise_for_status()
print(f"响应状态码: {response.status_code}")
print(f"✅ 最小值年份测试通过")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
# 测试最大值年份
max_year = 2028
url = f"{EDUCATION_DATA_BASE_URL}/byYear?year={max_year}"
print(f"\n测试最大值年份 {max_year}:")
try:
response = requests.get(url)
response.raise_for_status()
print(f"响应状态码: {response.status_code}")
print(f"✅ 最大值年份测试通过")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
# 测试超出范围的年份
invalid_year = 2030
url = f"{EDUCATION_DATA_BASE_URL}/byYear?year={invalid_year}"
print(f"\n测试超出范围的年份 {invalid_year}:")
try:
response = requests.get(url)
if response.status_code == 422: # FastAPI的验证错误状态码
print(f"响应状态码: {response.status_code}")
print(f"✅ 超出范围年份正确返回验证错误")
else:
print(f"❌ 期望状态码422实际: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
if __name__ == "__main__":
# 测试原有接口
# test_school_chart_interface()
# 测试新添加的教育数据接口
test_education_data_by_year()

View File

@@ -1,124 +0,0 @@
import requests
import json
import time
# API基础URL
EDUCATION_DATA_BASE_URL = "http://localhost:8100/EducationData"
# 测试年份
test_years = [2023, 2022, 2024]
def test_population_data_by_year():
"""测试获取指定年份人口数据接口"""
print("\n===== 测试 /EducationData/populationByYear 接口 =====")
# 测试不同年份的数据
for year in test_years:
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={year}"
print(f"\n测试 {year} 年人口数据:")
try:
response = requests.get(url)
response.raise_for_status() # 如果状态码不是200抛出异常
data = response.json()
print(f"响应状态码: {response.status_code}")
print(f"响应数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
# 验证返回数据的基本结构
assert "code" in data, "返回数据缺少code字段"
assert "message" in data, "返回数据缺少message字段"
assert "data" in data, "返回数据缺少data字段"
assert data["code"] == 200, f"状态码不匹配,期望: 200实际: {data['code']}"
# 验证数据内容
if data["data"]:
# 检查数据项的结构
population_data = data["data"]
assert "year" in population_data, "数据缺少year字段"
assert "total_population" in population_data, "数据缺少total_population字段"
assert "birth_population" in population_data, "数据缺少birth_population字段"
assert "urban_population" in population_data, "数据缺少urban_population字段"
assert "rural_population" in population_data, "数据缺少rural_population字段"
assert "urbanization_rate" in population_data, "数据缺少urbanization_rate字段"
# 检查数值类型
assert isinstance(population_data["year"], int), "year应为整数类型"
assert isinstance(population_data["total_population"], (int, float)), "total_population应为数值类型"
assert isinstance(population_data["birth_population"], (int, float)), "birth_population应为数值类型"
assert isinstance(population_data["urban_population"], (int, float)), "urban_population应为数值类型"
assert isinstance(population_data["rural_population"], (int, float)), "rural_population应为数值类型"
assert isinstance(population_data["urbanization_rate"], (int, float)), "urbanization_rate应为数值类型"
# 检查年份是否匹配
assert population_data["year"] == year, f"年份不匹配,期望: {year},实际: {population_data['year']}"
print(f"{year} 年人口数据验证通过")
else:
print(f"⚠️ {year} 年未返回人口数据")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
except AssertionError as e:
print(f"❌ 数据验证失败: {e}")
# 添加短暂延迟,避免请求过于频繁
time.sleep(0.5)
# 测试边界值
print("\n===== 测试边界值 =====")
# 测试最小值年份
min_year = 2015
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={min_year}"
print(f"\n测试最小值年份 {min_year}:")
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
print(f"响应状态码: {response.status_code}")
if data["code"] == 200 or data["code"] == 404:
print(f"✅ 最小值年份测试通过")
else:
print(f"❌ 状态码异常: {data['code']}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
# 测试最大值年份
max_year = 2028
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={max_year}"
print(f"\n测试最大值年份 {max_year}:")
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
print(f"响应状态码: {response.status_code}")
if data["code"] == 200 or data["code"] == 404:
print(f"✅ 最大值年份测试通过")
else:
print(f"❌ 状态码异常: {data['code']}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
# 测试超出范围的年份
invalid_year = 2030
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={invalid_year}"
print(f"\n测试超出范围的年份 {invalid_year}:")
try:
response = requests.get(url)
if response.status_code == 422: # FastAPI的验证错误状态码
print(f"响应状态码: {response.status_code}")
print(f"✅ 超出范围年份正确返回验证错误")
else:
data = response.json()
print(f"响应状态码: {response.status_code}")
print(f"响应数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
print(f"❌ 期望状态码422实际: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
if __name__ == "__main__":
# 测试新添加的人口数据接口
test_population_data_by_year()

View File

@@ -0,0 +1,162 @@
import unittest
import requests
class TestRuYuanZaiYuanInterfaces(unittest.TestCase):
# 配置API基础URL
BASE_URL = "http://localhost:8100" # 根据实际运行的服务地址和端口修改
def setUp(self):
# 测试前的初始化工作
pass
def tearDown(self):
# 测试后的清理工作
pass
def test_get_education_chart_config_default(self):
"""测试默认参数下的入园/招生数据接口"""
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
# 验证返回数据格式
self.assertIn("xAxis_data", data)
self.assertIn("series_data_0", data)
self.assertIn("series_data_1", data)
self.assertIn("series_data_2", data)
self.assertIn("series_data_3", data)
self.assertIn("series_data_4", data)
self.assertIn("education_stage", data)
# 默认应该是学前教育
self.assertEqual(data["education_stage"], "学前")
# 年份应该包含2015-2035
self.assertIn("2015", data["xAxis_data"])
self.assertIn("2024", data["xAxis_data"])
self.assertIn("2035", data["xAxis_data"])
def test_get_in_school_chart_config_default(self):
"""测试默认参数下的在园/在校数据接口"""
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
# 验证返回数据格式
self.assertIn("xAxis_data", data)
self.assertIn("series_data_0", data)
self.assertIn("series_data_1", data)
self.assertIn("series_data_2", data)
self.assertIn("series_data_3", data)
self.assertIn("series_data_4", data)
self.assertIn("education_stage", data)
# 默认应该是学前教育
self.assertEqual(data["education_stage"], "学前")
# 年份应该包含2015-2035
self.assertIn("2015", data["xAxis_data"])
self.assertIn("2024", data["xAxis_data"])
self.assertIn("2035", data["xAxis_data"])
def test_with_different_education_stages(self):
"""测试不同教育阶段参数"""
stages = ["preschool", "primary", "junior", "senior", "vocational"]
stage_names = {"preschool": "学前", "primary": "小学", "junior": "初中", "senior": "高中", "vocational": "中职"}
for stage in stages:
# 测试入园/招生接口
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart?education_stage={stage}"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["education_stage"], stage_names[stage])
# 测试在园/在校接口
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart?education_stage={stage}"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["education_stage"], stage_names[stage])
def test_with_different_area_names(self):
"""测试不同区域名称参数"""
# 注意:这里使用的区域名称需要在数据文件中存在,否则会返回空数据
# 以下测试假设"云南省"和"昆明市"在数据中存在
areas = ["云南省", "昆明市"]
for area in areas:
# 测试入园/招生接口
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart?area_name={area}"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
# 如果区域不存在,会返回空字典
if data: # 只在数据存在时进行验证
self.assertIn("xAxis_data", data)
self.assertIn("series_data_3", data) # 总人数数据
# 测试在园/在校接口
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart?area_name={area}"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
# 如果区域不存在,会返回空字典
if data: # 只在数据存在时进行验证
self.assertIn("xAxis_data", data)
self.assertIn("series_data_3", data) # 总人数数据
def test_with_invalid_parameters(self):
"""测试无效参数"""
# 测试无效的教育阶段
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart?education_stage=invalid_stage"
response = requests.get(url)
# FastAPI会自动验证参数格式不符合pattern的参数会返回422
self.assertEqual(response.status_code, 422)
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart?education_stage=invalid_stage"
response = requests.get(url)
self.assertEqual(response.status_code, 422)
# 测试不存在的区域名称
# 注意不存在的区域名称不会返回422而是返回空数据
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart?area_name=不存在的区域"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data, {})
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart?area_name=不存在的区域"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data, {})
def test_data_consistency(self):
"""测试数据一致性"""
# 测试相同参数下两次请求返回相同数据
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart"
response1 = requests.get(url)
response2 = requests.get(url)
self.assertEqual(response1.json(), response2.json())
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart"
response1 = requests.get(url)
response2 = requests.get(url)
self.assertEqual(response1.json(), response2.json())
def test_with_combined_parameters(self):
"""测试组合参数"""
# 同时指定教育阶段和区域名称
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/chart?education_stage=primary&area_name=云南省"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["education_stage"], "小学")
url = f"{self.BASE_URL}/RuYuanZaiYuan/school/inschool/chart?education_stage=junior&area_name=云南省"
response = requests.get(url)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["education_stage"], "初中")
if __name__ == "__main__":
unittest.main()

Binary file not shown.