492 lines
15 KiB
Python
492 lines
15 KiB
Python
"""
|
||
执行计划
|
||
定义检测任务的执行计划,包括执行顺序、分组和并发策略
|
||
"""
|
||
from typing import List, Dict, Any, Optional
|
||
from datetime import datetime
|
||
from dataclasses import dataclass, field
|
||
from enum import Enum
|
||
from loguru import logger
|
||
|
||
from app.models.risk_detection import DetectionRule
|
||
from .dependency_resolver import DependencyResolver, DependencyGraph
|
||
|
||
|
||
class ExecutionMode(str, Enum):
|
||
"""执行模式"""
|
||
SEQUENTIAL = "sequential" # 串行执行
|
||
PARALLEL = "parallel" # 并行执行
|
||
HYBRID = "hybrid" # 混合模式(基于依赖关系)
|
||
|
||
|
||
@dataclass
|
||
class ExecutionNode:
|
||
"""
|
||
执行节点
|
||
|
||
代表执行计划中的一个最小执行单元(单个规则)
|
||
"""
|
||
node_id: str
|
||
rule_id: str
|
||
rule: DetectionRule
|
||
level: int = 0 # 执行层级
|
||
dependencies: List[str] = field(default_factory=list) # 依赖的节点ID列表
|
||
dependents: List[str] = field(default_factory=list) # 依赖本节点的节点ID列表
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典"""
|
||
return {
|
||
"node_id": self.node_id,
|
||
"rule_id": self.rule_id,
|
||
"algorithm_code": self.rule.algorithm_code,
|
||
"algorithm_name": self.rule.rule_name,
|
||
"level": self.level,
|
||
"dependencies": self.dependencies,
|
||
"dependents": self.dependents,
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class ExecutionStage:
|
||
"""执行阶段"""
|
||
stage_id: str
|
||
stage_name: str
|
||
nodes: List[ExecutionNode] # 改为nodes而不是rules
|
||
execution_mode: ExecutionMode = ExecutionMode.PARALLEL
|
||
level: int = 0 # 执行层级
|
||
depends_on: List[str] = field(default_factory=list) # 依赖的阶段ID列表
|
||
context: Dict[str, Any] = field(default_factory=dict) # 执行上下文
|
||
created_at: datetime = field(default_factory=datetime.now)
|
||
|
||
@property
|
||
def rules(self) -> List[DetectionRule]:
|
||
"""获取该阶段的所有规则(兼容性属性)"""
|
||
return [node.rule for node in self.nodes]
|
||
|
||
@property
|
||
def rule_count(self) -> int:
|
||
"""规则数量"""
|
||
return len(self.nodes)
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典"""
|
||
return {
|
||
"stage_id": self.stage_id,
|
||
"stage_name": self.stage_name,
|
||
"level": self.level,
|
||
"execution_mode": self.execution_mode.value,
|
||
"rule_count": self.rule_count,
|
||
"depends_on": self.depends_on,
|
||
"context": self.context,
|
||
"nodes": [node.to_dict() for node in self.nodes],
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class ExecutionPlan:
|
||
"""执行计划"""
|
||
plan_id: str
|
||
task_id: str
|
||
entity_id: str
|
||
entity_type: str
|
||
period: str
|
||
stages: List[ExecutionStage]
|
||
execution_mode: ExecutionMode = ExecutionMode.HYBRID
|
||
total_rules: int = 0
|
||
max_level: int = 0
|
||
created_at: datetime = field(default_factory=datetime.now)
|
||
|
||
def get_stage(self, stage_id: str) -> Optional[ExecutionStage]:
|
||
"""获取指定阶段"""
|
||
for stage in self.stages:
|
||
if stage.stage_id == stage_id:
|
||
return stage
|
||
return None
|
||
|
||
def get_stages_by_level(self, level: int) -> List[ExecutionStage]:
|
||
"""获取指定层级的所有阶段"""
|
||
return [stage for stage in self.stages if stage.level == level]
|
||
|
||
def get_next_stage(self, completed_stages: List[str]) -> Optional[ExecutionStage]:
|
||
"""获取下一个可执行的阶段"""
|
||
for stage in self.stages:
|
||
# 检查是否已完成
|
||
if stage.stage_id in completed_stages:
|
||
continue
|
||
|
||
# 检查依赖是否满足
|
||
if all(dep in completed_stages for dep in stage.depends_on):
|
||
return stage
|
||
|
||
return None
|
||
|
||
def get_execution_summary(self) -> Dict[str, Any]:
|
||
"""获取执行计划摘要"""
|
||
return {
|
||
"plan_id": self.plan_id,
|
||
"task_id": self.task_id,
|
||
"entity_id": self.entity_id,
|
||
"entity_type": self.entity_type,
|
||
"period": self.period,
|
||
"execution_mode": self.execution_mode.value,
|
||
"total_stages": len(self.stages),
|
||
"total_rules": self.total_rules,
|
||
"max_level": self.max_level,
|
||
"stages_per_level": {
|
||
level: len([s for s in self.stages if s.level == level])
|
||
for level in range(self.max_level + 1)
|
||
},
|
||
"max_parallelism": max(
|
||
len([s for s in self.stages if s.level == level])
|
||
for level in range(self.max_level + 1)
|
||
) if self.stages else 0,
|
||
}
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典"""
|
||
return {
|
||
"plan_id": self.plan_id,
|
||
"task_id": self.task_id,
|
||
"entity_id": self.entity_id,
|
||
"entity_type": self.entity_type,
|
||
"period": self.period,
|
||
"execution_mode": self.execution_mode.value,
|
||
"total_stages": len(self.stages),
|
||
"total_rules": self.total_rules,
|
||
"max_level": self.max_level,
|
||
"stages": [stage.to_dict() for stage in self.stages],
|
||
"summary": self.get_execution_summary(),
|
||
"created_at": self.created_at.isoformat(),
|
||
}
|
||
|
||
|
||
class ExecutionPlanner:
|
||
"""
|
||
执行计划器
|
||
|
||
根据规则依赖关系自动生成优化的执行计划
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.dependency_resolver = DependencyResolver()
|
||
|
||
def create_plan(
|
||
self,
|
||
task_id: str,
|
||
entity_id: str,
|
||
entity_type: str,
|
||
period: str,
|
||
rules: List[DetectionRule],
|
||
execution_mode: ExecutionMode = ExecutionMode.HYBRID
|
||
) -> ExecutionPlan:
|
||
"""
|
||
创建执行计划
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
entity_id: 实体ID
|
||
entity_type: 实体类型
|
||
period: 检测期间
|
||
rules: 规则列表
|
||
execution_mode: 执行模式
|
||
|
||
Returns:
|
||
ExecutionPlan
|
||
"""
|
||
logger.info(
|
||
f"开始创建执行计划:任务ID={task_id}, "
|
||
f"实体ID={entity_id}, 规则数={len(rules)}, 模式={execution_mode.value}"
|
||
)
|
||
|
||
# 根据执行模式选择不同的计划生成策略
|
||
if execution_mode == ExecutionMode.SEQUENTIAL:
|
||
plan = self._create_sequential_plan(
|
||
task_id, entity_id, entity_type, period, rules
|
||
)
|
||
elif execution_mode == ExecutionMode.PARALLEL:
|
||
plan = self._create_parallel_plan(
|
||
task_id, entity_id, entity_type, period, rules
|
||
)
|
||
else: # HYBRID
|
||
plan = self._create_hybrid_plan(
|
||
task_id, entity_id, entity_type, period, rules
|
||
)
|
||
|
||
logger.info(
|
||
f"执行计划创建完成:{plan.plan_id}, "
|
||
f"阶段数={len(plan.stages)}, 规则数={plan.total_rules}, "
|
||
f"最大层级={plan.max_level}"
|
||
)
|
||
|
||
return plan
|
||
|
||
def _create_sequential_plan(
|
||
self,
|
||
task_id: str,
|
||
entity_id: str,
|
||
entity_type: str,
|
||
period: str,
|
||
rules: List[DetectionRule]
|
||
) -> ExecutionPlan:
|
||
"""创建串行执行计划(所有规则顺序执行)"""
|
||
plan_id = f"plan_{task_id}"
|
||
|
||
# 创建执行节点
|
||
nodes = [
|
||
ExecutionNode(
|
||
node_id=f"node_{i}",
|
||
rule_id=rule.rule_id,
|
||
rule=rule,
|
||
level=i
|
||
)
|
||
for i, rule in enumerate(rules)
|
||
]
|
||
|
||
# 创建执行上下文
|
||
context = {
|
||
'task_id': task_id,
|
||
'entity_id': entity_id,
|
||
'entity_type': entity_type,
|
||
'period': period,
|
||
'parameters': {}
|
||
}
|
||
|
||
# 创建单个阶段包含所有节点
|
||
stage = ExecutionStage(
|
||
stage_id="stage_0",
|
||
stage_name="串行执行",
|
||
nodes=nodes,
|
||
execution_mode=ExecutionMode.SEQUENTIAL,
|
||
level=0,
|
||
context=context
|
||
)
|
||
|
||
return ExecutionPlan(
|
||
plan_id=plan_id,
|
||
task_id=task_id,
|
||
entity_id=entity_id,
|
||
entity_type=entity_type,
|
||
period=period,
|
||
stages=[stage],
|
||
execution_mode=ExecutionMode.SEQUENTIAL,
|
||
total_rules=len(rules),
|
||
max_level=0
|
||
)
|
||
|
||
def _create_parallel_plan(
|
||
self,
|
||
task_id: str,
|
||
entity_id: str,
|
||
entity_type: str,
|
||
period: str,
|
||
rules: List[DetectionRule]
|
||
) -> ExecutionPlan:
|
||
"""创建并行执行计划(所有规则并行执行)"""
|
||
plan_id = f"plan_{task_id}"
|
||
|
||
# 创建执行节点(所有节点在同一层级)
|
||
nodes = [
|
||
ExecutionNode(
|
||
node_id=f"node_{i}",
|
||
rule_id=rule.rule_id,
|
||
rule=rule,
|
||
level=0
|
||
)
|
||
for i, rule in enumerate(rules)
|
||
]
|
||
|
||
# 创建执行上下文
|
||
context = {
|
||
'task_id': task_id,
|
||
'entity_id': entity_id,
|
||
'entity_type': entity_type,
|
||
'period': period,
|
||
'parameters': {}
|
||
}
|
||
|
||
# 创建单个阶段包含所有节点
|
||
stage = ExecutionStage(
|
||
stage_id="stage_0",
|
||
stage_name="并行执行",
|
||
nodes=nodes,
|
||
execution_mode=ExecutionMode.PARALLEL,
|
||
level=0,
|
||
context=context
|
||
)
|
||
|
||
return ExecutionPlan(
|
||
plan_id=plan_id,
|
||
task_id=task_id,
|
||
entity_id=entity_id,
|
||
entity_type=entity_type,
|
||
period=period,
|
||
stages=[stage],
|
||
execution_mode=ExecutionMode.PARALLEL,
|
||
total_rules=len(rules),
|
||
max_level=0
|
||
)
|
||
|
||
def _create_hybrid_plan(
|
||
self,
|
||
task_id: str,
|
||
entity_id: str,
|
||
entity_type: str,
|
||
period: str,
|
||
rules: List[DetectionRule]
|
||
) -> ExecutionPlan:
|
||
"""
|
||
创建混合执行计划(基于依赖关系的智能并行)
|
||
|
||
1. 分析规则依赖关系
|
||
2. 按层级分组规则
|
||
3. 同层级规则并行执行
|
||
4. 不同层级串行执行
|
||
"""
|
||
plan_id = f"plan_{task_id}"
|
||
|
||
# 1. 分析依赖关系
|
||
dep_graph = self.dependency_resolver.analyze(rules)
|
||
|
||
# 2. 获取执行层级
|
||
levels = dep_graph.get_levels()
|
||
|
||
# 3. 为每个层级创建执行节点和阶段
|
||
stages = []
|
||
rule_dict = {rule.rule_id: rule for rule in rules}
|
||
|
||
# 创建执行上下文
|
||
context = {
|
||
'task_id': task_id,
|
||
'entity_id': entity_id,
|
||
'entity_type': entity_type,
|
||
'period': period,
|
||
'parameters': {}
|
||
}
|
||
|
||
for level_idx, level_rules in enumerate(levels):
|
||
# 创建该层级的执行节点
|
||
nodes = []
|
||
for rule_id in level_rules:
|
||
rule = rule_dict[rule_id]
|
||
dep_node = dep_graph.get_node(rule_id)
|
||
|
||
node = ExecutionNode(
|
||
node_id=f"node_{rule_id}",
|
||
rule_id=rule_id,
|
||
rule=rule,
|
||
level=level_idx,
|
||
dependencies=[f"node_{dep}" for dep in dep_node.dependencies],
|
||
dependents=[f"node_{dep}" for dep in dep_node.dependents]
|
||
)
|
||
nodes.append(node)
|
||
|
||
# 创建该层级的执行阶段
|
||
stage = ExecutionStage(
|
||
stage_id=f"stage_{level_idx}",
|
||
stage_name=f"层级 {level_idx}",
|
||
nodes=nodes,
|
||
execution_mode=ExecutionMode.PARALLEL, # 同层级并行
|
||
level=level_idx,
|
||
depends_on=[f"stage_{i}" for i in range(level_idx)] if level_idx > 0 else [],
|
||
context=context
|
||
)
|
||
stages.append(stage)
|
||
|
||
return ExecutionPlan(
|
||
plan_id=plan_id,
|
||
task_id=task_id,
|
||
entity_id=entity_id,
|
||
entity_type=entity_type,
|
||
period=period,
|
||
stages=stages,
|
||
execution_mode=ExecutionMode.HYBRID,
|
||
total_rules=len(rules),
|
||
max_level=len(stages) - 1
|
||
)
|
||
|
||
def optimize_plan(self, plan: ExecutionPlan) -> ExecutionPlan:
|
||
"""
|
||
优化执行计划
|
||
|
||
优化策略:
|
||
1. 合并小阶段:如果某个阶段只有1-2个规则,考虑与相邻阶段合并
|
||
2. 拆分大阶段:如果某个阶段规则过多(>20个),考虑拆分为多个批次
|
||
3. 重排规则顺序:优先执行耗时短的规则
|
||
|
||
Returns:
|
||
优化后的ExecutionPlan
|
||
"""
|
||
logger.info(f"开始优化执行计划:{plan.plan_id}")
|
||
|
||
# TODO: 实现优化逻辑
|
||
# 当前返回原计划
|
||
return plan
|
||
|
||
|
||
class ExecutionPlanBuilder:
|
||
"""
|
||
执行计划构建器(保留用于手动构建)
|
||
|
||
提供更灵活的手动构建方式
|
||
"""
|
||
|
||
def __init__(self, task_id: str, entity_id: str, entity_type: str, period: str):
|
||
self.task_id = task_id
|
||
self.entity_id = entity_id
|
||
self.entity_type = entity_type
|
||
self.period = period
|
||
self.stages: List[ExecutionStage] = []
|
||
|
||
def add_stage(
|
||
self,
|
||
stage_name: str,
|
||
nodes: List[ExecutionNode],
|
||
execution_mode: ExecutionMode = ExecutionMode.PARALLEL,
|
||
depends_on: Optional[List[str]] = None,
|
||
) -> "ExecutionPlanBuilder":
|
||
"""添加执行阶段"""
|
||
stage_id = f"stage_{len(self.stages)}"
|
||
|
||
stage = ExecutionStage(
|
||
stage_id=stage_id,
|
||
stage_name=stage_name,
|
||
nodes=nodes,
|
||
execution_mode=execution_mode,
|
||
level=len(self.stages),
|
||
depends_on=depends_on or [],
|
||
)
|
||
|
||
self.stages.append(stage)
|
||
logger.info(
|
||
f"添加执行阶段: {stage_name}, "
|
||
f"规则数量: {len(nodes)}, "
|
||
f"模式: {execution_mode.value}"
|
||
)
|
||
|
||
return self
|
||
|
||
def build(self) -> ExecutionPlan:
|
||
"""构建执行计划"""
|
||
plan_id = f"plan_{self.task_id}"
|
||
|
||
plan = ExecutionPlan(
|
||
plan_id=plan_id,
|
||
task_id=self.task_id,
|
||
entity_id=self.entity_id,
|
||
entity_type=self.entity_type,
|
||
period=self.period,
|
||
stages=self.stages,
|
||
execution_mode=ExecutionMode.HYBRID,
|
||
total_rules=sum(stage.rule_count for stage in self.stages),
|
||
max_level=len(self.stages) - 1
|
||
)
|
||
|
||
logger.info(
|
||
f"构建执行计划完成: {plan_id}, "
|
||
f"阶段数: {len(self.stages)}, "
|
||
f"规则总数: {plan.total_rules}"
|
||
)
|
||
|
||
return plan
|
||
|