# routes/LoginController.py import base64 import os import json import random import string import jwt from captcha.image import ImageCaptcha from fastapi import APIRouter, Request, Response, status, HTTPException from utils.CommonUtil import * from utils.CookieUtil import * from utils.Database import * from utils.JwtUtil import * from utils.ParseRequest import * from Config.Config import * # 创建一个路由实例 router = APIRouter() # 获取项目根目录 project_root = os.path.dirname(os.path.abspath(__file__)) # 配置验证码 image = ImageCaptcha( width=100, height=30, # 增加宽度和高度 font_sizes=[26], # 增加字体大小 fonts=[os.path.join(project_root, 'DejaVuSans-Bold.ttf')] # 设置自定义字体路径 ) @router.get("/getCaptcha") def get_captcha(): captcha_text = ''.join(random.choices(string.digits, k=4)) # 生成4个数字的验证码 session_id = os.urandom(16).hex() # 将验证码文本存储在session中 session_data = {session_id: captcha_text} with open("./session/captcha_sessions.json", "a") as session_file: json.dump(session_data, session_file) session_file.write("\n") # 生成验证码图片 data = image.generate(captcha_text) captcha_image_base64 = base64.b64encode(data.read()).decode() return {"image": captcha_image_base64, "session_id": session_id} # 验证用户并生成JWT令牌的接口 @router.post("/validateCaptcha") async def validate_captcha(session_id : str, captcha : str): try: with open("./session/captcha_sessions.json", "r") as session_file: sessions = {} for line in session_file: sessions.update(json.loads(line)) except json.JSONDecodeError: sessions = {} correct_captcha_text = sessions.get(session_id) if not correct_captcha_text or correct_captcha_text.lower() != captcha.lower(): return {"success": False, "message": "验证码错误"} return {"success": True, "message": "验证码正确"} # 获取cookie中的token的方法 @router.get("/getToken") async def get_token(request: Request): token = CookieUtil.get_cookie(request, key="auth_token") print(token) if token: try: decoded_token = jwt.decode(token, JWT_SECRET_KEY, algorithms=['HS256']) logging.info(f"Token 解码成功: {decoded_token}") return {"success": True, "message": "Token 验证成功", "token_data": decoded_token} except jwt.ExpiredSignatureError: logging.error("Token 过期") return {"success": False, "message": "Token 过期"} except jwt.InvalidTokenError: logging.error("无效的 Token") return {"success": False, "message": "无效的 Token"} else: logging.error("未找到 Token") return {"success": False, "message": "未找到 Token"} # 登出 @router.get("/logout") async def logout(request: Request, response: Response): token = CookieUtil.get_cookie(request, key="auth_token") if token: CookieUtil.remove_cookie(response, key="auth_token", path="/" ) logging.info(f"Token cookie: {token}") return {"success": True, "message": "账号已登出!"} else: logging.error(" Token") return {"success": True, "message": "未找到有效Token,账号已登出!"} # 验证用户并生成JWT令牌的接口 @router.post("/login") async def login(request: Request, response: Response): username = await get_request_str_param(request, "username", True, True) password = await get_request_str_param(request, "password", True, True) if not username or not password: return {"success": False, "message": "用户名和密码不能为空"} password = md5_encrypt(password) select_user_sql: str = "SELECT person_id, person_name, identity_id, login_name, xb, bureau_id, org_id, pwdmd5 FROM t_sys_loginperson WHERE login_name = '" + username + "' AND b_use = 1" userlist = await find_by_sql(select_user_sql,()) user = userlist[0] if userlist else None logging.info(f"查询结果: {user}") if user and user['pwdmd5'] == password: # 验证的cas用户密码,md5加密的版本 token = create_access_token({"user_id": user['person_id'], "identity_id": user['identity_id']}) CookieUtil.set_cookie( res=response, key="auth_token", value=token, secure=False, # 在开发环境中,确保 secure=False httponly=False, max_age=3600, # 设置cookie的有效时间为1小时 path="/" # 设置cookie的路径 ) logging.info(f"Token 已成功设置到 cookie: {token}") user.pop('pwdmd5', None) # 移除密码字段 return {"success": True, "message": "登录成功", "data": {"token": token, "user_data": user}} else: return {"success": False, "message": "用户名或密码错误"}