deep-risk/backend/app/models/risk_detection.py
2025-12-14 20:08:27 +08:00

221 lines
7.5 KiB
Python

"""
风险检测相关数据模型
"""
from sqlalchemy import (
Column,
String,
DateTime,
Text,
Boolean,
Integer,
ForeignKey,
Float,
Enum as SqlEnum,
JSON,
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.database import Base
import enum
class RiskLevel(str, enum.Enum):
"""风险等级枚举"""
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
NONE = "NONE"
UNKNOWN = "UNKNOWN"
class RuleStatus(str, enum.Enum):
"""规则状态枚举"""
ACTIVE = "active"
INACTIVE = "inactive"
DRAFT = "draft"
class TaskType(str, enum.Enum):
"""任务类型枚举"""
PERIODIC = "periodic"
ON_DEMAND = "on_demand"
BATCH = "batch"
class TaskStatus(str, enum.Enum):
"""任务状态枚举"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class DetectionRule(Base):
"""风险检测规则表"""
__tablename__ = "risk_detection_rules"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
rule_id = Column(String(64), unique=True, index=True, nullable=False)
rule_name = Column(String(128), nullable=False, comment="规则名称")
algorithm_code = Column(String(64), nullable=False, comment="算法编码")
description = Column(Text, comment="规则描述")
is_enabled = Column(Boolean, default=True, comment="是否启用")
parameters = Column(JSON, comment="算法参数")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# 关联关系
executions = relationship("RuleExecution", back_populates="rule")
results = relationship("DetectionResult", back_populates="rule")
class DetectionTask(Base):
"""检测任务表"""
__tablename__ = "risk_detection_tasks"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
task_id = Column(String(64), unique=True, index=True, nullable=False)
task_name = Column(String(128), nullable=False, comment="任务名称")
task_type = Column(SqlEnum(TaskType), nullable=False)
status = Column(SqlEnum(TaskStatus), default=TaskStatus.PENDING)
# 实体信息
entity_type = Column(String(32), nullable=False, comment="实体类型")
period = Column(String(16), nullable=False, comment="检测期间")
# 执行统计
total_entities = Column(Integer, default=0, comment="总实体数")
processed_entities = Column(Integer, default=0, comment="已处理实体数")
result_count = Column(Integer, default=0, comment="结果数量")
# 汇总信息
summary = Column(JSON, comment="汇总信息")
retry_count = Column(Integer, default=0, comment="重试次数")
# 参数
parameters = Column(JSON, comment="任务参数")
# 时间信息
started_at = Column(DateTime(timezone=True), comment="任务开始时间")
completed_at = Column(DateTime(timezone=True), comment="任务结束时间")
# 错误信息
error_message = Column(Text, comment="错误信息")
created_at = Column(DateTime(timezone=True), server_default=func.now())
# 关联关系
executions = relationship("TaskExecution", back_populates="task")
rule_executions = relationship("RuleExecution", back_populates="task")
results = relationship("DetectionResult", back_populates="task")
class RuleExecution(Base):
"""规则执行记录表"""
__tablename__ = "risk_rule_executions"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
execution_id = Column(String(64), unique=True, index=True, nullable=False)
task_id = Column(String(64), ForeignKey("risk_detection_tasks.task_id"), index=True)
rule_id = Column(String(64), ForeignKey("risk_detection_rules.rule_id"), index=True)
# 实体信息
entity_id = Column(String(64), nullable=False, comment="实体ID")
entity_type = Column(String(32), nullable=False, comment="实体类型")
# 风险信息
risk_level = Column(SqlEnum(RiskLevel), nullable=False)
risk_score = Column(Float, default=0.0, comment="风险评分")
description = Column(Text, comment="风险描述")
suggestion = Column(Text, comment="处理建议")
risk_data = Column(JSON, comment="风险数据")
# 执行信息
status = Column(String(32), default="completed", comment="执行状态")
# 时间信息
started_at = Column(DateTime(timezone=True), nullable=False)
completed_at = Column(DateTime(timezone=True), comment="完成时间")
# 错误信息
error_message = Column(Text, comment="错误信息")
created_at = Column(DateTime(timezone=True), server_default=func.now())
# 关联关系
task = relationship("DetectionTask", back_populates="rule_executions")
rule = relationship("DetectionRule", back_populates="executions")
class TaskExecution(Base):
"""任务执行记录表"""
__tablename__ = "risk_task_executions"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
execution_id = Column(String(64), unique=True, index=True, nullable=False)
task_id = Column(String(64), ForeignKey("risk_detection_tasks.task_id"), index=True)
# 实体信息
entity_id = Column(String(64), nullable=False, comment="实体ID")
entity_type = Column(String(32), nullable=False, comment="实体类型")
period = Column(String(16), nullable=False, comment="检测期间")
# 规则信息
rule_ids = Column(String(255), comment="关联规则ID列表")
# 参数
parameters = Column(JSON, comment="执行参数")
# 执行信息
status = Column(String(32), default="pending", comment="执行状态")
result_count = Column(Integer, default=0, comment="结果数量")
# 时间信息
started_at = Column(DateTime(timezone=True), comment="开始时间")
completed_at = Column(DateTime(timezone=True), comment="结束时间")
# 错误信息
error_message = Column(Text, comment="错误信息")
created_at = Column(DateTime(timezone=True), server_default=func.now())
# 关联关系
task = relationship("DetectionTask", back_populates="executions")
class DetectionResult(Base):
"""检测结果表"""
__tablename__ = "risk_detection_results"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
task_id = Column(String(64), ForeignKey("risk_detection_tasks.task_id"), index=True)
rule_id = Column(String(64), ForeignKey("risk_detection_rules.rule_id"), index=True)
entity_id = Column(String(64), nullable=False, comment="实体ID")
entity_type = Column(String(32), nullable=False, comment="实体类型")
# 风险信息
risk_level = Column(SqlEnum(RiskLevel), nullable=False)
risk_score = Column(Float, default=0.0, comment="风险评分(0-100)")
risk_category = Column(String(64), comment="风险分类")
# 风险详情
description = Column(Text, comment="风险描述")
suggestion = Column(Text, comment="处理建议")
risk_data = Column(JSON, comment="风险数据")
# 证据链
evidence = Column(JSON, comment="证据信息")
# 状态
status = Column(String(32), default="active", comment="状态")
is_false_positive = Column(Boolean, default=False, comment="是否为误报")
detected_at = Column(DateTime(timezone=True), server_default=func.now())
# 关联关系
task = relationship("DetectionTask", back_populates="results")
rule = relationship("DetectionRule", back_populates="results")