deep-risk/backend/app/api/v1/auth.py
2025-12-14 20:08:27 +08:00

113 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
认证相关API路由
"""
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_async_session
from app.models.user import User
from app.utils.helpers import create_access_token, create_refresh_token, verify_token
router = APIRouter()
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_async_session),
request: Request = None
):
"""
用户登录
"""
logger.info(f"Login attempt for user: {form_data.username}")
# 查询用户
user = await User.get_by_username(db, form_data.username)
if not user:
logger.warning(f"User not found: {form_data.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 检查用户状态
if not user.status:
logger.warning(f"User inactive: {form_data.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User is inactive",
headers={"WWW-Authenticate": "Bearer"},
)
# 验证密码
is_valid = await user.check_password(form_data.password)
if not is_valid:
logger.warning(f"Invalid password for user: {form_data.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 生成token使用用户ID作为subject
access_token_expires = timedelta(minutes=settings.JWT_EXPIRE_MINUTES)
access_token = create_access_token(
subject=str(user.id), expires_delta=access_token_expires
)
refresh_token = create_refresh_token(subject=str(user.id))
logger.info(f"User logged in successfully: {form_data.username}")
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": settings.JWT_EXPIRE_MINUTES * 60,
"refresh_token": refresh_token,
"user_info": {
"id": user.id,
"username": user.username,
"nickname": user.nickname,
"email": user.email,
"entity_id": user.entity_id,
"entity_type": user.entity_type,
}
}
@router.post("/refresh")
async def refresh_token(refresh_token: str):
"""
刷新访问令牌
"""
username = verify_token(refresh_token, "refresh")
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
access_token_expires = timedelta(minutes=settings.JWT_EXPIRE_MINUTES)
access_token = create_access_token(
subject=username, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/logout")
async def logout():
"""
用户退出登录
"""
return {"message": "Successfully logged out"}