""" 成本费用凭证API路由 """ from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select, func, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession from loguru import logger from app.database import get_async_session from app.models.expense import Expense from app.schemas.expense import ( ExpenseCreate, ExpenseUpdate, ExpenseResponse, ExpenseListResponse, ) router = APIRouter() @router.get("", response_model=ExpenseListResponse) async def list_expenses( page: int = Query(1, ge=1, description="页码"), size: int = Query(10, ge=1, le=100, description="每页数量"), expense_id: str = Query(None, description="费用ID"), expense_type: str = Query(None, description="费用类型"), payee_name: str = Query(None, description="收款方名称"), payment_status: str = Query(None, description="支付状态"), db: AsyncSession = Depends(get_async_session), ): """ 获取成本费用凭证列表(分页查询) """ logger.info(f"获取成本费用凭证列表: page={page}, size={size}") # 构建查询 query = select(Expense) # 添加过滤条件 conditions = [] if expense_id: conditions.append(Expense.expense_id.ilike(f"%{expense_id}%")) if expense_type: conditions.append(Expense.expense_type == expense_type) if payee_name: conditions.append(Expense.payee_name.ilike(f"%{payee_name}%")) if payment_status: conditions.append(Expense.payment_status == payment_status) if conditions: query = query.where(and_(*conditions)) # 获取总数 count_query = select(func.count()).select_from(Expense) if conditions: count_query = count_query.where(and_(*conditions)) total_result = await db.execute(count_query) total = total_result.scalar() # 分页 query = query.offset((page - 1) * size).limit(size) # 执行查询 result = await db.execute(query) records = result.scalars().all() # 转换为响应格式 response_records = [] for expense in records: response_records.append({ "id": expense.id, "expense_id": expense.expense_id, "voucher_no": expense.voucher_no, "expense_type": expense.expense_type, "expense_category": expense.expense_category, "payer_name": expense.payer_name, "payer_account_no": expense.payer_account_no, "payee_name": expense.payee_name, "payee_account_no": expense.payee_account_no, "payee_bank_name": expense.payee_bank_name, "expense_date": expense.expense_date.isoformat() if expense.expense_date else None, "expense_amount": expense.expense_amount, "tax_amount": expense.tax_amount, "tax_rate": expense.tax_rate, "payment_method": expense.payment_method, "payment_status": expense.payment_status, "accounting_status": expense.accounting_status, "fiscal_year": expense.fiscal_year, "fiscal_period": expense.fiscal_period, "is_large_amount": expense.is_large_amount, "is_cross_border": expense.is_cross_border, "expense_description": expense.expense_description, "related_contract_id": expense.related_contract_id, }) return ExpenseListResponse( records=response_records, total=total, page=page, size=size, ) @router.get("/{expense_id}", response_model=ExpenseResponse) async def get_expense(expense_id: str, db: AsyncSession = Depends(get_async_session)): """ 根据ID获取成本费用凭证详细信息 """ logger.info(f"获取成本费用凭证详情: {expense_id}") query = select(Expense).where(Expense.expense_id == expense_id) result = await db.execute(query) expense = result.scalar_one_or_none() if not expense: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"成本费用凭证不存在: {expense_id}", ) # 转换为响应格式 response = { "id": expense.id, "expense_id": expense.expense_id, "voucher_no": expense.voucher_no, "expense_type": expense.expense_type, "expense_category": expense.expense_category, "payer_name": expense.payer_name, "payer_account_no": expense.payer_account_no, "payee_name": expense.payee_name, "payee_account_no": expense.payee_account_no, "payee_bank_name": expense.payee_bank_name, "expense_date": expense.expense_date.isoformat() if expense.expense_date else None, "expense_amount": expense.expense_amount, "tax_amount": expense.tax_amount, "tax_rate": expense.tax_rate, "payment_method": expense.payment_method, "payment_status": expense.payment_status, "accounting_status": expense.accounting_status, "fiscal_year": expense.fiscal_year, "fiscal_period": expense.fiscal_period, "is_large_amount": expense.is_large_amount, "is_cross_border": expense.is_cross_border, "expense_description": expense.expense_description, "related_contract_id": expense.related_contract_id, } return response @router.post("", response_model=ExpenseResponse, status_code=status.HTTP_201_CREATED) async def create_expense(expense: ExpenseCreate, db: AsyncSession = Depends(get_async_session)): """ 创建新的成本费用凭证 """ logger.info(f"创建成本费用凭证: {expense.voucher_no}") # 生成费用ID query = select(func.count()).select_from(Expense) result = await db.execute(query) count = result.scalar() expense_id = f"EXP{2024}{(count + 1):06d}" # 创建新费用 new_expense = Expense( expense_id=expense_id, voucher_no=expense.voucher_no, expense_type=expense.expense_type, expense_category=expense.expense_category, payer_name=expense.payer_name, payer_account_no=expense.payer_account_no, payee_name=expense.payee_name, payee_account_no=expense.payee_account_no, payee_bank_name=expense.payee_bank_name, expense_date=expense.expense_date, expense_amount=expense.expense_amount, tax_amount=expense.tax_amount, tax_rate=expense.tax_rate, payment_method=expense.payment_method, payment_status=expense.payment_status, accounting_status=expense.accounting_status, fiscal_year=expense.fiscal_year, fiscal_period=expense.fiscal_period, is_large_amount=expense.is_large_amount, is_cross_border=expense.is_cross_border, expense_description=expense.expense_description, related_contract_id=expense.related_contract_id, ) db.add(new_expense) await db.commit() await db.refresh(new_expense) # 转换为响应格式 response = { "id": new_expense.id, "expense_id": new_expense.expense_id, "voucher_no": new_expense.voucher_no, "expense_type": new_expense.expense_type, "expense_category": new_expense.expense_category, "payer_name": new_expense.payer_name, "payer_account_no": new_expense.payer_account_no, "payee_name": new_expense.payee_name, "payee_account_no": new_expense.payee_account_no, "payee_bank_name": new_expense.payee_bank_name, "expense_date": new_expense.expense_date.isoformat() if new_expense.expense_date else None, "expense_amount": new_expense.expense_amount, "tax_amount": new_expense.tax_amount, "tax_rate": new_expense.tax_rate, "payment_method": new_expense.payment_method, "payment_status": new_expense.payment_status, "accounting_status": new_expense.accounting_status, "fiscal_year": new_expense.fiscal_year, "fiscal_period": new_expense.fiscal_period, "is_large_amount": new_expense.is_large_amount, "is_cross_border": new_expense.is_cross_border, "expense_description": new_expense.expense_description, "related_contract_id": new_expense.related_contract_id, } return response @router.put("/{expense_id}", response_model=ExpenseResponse) async def update_expense( expense_id: str, expense_update: ExpenseUpdate, db: AsyncSession = Depends(get_async_session), ): """ 更新成本费用凭证 """ logger.info(f"更新成本费用凭证: {expense_id}") query = select(Expense).where(Expense.expense_id == expense_id) result = await db.execute(query) expense = result.scalar_one_or_none() if not expense: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"成本费用凭证不存在: {expense_id}", ) # 更新字段 update_data = expense_update.model_dump(exclude_unset=True) for key, value in update_data.items(): if hasattr(expense, key): setattr(expense, key, value) await db.commit() await db.refresh(expense) # 转换为响应格式 response = { "id": expense.id, "expense_id": expense.expense_id, "voucher_no": expense.voucher_no, "expense_type": expense.expense_type, "expense_category": expense.expense_category, "payer_name": expense.payer_name, "payer_account_no": expense.payer_account_no, "payee_name": expense.payee_name, "payee_account_no": expense.payee_account_no, "payee_bank_name": expense.payee_bank_name, "expense_date": expense.expense_date.isoformat() if expense.expense_date else None, "expense_amount": expense.expense_amount, "tax_amount": expense.tax_amount, "tax_rate": expense.tax_rate, "payment_method": expense.payment_method, "payment_status": expense.payment_status, "accounting_status": expense.accounting_status, "fiscal_year": expense.fiscal_year, "fiscal_period": expense.fiscal_period, "is_large_amount": expense.is_large_amount, "is_cross_border": expense.is_cross_border, "expense_description": expense.expense_description, "related_contract_id": expense.related_contract_id, } return response @router.delete("/{expense_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_expense(expense_id: str, db: AsyncSession = Depends(get_async_session)): """ 删除成本费用凭证(软删除) """ logger.info(f"删除成本费用凭证: {expense_id}") query = select(Expense).where(Expense.expense_id == expense_id) result = await db.execute(query) expense = result.scalar_one_or_none() if not expense: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"成本费用凭证不存在: {expense_id}", ) # 软删除 - 设置为已冲销状态 expense.accounting_status = "reversed" await db.commit() return None