159 lines
6.3 KiB
Python
159 lines
6.3 KiB
Python
# routes/LoginController.py
|
||
import base64
|
||
import os
|
||
import json
|
||
import random
|
||
import string
|
||
import jwt
|
||
|
||
from captcha.image import ImageCaptcha
|
||
from fastapi import APIRouter, Request, Response, status, HTTPException
|
||
from Util.CommonUtil import *
|
||
from Util.CookieUtil import *
|
||
from Util.Database import *
|
||
from Util.JwtUtil import *
|
||
from Util.ParseRequest import *
|
||
from Config.Config import *
|
||
|
||
# 创建一个路由实例
|
||
router = APIRouter()
|
||
|
||
# 获取项目根目录
|
||
project_root = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# 配置验证码
|
||
image = ImageCaptcha(
|
||
width=100, height=30, # 增加宽度和高度
|
||
font_sizes=[26], # 增加字体大小
|
||
fonts=[os.path.join(project_root, 'DejaVuSans-Bold.ttf')] # 设置自定义字体路径
|
||
)
|
||
|
||
@router.get("/getCaptcha")
|
||
def get_captcha():
|
||
captcha_text = ''.join(random.choices(string.digits, k=4)) # 生成4个数字的验证码
|
||
session_id = os.urandom(16).hex()
|
||
|
||
# 将验证码文本存储在session中
|
||
session_data = {session_id: captcha_text}
|
||
with open("./session/captcha_sessions.json", "a") as session_file:
|
||
json.dump(session_data, session_file)
|
||
session_file.write("\n")
|
||
|
||
# 生成验证码图片
|
||
data = image.generate(captcha_text)
|
||
captcha_image_base64 = base64.b64encode(data.read()).decode()
|
||
|
||
return {"image": captcha_image_base64, "session_id": session_id}
|
||
|
||
# 验证用户并生成JWT令牌的接口
|
||
@router.post("/validateCaptcha")
|
||
async def validate_captcha(session_id : str, captcha : str):
|
||
try:
|
||
with open("./session/captcha_sessions.json", "r") as session_file:
|
||
sessions = {}
|
||
for line in session_file:
|
||
sessions.update(json.loads(line))
|
||
except json.JSONDecodeError:
|
||
sessions = {}
|
||
|
||
correct_captcha_text = sessions.get(session_id)
|
||
|
||
if not correct_captcha_text or correct_captcha_text.lower() != captcha.lower():
|
||
return {"success": False, "message": "验证码错误"}
|
||
return {"success": True, "message": "验证码正确"}
|
||
|
||
# 获取cookie中的token的方法
|
||
@router.get("/getToken")
|
||
async def get_token(request: Request):
|
||
token = CookieUtil.get_cookie(request, key="auth_token")
|
||
print(token)
|
||
if token:
|
||
try:
|
||
decoded_token = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256'])
|
||
logging.info(f"Token 解码成功: {decoded_token}")
|
||
return {"success": True, "message": "Token 验证成功", "token_data": decoded_token}
|
||
except jwt.ExpiredSignatureError:
|
||
logging.error("Token 过期")
|
||
return {"success": False, "message": "Token 过期"}
|
||
except jwt.InvalidTokenError:
|
||
logging.error("无效的 Token")
|
||
return {"success": False, "message": "无效的 Token"}
|
||
else:
|
||
logging.error("未找到 Token")
|
||
return {"success": False, "message": "未找到 Token"}
|
||
|
||
|
||
# 登出
|
||
@router.get("/logout")
|
||
async def logout(request: Request, response: Response):
|
||
token = CookieUtil.get_cookie(request, key="auth_token")
|
||
if token:
|
||
CookieUtil.remove_cookie(response, key="auth_token", path="/" )
|
||
logging.info(f"Token <UNK> cookie: {token}")
|
||
return {"success": True, "message": "账号已登出!"}
|
||
else:
|
||
logging.error("<UNK> Token")
|
||
return {"success": True, "message": "未找到有效Token,账号已登出!"}
|
||
|
||
|
||
|
||
# 验证用户并生成JWT令牌的接口
|
||
@router.post("/login")
|
||
async def login(request: Request, response: Response):
|
||
|
||
username = await get_request_str_param(request, "username", True, True)
|
||
password = await get_request_str_param(request, "password", True, True)
|
||
|
||
if not username or not password:
|
||
return {"success": False, "message": "用户名和密码不能为空"}
|
||
|
||
password = md5_encrypt(password)
|
||
select_user_sql: str = "SELECT person_id, person_name, identity_id, login_name, xb, bureau_id, org_id, pwdmd5 FROM t_sys_loginperson WHERE login_name = '" + username + "' AND b_use = 1"
|
||
userlist = await find_by_sql(select_user_sql,())
|
||
user = userlist[0] if userlist else None
|
||
logging.info(f"查询结果: {user}")
|
||
if user and user['pwdmd5'] == password: # 验证的cas用户密码,md5加密的版本
|
||
token = create_access_token({"user_id": user['person_id'], "identity_id": user['identity_id']})
|
||
CookieUtil.set_cookie(
|
||
res=response,
|
||
key="auth_token",
|
||
value=token,
|
||
secure=False, # 在开发环境中,确保 secure=False
|
||
httponly=False,
|
||
max_age=3600, # 设置cookie的有效时间为1小时
|
||
path="/" # 设置cookie的路径
|
||
)
|
||
logging.info(f"Token 已成功设置到 cookie: {token}")
|
||
user.pop('pwdmd5', None) # 移除密码字段
|
||
return {"success": True, "message": "登录成功", "data": {"token": token, "user_data": user}}
|
||
else:
|
||
return {"success": False, "message": "用户名或密码错误"}
|
||
|
||
|
||
# 【Base-Login-3】通过手机号获取Person的ID
|
||
@router.get("/getPersonIdByTelephone")
|
||
async def get_person_id_by_telephone(request: Request):
|
||
telephone = await get_request_str_param(request, "telephone", True, True)
|
||
if not telephone:
|
||
return {"success": False, "message": "手机号不能为空"}
|
||
select_user_sql: str = "SELECT person_id FROM t_sys_loginperson WHERE telephone = '" + telephone + "' and b_use = 1 "
|
||
userlist = await find_by_sql(select_user_sql,())
|
||
user = userlist[0] if userlist else None
|
||
if user:
|
||
return {"success": True, "message": "查询成功", "data": {"person_id": user['person_id']}}
|
||
else:
|
||
return {"success": False, "message": "未查询到相关信息"}
|
||
|
||
|
||
|
||
# 【Base-Login-4】忘记密码重设,不登录的状态
|
||
@router.post("/resetPassword")
|
||
async def reset_password(request: Request):
|
||
person_id = await get_request_str_param(request, "person_id", True, True)
|
||
password = await get_request_str_param(request, "password", True, True)
|
||
if not person_id or not password:
|
||
return {"success": False, "message": "用户ID和新密码不能为空"}
|
||
password_md5 = md5_encrypt(password)
|
||
update_user_sql: str = "UPDATE t_sys_loginperson SET original_pwd = '" + password + "', pwdmd5 = '" + password_md5 + "' WHERE person_id = '" + person_id + "'"
|
||
await execute_sql(update_user_sql, ())
|
||
return {"success": True, "message": "密码修改成功"} |