topfans/docs/superpowers/plans/2026-06-17-plan-b-admin-fastapi-vue.md
2026-06-22 17:19:48 +08:00

17 KiB
Raw Blame History

Plan B: TopFans-activity-admin 后台 FastAPI + Vue3

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

父计划: docs/superpowers/plans/2026-06-17-moderation-report-feedback-system.md

Goal: 在 TopFans-activity-admin 仓库完成审核员后台的 FastAPI 后端 + Vue3 前端

Architecture:

  • FastAPI 路由层 /api/admin/moderation/*不依赖 topfans moderationService RPC
  • 通过共享 PostgreSQL top-fans 库直读写所有 moderation_* 表
  • 复用现有 verify_token 中间件
  • 业务下架/封禁:事务内双重写入(业务表软删除 + moderation_target_status UPSERT
  • 错误码 50008 vs 50009 vs 50014 在 handler 层区分

Tech Stack: Python 3.11+ / FastAPI / SQLAlchemy / Pydantic / pytest / Vue3 + Element Plus

仓库: /Users/liulujian/Documents/code/TopFans-activity-admin

前置: Plan A 阶段 1 的 9 张表已迁移到 top-fans


阶段 B.1:模型 + Schema + CRUD

Task B.1.1: 创建 ORM 模型

Files:

  • Create: backend/models/moderation.py

按 spec §3 表结构写 9 个 SQLAlchemy ORMReportCategory / FeedbackCategory / Report / ReportEvidence / Feedback / FeedbackEvidence / ModerationAction / ModerationTargetStatus / AdminAuditLog

from sqlalchemy import Column, BigInteger, String, Integer, Boolean, DateTime, Text, ForeignKey, JSON
from sqlalchemy.dialects.postgresql import JSONB
from .base import Base

class Report(Base):
    __tablename__ = "reports"
    id = Column(BigInteger, primary_key=True)
    reporter_id = Column(BigInteger, nullable=False)
    star_id = Column(BigInteger)
    target_type = Column(String(30), nullable=False)
    target_id = Column(BigInteger, nullable=False)
    target_snapshot = Column(JSONB, nullable=False)
    triggered_auto_hide = Column(Boolean, default=False, nullable=False)
    category_code = Column(String(50), ForeignKey("report_categories.code", ondelete="RESTRICT", onupdate="CASCADE"), nullable=False)
    description = Column(String(500))
    is_anonymous = Column(Boolean, default=False, nullable=False)
    status = Column(String(20), default="pending", nullable=False)
    is_auto_hidden = Column(Boolean, default=False, nullable=False)
    claimed_by = Column(BigInteger)
    claimed_at = Column(BigInteger)
    resolved_action = Column(String(20))
    resolved_by = Column(BigInteger)
    resolved_at = Column(BigInteger)
    resolution_note = Column(String(500))
    created_at = Column(BigInteger, nullable=False)
    updated_at = Column(BigInteger, nullable=False)

同模式ReportCategory / FeedbackCategory / ReportEvidence / Feedback / FeedbackEvidence / ModerationAction / ModerationTargetStatus / AdminAuditLog

Task B.1.2: 创建 Pydantic Schema

Files:

  • Create: backend/schemas/moderation.py
from pydantic import BaseModel, Field
from typing import Optional, List

class ReportListItem(BaseModel):
    id: int
    target_type: str
    target_id: int
    category_code: str
    status: str
    reporter_id: int
    created_at: int
    resolved_at: Optional[int] = None
    resolved_action: Optional[str] = None

class ReportDetail(BaseModel):
    report: ReportListItem
    description: Optional[str]
    target_snapshot: dict
    evidence: List[dict]
    actions: List[dict]
    claimed_by: Optional[int]
    claimed_at: Optional[int]

class ClaimRequest(BaseModel):
    pass

class ActionRequest(BaseModel):
    action: str = Field(..., regex="^(takedown|ban|warn|dismiss|restore)$")
    note: str = Field(..., max_length=500)
    send_notification: bool = True
    restore: Optional[bool] = None  # 仅 dismiss path C 用

class BulkDismissRequest(BaseModel):
    report_ids: List[int] = Field(..., min_items=1, max_items=500)
    reason: str = Field(..., max_length=200)

class ForceReleaseRequest(BaseModel):
    reason: str = Field(..., max_length=200)

Task B.1.3: 创建 CRUD 函数

Files:

  • Create: backend/crud/moderation_crud.py

关键函数(按 spec §5.2 端点):

async def list_reports(db: AsyncSession, *, status: Optional[str], category: Optional[str],
                       target_type: Optional[str], keyword: Optional[str],
                       page: int = 1, page_size: int = 20) -> Tuple[List[Report], int]:
    q = select(Report)
    if status: q = q.where(Report.status == status)
    if category: q = q.where(Report.category_code == category)
    if target_type: q = q.where(Report.target_type == target_type)
    if keyword: q = q.where(or_(
        Report.description.ilike(f"%{keyword}%"),
        Report.target_id == int(keyword) if keyword.isdigit() else False,
    ))
    total = await db.scalar(select(func.count()).select_from(q.subquery()))
    rows = (await db.execute(q.order_by(Report.created_at.desc()).offset((page-1)*page_size).limit(page_size))).scalars().all()
    return rows, total

async def claim_report(db: AsyncSession, *, report_id: int, admin_id: int, now: int) -> int:
    """返回 affected rows0/1"""
    res = await db.execute(
        update(Report)
        .where(Report.id == report_id, Report.status.in_(["pending", "auto_hidden"]))
        .values(status="reviewing", claimed_by=admin_id, claimed_at=now, updated_at=now)
    )
    await db.commit()
    return res.rowcount

async def execute_action(db: AsyncSession, *, report_id: int, action: str, admin_id: int,
                         note: str, now: int, send_notification: bool, restore: Optional[bool] = None) -> Dict:
    """事务内三重写入:业务表软删除/恢复 + mts UPSERT + reports UPDATE
       返回:{affected: int, target_type, target_id, action_log_id}"""
    # 1. SELECT report 取 target_type/target_id/origin_status
    report = await db.get(Report, report_id)
    if report.status != "reviewing" or report.claimed_by != admin_id:
        return {"affected": 0, "error": "not_claimed"}  # handler 转 50008/50009

    target_type = report.target_type
    target_id = report.target_id
    origin_is_auto_hidden = report.is_auto_hidden

    # 2. 业务表软删除/恢复
    affected = 0
    if action == "takedown":
        if target_type == "asset":
            res = await db.execute(update(Asset).where(Asset.id == target_id, Asset.is_active == True)
                                    .values(is_active=False, deleted_at=now))
        else:
            res = await db.execute(update(User).where(User.id == target_id, User.is_active == True)
                                    .values(is_active=False, deleted_at=now))
        affected = res.rowcount
    elif action == "ban":
        # 仅 user_profile
        res = await db.execute(update(User).where(User.id == target_id, User.is_active == True)
                                .values(is_active=False, deleted_at=now))
        affected = res.rowcount
    elif action == "warn":
        # 不软删除 user仅更新 mts
        affected = 0  # warn 不影响业务表
    elif action == "dismiss":
        # path C: dismiss 且曾 auto_hidden 且 restore=True → 恢复业务表
        if origin_is_auto_hidden and restore:
            if target_type == "asset":
                res = await db.execute(update(Asset).where(Asset.id == target_id, Asset.is_active == False)
                                        .values(is_active=True, deleted_at=None))
            else:
                res = await db.execute(update(User).where(User.id == target_id, User.is_active == False)
                                        .values(is_active=True, deleted_at=None))
            affected = res.rowcount
    elif action == "restore":
        # v2.2 B6 + v2.5 W6: ban/takedown 误判恢复;用 CTE 条件写 mts
        # 仅当 user_unbanned=1 时写 mts last_action_type='restore'
        if target_type == "user_profile":
            res = await db.execute(update(User).where(User.id == target_id, User.is_active == False)
                                    .values(is_active=True, deleted_at=None).returning(User.id))
            unbanned = res.fetchone()
            if unbanned:
                # UPSERT mts with last_action_type='restore'
                stmt = pg_insert(ModerationTargetStatus).values(
                    target_type=target_type, target_id=target_id, last_action_type="restore",
                    source="admin", operator_admin_id=admin_id, reason=f"unban: {note}",
                    created_at=now, updated_at=now,
                ).on_conflict_do_update(
                    index_elements=["target_type", "target_id"],
                    set_=dict(last_action_type="restore", source="admin",
                              operator_admin_id=admin_id, reason=f"unban: {note}", updated_at=now)
                )
                await db.execute(stmt)
        # 同步更新 reports.resolved_action='restore'
        await db.execute(update(Report).where(Report.id == report_id)
                        .values(resolved_action="restore", resolution_note=f"unban: {note}",
                                resolved_by=admin_id, resolved_at=now, updated_at=now))

    # 3. UPSERT mts
    last_action_type = action if action != "dismiss" else ("restore" if (origin_is_auto_hidden and restore) else "dismiss")
    mts = ModerationTargetStatus(
        target_type=target_type, target_id=target_id,
        last_action_type=last_action_type, source="admin",
        operator_admin_id=admin_id, reason=note,
        created_at=now, updated_at=now,
    )
    stmt = pg_insert(ModerationTargetStatus).values(**mts.__dict__).on_conflict_do_update(
        index_elements=["target_type", "target_id"],
        set_=dict(last_action_type=last_action_type, source="admin",
                  operator_admin_id=admin_id, reason=note, updated_at=now,
                  is_warned=case(... if action == "warn" else ...),
                  warn_count=case(... if action == "warn" else ModerationTargetStatus.warn_count + 0),
                  last_warned_at=case(... if action == "warn" else ...))
    )
    await db.execute(stmt)

    # 4. UPDATE reports.status = 'resolved' or 'dismissed'
    new_status = "resolved" if action in ["takedown", "ban", "warn", "restore"] else "dismissed"
    await db.execute(update(Report).where(Report.id == report_id)
                    .values(status=new_status, resolved_action=action, resolved_by=admin_id,
                            resolved_at=now, claimed_by=None, claimed_at=None, resolution_note=note, updated_at=now))

    # 5. 写 moderation_actions
    await db.execute(insert(ModerationAction).values(
        report_id=report_id, admin_id=admin_id, action_type=action,
        target_type=target_type, target_id=target_id, note=note,
        success=True, created_at=now,
    ))

    await db.commit()

    # 6. 通知(异步,失败不影响主流程)
    if send_notification:
        try:
            await send_notification_internal(target_type, target_id, action, note)
        except Exception as e:
            logger.warning(f"notification failed: {e}")

    return {"affected": affected, "target_type": target_type, "target_id": target_id}

注意:

  • warn 流程UPSERT mts SET is_warned=true, warn_count=warn_count+1, last_warned_at=now
  • dismiss path D不恢复业务表mts.last_action_type='dismiss' 覆盖原 autohide
  • dismiss path C:恢复业务表 + mts.last_action_type='restore' + 批量清理同 target 老 auto_hidden 工单

Task B.1.4: 写其余 CRUD 函数

  • release_report / dismiss_path_a (pending → dismissed) / bulk_dismiss / force_release / withdraw_report / clear_warn / list_categories / upsert_category / delete_category / get_stats

阶段 B.2FastAPI Handler

Task B.2.1: 写 handlers

Files:

  • Create: backend/handlers/moderation_admin.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession

from .verify_token import verify_token
from ..crud import moderation_crud
from ..schemas.moderation import *

router = APIRouter(prefix="/api/admin/moderation", tags=["moderation"])

@router.get("/reports")
async def list_reports(
    status: Optional[str] = None,
    category: Optional[str] = None,
    target_type: Optional[str] = None,
    keyword: Optional[str] = None,
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    db: AsyncSession = Depends(get_db),
    admin: dict = Depends(verify_token),
):
    rows, total = await moderation_crud.list_reports(db, status=status, category=category,
                                                      target_type=target_type, keyword=keyword,
                                                      page=page, page_size=page_size)
    return {"code": 200, "data": {"reports": [ReportListItem.from_orm(r).dict() for r in rows], "total": total}}

@router.post("/reports/{report_id}/claim")
async def claim_report(
    report_id: int,
    db: AsyncSession = Depends(get_db),
    admin: dict = Depends(verify_token),
):
    affected = await moderation_crud.claim_report(db, report_id=report_id, admin_id=admin["admin_id"], now=now_ms())
    if affected == 0:
        # handler 区分 50007/50008/50009
        report = await db.get(Report, report_id)
        if report is None:
            raise HTTPException(404, detail={"code": 50007, "message": "工单不存在"})
        if report.status == "reviewing" and report.claimed_by != admin["admin_id"]:
            raise HTTPException(409, detail={
                "code": 50008, "message": "工单已被他人认领",
                "claimed_by": report.claimed_by, "claimed_at": report.claimed_at,
            })
        if report.status in ["resolved", "dismissed"]:
            raise HTTPException(400, detail={"code": 50009, "message": "工单已结案"})
        # pending 状态但 claim 失败(异常情况)
        raise HTTPException(409, detail={"code": 50014, "message": "工单已被释放或结案,请刷新"})
    return {"code": 200, "data": {"claimed": True}}

@router.post("/reports/{report_id}/actions")
async def execute_action(
    report_id: int,
    payload: ActionRequest,
    db: AsyncSession = Depends(get_db),
    admin: dict = Depends(verify_token),
):
    result = await moderation_crud.execute_action(
        db, report_id=report_id, action=payload.action,
        admin_id=admin["admin_id"], note=payload.note,
        now=now_ms(), send_notification=payload.send_notification,
        restore=payload.restore,
    )
    if "error" in result:
        raise HTTPException(409, detail={"code": 50008, "message": "工单已被他人认领或结案"})
    return {"code": 200, "data": result}

Task B.2.2: 注册路由

Files:

  • Modify: backend/router/__init__.py
from .moderation_admin import router as moderation_admin_router
admin_router.include_router(moderation_admin_router)

Task B.2.3: 启动 + curl 验证

cd /Users/liulujian/Documents/code/TopFans-activity-admin
uvicorn backend.main:app --reload
curl http://localhost:8000/api/admin/moderation/reports -H "Authorization: Bearer $ADMIN_TOKEN"

阶段 B.3:单元测试

Task B.3.1: 写测试

Files:

  • Create: backend/handlers/test_moderation_admin.py

关键测试

  • 列表/详情/认领 happy path
  • dismiss 4 路径分支
  • 50008 vs 50009 vs 50014 区分
  • 鉴权(无 token → 401
  • bulk_dismiss / force_release / clear_warn

阶段 B.4Vue3 前端

Task B.4.1: API 封装

Files:

  • Create: frontend/src/api/moderation.js

按 spec §5.2 端点封装。

Task B.4.2: 举报工单列表

Files:

  • Create: frontend/src/views/moderation/ReportList.vue

Element Plus 组件:

  • el-form 筛选栏status select / category select / target_type select / keyword input
  • el-table 列表id/target_type/target_id/category/status[el-tag]/created_at
  • el-pagination 分页
  • 详情按钮 → 跳转 /moderation/reports/:id

Task B.4.3: 举报详情

Files:

  • Create: frontend/src/views/moderation/ReportDetail.vue

  • el-descriptions 信息卡片target_snapshot JSON 格式化展示)

  • el-image preview 证据图

  • el-timeline 流水

  • 动作按钮组(按 status 显隐):

    • pending / auto_hidden认领
    • reviewing本人下架 / 封禁 / 警告 / 驳回path B/C/D 选项)/ 释放
  • el-dialog 输入 action/note/send_notification/restore

Task B.4.4: 反馈列表/详情

按举报同模式。

Task B.4.5: 分类 CRUD 页面

  • ReportCategoryConfig.vue / FeedbackCategoryConfig.vue
  • el-table 列出 + el-dialog 新增/编辑 + 删除确认

Task B.4.6: 路由 + 菜单

Files:

  • Modify: frontend/src/router/index.js
  • Modify: frontend/src/layout/Layout.vue

注册 6 个路由 + 菜单项。


自审检查清单Plan B

  • FastAPI /api/admin/moderation/* 路由全部注册
  • 错误码 50007/50008/50009/50014 在 handler 区分正确
  • 事务内三重写入(业务表 + mts + reports原子性
  • dismiss 4 路径全覆盖
  • warn 流程不软删除 user仅更新 mts
  • restore 流程用 CTE 条件写 mtsv2.5 W6
  • Vue3 路由 + 菜单注册
  • pytest 全部 PASS