You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
6.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# routes/LoginController.py
import base64
import os
import json
import random
import string
import jwt
from captcha.image import ImageCaptcha
from fastapi import APIRouter, Request, Response, status, HTTPException
from utils.CommonUtil import *
from utils.CookieUtil import *
from utils.Database import *
from utils.JwtUtil import *
from utils.ParseRequest import *
from Config.Config import *
# 创建一个路由实例
router = APIRouter()
# 获取项目根目录
project_root = os.path.dirname(os.path.abspath(__file__))
# 配置验证码
image = ImageCaptcha(
width=100, height=30, # 增加宽度和高度
font_sizes=[26], # 增加字体大小
fonts=[os.path.join(project_root, 'DejaVuSans-Bold.ttf')] # 设置自定义字体路径
)
@router.get("/getCaptcha")
def get_captcha():
captcha_text = ''.join(random.choices(string.digits, k=4)) # 生成4个数字的验证码
session_id = os.urandom(16).hex()
# 将验证码文本存储在session中
session_data = {session_id: captcha_text}
with open("./session/captcha_sessions.json", "a") as session_file:
json.dump(session_data, session_file)
session_file.write("\n")
# 生成验证码图片
data = image.generate(captcha_text)
captcha_image_base64 = base64.b64encode(data.read()).decode()
return {"image": captcha_image_base64, "session_id": session_id}
# 验证用户并生成JWT令牌的接口
@router.post("/validateCaptcha")
async def validate_captcha(session_id : str, captcha : str):
try:
with open("./session/captcha_sessions.json", "r") as session_file:
sessions = {}
for line in session_file:
sessions.update(json.loads(line))
except json.JSONDecodeError:
sessions = {}
correct_captcha_text = sessions.get(session_id)
if not correct_captcha_text or correct_captcha_text.lower() != captcha.lower():
return {"success": False, "message": "验证码错误"}
return {"success": True, "message": "验证码正确"}
# 获取cookie中的token的方法
@router.get("/getToken")
async def get_token(request: Request):
token = CookieUtil.get_cookie(request, key="auth_token")
print(token)
if token:
try:
decoded_token = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256'])
logging.info(f"Token 解码成功: {decoded_token}")
return {"success": True, "message": "Token 验证成功", "token_data": decoded_token}
except jwt.ExpiredSignatureError:
logging.error("Token 过期")
return {"success": False, "message": "Token 过期"}
except jwt.InvalidTokenError:
logging.error("无效的 Token")
return {"success": False, "message": "无效的 Token"}
else:
logging.error("未找到 Token")
return {"success": False, "message": "未找到 Token"}
# 登出
@router.get("/logout")
async def logout(request: Request, response: Response):
token = CookieUtil.get_cookie(request, key="auth_token")
if token:
CookieUtil.remove_cookie(response, key="auth_token", path="/" )
logging.info(f"Token <UNK> cookie: {token}")
return {"success": True, "message": "账号已登出!"}
else:
logging.error("<UNK> Token")
return {"success": True, "message": "未找到有效Token,账号已登出!"}
# 验证用户并生成JWT令牌的接口
@router.post("/login")
async def login(request: Request, response: Response):
username = await get_request_str_param(request, "username", True, True)
password = await get_request_str_param(request, "password", True, True)
if not username or not password:
return {"success": False, "message": "用户名和密码不能为空"}
password = md5_encrypt(password)
select_user_sql: str = "SELECT person_id, person_name, identity_id, login_name, xb, bureau_id, org_id, pwdmd5 FROM t_sys_loginperson WHERE login_name = '" + username + "' AND b_use = 1"
userlist = await find_by_sql(select_user_sql,())
user = userlist[0] if userlist else None
logging.info(f"查询结果: {user}")
if user and user['pwdmd5'] == password: # 验证的cas用户密码md5加密的版本
token = create_access_token({"user_id": user['person_id'], "identity_id": user['identity_id']})
CookieUtil.set_cookie(
res=response,
key="auth_token",
value=token,
secure=False, # 在开发环境中,确保 secure=False
httponly=False,
max_age=3600, # 设置cookie的有效时间为1小时
path="/" # 设置cookie的路径
)
logging.info(f"Token 已成功设置到 cookie: {token}")
user.pop('pwdmd5', None) # 移除密码字段
return {"success": True, "message": "登录成功", "data": {"token": token, "user_data": user}}
else:
return {"success": False, "message": "用户名或密码错误"}
# 【Base-Login-3】通过手机号获取Person的ID
@router.get("/getPersonIdByTelephone")
async def get_person_id_by_telephone(request: Request):
telephone = await get_request_str_param(request, "telephone", True, True)
if not telephone:
return {"success": False, "message": "手机号不能为空"}
select_user_sql: str = "SELECT person_id FROM t_sys_loginperson WHERE telephone = '" + telephone + "' and b_use = 1 "
userlist = await find_by_sql(select_user_sql,())
user = userlist[0] if userlist else None
if user:
return {"success": True, "message": "查询成功", "data": {"person_id": user['person_id']}}
else:
return {"success": False, "message": "未查询到相关信息"}
# 【Base-Login-4】忘记密码重设不登录的状态
@router.post("/resetPassword")
async def reset_password(request: Request):
person_id = await get_request_str_param(request, "person_id", True, True)
password = await get_request_str_param(request, "password", True, True)
if not person_id or not password:
return {"success": False, "message": "用户ID和新密码不能为空"}
password_md5 = md5_encrypt(password)
update_user_sql: str = "UPDATE t_sys_loginperson SET original_pwd = '" + password + "', pwdmd5 = '" + password_md5 + "' WHERE person_id = '" + person_id + "'"
await execute_sql(update_user_sql)
return {"success": True, "message": "密码修改成功"}