You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
6.3 KiB

# routes/LoginController.py
import base64
import os
import json
import random
import string
import jwt
from captcha.image import ImageCaptcha
from fastapi import APIRouter, Request, Response, status, HTTPException
from utils.CommonUtil import *
from utils.CookieUtil import *
from utils.Database import *
from utils.JwtUtil import *
from utils.ParseRequest import *
from Config.Config import *
# 创建一个路由实例
router = APIRouter()
# 获取项目根目录
project_root = os.path.dirname(os.path.abspath(__file__))
# 配置验证码
image = ImageCaptcha(
width=100, height=30, # 增加宽度和高度
font_sizes=[26], # 增加字体大小
fonts=[os.path.join(project_root, 'DejaVuSans-Bold.ttf')] # 设置自定义字体路径
)
@router.get("/getCaptcha")
def get_captcha():
captcha_text = ''.join(random.choices(string.digits, k=4)) # 生成4个数字的验证码
session_id = os.urandom(16).hex()
# 将验证码文本存储在session中
session_data = {session_id: captcha_text}
with open("./session/captcha_sessions.json", "a") as session_file:
json.dump(session_data, session_file)
session_file.write("\n")
# 生成验证码图片
data = image.generate(captcha_text)
captcha_image_base64 = base64.b64encode(data.read()).decode()
return {"image": captcha_image_base64, "session_id": session_id}
# 验证用户并生成JWT令牌的接口
@router.post("/validateCaptcha")
async def validate_captcha(session_id : str, captcha : str):
try:
with open("./session/captcha_sessions.json", "r") as session_file:
sessions = {}
for line in session_file:
sessions.update(json.loads(line))
except json.JSONDecodeError:
sessions = {}
correct_captcha_text = sessions.get(session_id)
if not correct_captcha_text or correct_captcha_text.lower() != captcha.lower():
return {"success": False, "message": "验证码错误"}
return {"success": True, "message": "验证码正确"}
# 获取cookie中的token的方法
@router.get("/getToken")
async def get_token(request: Request):
token = CookieUtil.get_cookie(request, key="auth_token")
print(token)
if token:
try:
decoded_token = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256'])
logging.info(f"Token 解码成功: {decoded_token}")
return {"success": True, "message": "Token 验证成功", "token_data": decoded_token}
except jwt.ExpiredSignatureError:
logging.error("Token 过期")
return {"success": False, "message": "Token 过期"}
except jwt.InvalidTokenError:
logging.error("无效的 Token")
return {"success": False, "message": "无效的 Token"}
else:
logging.error("未找到 Token")
return {"success": False, "message": "未找到 Token"}
# 登出
@router.get("/logout")
async def logout(request: Request, response: Response):
token = CookieUtil.get_cookie(request, key="auth_token")
if token:
CookieUtil.remove_cookie(response, key="auth_token", path="/" )
logging.info(f"Token <UNK> cookie: {token}")
return {"success": True, "message": "账号已登出!"}
else:
logging.error("<UNK> Token")
return {"success": True, "message": "未找到有效Token,账号已登出!"}
# 验证用户并生成JWT令牌的接口
@router.post("/login")
async def login(request: Request, response: Response):
username = await get_request_str_param(request, "username", True, True)
password = await get_request_str_param(request, "password", True, True)
if not username or not password:
return {"success": False, "message": "用户名和密码不能为空"}
password = md5_encrypt(password)
select_user_sql: str = "SELECT person_id, person_name, identity_id, login_name, xb, bureau_id, org_id, pwdmd5 FROM t_sys_loginperson WHERE login_name = '" + username + "' AND b_use = 1"
userlist = await find_by_sql(select_user_sql,())
user = userlist[0] if userlist else None
logging.info(f"查询结果: {user}")
if user and user['pwdmd5'] == password: # 验证的cas用户密码md5加密的版本
token = create_access_token({"user_id": user['person_id'], "identity_id": user['identity_id']})
CookieUtil.set_cookie(
res=response,
key="auth_token",
value=token,
secure=False, # 在开发环境中,确保 secure=False
httponly=False,
max_age=3600, # 设置cookie的有效时间为1小时
path="/" # 设置cookie的路径
)
logging.info(f"Token 已成功设置到 cookie: {token}")
user.pop('pwdmd5', None) # 移除密码字段
return {"success": True, "message": "登录成功", "data": {"token": token, "user_data": user}}
else:
return {"success": False, "message": "用户名或密码错误"}
# 【Base-Login-3】通过手机号获取Person的ID
@router.get("/getPersonIdByTelephone")
async def get_person_id_by_telephone(request: Request):
telephone = await get_request_str_param(request, "telephone", True, True)
if not telephone:
return {"success": False, "message": "手机号不能为空"}
select_user_sql: str = "SELECT person_id FROM t_sys_loginperson WHERE telephone = '" + telephone + "' and b_use = 1 "
userlist = await find_by_sql(select_user_sql,())
user = userlist[0] if userlist else None
if user:
return {"success": True, "message": "查询成功", "data": {"person_id": user['person_id']}}
else:
return {"success": False, "message": "未查询到相关信息"}
# 【Base-Login-4】忘记密码重设不登录的状态
@router.post("/resetPassword")
async def reset_password(request: Request):
person_id = await get_request_str_param(request, "person_id", True, True)
password = await get_request_str_param(request, "password", True, True)
if not person_id or not password:
return {"success": False, "message": "用户ID和新密码不能为空"}
password_md5 = md5_encrypt(password)
update_user_sql: str = "UPDATE t_sys_loginperson SET original_pwd = '" + password + "', pwdmd5 = '" + password_md5 + "' WHERE person_id = '" + person_id + "'"
await execute_sql(update_user_sql)
return {"success": True, "message": "密码修改成功"}