diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b8bbc7..13df062 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ All notable changes to this project should be documented in this file. - Removed redundant `from __future__ import annotations` from 18 files (Python 3.14+ doesn't need it), by @devdanzin. - Removed unnecessary shebangs from 23 package and test files, by @devdanzin. - Enabled ruff isort rule (`I`) and auto-sorted imports across 41 files; added `_dump_unparse_diagnostics` to `mutators/__init__.py` `__all__`, by @devdanzin. +- Extracted per-mode handlers (`_handle_ancestry`, `_handle_descendants`, `_handle_mrca`, `_handle_forest`) and `_print_mode_stats` from `lineage.py main()`, by @devdanzin. - Extracted `_record_timeout_metadata()` and `_record_new_find()` from `_execute_single_mutation` in `orchestrator.py`, reducing the inner mutation loop to its essential logic, by @devdanzin. ### Enhanced diff --git a/lafleur/lineage.py b/lafleur/lineage.py index cea547b..8baf761 100644 --- a/lafleur/lineage.py +++ b/lafleur/lineage.py @@ -2056,6 +2056,145 @@ def resolve_target( return filename, "file" +# --------------------------------------------------------------------------- +# Per-mode handlers +# --------------------------------------------------------------------------- + + +@dataclass +class ModeResult: + """Result of a mode handler, carrying the subgraph and mode-specific state.""" + + subgraph: Subgraph + chain: list[str] | None = None + role_to_file: dict[str, str | None] | None = None + crash_dir: Path | None = None + mrca_node: str | None = None + mrca_targets: list[str] = field(default_factory=list) + + +def _handle_ancestry( + args: argparse.Namespace, + graph: LineageGraph, + per_file_coverage: dict[str, Any], +) -> ModeResult: + """Handle the 'ancestry' mode: trace a file back to its seed.""" + target, target_type = resolve_target(args.target, per_file_coverage) + multi_lineage = getattr(args, "multi_lineage", False) + attack_only = getattr(args, "attack_only", False) + + if target_type == "crash_dir" and isinstance(target, Path): + crash_dir = target + if multi_lineage or not attack_only: + subgraph = extract_session_ancestry( + graph, crash_dir, per_file_coverage, attack_only=attack_only + ) + role_to_file = resolve_session_scripts(crash_dir, per_file_coverage) + return ModeResult(subgraph=subgraph, role_to_file=role_to_file, crash_dir=crash_dir) + else: + role_to_file = resolve_session_scripts(crash_dir, per_file_coverage) + warmup = role_to_file.get("warmup") + if warmup and warmup in graph.metadata: + subgraph = extract_ancestry(graph, warmup) + chain = _extract_ancestry_chain(graph, warmup) + return ModeResult( + subgraph=subgraph, + chain=chain, + role_to_file=role_to_file, + crash_dir=crash_dir, + ) + else: + print("Error: no resolvable warmup file.", file=sys.stderr) + sys.exit(1) + else: + assert isinstance(target, str) + subgraph = extract_ancestry(graph, target) + chain = _extract_ancestry_chain(graph, target) + return ModeResult(subgraph=subgraph, chain=chain) + + +def _handle_descendants( + args: argparse.Namespace, + graph: LineageGraph, + per_file_coverage: dict[str, Any], +) -> ModeResult: + """Handle the 'descendants' mode: show what a file produced.""" + root, _ = resolve_target(args.root, per_file_coverage) + assert isinstance(root, str) + subgraph = extract_descendants( + graph, + root, + max_depth=args.max_depth, + collapse_sterile=not args.no_collapse_sterile, + ) + return ModeResult(subgraph=subgraph) + + +def _handle_mrca( + args: argparse.Namespace, + graph: LineageGraph, + per_file_coverage: dict[str, Any], +) -> ModeResult: + """Handle the 'mrca' mode: find most recent common ancestor.""" + mrca_targets_raw = [resolve_target(t, per_file_coverage) for t in args.targets] + mrca_targets = [str(t) for t, _ in mrca_targets_raw] + subgraph = extract_mrca(graph, mrca_targets) + mrca_node: str | None = None + for node, role in subgraph.special_nodes.items(): + if role == "mrca": + mrca_node = node + break + return ModeResult(subgraph=subgraph, mrca_node=mrca_node, mrca_targets=mrca_targets) + + +def _handle_forest(args: argparse.Namespace, graph: LineageGraph) -> ModeResult: + """Handle the 'forest' mode: overview of all lineage trees.""" + subgraph = extract_forest( + graph, + max_depth=args.max_depth, + min_descendants=args.min_descendants, + min_strahler=args.min_strahler, + collapse_sterile=not args.no_collapse_sterile, + ) + return ModeResult(subgraph=subgraph) + + +def _print_mode_stats( + mode: str, + result: ModeResult, + graph: LineageGraph, + metrics: TreeMetrics | None, + total_seeds: int, +) -> None: + """Print mode-specific statistics to stderr.""" + if mode == "ancestry": + if result.role_to_file is not None and result.crash_dir is not None: + print_session_ancestry_stats( + result.crash_dir, result.role_to_file, result.subgraph, graph + ) + elif result.chain is not None: + print_ancestry_stats(result.chain, graph) + elif mode == "descendants": + print_descendants_stats(result.subgraph, graph, metrics=metrics) + elif mode == "mrca": + print_mrca_stats(result.mrca_targets, result.mrca_node, result.subgraph, graph) + elif mode == "forest": + filtered_count = total_seeds - len( + [ + n + for n in result.subgraph.nodes + if not n.startswith("__") and graph.parent.get(n) is None + ] + ) + print_forest_stats( + result.subgraph, + graph, + metrics, + total_seeds=total_seeds, + filtered_count=filtered_count, + ) + + # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- @@ -2244,67 +2383,19 @@ def main() -> None: graph = build_adjacency_graph(per_file_coverage) # Extract subgraph based on mode - mrca_node: str | None = None - mrca_targets: list[str] = [] - chain: list[str] | None = None - role_to_file: dict[str, str | None] | None = None - crash_dir: Path | None = None total_seeds = len(graph.roots) - - if args.mode == "ancestry": - target, target_type = resolve_target(args.target, per_file_coverage) - multi_lineage = getattr(args, "multi_lineage", False) - attack_only = getattr(args, "attack_only", False) - - if target_type == "crash_dir" and isinstance(target, Path): - crash_dir = target - if multi_lineage or not attack_only: - subgraph = extract_session_ancestry( - graph, crash_dir, per_file_coverage, attack_only=attack_only - ) - role_to_file = resolve_session_scripts(crash_dir, per_file_coverage) - else: - # attack-only: find the warmup/parent and trace it - role_to_file = resolve_session_scripts(crash_dir, per_file_coverage) - warmup = role_to_file.get("warmup") - if warmup and warmup in graph.metadata: - subgraph = extract_ancestry(graph, warmup) - chain = _extract_ancestry_chain(graph, warmup) - else: - print("Error: no resolvable warmup file.", file=sys.stderr) - sys.exit(1) - else: - assert isinstance(target, str) - subgraph = extract_ancestry(graph, target) - chain = _extract_ancestry_chain(graph, target) - elif args.mode == "descendants": - root, _ = resolve_target(args.root, per_file_coverage) - assert isinstance(root, str) - subgraph = extract_descendants( - graph, - root, - max_depth=args.max_depth, - collapse_sterile=not args.no_collapse_sterile, - ) - elif args.mode == "mrca": - mrca_targets_raw = [resolve_target(t, per_file_coverage) for t in args.targets] - mrca_targets = [str(t) for t, _ in mrca_targets_raw] - subgraph = extract_mrca(graph, mrca_targets) - for node, role in subgraph.special_nodes.items(): - if role == "mrca": - mrca_node = node - break - elif args.mode == "forest": - subgraph = extract_forest( - graph, - max_depth=args.max_depth, - min_descendants=args.min_descendants, - min_strahler=args.min_strahler, - collapse_sterile=not args.no_collapse_sterile, - ) - else: + handlers = { + "ancestry": lambda: _handle_ancestry(args, graph, per_file_coverage), + "descendants": lambda: _handle_descendants(args, graph, per_file_coverage), + "mrca": lambda: _handle_mrca(args, graph, per_file_coverage), + "forest": lambda: _handle_forest(args, graph), + } + handler = handlers.get(args.mode) + if handler is None: parser.print_help() sys.exit(0) + result = handler() + subgraph = result.subgraph # Attach crash nodes if requested if args.include_crashes: @@ -2382,22 +2473,7 @@ def main() -> None: print(output) # Print statistics to stderr - if args.mode == "ancestry": - if role_to_file is not None and crash_dir is not None: - print_session_ancestry_stats(crash_dir, role_to_file, subgraph, graph) - elif chain is not None: - print_ancestry_stats(chain, graph) - elif args.mode == "descendants": - print_descendants_stats(subgraph, graph, metrics=metrics) - elif args.mode == "mrca": - print_mrca_stats(mrca_targets, mrca_node, subgraph, graph) - elif args.mode == "forest": - filtered_count = total_seeds - len( - [n for n in subgraph.nodes if not n.startswith("__") and graph.parent.get(n) is None] - ) - print_forest_stats( - subgraph, graph, metrics, total_seeds=total_seeds, filtered_count=filtered_count - ) + _print_mode_stats(args.mode, result, graph, metrics, total_seeds) def _extract_ancestry_chain(graph: LineageGraph, target: str) -> list[str]: