This commit is contained in:
2025-09-11 21:34:02 +08:00
parent 9349d582b6
commit ec35818b13
6 changed files with 271 additions and 5 deletions

View File

@@ -31,4 +31,37 @@ async def get_education_data_by_year(
}
@router.get("/populationByYear")
async def get_population_data_by_year(
year: int = Query(default=2023, ge=2015, le=2028,
description="年份: 2015-2028范围内的年份")
):
"""获取指定年份的云南省人口数据"""
try:
# 调用EducationDataModel的方法获取人口数据
data = EducationDataModel.get_population_data_by_year(year)
if data:
# 返回包含状态和数据的响应
return {
"code": 200,
"message": "success",
"data": data
}
else:
# 未找到数据的情况
return {
"code": 404,
"message": f"未找到{year}年的云南省人口数据",
"data": None
}
except Exception as e:
# 异常处理
return {
"code": 500,
"message": f"获取数据失败: {str(e)}",
"data": None
}

View File

@@ -20,6 +20,7 @@ class EducationDataModel:
TEACHER_COUNT_FILE = os.path.join(DATA_DIR, 'TeacherCount.json')
ENROLLMENT_COUNT_FILE = os.path.join(DATA_DIR, 'ZaiXiaoShengCount.json')
ADMISSION_COUNT_FILE = os.path.join(DATA_DIR, 'ZhaoShengCount.json')
POPULATION_FILE = os.path.join(DATA_DIR, 'RenKou.json') # 新增:人口数据文件路径
@staticmethod
def load_json_file(file_path):
@@ -247,6 +248,58 @@ class EducationDataModel:
return result
# 新增:人口数据相关方法
@staticmethod
def get_population_data_by_year(year):
"""获取指定年份的云南省人口数据"""
# 加载人口数据文件
population_data = EducationDataModel.load_json_file(EducationDataModel.POPULATION_FILE)
# 获取云南省数据
yunnan_data = EducationDataModel.get_province_data(population_data)
if not yunnan_data:
print(f"未获取到云南省{year}年的人口数据")
return None
year_str = str(year)
result = {
'year': year,
'total_population': yunnan_data.get('total_population', {}).get(year_str, 0), # 万人
'birth_population': yunnan_data.get('birth_population', {}).get(year_str, 0), # 原数据单位为个
'urban_population': yunnan_data.get('urban_population', {}).get(year_str, 0), # 万人
'rural_population': yunnan_data.get('rural_population', {}).get(year_str, 0), # 万人
'urbanization_rate': yunnan_data.get('urbanization_rate', {}).get(year_str, 0) # %
}
# 将新生人口转换为万人(如果是整数)
if isinstance(result['birth_population'], int):
result['birth_population'] = round(result['birth_population'] / 10000, 1)
return result
@staticmethod
def print_population_data(population_data):
"""打印人口数据"""
if not population_data:
print("没有可打印的人口数据")
return
year = population_data.get('year', '未知')
print(f"云南省 {year}年人口概览")
print("-------------------")
print(f"总人口 {population_data.get('total_population', 0):,} 万人")
print(f"新生人口 {population_data.get('birth_population', 0)} 万人")
print(f"城镇人口 {population_data.get('urban_population', 0):,} 万人")
print(f"乡村人口 {population_data.get('rural_population', 0):,} 万人")
print(f"城镇化率 {population_data.get('urbanization_rate', 0):.2f} %")
# 验证数据一致性
total = population_data.get('total_population', 0)
urban_rural_sum = population_data.get('urban_population', 0) + population_data.get('rural_population', 0)
if abs(total - urban_rural_sum) > 0.01:
print("\n注意:总人口与城镇人口+乡村人口存在差异")
print(f"差异值: {abs(total - urban_rural_sum):.2f} 万人")
@staticmethod
def print_education_data(data, year):
"""打印教育数据"""
@@ -268,20 +321,24 @@ class EducationDataModel:
for file_path in [EducationDataModel.SCHOOL_COUNT_FILE,
EducationDataModel.TEACHER_COUNT_FILE,
EducationDataModel.ENROLLMENT_COUNT_FILE,
EducationDataModel.ADMISSION_COUNT_FILE]:
EducationDataModel.ADMISSION_COUNT_FILE,
EducationDataModel.POPULATION_FILE]: # 新增:检查人口数据文件
if os.path.exists(file_path):
print(f"找到数据文件: {file_path}")
else:
print(f"警告:数据文件不存在: {file_path}")
# 获取数据
# 获取并打印教育数据
education_data = EducationDataModel.get_education_data_by_year(year)
# 打印数据
if education_data:
EducationDataModel.print_education_data(education_data, year)
else:
print("未获取到任何数据")
print("未获取到任何教育数据")
# 新增:获取并打印人口数据
print("\n" + "=" * 50)
population_data = EducationDataModel.get_population_data_by_year(year)
EducationDataModel.print_population_data(population_data)
except Exception as e:
print(f"获取数据时发生错误: {e}")

View File

@@ -0,0 +1,52 @@
import json
import os
# 获取脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
# 构建数据文件的绝对路径
file_path = os.path.join(script_dir, '../Data/RenKou.json')
# 读取JSON数据
with open(file_path, 'r', encoding='utf-8') as f:
population_data = json.load(f)
# 查找云南省的数据
for area in population_data:
if area.get('area_name') == '云南省':
yunnan_data = area
break
# 提取2023年的人口数据
year = '2023'
# 总人口(万人)
total_population = yunnan_data.get('total_population', {}).get(year, 0)
# 新生人口(万人,注意原数据是整数,需要转换为万人)
birth_population = yunnan_data.get('birth_population', {}).get(year, 0)
# 转换为万人(如果是整数)
if isinstance(birth_population, int):
birth_population = round(birth_population / 10000, 1)
# 城镇人口(万人)
urban_population = yunnan_data.get('urban_population', {}).get(year, 0)
# 乡村人口(万人)
rural_population = yunnan_data.get('rural_population', {}).get(year, 0)
# 城镇化率(%
urbanization_rate = yunnan_data.get('urbanization_rate', {}).get(year, 0)
# 打印结果
print("云南省 2023年人口概览")
print("-------------------")
print(f"总人口 {total_population:,} 万人")
print(f"新生人口 {birth_population} 万人")
print(f"城镇人口 {urban_population:,} 万人")
print(f"乡村人口 {rural_population:,} 万人")
print(f"城镇化率 {urbanization_rate:.2f} %")
# 验证数据一致性
if abs(total_population - (urban_population + rural_population)) > 0.01:
print("\n注意:总人口与城镇人口+乡村人口存在差异")
print(f"差异值: {abs(total_population - (urban_population + rural_population)):.2f} 万人")

View File

@@ -0,0 +1,124 @@
import requests
import json
import time
# API基础URL
EDUCATION_DATA_BASE_URL = "http://localhost:8100/EducationData"
# 测试年份
test_years = [2023, 2022, 2024]
def test_population_data_by_year():
"""测试获取指定年份人口数据接口"""
print("\n===== 测试 /EducationData/populationByYear 接口 =====")
# 测试不同年份的数据
for year in test_years:
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={year}"
print(f"\n测试 {year} 年人口数据:")
try:
response = requests.get(url)
response.raise_for_status() # 如果状态码不是200抛出异常
data = response.json()
print(f"响应状态码: {response.status_code}")
print(f"响应数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
# 验证返回数据的基本结构
assert "code" in data, "返回数据缺少code字段"
assert "message" in data, "返回数据缺少message字段"
assert "data" in data, "返回数据缺少data字段"
assert data["code"] == 200, f"状态码不匹配,期望: 200实际: {data['code']}"
# 验证数据内容
if data["data"]:
# 检查数据项的结构
population_data = data["data"]
assert "year" in population_data, "数据缺少year字段"
assert "total_population" in population_data, "数据缺少total_population字段"
assert "birth_population" in population_data, "数据缺少birth_population字段"
assert "urban_population" in population_data, "数据缺少urban_population字段"
assert "rural_population" in population_data, "数据缺少rural_population字段"
assert "urbanization_rate" in population_data, "数据缺少urbanization_rate字段"
# 检查数值类型
assert isinstance(population_data["year"], int), "year应为整数类型"
assert isinstance(population_data["total_population"], (int, float)), "total_population应为数值类型"
assert isinstance(population_data["birth_population"], (int, float)), "birth_population应为数值类型"
assert isinstance(population_data["urban_population"], (int, float)), "urban_population应为数值类型"
assert isinstance(population_data["rural_population"], (int, float)), "rural_population应为数值类型"
assert isinstance(population_data["urbanization_rate"], (int, float)), "urbanization_rate应为数值类型"
# 检查年份是否匹配
assert population_data["year"] == year, f"年份不匹配,期望: {year},实际: {population_data['year']}"
print(f"{year} 年人口数据验证通过")
else:
print(f"⚠️ {year} 年未返回人口数据")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
except AssertionError as e:
print(f"❌ 数据验证失败: {e}")
# 添加短暂延迟,避免请求过于频繁
time.sleep(0.5)
# 测试边界值
print("\n===== 测试边界值 =====")
# 测试最小值年份
min_year = 2015
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={min_year}"
print(f"\n测试最小值年份 {min_year}:")
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
print(f"响应状态码: {response.status_code}")
if data["code"] == 200 or data["code"] == 404:
print(f"✅ 最小值年份测试通过")
else:
print(f"❌ 状态码异常: {data['code']}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
# 测试最大值年份
max_year = 2028
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={max_year}"
print(f"\n测试最大值年份 {max_year}:")
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
print(f"响应状态码: {response.status_code}")
if data["code"] == 200 or data["code"] == 404:
print(f"✅ 最大值年份测试通过")
else:
print(f"❌ 状态码异常: {data['code']}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
# 测试超出范围的年份
invalid_year = 2030
url = f"{EDUCATION_DATA_BASE_URL}/populationByYear?year={invalid_year}"
print(f"\n测试超出范围的年份 {invalid_year}:")
try:
response = requests.get(url)
if response.status_code == 422: # FastAPI的验证错误状态码
print(f"响应状态码: {response.status_code}")
print(f"✅ 超出范围年份正确返回验证错误")
else:
data = response.json()
print(f"响应状态码: {response.status_code}")
print(f"响应数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
print(f"❌ 期望状态码422实际: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败: {e}")
if __name__ == "__main__":
# 测试新添加的人口数据接口
test_population_data_by_year()