221 lines
7.5 KiB
Python
221 lines
7.5 KiB
Python
"""
|
|
风险检测相关数据模型
|
|
"""
|
|
from sqlalchemy import (
|
|
Column,
|
|
String,
|
|
DateTime,
|
|
Text,
|
|
Boolean,
|
|
Integer,
|
|
ForeignKey,
|
|
Float,
|
|
Enum as SqlEnum,
|
|
JSON,
|
|
)
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy.sql import func
|
|
from app.database import Base
|
|
import enum
|
|
|
|
|
|
class RiskLevel(str, enum.Enum):
|
|
"""风险等级枚举"""
|
|
CRITICAL = "CRITICAL"
|
|
HIGH = "HIGH"
|
|
MEDIUM = "MEDIUM"
|
|
LOW = "LOW"
|
|
NONE = "NONE"
|
|
UNKNOWN = "UNKNOWN"
|
|
|
|
|
|
class RuleStatus(str, enum.Enum):
|
|
"""规则状态枚举"""
|
|
ACTIVE = "active"
|
|
INACTIVE = "inactive"
|
|
DRAFT = "draft"
|
|
|
|
|
|
class TaskType(str, enum.Enum):
|
|
"""任务类型枚举"""
|
|
PERIODIC = "periodic"
|
|
ON_DEMAND = "on_demand"
|
|
BATCH = "batch"
|
|
|
|
|
|
class TaskStatus(str, enum.Enum):
|
|
"""任务状态枚举"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
class DetectionRule(Base):
|
|
"""风险检测规则表"""
|
|
__tablename__ = "risk_detection_rules"
|
|
|
|
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
|
rule_id = Column(String(64), unique=True, index=True, nullable=False)
|
|
rule_name = Column(String(128), nullable=False, comment="规则名称")
|
|
algorithm_code = Column(String(64), nullable=False, comment="算法编码")
|
|
description = Column(Text, comment="规则描述")
|
|
is_enabled = Column(Boolean, default=True, comment="是否启用")
|
|
parameters = Column(JSON, comment="算法参数")
|
|
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
|
|
|
# 关联关系
|
|
executions = relationship("RuleExecution", back_populates="rule")
|
|
results = relationship("DetectionResult", back_populates="rule")
|
|
|
|
|
|
class DetectionTask(Base):
|
|
"""检测任务表"""
|
|
__tablename__ = "risk_detection_tasks"
|
|
|
|
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
|
task_id = Column(String(64), unique=True, index=True, nullable=False)
|
|
task_name = Column(String(128), nullable=False, comment="任务名称")
|
|
task_type = Column(SqlEnum(TaskType), nullable=False)
|
|
status = Column(SqlEnum(TaskStatus), default=TaskStatus.PENDING)
|
|
|
|
# 实体信息
|
|
entity_type = Column(String(32), nullable=False, comment="实体类型")
|
|
period = Column(String(16), nullable=False, comment="检测期间")
|
|
|
|
# 执行统计
|
|
total_entities = Column(Integer, default=0, comment="总实体数")
|
|
processed_entities = Column(Integer, default=0, comment="已处理实体数")
|
|
result_count = Column(Integer, default=0, comment="结果数量")
|
|
|
|
# 汇总信息
|
|
summary = Column(JSON, comment="汇总信息")
|
|
retry_count = Column(Integer, default=0, comment="重试次数")
|
|
|
|
# 参数
|
|
parameters = Column(JSON, comment="任务参数")
|
|
|
|
# 时间信息
|
|
started_at = Column(DateTime(timezone=True), comment="任务开始时间")
|
|
completed_at = Column(DateTime(timezone=True), comment="任务结束时间")
|
|
|
|
# 错误信息
|
|
error_message = Column(Text, comment="错误信息")
|
|
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
|
|
# 关联关系
|
|
executions = relationship("TaskExecution", back_populates="task")
|
|
rule_executions = relationship("RuleExecution", back_populates="task")
|
|
results = relationship("DetectionResult", back_populates="task")
|
|
|
|
|
|
class RuleExecution(Base):
|
|
"""规则执行记录表"""
|
|
__tablename__ = "risk_rule_executions"
|
|
|
|
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
|
execution_id = Column(String(64), unique=True, index=True, nullable=False)
|
|
task_id = Column(String(64), ForeignKey("risk_detection_tasks.task_id"), index=True)
|
|
rule_id = Column(String(64), ForeignKey("risk_detection_rules.rule_id"), index=True)
|
|
|
|
# 实体信息
|
|
entity_id = Column(String(64), nullable=False, comment="实体ID")
|
|
entity_type = Column(String(32), nullable=False, comment="实体类型")
|
|
|
|
# 风险信息
|
|
risk_level = Column(SqlEnum(RiskLevel), nullable=False)
|
|
risk_score = Column(Float, default=0.0, comment="风险评分")
|
|
description = Column(Text, comment="风险描述")
|
|
suggestion = Column(Text, comment="处理建议")
|
|
risk_data = Column(JSON, comment="风险数据")
|
|
|
|
# 执行信息
|
|
status = Column(String(32), default="completed", comment="执行状态")
|
|
|
|
# 时间信息
|
|
started_at = Column(DateTime(timezone=True), nullable=False)
|
|
completed_at = Column(DateTime(timezone=True), comment="完成时间")
|
|
|
|
# 错误信息
|
|
error_message = Column(Text, comment="错误信息")
|
|
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
|
|
# 关联关系
|
|
task = relationship("DetectionTask", back_populates="rule_executions")
|
|
rule = relationship("DetectionRule", back_populates="executions")
|
|
|
|
|
|
class TaskExecution(Base):
|
|
"""任务执行记录表"""
|
|
__tablename__ = "risk_task_executions"
|
|
|
|
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
|
execution_id = Column(String(64), unique=True, index=True, nullable=False)
|
|
task_id = Column(String(64), ForeignKey("risk_detection_tasks.task_id"), index=True)
|
|
|
|
# 实体信息
|
|
entity_id = Column(String(64), nullable=False, comment="实体ID")
|
|
entity_type = Column(String(32), nullable=False, comment="实体类型")
|
|
period = Column(String(16), nullable=False, comment="检测期间")
|
|
|
|
# 规则信息
|
|
rule_ids = Column(String(255), comment="关联规则ID列表")
|
|
|
|
# 参数
|
|
parameters = Column(JSON, comment="执行参数")
|
|
|
|
# 执行信息
|
|
status = Column(String(32), default="pending", comment="执行状态")
|
|
result_count = Column(Integer, default=0, comment="结果数量")
|
|
|
|
# 时间信息
|
|
started_at = Column(DateTime(timezone=True), comment="开始时间")
|
|
completed_at = Column(DateTime(timezone=True), comment="结束时间")
|
|
|
|
# 错误信息
|
|
error_message = Column(Text, comment="错误信息")
|
|
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
|
|
# 关联关系
|
|
task = relationship("DetectionTask", back_populates="executions")
|
|
|
|
|
|
class DetectionResult(Base):
|
|
"""检测结果表"""
|
|
__tablename__ = "risk_detection_results"
|
|
|
|
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
|
task_id = Column(String(64), ForeignKey("risk_detection_tasks.task_id"), index=True)
|
|
rule_id = Column(String(64), ForeignKey("risk_detection_rules.rule_id"), index=True)
|
|
|
|
entity_id = Column(String(64), nullable=False, comment="实体ID")
|
|
entity_type = Column(String(32), nullable=False, comment="实体类型")
|
|
|
|
# 风险信息
|
|
risk_level = Column(SqlEnum(RiskLevel), nullable=False)
|
|
risk_score = Column(Float, default=0.0, comment="风险评分(0-100)")
|
|
risk_category = Column(String(64), comment="风险分类")
|
|
|
|
# 风险详情
|
|
description = Column(Text, comment="风险描述")
|
|
suggestion = Column(Text, comment="处理建议")
|
|
risk_data = Column(JSON, comment="风险数据")
|
|
|
|
# 证据链
|
|
evidence = Column(JSON, comment="证据信息")
|
|
|
|
# 状态
|
|
status = Column(String(32), default="active", comment="状态")
|
|
is_false_positive = Column(Boolean, default=False, comment="是否为误报")
|
|
|
|
detected_at = Column(DateTime(timezone=True), server_default=func.now())
|
|
|
|
# 关联关系
|
|
task = relationship("DetectionTask", back_populates="results")
|
|
rule = relationship("DetectionRule", back_populates="results")
|