""" 执行计划 定义检测任务的执行计划,包括执行顺序、分组和并发策略 """ from typing import List, Dict, Any, Optional from datetime import datetime from dataclasses import dataclass, field from enum import Enum from loguru import logger from app.models.risk_detection import DetectionRule from .dependency_resolver import DependencyResolver, DependencyGraph class ExecutionMode(str, Enum): """执行模式""" SEQUENTIAL = "sequential" # 串行执行 PARALLEL = "parallel" # 并行执行 HYBRID = "hybrid" # 混合模式(基于依赖关系) @dataclass class ExecutionNode: """ 执行节点 代表执行计划中的一个最小执行单元(单个规则) """ node_id: str rule_id: str rule: DetectionRule level: int = 0 # 执行层级 dependencies: List[str] = field(default_factory=list) # 依赖的节点ID列表 dependents: List[str] = field(default_factory=list) # 依赖本节点的节点ID列表 def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "node_id": self.node_id, "rule_id": self.rule_id, "algorithm_code": self.rule.algorithm_code, "algorithm_name": self.rule.rule_name, "level": self.level, "dependencies": self.dependencies, "dependents": self.dependents, } @dataclass class ExecutionStage: """执行阶段""" stage_id: str stage_name: str nodes: List[ExecutionNode] # 改为nodes而不是rules execution_mode: ExecutionMode = ExecutionMode.PARALLEL level: int = 0 # 执行层级 depends_on: List[str] = field(default_factory=list) # 依赖的阶段ID列表 context: Dict[str, Any] = field(default_factory=dict) # 执行上下文 created_at: datetime = field(default_factory=datetime.now) @property def rules(self) -> List[DetectionRule]: """获取该阶段的所有规则(兼容性属性)""" return [node.rule for node in self.nodes] @property def rule_count(self) -> int: """规则数量""" return len(self.nodes) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "stage_id": self.stage_id, "stage_name": self.stage_name, "level": self.level, "execution_mode": self.execution_mode.value, "rule_count": self.rule_count, "depends_on": self.depends_on, "context": self.context, "nodes": [node.to_dict() for node in self.nodes], } @dataclass class ExecutionPlan: """执行计划""" plan_id: str task_id: str entity_id: str entity_type: str period: str stages: List[ExecutionStage] execution_mode: ExecutionMode = ExecutionMode.HYBRID total_rules: int = 0 max_level: int = 0 created_at: datetime = field(default_factory=datetime.now) def get_stage(self, stage_id: str) -> Optional[ExecutionStage]: """获取指定阶段""" for stage in self.stages: if stage.stage_id == stage_id: return stage return None def get_stages_by_level(self, level: int) -> List[ExecutionStage]: """获取指定层级的所有阶段""" return [stage for stage in self.stages if stage.level == level] def get_next_stage(self, completed_stages: List[str]) -> Optional[ExecutionStage]: """获取下一个可执行的阶段""" for stage in self.stages: # 检查是否已完成 if stage.stage_id in completed_stages: continue # 检查依赖是否满足 if all(dep in completed_stages for dep in stage.depends_on): return stage return None def get_execution_summary(self) -> Dict[str, Any]: """获取执行计划摘要""" return { "plan_id": self.plan_id, "task_id": self.task_id, "entity_id": self.entity_id, "entity_type": self.entity_type, "period": self.period, "execution_mode": self.execution_mode.value, "total_stages": len(self.stages), "total_rules": self.total_rules, "max_level": self.max_level, "stages_per_level": { level: len([s for s in self.stages if s.level == level]) for level in range(self.max_level + 1) }, "max_parallelism": max( len([s for s in self.stages if s.level == level]) for level in range(self.max_level + 1) ) if self.stages else 0, } def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "plan_id": self.plan_id, "task_id": self.task_id, "entity_id": self.entity_id, "entity_type": self.entity_type, "period": self.period, "execution_mode": self.execution_mode.value, "total_stages": len(self.stages), "total_rules": self.total_rules, "max_level": self.max_level, "stages": [stage.to_dict() for stage in self.stages], "summary": self.get_execution_summary(), "created_at": self.created_at.isoformat(), } class ExecutionPlanner: """ 执行计划器 根据规则依赖关系自动生成优化的执行计划 """ def __init__(self): self.dependency_resolver = DependencyResolver() def create_plan( self, task_id: str, entity_id: str, entity_type: str, period: str, rules: List[DetectionRule], execution_mode: ExecutionMode = ExecutionMode.HYBRID ) -> ExecutionPlan: """ 创建执行计划 Args: task_id: 任务ID entity_id: 实体ID entity_type: 实体类型 period: 检测期间 rules: 规则列表 execution_mode: 执行模式 Returns: ExecutionPlan """ logger.info( f"开始创建执行计划:任务ID={task_id}, " f"实体ID={entity_id}, 规则数={len(rules)}, 模式={execution_mode.value}" ) # 根据执行模式选择不同的计划生成策略 if execution_mode == ExecutionMode.SEQUENTIAL: plan = self._create_sequential_plan( task_id, entity_id, entity_type, period, rules ) elif execution_mode == ExecutionMode.PARALLEL: plan = self._create_parallel_plan( task_id, entity_id, entity_type, period, rules ) else: # HYBRID plan = self._create_hybrid_plan( task_id, entity_id, entity_type, period, rules ) logger.info( f"执行计划创建完成:{plan.plan_id}, " f"阶段数={len(plan.stages)}, 规则数={plan.total_rules}, " f"最大层级={plan.max_level}" ) return plan def _create_sequential_plan( self, task_id: str, entity_id: str, entity_type: str, period: str, rules: List[DetectionRule] ) -> ExecutionPlan: """创建串行执行计划(所有规则顺序执行)""" plan_id = f"plan_{task_id}" # 创建执行节点 nodes = [ ExecutionNode( node_id=f"node_{i}", rule_id=rule.rule_id, rule=rule, level=i ) for i, rule in enumerate(rules) ] # 创建执行上下文 context = { 'task_id': task_id, 'entity_id': entity_id, 'entity_type': entity_type, 'period': period, 'parameters': {} } # 创建单个阶段包含所有节点 stage = ExecutionStage( stage_id="stage_0", stage_name="串行执行", nodes=nodes, execution_mode=ExecutionMode.SEQUENTIAL, level=0, context=context ) return ExecutionPlan( plan_id=plan_id, task_id=task_id, entity_id=entity_id, entity_type=entity_type, period=period, stages=[stage], execution_mode=ExecutionMode.SEQUENTIAL, total_rules=len(rules), max_level=0 ) def _create_parallel_plan( self, task_id: str, entity_id: str, entity_type: str, period: str, rules: List[DetectionRule] ) -> ExecutionPlan: """创建并行执行计划(所有规则并行执行)""" plan_id = f"plan_{task_id}" # 创建执行节点(所有节点在同一层级) nodes = [ ExecutionNode( node_id=f"node_{i}", rule_id=rule.rule_id, rule=rule, level=0 ) for i, rule in enumerate(rules) ] # 创建执行上下文 context = { 'task_id': task_id, 'entity_id': entity_id, 'entity_type': entity_type, 'period': period, 'parameters': {} } # 创建单个阶段包含所有节点 stage = ExecutionStage( stage_id="stage_0", stage_name="并行执行", nodes=nodes, execution_mode=ExecutionMode.PARALLEL, level=0, context=context ) return ExecutionPlan( plan_id=plan_id, task_id=task_id, entity_id=entity_id, entity_type=entity_type, period=period, stages=[stage], execution_mode=ExecutionMode.PARALLEL, total_rules=len(rules), max_level=0 ) def _create_hybrid_plan( self, task_id: str, entity_id: str, entity_type: str, period: str, rules: List[DetectionRule] ) -> ExecutionPlan: """ 创建混合执行计划(基于依赖关系的智能并行) 1. 分析规则依赖关系 2. 按层级分组规则 3. 同层级规则并行执行 4. 不同层级串行执行 """ plan_id = f"plan_{task_id}" # 1. 分析依赖关系 dep_graph = self.dependency_resolver.analyze(rules) # 2. 获取执行层级 levels = dep_graph.get_levels() # 3. 为每个层级创建执行节点和阶段 stages = [] rule_dict = {rule.rule_id: rule for rule in rules} # 创建执行上下文 context = { 'task_id': task_id, 'entity_id': entity_id, 'entity_type': entity_type, 'period': period, 'parameters': {} } for level_idx, level_rules in enumerate(levels): # 创建该层级的执行节点 nodes = [] for rule_id in level_rules: rule = rule_dict[rule_id] dep_node = dep_graph.get_node(rule_id) node = ExecutionNode( node_id=f"node_{rule_id}", rule_id=rule_id, rule=rule, level=level_idx, dependencies=[f"node_{dep}" for dep in dep_node.dependencies], dependents=[f"node_{dep}" for dep in dep_node.dependents] ) nodes.append(node) # 创建该层级的执行阶段 stage = ExecutionStage( stage_id=f"stage_{level_idx}", stage_name=f"层级 {level_idx}", nodes=nodes, execution_mode=ExecutionMode.PARALLEL, # 同层级并行 level=level_idx, depends_on=[f"stage_{i}" for i in range(level_idx)] if level_idx > 0 else [], context=context ) stages.append(stage) return ExecutionPlan( plan_id=plan_id, task_id=task_id, entity_id=entity_id, entity_type=entity_type, period=period, stages=stages, execution_mode=ExecutionMode.HYBRID, total_rules=len(rules), max_level=len(stages) - 1 ) def optimize_plan(self, plan: ExecutionPlan) -> ExecutionPlan: """ 优化执行计划 优化策略: 1. 合并小阶段:如果某个阶段只有1-2个规则,考虑与相邻阶段合并 2. 拆分大阶段:如果某个阶段规则过多(>20个),考虑拆分为多个批次 3. 重排规则顺序:优先执行耗时短的规则 Returns: 优化后的ExecutionPlan """ logger.info(f"开始优化执行计划:{plan.plan_id}") # TODO: 实现优化逻辑 # 当前返回原计划 return plan class ExecutionPlanBuilder: """ 执行计划构建器(保留用于手动构建) 提供更灵活的手动构建方式 """ def __init__(self, task_id: str, entity_id: str, entity_type: str, period: str): self.task_id = task_id self.entity_id = entity_id self.entity_type = entity_type self.period = period self.stages: List[ExecutionStage] = [] def add_stage( self, stage_name: str, nodes: List[ExecutionNode], execution_mode: ExecutionMode = ExecutionMode.PARALLEL, depends_on: Optional[List[str]] = None, ) -> "ExecutionPlanBuilder": """添加执行阶段""" stage_id = f"stage_{len(self.stages)}" stage = ExecutionStage( stage_id=stage_id, stage_name=stage_name, nodes=nodes, execution_mode=execution_mode, level=len(self.stages), depends_on=depends_on or [], ) self.stages.append(stage) logger.info( f"添加执行阶段: {stage_name}, " f"规则数量: {len(nodes)}, " f"模式: {execution_mode.value}" ) return self def build(self) -> ExecutionPlan: """构建执行计划""" plan_id = f"plan_{self.task_id}" plan = ExecutionPlan( plan_id=plan_id, task_id=self.task_id, entity_id=self.entity_id, entity_type=self.entity_type, period=self.period, stages=self.stages, execution_mode=ExecutionMode.HYBRID, total_rules=sum(stage.rule_count for stage in self.stages), max_level=len(self.stages) - 1 ) logger.info( f"构建执行计划完成: {plan_id}, " f"阶段数: {len(self.stages)}, " f"规则总数: {plan.total_rules}" ) return plan