diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 000000000..3969bdb47 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,7 @@ +# Copilot Instructions + +Follow the repository's canonical engineering skills under +`docs/engineering/skills/`. + +For tests, read `docs/engineering/skills/testing.md` before adding, moving, or +reviewing test files. Do not duplicate or override that testing guidance here. diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 797ca19b0..9f712acc4 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -43,6 +43,18 @@ jobs: - run: pip install ruff>=0.9.0 - run: ruff format --check . + quality-guards: + name: Quality guards + runs-on: ubuntu-latest + needs: check-fork + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.14" + - name: Run quality guards + run: python scripts/run_quality_guards.py + check-changelog: runs-on: ubuntu-latest needs: check-fork @@ -113,23 +125,25 @@ jobs: - uses: actions/setup-python@v5 with: python-version: "3.14" + - uses: astral-sh/setup-uv@v5 + - run: uv sync --dev - name: Install optimized test deps - run: pip install modal pytest numpy pandas + run: uv pip install modal pytest numpy pandas - name: Ensure PR Modal environment exists - run: python .github/scripts/ensure_modal_environment.py + run: uv run python .github/scripts/ensure_modal_environment.py - name: Sync Modal secrets to PR environment - run: python .github/scripts/sync_modal_secrets.py + run: uv run python .github/scripts/sync_modal_secrets.py - name: Deploy Modal pipeline app to PR staging - run: modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline.py + run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline.py - name: Deploy Modal local-area app to PR staging - run: modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/local_area.py + run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/local_area.py - name: Deploy Modal H5 test harness to PR staging - run: modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/h5_test_harness.py + run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/h5_test_harness.py - name: Run optimized integration tests against PR staging - run: python -m pytest tests/optimized/ -v + run: uv run pytest tests/optimized/ -v - name: Cleanup PR Modal environment if: always() - run: python .github/scripts/delete_modal_environment.py + run: uv run python .github/scripts/delete_modal_environment.py smoke-test: runs-on: ubuntu-latest @@ -179,6 +193,7 @@ jobs: fi integration-tests: + name: Build datasets and run integration tests on Modal runs-on: ubuntu-latest needs: [check-fork, lint, decide-test-scope] if: needs.decide-test-scope.outputs.run_integration == 'true' diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index 4a0cec793..005a6eaad 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -12,9 +12,9 @@ jobs: - run: pip install ruff>=0.9.0 - run: ruff format --check . - # ── Build and linear integration tests ────────────────────── - build-and-linear-integration-tests: - name: Build and linear integration tests + # ── Dataset build ─────────────────────────────────────────── + build-datasets: + name: Build datasets runs-on: ubuntu-latest needs: lint if: github.event.head_commit.message != 'Update package version' @@ -29,7 +29,7 @@ jobs: with: python-version: "3.14" - run: pip install modal - - name: Run linear integration tests on Modal + - name: Build datasets on Modal run: | modal run --env="${MODAL_ENVIRONMENT}" modal_app/data_build.py \ --upload \ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..87120ff4b --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,14 @@ +# Codex Instructions + +These instructions apply repository-wide. + +## Skills system + +Canonical AI-facing engineering skills live under `docs/engineering/skills/`. +Use those files as the source of truth across Codex, Claude, Copilot, and other +AI tools. + +When adding, moving, or reviewing tests, read +`docs/engineering/skills/testing.md`. Do not put pytest files under +`policyengine_us_data/tests/`, do not import from `tests.conftest`, and do not +import helpers across test lanes. diff --git a/CLAUDE.md b/CLAUDE.md index 699744940..d9bfbf642 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -7,25 +7,33 @@ ## Testing +Canonical testing guidance lives in `docs/engineering/skills/testing.md`. If +this file conflicts with that skill, follow the skill and update this adapter. + ### Running Tests - `make test-unit` - Run unit tests only (fast, no data dependencies) - `make test-integration` - Run integration tests (requires built H5 datasets) - `make test` - Run all tests - `pytest tests/unit/ -v` - Unit tests directly - `pytest tests/integration/test_cps.py -v` - Specific integration test +- `python scripts/run_quality_guards.py` - Run layout/import quality guards ### Test Organization -Tests are in the top-level `tests/` directory, split into two sub-directories: +Tests are in the top-level `tests/` directory, split into these sub-directories: - **`tests/unit/`** — Self-contained tests that use synthetic data, mocks, patches, or checked-in fixtures. Run in seconds with no external dependencies. - `unit/datasets/` — unit tests for dataset code - `unit/calibration/` — unit tests for calibration code - **`tests/integration/`** — Tests that require built H5 datasets, HuggingFace downloads, Microsimulation objects, or database ETL. Named after the dataset they test. +- **`tests/optimized/`** — Tests that exercise deployed Modal/staging seams. ### Test Placement Rules +- **NEVER** put pytest files under `policyengine_us_data/tests/`; CI does not collect that tree - **NEVER** put tests that require H5 files or Microsimulation in `unit/` - **NEVER** put tests that use only synthetic data or mocks in `integration/` +- **NEVER** import from `tests.conftest`; fixtures are discovered automatically and helper functions belong in local support modules +- **NEVER** import helpers across test lanes, such as `tests.unit` from an integration test - Integration test files are named after their dataset dependency: `test_cps.py` tests `cps_2024.h5` - Sanity checks (value ranges, population counts) belong in the per-dataset integration test file, not in a separate sanity file - When adding a new integration test, add it to the existing per-dataset file if one exists diff --git a/changelog.d/760.added.md b/changelog.d/760.added.md new file mode 100644 index 000000000..bff3b7190 --- /dev/null +++ b/changelog.d/760.added.md @@ -0,0 +1 @@ +Added local H5 traceability metadata and scope fingerprinting for calibration artifacts. diff --git a/changelog.d/test-quality-guards.changed.md b/changelog.d/test-quality-guards.changed.md new file mode 100644 index 000000000..ae2d8afa9 --- /dev/null +++ b/changelog.d/test-quality-guards.changed.md @@ -0,0 +1 @@ +Add quality guards for test layout and document the testing skill for AI tooling. diff --git a/docs/engineering/skills/README.md b/docs/engineering/skills/README.md new file mode 100644 index 000000000..253d28031 --- /dev/null +++ b/docs/engineering/skills/README.md @@ -0,0 +1,13 @@ +# Engineering Skills + +This directory is the canonical source for AI-facing engineering rules. + +Tool-specific instruction files such as `AGENTS.md`, `CLAUDE.md`, and +`.github/copilot-instructions.md` should point here instead of duplicating +implementation-specific guidance. When a rule changes, update the skill here +first, then keep adapters thin. + +Current skills: + +- `testing.md`: test layout, fixture scope, helper placement, and quality guard + expectations. diff --git a/docs/engineering/skills/testing.md b/docs/engineering/skills/testing.md new file mode 100644 index 000000000..69a4d71e9 --- /dev/null +++ b/docs/engineering/skills/testing.md @@ -0,0 +1,54 @@ +# Testing Skill + +Use this skill whenever adding, moving, or reviewing tests. + +## Canonical Layout + +- Put unit tests under `tests/unit/`. +- Put data-dependent or runtime integration tests under `tests/integration/`. +- Put deployed Modal/staging tests under `tests/optimized/`. +- Do not add pytest files under `policyengine_us_data/tests/`; CI does not + collect that tree. + +## Fixtures And Helpers + +- Keep root `tests/conftest.py` empty or very lightweight. It must not import + cloud clients, Modal, Hugging Face, PolicyEngine runtime-heavy modules, or + package modules that transitively import those dependencies. +- Put domain-specific fixtures in the narrowest `conftest.py` that covers the + tests that use them. +- Put reusable helper functions in a local `support.py`, a local fixture module, + or `tests/support/`. +- Do not import from `tests.conftest`; pytest discovers fixtures automatically. +- Do not import across test lanes, for example from `tests.integration` into + `tests.unit` or from `tests.unit` into `tests.integration`. Move shared helpers + to `tests/support/` or colocate them with the tests. + +## Dependency Boundaries + +- Unit tests should not require real network credentials, Modal, Hugging Face, + or GCS. Mock those seams. +- Integration tests may require built data or heavier runtime setup, but should + be explicit about those requirements and skip cleanly when local artifacts are + unavailable. +- CI should run tests in an environment where project dependencies are installed + with `uv sync --dev` or an equivalent full test dependency install. A full + install is required, but it is not a substitute for fixture isolation. + +## Quality Guards + +Run this before opening or updating a PR: + +```bash +python scripts/run_quality_guards.py +``` + +The current guard enforces: + +- No package-internal pytest files under `policyengine_us_data/tests/`. +- No pytest files outside the approved top-level test lanes. +- No imports from `tests.conftest`. +- No imports across test lanes. + +When adding a new guard, register it in `scripts/run_quality_guards.py` so CI +continues to expose a single `Quality guards` job. diff --git a/modal_app/local_area.py b/modal_app/local_area.py index 0beafee5c..6ea07cdca 100644 --- a/modal_app/local_area.py +++ b/modal_app/local_area.py @@ -29,6 +29,10 @@ from modal_app.images import cpu_image as image # noqa: E402 from modal_app.resilience import reconcile_run_dir_fingerprint # noqa: E402 +from policyengine_us_data.calibration.local_h5.fingerprinting import ( # noqa: E402 + FingerprintingService, + PublishingInputBundle, +) from policyengine_us_data.calibration.local_h5.partitioning import ( # noqa: E402 partition_weighted_work_items, ) @@ -311,6 +315,63 @@ def get_version() -> str: return pyproject["project"]["version"] +def _build_publishing_input_bundle( + *, + weights_path: Path, + dataset_path: Path, + db_path: Path | None, + geography_path: Path | None, + calibration_package_path: Path | None, + run_config_path: Path | None, + run_id: str, + version: str, + n_clones: int | None, + seed: int, + legacy_blocks_path: Path | None = None, +) -> PublishingInputBundle: + """Build the normalized coordinator input bundle for one publish scope.""" + + return PublishingInputBundle( + weights_path=weights_path, + source_dataset_path=dataset_path, + target_db_path=db_path, + exact_geography_path=geography_path, + calibration_package_path=calibration_package_path, + run_config_path=run_config_path, + run_id=run_id, + version=version, + n_clones=n_clones, + seed=seed, + legacy_blocks_path=legacy_blocks_path, + ) + + +def _resolve_scope_fingerprint( + *, + inputs: PublishingInputBundle, + scope: str, + expected_fingerprint: str = "", +) -> str: + """Compute the scope fingerprint while preserving pinned resume values.""" + + service = FingerprintingService() + traceability = service.build_traceability(inputs=inputs, scope=scope) + computed_fingerprint = service.compute_scope_fingerprint(traceability) + if expected_fingerprint: + if expected_fingerprint != computed_fingerprint: + print( + "WARNING: Pinned fingerprint differs from current " + f"{scope} scope fingerprint. " + "Preserving pinned value for backward-compatible resume.\n" + f" Pinned: {expected_fingerprint}\n" + f" Current: {computed_fingerprint}" + ) + else: + print(f"Using pinned fingerprint from pipeline: {expected_fingerprint}") + return expected_fingerprint + return computed_fingerprint + + def partition_work( work_items: List[Dict], num_workers: int, @@ -836,45 +897,26 @@ def coordinate_publish( validate = False # Fingerprint-based cache invalidation - if expected_fingerprint: - fingerprint = expected_fingerprint - print(f"Using pinned fingerprint from pipeline: {fingerprint}") - else: - geography_path_expr = ( - f'Path("{geography_path}")' if geography_path.exists() else "None" - ) - package_path_expr = ( - f'Path("{calibration_package_path}")' - if calibration_package_path.exists() - else "None" - ) - fp_result = subprocess.run( - _python_cmd( - "-c", - f""" -from pathlib import Path -from policyengine_us_data.calibration.publish_local_area import ( - compute_input_fingerprint, -) -print( - compute_input_fingerprint( - Path("{weights_path}"), - Path("{dataset_path}"), - {n_clones}, + fingerprint_inputs = _build_publishing_input_bundle( + weights_path=weights_path, + dataset_path=dataset_path, + db_path=db_path, + geography_path=geography_path, + calibration_package_path=( + calibration_package_path if calibration_package_path.exists() else None + ), + run_config_path=config_json_path if config_json_path.exists() else None, + run_id=run_id, + version=version, + n_clones=n_clones, seed=42, - geography_path={geography_path_expr}, - calibration_package_path={package_path_expr}, + legacy_blocks_path=artifacts / "stacked_blocks.npy", + ) + fingerprint = _resolve_scope_fingerprint( + inputs=fingerprint_inputs, + scope="regional", + expected_fingerprint=expected_fingerprint, ) -) -""", - ), - capture_output=True, - text=True, - env=os.environ.copy(), - ) - if fp_result.returncode != 0: - raise RuntimeError(f"Failed to compute fingerprint: {fp_result.stderr}") - fingerprint = fp_result.stdout.strip() reconcile_action = reconcile_run_dir_fingerprint(run_dir, fingerprint) if reconcile_action == "resume": print(f"Inputs unchanged ({fingerprint}), resuming...") @@ -1123,6 +1165,22 @@ def coordinate_national_publish( "geography_assignment.npz": "national_geography_assignment.npz", }, ) + fingerprint_inputs = _build_publishing_input_bundle( + weights_path=weights_path, + dataset_path=dataset_path, + db_path=db_path, + geography_path=geography_path, + calibration_package_path=None, + run_config_path=config_json_path if config_json_path.exists() else None, + run_id=run_id, + version=version, + n_clones=n_clones, + seed=42, + ) + fingerprint = _resolve_scope_fingerprint( + inputs=fingerprint_inputs, + scope="national", + ) run_dir = staging_dir / run_id run_dir.mkdir(parents=True, exist_ok=True) @@ -1224,6 +1282,7 @@ def coordinate_national_publish( f"{version}. Run main_national_promote to publish." ), "run_id": run_id, + "fingerprint": fingerprint, "national_validation": national_validation_output, } diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index fa83adc25..90359b37b 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -109,12 +109,31 @@ class RunMetadata: error: Optional[str] = None resume_history: list = field(default_factory=list) fingerprint: Optional[str] = None + regional_fingerprint: Optional[str] = None + + def __post_init__(self) -> None: + if self.regional_fingerprint is None and self.fingerprint is not None: + self.regional_fingerprint = self.fingerprint + if self.fingerprint is None and self.regional_fingerprint is not None: + self.fingerprint = self.regional_fingerprint def to_dict(self) -> dict: - return asdict(self) + data = asdict(self) + if ( + data.get("fingerprint") is None + and data.get("regional_fingerprint") is not None + ): + data["fingerprint"] = data["regional_fingerprint"] + return data @classmethod def from_dict(cls, data: dict) -> "RunMetadata": + data = dict(data) + if ( + data.get("regional_fingerprint") is None + and data.get("fingerprint") is not None + ): + data["regional_fingerprint"] = data["fingerprint"] return cls(**data) @@ -990,7 +1009,9 @@ def run_pipeline( n_clones=n_clones, validate=True, run_id=run_id, - expected_fingerprint=meta.fingerprint or "", + expected_fingerprint=( + meta.regional_fingerprint or meta.fingerprint or "" + ), ) print(f" → coordinate_publish fc: {regional_h5_handle.object_id}") @@ -1026,6 +1047,7 @@ def run_pipeline( if isinstance(regional_h5_result, dict) and regional_h5_result.get( "fingerprint" ): + meta.regional_fingerprint = regional_h5_result["fingerprint"] meta.fingerprint = regional_h5_result["fingerprint"] write_run_meta(meta, pipeline_volume) diff --git a/policyengine_us_data/calibration/local_h5/__init__.py b/policyengine_us_data/calibration/local_h5/__init__.py index f69663eb0..96ec7258f 100644 --- a/policyengine_us_data/calibration/local_h5/__init__.py +++ b/policyengine_us_data/calibration/local_h5/__init__.py @@ -3,5 +3,5 @@ Modules in this package should land only when they become active runtime seams rather than speculative placeholders. The current early slices introduce ``partitioning.py``, ``requests.py``, ``area_catalog.py``, -and ``geography_loader.py``. +``fingerprinting.py``, and ``geography_loader.py``. """ diff --git a/policyengine_us_data/calibration/local_h5/fingerprinting.py b/policyengine_us_data/calibration/local_h5/fingerprinting.py new file mode 100644 index 000000000..f141ac28f --- /dev/null +++ b/policyengine_us_data/calibration/local_h5/fingerprinting.py @@ -0,0 +1,258 @@ +"""Coordinator-owned provenance and resumability logic for local H5 publication.""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Literal, Mapping + +from .geography_loader import CalibrationGeographyLoader + +FingerprintScope = Literal["regional", "national"] + + +@dataclass(frozen=True) +class PublishingInputBundle: + """File-system and run metadata needed to publish one H5 scope.""" + + weights_path: Path + source_dataset_path: Path + target_db_path: Path | None + exact_geography_path: Path | None + calibration_package_path: Path | None + run_config_path: Path | None + run_id: str + version: str + n_clones: int | None + seed: int + legacy_blocks_path: Path | None = None + + +@dataclass(frozen=True) +class ArtifactIdentity: + """Stable identity for one input artifact used by traceability and resume.""" + + logical_name: str + path: Path | None + sha256: str | None + size_bytes: int | None = None + metadata: Mapping[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TraceabilityBundle: + """Full provenance record for one publish scope.""" + + scope: FingerprintScope + weights: ArtifactIdentity + source_dataset: ArtifactIdentity + exact_geography: ArtifactIdentity | None = None + target_db: ArtifactIdentity | None = None + calibration_package: ArtifactIdentity | None = None + run_config: ArtifactIdentity | None = None + code_version: Mapping[str, Any] = field(default_factory=dict) + model_build: Mapping[str, Any] = field(default_factory=dict) + metadata: Mapping[str, Any] = field(default_factory=dict) + + def resumability_material(self) -> Mapping[str, Any]: + """Return the normalized subset that controls staged-output validity.""" + + geography_sha = None + if self.exact_geography is not None: + geography_sha = self.exact_geography.metadata.get("canonical_sha256") + if geography_sha is None: + geography_sha = self.exact_geography.sha256 + + return { + "scope": self.scope, + "weights_sha256": self.weights.sha256, + "source_dataset_sha256": self.source_dataset.sha256, + "exact_geography_sha256": geography_sha, + "target_db_sha256": ( + self.target_db.sha256 if self.target_db is not None else None + ), + "n_clones": self.metadata.get("n_clones"), + "seed": self.metadata.get("seed"), + "policyengine_us_locked_version": self.model_build.get("locked_version"), + "policyengine_us_git_commit": self.model_build.get("git_commit"), + } + + +class FingerprintingService: + """Build traceability bundles and derive scope fingerprints from them.""" + + def __init__( + self, + *, + geography_loader: CalibrationGeographyLoader | None = None, + ) -> None: + self._geography_loader = geography_loader or CalibrationGeographyLoader() + + def build_traceability( + self, + *, + inputs: PublishingInputBundle, + scope: FingerprintScope, + ) -> TraceabilityBundle: + """Build a traceability bundle from current publish inputs.""" + + run_config_payload = self._load_json(inputs.run_config_path) + return TraceabilityBundle( + scope=scope, + weights=self._build_artifact_identity("weights", inputs.weights_path), + source_dataset=self._build_artifact_identity( + "source_dataset", + inputs.source_dataset_path, + ), + exact_geography=self._build_geography_identity(inputs), + target_db=self._build_optional_artifact_identity( + "target_db", + inputs.target_db_path, + ), + calibration_package=self._build_optional_artifact_identity( + "calibration_package", + inputs.calibration_package_path, + ), + run_config=self._build_optional_artifact_identity( + "run_config", + inputs.run_config_path, + ), + code_version=self._extract_code_version(run_config_payload), + model_build=self._extract_model_build(run_config_payload), + metadata={ + "run_id": inputs.run_id, + "version": inputs.version, + "n_clones": inputs.n_clones, + "seed": inputs.seed, + }, + ) + + def compute_scope_fingerprint(self, traceability: TraceabilityBundle) -> str: + """Hash normalized resumability material into a short scope fingerprint.""" + + payload = json.dumps( + traceability.resumability_material(), + sort_keys=True, + separators=(",", ":"), + ).encode() + return hashlib.sha256(payload).hexdigest()[:16] + + def _build_artifact_identity( + self, + logical_name: str, + path: Path, + *, + metadata: Mapping[str, Any] | None = None, + ) -> ArtifactIdentity: + actual_path = Path(path) + if not actual_path.exists(): + raise FileNotFoundError( + f"Expected {logical_name} artifact at {actual_path}" + ) + return ArtifactIdentity( + logical_name=logical_name, + path=actual_path, + sha256=self._sha256_file(actual_path), + size_bytes=actual_path.stat().st_size, + metadata=dict(metadata or {}), + ) + + def _build_optional_artifact_identity( + self, + logical_name: str, + path: Path | None, + ) -> ArtifactIdentity | None: + if path is None: + return None + actual_path = Path(path) + if not actual_path.exists(): + return None + return self._build_artifact_identity(logical_name, actual_path) + + def _build_geography_identity( + self, + inputs: PublishingInputBundle, + ) -> ArtifactIdentity | None: + resolved = self._geography_loader.resolve_source( + weights_path=inputs.weights_path, + geography_path=inputs.exact_geography_path, + blocks_path=inputs.legacy_blocks_path, + calibration_package_path=inputs.calibration_package_path, + ) + if resolved is None: + return None + + metadata = { + "source_kind": resolved.kind, + "canonical_sha256": self._geography_loader.compute_canonical_checksum( + weights_path=inputs.weights_path, + n_records=self._infer_n_records( + weights_path=inputs.weights_path, + source_dataset_path=inputs.source_dataset_path, + n_clones=inputs.n_clones, + ), + n_clones=inputs.n_clones, + geography_path=inputs.exact_geography_path, + blocks_path=inputs.legacy_blocks_path, + calibration_package_path=inputs.calibration_package_path, + ), + } + return self._build_artifact_identity( + "exact_geography", + resolved.path, + metadata=metadata, + ) + + def _extract_code_version( + self, run_config_payload: Mapping[str, Any] + ) -> dict[str, Any]: + return { + "git_commit": run_config_payload.get("git_commit"), + "git_branch": run_config_payload.get("git_branch"), + "git_dirty": run_config_payload.get("git_dirty"), + } + + def _extract_model_build( + self, run_config_payload: Mapping[str, Any] + ) -> dict[str, Any]: + return { + "locked_version": run_config_payload.get("package_version"), + "git_commit": run_config_payload.get("git_commit"), + } + + def _load_json(self, path: Path | None) -> Mapping[str, Any]: + if path is None: + return {} + actual_path = Path(path) + if not actual_path.exists(): + return {} + with open(actual_path) as handle: + return json.load(handle) + + def _sha256_file(self, path: Path) -> str: + digest = hashlib.sha256() + with open(path, "rb") as handle: + for chunk in iter(lambda: handle.read(1 << 20), b""): + digest.update(chunk) + return f"sha256:{digest.hexdigest()}" + + def _infer_n_records( + self, + *, + weights_path: Path, + source_dataset_path: Path, + n_clones: int | None, + ) -> int: + if n_clones is not None: + import numpy as np + + weights = np.load(weights_path, mmap_mode="r") + if len(weights) % n_clones == 0: + return int(len(weights) // n_clones) + + from policyengine_us import Microsimulation + + simulation = Microsimulation(dataset=str(source_dataset_path)) + return int(len(simulation.calculate("household_id", map_to="household").values)) diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index b1946a8f3..e42d7a030 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -11,12 +11,15 @@ import json import shutil - import numpy as np from pathlib import Path from typing import List, Optional from policyengine_us import Microsimulation +from policyengine_us_data.calibration.local_h5.fingerprinting import ( + FingerprintingService, + PublishingInputBundle, +) from policyengine_us_data.calibration.local_h5.geography_loader import ( CalibrationGeographyLoader, ) @@ -57,50 +60,33 @@ def compute_input_fingerprint( seed: int = 42, geography_path: Optional[Path] = None, blocks_path: Optional[Path] = None, + target_db_path: Optional[Path] = None, + run_config_path: Optional[Path] = None, calibration_package_path: Optional[Path] = None, + scope: str = "regional", ) -> str: - import hashlib - - def _update_hash_from_file(h: "hashlib._Hash", path: Path) -> None: - with open(path, "rb") as f: - while chunk := f.read(8192): - h.update(chunk) - - def _infer_n_records() -> int: - if n_clones is not None: - weights = np.load(weights_path, mmap_mode="r") - if len(weights) % n_clones == 0: - return len(weights) // n_clones - sim = Microsimulation(dataset=str(dataset_path)) - return len(sim.calculate("household_id", map_to="household").values) - - loader = CalibrationGeographyLoader() - h = hashlib.sha256() - for p in [weights_path, dataset_path]: - _update_hash_from_file(h, p) - - resolved = loader.resolve_source( - weights_path=weights_path, - geography_path=geography_path, - blocks_path=blocks_path, - calibration_package_path=calibration_package_path, + service = FingerprintingService() + inputs = PublishingInputBundle( + weights_path=Path(weights_path), + source_dataset_path=Path(dataset_path), + target_db_path=Path(target_db_path) if target_db_path is not None else None, + exact_geography_path=( + Path(geography_path) if geography_path is not None else None + ), + calibration_package_path=( + Path(calibration_package_path) + if calibration_package_path is not None + else None + ), + run_config_path=Path(run_config_path) if run_config_path is not None else None, + run_id="", + version="", + n_clones=n_clones, + seed=seed, + legacy_blocks_path=Path(blocks_path) if blocks_path is not None else None, ) - if resolved is not None: - n_records = _infer_n_records() - h.update(f"geography_source:{resolved.kind}".encode()) - h.update( - loader.compute_canonical_checksum( - weights_path=weights_path, - n_records=n_records, - n_clones=n_clones, - geography_path=geography_path, - blocks_path=blocks_path, - calibration_package_path=calibration_package_path, - ).encode() - ) - else: - h.update(f"legacy_regeneration:{n_clones}:{seed}".encode()) - return h.hexdigest()[:16] + traceability = service.build_traceability(inputs=inputs, scope=scope) + return service.compute_scope_fingerprint(traceability) def load_calibration_geography( diff --git a/policyengine_us_data/tests/test_release_manifest.py b/policyengine_us_data/tests/test_release_manifest.py deleted file mode 100644 index 0f0b8f9df..000000000 --- a/policyengine_us_data/tests/test_release_manifest.py +++ /dev/null @@ -1,318 +0,0 @@ -import hashlib -from io import BytesIO -from pathlib import Path -from unittest.mock import MagicMock, patch - -from huggingface_hub import CommitOperationAdd - -from policyengine_us_data.utils.data_upload import upload_files_to_hf -from policyengine_us_data.utils.data_upload import publish_release_manifest_to_hf -from policyengine_us_data.utils.release_manifest import ( - RELEASE_MANIFEST_SCHEMA_VERSION, - build_release_manifest, -) -from policyengine_us_data.utils.trace_tro import TRACE_TRO_FILENAME - - -def _write_file(path: Path, content: bytes) -> Path: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_bytes(content) - return path - - -def _sha256(content: bytes) -> str: - return hashlib.sha256(content).hexdigest() - - -def test_build_release_manifest_tracks_uploaded_artifacts(tmp_path): - national_bytes = b"national-dataset" - state_bytes = b"state-dataset" - national_path = _write_file( - tmp_path / "enhanced_cps_2024.h5", - national_bytes, - ) - state_path = _write_file(tmp_path / "AL.h5", state_bytes) - - manifest = build_release_manifest( - files_with_repo_paths=[ - (national_path, "enhanced_cps_2024.h5"), - (state_path, "states/AL.h5"), - ], - version="1.73.0", - repo_id="policyengine/policyengine-us-data", - model_package_version="1.634.4", - model_package_git_sha="deadbeef", - model_package_data_build_fingerprint="sha256:fingerprint", - created_at="2026-04-10T12:00:00Z", - ) - - assert manifest["data_package"] == { - "name": "policyengine-us-data", - "version": "1.73.0", - } - assert manifest["schema_version"] == RELEASE_MANIFEST_SCHEMA_VERSION - assert manifest["compatible_model_packages"] == [ - { - "name": "policyengine-us", - "specifier": "==1.634.4", - } - ] - assert manifest["build"] == { - "build_id": "policyengine-us-data-1.73.0", - "built_at": "2026-04-10T12:00:00Z", - "built_with_model_package": { - "name": "policyengine-us", - "version": "1.634.4", - "git_sha": "deadbeef", - "data_build_fingerprint": "sha256:fingerprint", - }, - } - assert manifest["default_datasets"] == {"national": "enhanced_cps_2024"} - - assert manifest["artifacts"]["enhanced_cps_2024"] == { - "kind": "microdata", - "path": "enhanced_cps_2024.h5", - "repo_id": "policyengine/policyengine-us-data", - "revision": "1.73.0", - "sha256": _sha256(national_bytes), - "size_bytes": len(national_bytes), - } - assert manifest["artifacts"]["states/AL"] == { - "kind": "microdata", - "path": "states/AL.h5", - "repo_id": "policyengine/policyengine-us-data", - "revision": "1.73.0", - "sha256": _sha256(state_bytes), - "size_bytes": len(state_bytes), - } - - -def test_build_release_manifest_merges_existing_release_same_version(tmp_path): - district_bytes = b"district-dataset" - district_path = _write_file(tmp_path / "NC-01.h5", district_bytes) - - existing_manifest = { - "data_package": { - "name": "policyengine-us-data", - "version": "1.73.0", - }, - "compatible_model_packages": [ - { - "name": "policyengine-us", - "specifier": "==1.634.4", - } - ], - "default_datasets": {"national": "enhanced_cps_2024"}, - "created_at": "2026-04-09T12:00:00Z", - "artifacts": { - "enhanced_cps_2024": { - "kind": "microdata", - "path": "enhanced_cps_2024.h5", - "repo_id": "policyengine/policyengine-us-data", - "revision": "1.73.0", - "sha256": "abc", - "size_bytes": 123, - } - }, - } - - manifest = build_release_manifest( - files_with_repo_paths=[(district_path, "districts/NC-01.h5")], - version="1.73.0", - repo_id="policyengine/policyengine-us-data", - model_package_version="1.634.4", - model_package_git_sha="deadbeef", - model_package_data_build_fingerprint="sha256:fingerprint", - existing_manifest=existing_manifest, - created_at="2026-04-10T12:00:00Z", - ) - - assert set(manifest["artifacts"]) == {"enhanced_cps_2024", "districts/NC-01"} - assert manifest["default_datasets"] == {"national": "enhanced_cps_2024"} - assert manifest["build"] == { - "build_id": "policyengine-us-data-1.73.0", - "built_at": "2026-04-10T12:00:00Z", - "built_with_model_package": { - "name": "policyengine-us", - "version": "1.634.4", - "git_sha": "deadbeef", - "data_build_fingerprint": "sha256:fingerprint", - }, - } - assert manifest["artifacts"]["districts/NC-01"]["sha256"] == _sha256(district_bytes) - - -def test_upload_files_to_hf_adds_release_manifest_operations(tmp_path): - dataset_path = _write_file( - tmp_path / "enhanced_cps_2024.h5", - b"national-dataset", - ) - - mock_api = MagicMock() - mock_api.create_commit.return_value = MagicMock(oid="commit-sha") - - with ( - patch("policyengine_us_data.utils.data_upload.HfApi", return_value=mock_api), - patch( - "policyengine_us_data.utils.data_upload.load_release_manifest_from_hf", - return_value=None, - ), - patch( - "policyengine_us_data.utils.data_upload._get_model_package_build_metadata", - return_value={ - "version": "1.634.4", - "git_sha": "deadbeef", - "data_build_fingerprint": "sha256:fingerprint", - }, - ), - patch.dict( - "policyengine_us_data.utils.data_upload.os.environ", - {"HUGGING_FACE_TOKEN": "token"}, - clear=False, - ), - ): - upload_files_to_hf( - files=[dataset_path], - version="1.73.0", - ) - - operations = mock_api.create_commit.call_args.kwargs["operations"] - operation_paths = [operation.path_in_repo for operation in operations] - - assert "enhanced_cps_2024.h5" in operation_paths - assert "release_manifest.json" in operation_paths - assert "releases/1.73.0/release_manifest.json" in operation_paths - assert TRACE_TRO_FILENAME in operation_paths - assert f"releases/1.73.0/{TRACE_TRO_FILENAME}" in operation_paths - - release_ops = [ - operation - for operation in operations - if operation.path_in_repo.endswith("release_manifest.json") - ] - assert len(release_ops) == 2 - for operation in release_ops: - assert isinstance(operation, CommitOperationAdd) - assert isinstance(operation.path_or_fileobj, BytesIO) - - trace_ops = [ - operation - for operation in operations - if operation.path_in_repo.endswith(".jsonld") - ] - assert len(trace_ops) == 2 - for operation in trace_ops: - assert isinstance(operation, CommitOperationAdd) - assert isinstance(operation.path_or_fileobj, BytesIO) - - -def test_upload_files_to_hf_does_not_tag_until_finalize(tmp_path): - dataset_path = _write_file( - tmp_path / "enhanced_cps_2024.h5", - b"national-dataset", - ) - - mock_api = MagicMock() - mock_api.create_commit.return_value = MagicMock(oid="commit-sha") - - with ( - patch("policyengine_us_data.utils.data_upload.HfApi", return_value=mock_api), - patch( - "policyengine_us_data.utils.data_upload.load_release_manifest_from_hf", - return_value=None, - ), - patch( - "policyengine_us_data.utils.data_upload._get_model_package_build_metadata", - return_value={ - "version": "1.634.4", - "git_sha": "deadbeef", - "data_build_fingerprint": "sha256:fingerprint", - }, - ), - patch.dict( - "policyengine_us_data.utils.data_upload.os.environ", - {"HUGGING_FACE_TOKEN": "token"}, - clear=False, - ), - ): - upload_files_to_hf( - files=[dataset_path], - version="1.73.0", - create_tag=False, - ) - - mock_api.create_tag.assert_not_called() - - -def test_publish_release_manifest_to_hf_can_finalize_and_tag(tmp_path): - state_path = _write_file( - tmp_path / "AL.h5", - b"state-dataset", - ) - - mock_api = MagicMock() - mock_api.create_commit.return_value = MagicMock(oid="final-commit-sha") - existing_manifest = { - "schema_version": RELEASE_MANIFEST_SCHEMA_VERSION, - "data_package": { - "name": "policyengine-us-data", - "version": "1.73.0", - }, - "compatible_model_packages": [], - "default_datasets": {"national": "enhanced_cps_2024"}, - "created_at": "2026-04-10T12:00:00Z", - "build": { - "build_id": "policyengine-us-data-1.73.0", - "built_at": "2026-04-10T12:00:00Z", - }, - "artifacts": { - "enhanced_cps_2024": { - "kind": "microdata", - "path": "enhanced_cps_2024.h5", - "repo_id": "policyengine/policyengine-us-data", - "revision": "1.73.0", - "sha256": "abc", - "size_bytes": 123, - } - }, - } - - with ( - patch("policyengine_us_data.utils.data_upload.HfApi", return_value=mock_api), - patch( - "policyengine_us_data.utils.data_upload.load_release_manifest_from_hf", - side_effect=lambda *args, **kwargs: ( - None if kwargs.get("revision") == "1.73.0" else existing_manifest - ), - ), - patch( - "policyengine_us_data.utils.data_upload._get_model_package_build_metadata", - return_value={ - "version": "1.634.4", - "git_sha": "deadbeef", - "data_build_fingerprint": "sha256:fingerprint", - }, - ), - patch.dict( - "policyengine_us_data.utils.data_upload.os.environ", - {"HUGGING_FACE_TOKEN": "token"}, - clear=False, - ), - ): - manifest = publish_release_manifest_to_hf( - [(state_path, "states/AL.h5")], - version="1.73.0", - create_tag=True, - ) - - mock_api.create_tag.assert_called_once() - assert manifest["build"] == { - "build_id": "policyengine-us-data-1.73.0", - "built_at": "2026-04-10T12:00:00Z", - "built_with_model_package": { - "name": "policyengine-us", - "version": "1.634.4", - "git_sha": "deadbeef", - "data_build_fingerprint": "sha256:fingerprint", - }, - } diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 000000000..e3d9df4d4 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""Repository maintenance scripts.""" diff --git a/scripts/guards/__init__.py b/scripts/guards/__init__.py new file mode 100644 index 000000000..2b896a044 --- /dev/null +++ b/scripts/guards/__init__.py @@ -0,0 +1 @@ +"""Quality guard implementations.""" diff --git a/scripts/guards/test_layout.py b/scripts/guards/test_layout.py new file mode 100644 index 000000000..aef7a425f --- /dev/null +++ b/scripts/guards/test_layout.py @@ -0,0 +1,160 @@ +"""Guardrails for pytest layout and test helper imports.""" + +from __future__ import annotations + +import ast +import subprocess +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[2] +TEST_LANES = { + "tests/unit": Path("tests/unit"), + "tests/integration": Path("tests/integration"), + "tests/optimized": Path("tests/optimized"), +} +ALLOWED_TEST_ROOTS = tuple(TEST_LANES.values()) +PYTEST_FILE_PREFIX = "test_" +PYTEST_FILE_SUFFIX = "_test.py" + + +def _git_files() -> list[Path]: + try: + result = subprocess.run( + [ + "git", + "ls-files", + "--cached", + "--others", + "--exclude-standard", + ], + cwd=REPO_ROOT, + check=True, + capture_output=True, + text=True, + ) + except (FileNotFoundError, subprocess.CalledProcessError): + return [ + path.relative_to(REPO_ROOT) + for path in REPO_ROOT.rglob("*") + if path.is_file() + ] + + return [ + Path(line) + for line in result.stdout.splitlines() + if line and (REPO_ROOT / line).is_file() + ] + + +def _is_pytest_file(path: Path) -> bool: + return path.suffix == ".py" and ( + path.name.startswith(PYTEST_FILE_PREFIX) + or path.name.endswith(PYTEST_FILE_SUFFIX) + ) + + +def _is_under(path: Path, parent: Path) -> bool: + return path == parent or parent in path.parents + + +def _test_lane(path: Path) -> str | None: + for name, root in TEST_LANES.items(): + if _is_under(path, root): + return name + return None + + +def _module_root(module: str) -> str | None: + for name in TEST_LANES: + if module == name.replace("/", ".") or module.startswith( + f"{name.replace('/', '.')}." + ): + return name + return None + + +def _check_test_placement(files: list[Path]) -> list[str]: + violations = [] + for path in files: + if not _is_pytest_file(path): + continue + + if _is_under(path, Path("policyengine_us_data/tests")): + violations.append( + f"{path}: package-internal tests are not collected by CI; " + "move tests under tests/unit, tests/integration, or tests/optimized." + ) + continue + + if path.parts and path.parts[0] == "tests": + if not any(_is_under(path, root) for root in ALLOWED_TEST_ROOTS): + violations.append( + f"{path}: pytest files under tests/ must live under " + "tests/unit, tests/integration, or tests/optimized." + ) + + return violations + + +def _check_test_imports(files: list[Path]) -> list[str]: + violations = [] + for path in files: + if path.suffix != ".py" or not _is_under(path, Path("tests")): + continue + + source = (REPO_ROOT / path).read_text(encoding="utf-8") + try: + tree = ast.parse(source, filename=str(path)) + except SyntaxError as exc: + violations.append(f"{path}: could not parse Python source: {exc}") + continue + + current_lane = _test_lane(path) + for node in ast.walk(tree): + module_names: list[str] = [] + if isinstance(node, ast.ImportFrom) and node.module: + module_names.append(node.module) + elif isinstance(node, ast.Import): + module_names.extend(alias.name for alias in node.names) + + for module in module_names: + if module == "tests.conftest" or module.startswith("tests.conftest."): + violations.append( + f"{path}: import from {module!r} couples tests to global " + "pytest setup; move helpers into a local support module." + ) + continue + + imported_lane = _module_root(module) + if imported_lane and imported_lane != current_lane: + violations.append( + f"{path}: imports {module!r} across test lanes; move shared " + "helpers to tests/support or colocate them with the tests." + ) + + return violations + + +def check() -> list[str]: + files = _git_files() + return [ + *_check_test_placement(files), + *_check_test_imports(files), + ] + + +def main() -> int: + violations = check() + if not violations: + print("test-layout guard passed") + return 0 + + print("test-layout guard failed:") + for violation in violations: + print(f" - {violation}") + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/run_quality_guards.py b/scripts/run_quality_guards.py new file mode 100644 index 000000000..135b8772b --- /dev/null +++ b/scripts/run_quality_guards.py @@ -0,0 +1,38 @@ +"""Run repository quality guards.""" + +from __future__ import annotations + +import sys +from collections.abc import Callable +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from scripts.guards import test_layout # noqa: E402 + + +Guard = tuple[str, Callable[[], list[str]]] + +GUARDS: tuple[Guard, ...] = (("test-layout", test_layout.check),) + + +def main() -> int: + failed = False + for name, check in GUARDS: + violations = check() + if not violations: + print(f"{name}: passed") + continue + + failed = True + print(f"{name}: failed") + for violation in violations: + print(f" - {violation}") + + return 1 if failed else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/integration/test_xw_consistency.py b/tests/integration/test_xw_consistency.py index 49c60dd88..e6ee9e101 100644 --- a/tests/integration/test_xw_consistency.py +++ b/tests/integration/test_xw_consistency.py @@ -6,7 +6,7 @@ from any optimizer behavior. Usage: - pytest policyengine_us_data/tests/test_calibration/test_xw_consistency.py -v + pytest tests/integration/test_xw_consistency.py -v """ import tempfile diff --git a/tests/optimized/local_h5/__init__.py b/tests/optimized/local_h5/__init__.py new file mode 100644 index 000000000..99c72228c --- /dev/null +++ b/tests/optimized/local_h5/__init__.py @@ -0,0 +1 @@ +"""Optimized local H5 integration tests.""" diff --git a/tests/optimized/local_h5/fixtures.py b/tests/optimized/local_h5/fixtures.py new file mode 100644 index 000000000..7918f9cd0 --- /dev/null +++ b/tests/optimized/local_h5/fixtures.py @@ -0,0 +1,205 @@ +"""Shared tiny-artifact fixtures for optimized local H5 integration tests.""" + +from __future__ import annotations + +import json +import pickle +import shutil +import sqlite3 +from dataclasses import dataclass +from functools import lru_cache +from pathlib import Path + +import numpy as np + +from policyengine_us_data.calibration.clone_and_assign import ( + GeographyAssignment, + save_geography, +) +from policyengine_us_data.calibration.local_h5.requests import ( + AreaBuildRequest, + AreaFilter, +) + +FIXTURE_DATASET_PATH = ( + Path(__file__).resolve().parents[2] / "integration" / "test_fixture_50hh.h5" +) +DISTRICT_GEOID = "3701" +COUNTY_FIPS = "37183" +STATE_FIPS = 37 +N_CLONES = 1 +SEED = 42 +VERSION = "0.0.0" + + +@dataclass(frozen=True) +class LocalH5Artifacts: + dataset_path: Path + weights_path: Path + db_path: Path + run_config_path: Path + geography_path: Path + calibration_package_path: Path + geography: GeographyAssignment + n_records: int + n_clones: int + + +@lru_cache(maxsize=1) +def fixture_household_count() -> int: + from policyengine_us import Microsimulation + + sim = Microsimulation(dataset=str(FIXTURE_DATASET_PATH)) + try: + return int(len(sim.calculate("household_id", map_to="household").values)) + finally: + del sim + + +def base_geography(*, n_records: int, n_clones: int = N_CLONES) -> GeographyAssignment: + total_rows = n_records * n_clones + block_geoids = np.array( + [f"{COUNTY_FIPS}{i:06d}{i:04d}"[:15] for i in range(total_rows)], + dtype="U15", + ) + return GeographyAssignment( + block_geoid=block_geoids, + cd_geoid=np.full(total_rows, DISTRICT_GEOID, dtype="U4"), + county_fips=np.full(total_rows, COUNTY_FIPS, dtype="U5"), + state_fips=np.full(total_rows, STATE_FIPS, dtype=np.int32), + n_records=n_records, + n_clones=n_clones, + ) + + +def seed_local_h5_artifacts( + tmp_path: Path, + *, + n_clones: int = N_CLONES, +) -> LocalH5Artifacts: + artifact_dir = tmp_path / "artifacts" + if artifact_dir.exists(): + shutil.rmtree(artifact_dir) + artifact_dir.mkdir(parents=True, exist_ok=True) + + dataset_path = artifact_dir / "source.h5" + weights_path = artifact_dir / "calibration_weights.npy" + db_path = artifact_dir / "policy_data.db" + run_config_path = artifact_dir / "unified_run_config.json" + geography_path = artifact_dir / "geography_assignment.npz" + calibration_package_path = artifact_dir / "calibration_package.pkl" + + shutil.copy2(FIXTURE_DATASET_PATH, dataset_path) + n_records = fixture_household_count() + np.save(weights_path, np.ones(n_records * n_clones, dtype=np.float32)) + + geography = base_geography(n_records=n_records, n_clones=n_clones) + save_geography(geography, geography_path) + + with open(calibration_package_path, "wb") as handle: + pickle.dump( + { + "block_geoid": geography.block_geoid, + "cd_geoid": geography.cd_geoid, + "metadata": { + "git_commit": "deadbeefcafebabe", + "git_branch": "main", + "git_dirty": False, + "package_version": VERSION, + }, + }, + handle, + protocol=pickle.HIGHEST_PROTOCOL, + ) + + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + CREATE TABLE stratum_constraints ( + stratum_id INTEGER, + constraint_variable TEXT, + value TEXT + ) + """ + ) + conn.execute( + """ + INSERT INTO stratum_constraints (stratum_id, constraint_variable, value) + VALUES (?, ?, ?) + """, + (1, "congressional_district_geoid", DISTRICT_GEOID), + ) + conn.commit() + finally: + conn.close() + + run_config_path.write_text( + json.dumps( + { + "git_commit": "deadbeefcafebabe", + "git_branch": "main", + "git_dirty": False, + "package_version": VERSION, + } + ) + ) + + return LocalH5Artifacts( + dataset_path=dataset_path, + weights_path=weights_path, + db_path=db_path, + run_config_path=run_config_path, + geography_path=geography_path, + calibration_package_path=calibration_package_path, + geography=geography, + n_records=n_records, + n_clones=n_clones, + ) + + +def build_request( + area_type: str, *, geography: GeographyAssignment +) -> AreaBuildRequest: + if area_type == "district": + return AreaBuildRequest( + area_type="district", + area_id="NC-01", + display_name="NC-01", + output_relative_path="districts/NC-01.h5", + filters=( + AreaFilter( + geography_field="cd_geoid", + op="in", + value=(DISTRICT_GEOID,), + ), + ), + validation_geo_level="district", + validation_geographic_ids=(DISTRICT_GEOID,), + ) + if area_type == "state": + return AreaBuildRequest( + area_type="state", + area_id="NC", + display_name="NC", + output_relative_path="states/NC.h5", + filters=( + AreaFilter( + geography_field="cd_geoid", + op="in", + value=(DISTRICT_GEOID,), + ), + ), + validation_geo_level="state", + validation_geographic_ids=(str(STATE_FIPS),), + ) + if area_type == "national": + return AreaBuildRequest( + area_type="national", + area_id="US", + display_name="US", + output_relative_path="national/US.h5", + validation_geo_level="national", + validation_geographic_ids=("US",), + ) + raise ValueError(f"Unsupported area_type for test fixture: {area_type}") diff --git a/tests/optimized/local_h5/test_modal_local_area_traceability.py b/tests/optimized/local_h5/test_modal_local_area_traceability.py new file mode 100644 index 000000000..a943b8dcf --- /dev/null +++ b/tests/optimized/local_h5/test_modal_local_area_traceability.py @@ -0,0 +1,64 @@ +from policyengine_us_data.calibration.local_h5.fingerprinting import ( + FingerprintingService, +) + +from tests.optimized.local_h5.fixtures import SEED, VERSION, seed_local_h5_artifacts +from tests.support.modal_local_area import load_local_area_module + + +def test_local_area_helpers_match_publish_traceability_contract(tmp_path): + local_area = load_local_area_module(stub_policyengine=False) + artifacts = seed_local_h5_artifacts(tmp_path) + + inputs = local_area._build_publishing_input_bundle( + weights_path=artifacts.weights_path, + dataset_path=artifacts.dataset_path, + db_path=artifacts.db_path, + geography_path=artifacts.geography_path, + calibration_package_path=artifacts.calibration_package_path, + run_config_path=artifacts.run_config_path, + run_id="run-123", + version=VERSION, + n_clones=artifacts.n_clones, + seed=SEED, + ) + + helper_fingerprint = local_area._resolve_scope_fingerprint( + inputs=inputs, + scope="regional", + ) + service = FingerprintingService() + service_fingerprint = service.compute_scope_fingerprint( + service.build_traceability(inputs=inputs, scope="regional") + ) + + assert helper_fingerprint == service_fingerprint + + +def test_local_area_scope_helper_distinguishes_regional_and_national(tmp_path): + local_area = load_local_area_module(stub_policyengine=False) + artifacts = seed_local_h5_artifacts(tmp_path) + + inputs = local_area._build_publishing_input_bundle( + weights_path=artifacts.weights_path, + dataset_path=artifacts.dataset_path, + db_path=artifacts.db_path, + geography_path=artifacts.geography_path, + calibration_package_path=artifacts.calibration_package_path, + run_config_path=artifacts.run_config_path, + run_id="run-123", + version=VERSION, + n_clones=artifacts.n_clones, + seed=SEED, + ) + + regional = local_area._resolve_scope_fingerprint( + inputs=inputs, + scope="regional", + ) + national = local_area._resolve_scope_fingerprint( + inputs=inputs, + scope="national", + ) + + assert regional != national diff --git a/tests/optimized/local_h5/test_traceability_contract.py b/tests/optimized/local_h5/test_traceability_contract.py new file mode 100644 index 000000000..b6b599082 --- /dev/null +++ b/tests/optimized/local_h5/test_traceability_contract.py @@ -0,0 +1,88 @@ +from policyengine_us_data.calibration.local_h5.fingerprinting import ( + FingerprintingService, + PublishingInputBundle, +) + +from tests.optimized.local_h5.fixtures import SEED, VERSION, seed_local_h5_artifacts + + +def _fingerprint_for(*, inputs, scope: str = "regional") -> str: + service = FingerprintingService() + return service.compute_scope_fingerprint( + service.build_traceability(inputs=inputs, scope=scope) + ) + + +def test_saved_geography_bundle_builds_traceability_with_stable_fingerprint(tmp_path): + artifacts = seed_local_h5_artifacts(tmp_path) + inputs = PublishingInputBundle( + weights_path=artifacts.weights_path, + source_dataset_path=artifacts.dataset_path, + target_db_path=artifacts.db_path, + exact_geography_path=artifacts.geography_path, + calibration_package_path=None, + run_config_path=artifacts.run_config_path, + run_id="run-123", + version=VERSION, + n_clones=artifacts.n_clones, + seed=SEED, + ) + + first = _fingerprint_for(inputs=inputs) + second = _fingerprint_for(inputs=inputs) + + assert first == second + + +def test_package_geography_bundle_builds_traceability_with_stable_fingerprint(tmp_path): + artifacts = seed_local_h5_artifacts(tmp_path) + inputs = PublishingInputBundle( + weights_path=artifacts.weights_path, + source_dataset_path=artifacts.dataset_path, + target_db_path=artifacts.db_path, + exact_geography_path=None, + calibration_package_path=artifacts.calibration_package_path, + run_config_path=artifacts.run_config_path, + run_id="run-123", + version=VERSION, + n_clones=artifacts.n_clones, + seed=SEED, + ) + + first = _fingerprint_for(inputs=inputs) + second = _fingerprint_for(inputs=inputs) + + assert first == second + + +def test_saved_and_package_geography_share_the_same_resumability_identity(tmp_path): + artifacts = seed_local_h5_artifacts(tmp_path) + saved_inputs = PublishingInputBundle( + weights_path=artifacts.weights_path, + source_dataset_path=artifacts.dataset_path, + target_db_path=artifacts.db_path, + exact_geography_path=artifacts.geography_path, + calibration_package_path=None, + run_config_path=artifacts.run_config_path, + run_id="run-123", + version=VERSION, + n_clones=artifacts.n_clones, + seed=SEED, + ) + package_inputs = PublishingInputBundle( + weights_path=artifacts.weights_path, + source_dataset_path=artifacts.dataset_path, + target_db_path=artifacts.db_path, + exact_geography_path=None, + calibration_package_path=artifacts.calibration_package_path, + run_config_path=artifacts.run_config_path, + run_id="run-123", + version=VERSION, + n_clones=artifacts.n_clones, + seed=SEED, + ) + + saved_fingerprint = _fingerprint_for(inputs=saved_inputs) + package_fingerprint = _fingerprint_for(inputs=package_inputs) + + assert saved_fingerprint == package_fingerprint diff --git a/tests/optimized/local_h5/test_worker_script_tiny_fixture.py b/tests/optimized/local_h5/test_worker_script_tiny_fixture.py new file mode 100644 index 000000000..6ad30fb4e --- /dev/null +++ b/tests/optimized/local_h5/test_worker_script_tiny_fixture.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path + +import pytest + +from tests.optimized.local_h5.fixtures import ( + build_request, + seed_local_h5_artifacts, +) + +pytest.importorskip("scipy") +pytest.importorskip("spm_calculator") + + +def _run_worker( + *, + request, + artifacts, + output_dir: Path, + use_saved_geography: bool = False, + use_package_geography: bool = False, +) -> dict: + cmd = [ + sys.executable, + "-m", + "modal_app.worker_script", + "--requests-json", + json.dumps([request.to_dict()]), + "--weights-path", + str(artifacts.weights_path), + "--dataset-path", + str(artifacts.dataset_path), + "--db-path", + str(artifacts.db_path), + "--output-dir", + str(output_dir), + "--n-clones", + str(artifacts.n_clones), + "--no-validate", + ] + if use_saved_geography: + cmd.extend(["--geography-path", str(artifacts.geography_path)]) + if use_package_geography: + cmd.extend( + [ + "--calibration-package-path", + str(artifacts.calibration_package_path), + ] + ) + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + ) + return json.loads(result.stdout) + + +def test_worker_builds_district_h5_from_saved_geography(tmp_path): + artifacts = seed_local_h5_artifacts(tmp_path / "district") + request = build_request("district", geography=artifacts.geography) + output_dir = tmp_path / "district-out" + + result = _run_worker( + request=request, + artifacts=artifacts, + output_dir=output_dir, + use_saved_geography=True, + ) + + assert result["failed"] == [] + assert result["errors"] == [] + assert result["completed"] == [f"district:{request.area_id}"] + assert (output_dir / request.output_relative_path).exists() + + +def test_worker_builds_state_h5_from_package_geography(tmp_path): + artifacts = seed_local_h5_artifacts(tmp_path / "state") + request = build_request("state", geography=artifacts.geography) + output_dir = tmp_path / "state-out" + + result = _run_worker( + request=request, + artifacts=artifacts, + output_dir=output_dir, + use_package_geography=True, + ) + + assert result["failed"] == [] + assert result["errors"] == [] + assert result["completed"] == [f"state:{request.area_id}"] + assert (output_dir / request.output_relative_path).exists() + + +def test_worker_builds_national_h5_from_package_geography(tmp_path): + artifacts = seed_local_h5_artifacts(tmp_path / "national") + request = build_request("national", geography=artifacts.geography) + output_dir = tmp_path / "national-out" + + result = _run_worker( + request=request, + artifacts=artifacts, + output_dir=output_dir, + use_package_geography=True, + ) + + assert result["failed"] == [] + assert result["errors"] == [] + assert result["completed"] == ["national:US"] + assert (output_dir / request.output_relative_path).exists() diff --git a/tests/support/__init__.py b/tests/support/__init__.py new file mode 100644 index 000000000..38361eaf5 --- /dev/null +++ b/tests/support/__init__.py @@ -0,0 +1 @@ +"""Shared test support helpers.""" diff --git a/tests/support/modal_local_area.py b/tests/support/modal_local_area.py new file mode 100644 index 000000000..683528e42 --- /dev/null +++ b/tests/support/modal_local_area.py @@ -0,0 +1,110 @@ +"""Helpers for importing `modal_app.local_area` in tests.""" + +import importlib +import sys +from contextlib import contextmanager +from types import ModuleType, SimpleNamespace + +__test__ = False + + +@contextmanager +def _patched_module_registry(overrides: dict[str, ModuleType]): + """Temporarily replace selected `sys.modules` entries for one import.""" + + sentinel = object() + previous = { + name: sys.modules.get(name, sentinel) + for name in [*overrides.keys(), "modal_app.local_area"] + } + + try: + for name, module in overrides.items(): + sys.modules[name] = module + sys.modules.pop("modal_app.local_area", None) + yield + finally: + for name, module in previous.items(): + if module is sentinel: + sys.modules.pop(name, None) + else: + sys.modules[name] = module + + +def load_local_area_module(*, stub_policyengine: bool = True): + """Import `modal_app.local_area` with scoped fake Modal dependencies.""" + + fake_modal = ModuleType("modal") + + class _FakeApp: + def __init__(self, *args, **kwargs): + pass + + def function(self, *args, **kwargs): + def decorator(func): + return func + + return decorator + + def local_entrypoint(self, *args, **kwargs): + def decorator(func): + return func + + return decorator + + fake_modal.App = _FakeApp + fake_modal.Secret = SimpleNamespace(from_name=lambda *args, **kwargs: object()) + fake_modal.Volume = SimpleNamespace(from_name=lambda *args, **kwargs: object()) + + fake_images = ModuleType("modal_app.images") + fake_images.cpu_image = object() + + fake_resilience = ModuleType("modal_app.resilience") + fake_resilience.reconcile_run_dir_fingerprint = lambda *args, **kwargs: None + + overrides = { + "modal": fake_modal, + "modal_app.images": fake_images, + "modal_app.resilience": fake_resilience, + } + + if stub_policyengine: + fake_policyengine = ModuleType("policyengine_us_data") + fake_calibration = ModuleType("policyengine_us_data.calibration") + fake_local_h5 = ModuleType("policyengine_us_data.calibration.local_h5") + fake_partitioning = ModuleType( + "policyengine_us_data.calibration.local_h5.partitioning" + ) + fake_fingerprinting = ModuleType( + "policyengine_us_data.calibration.local_h5.fingerprinting" + ) + fake_policyengine.__path__ = [] + fake_calibration.__path__ = [] + fake_local_h5.__path__ = [] + fake_partitioning.partition_weighted_work_items = lambda *args, **kwargs: [] + fake_fingerprinting.PublishingInputBundle = object + + class _FakeFingerprintingService: + def build_traceability(self, *args, **kwargs): + return object() + + def compute_scope_fingerprint(self, *args, **kwargs): + return "fake-fingerprint" + + fake_fingerprinting.FingerprintingService = _FakeFingerprintingService + overrides.update( + { + "policyengine_us_data": fake_policyengine, + "policyengine_us_data.calibration": fake_calibration, + "policyengine_us_data.calibration.local_h5": fake_local_h5, + "policyengine_us_data.calibration.local_h5.fingerprinting": ( + fake_fingerprinting + ), + "policyengine_us_data.calibration.local_h5.partitioning": ( + fake_partitioning + ), + } + ) + + with _patched_module_registry(overrides): + return importlib.import_module("modal_app.local_area") diff --git a/tests/support/pipeline_workspace.py b/tests/support/pipeline_workspace.py new file mode 100644 index 000000000..d8eb2e600 --- /dev/null +++ b/tests/support/pipeline_workspace.py @@ -0,0 +1,174 @@ +"""Shared fixture-scale pipeline workspace helpers. + +This module is intentionally test-only. Production code must not import it. +It defines the canonical tiny pipeline directory and artifact names that local +integration tests and Modal test harnesses can share as we move coverage +upstream from H5-only seams toward dataset build stages 1-5. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import ClassVar + +__test__ = False + + +STAGE_ARTIFACTS: dict[str, tuple[str, ...]] = { + "stage_1": ( + "uprating_factors.csv", + "acs_2022.h5", + "irs_puf_2015.h5", + ), + "stage_2": ( + "cps_2024.h5", + "puf_2024.h5", + ), + "stage_3": ("extended_cps_2024.h5",), + "stage_4": ( + "enhanced_cps_2024.h5", + "stratified_extended_cps_2024.h5", + ), + "stage_5": ( + "source_imputed_stratified_extended_cps_2024.h5", + "source_imputed_stratified_extended_cps.h5", + "small_enhanced_cps_2024.h5", + "sparse_enhanced_cps_2024.h5", + ), + "calibration": ( + "policy_data.db", + "calibration_package.pkl", + "calibration_weights.npy", + "geography_assignment.npz", + "unified_run_config.json", + ), + "h5_outputs": ( + "states/AL.h5", + "districts/NC-01.h5", + "national/US.h5", + ), +} + + +@dataclass(frozen=True) +class TinyPipelineWorkspace: + """Canonical on-disk layout for fixture-scale pipeline tests.""" + + root: Path + + STAGE_NAMES: ClassVar[tuple[str, ...]] = tuple(STAGE_ARTIFACTS) + TOP_LEVEL_DIRS: ClassVar[tuple[str, ...]] = ( + "inputs", + "stage_1", + "stage_2", + "stage_3", + "stage_4", + "stage_5", + "calibration", + "h5", + ) + H5_DIRS: ClassVar[tuple[str, ...]] = ( + "outputs", + "staging", + "diagnostics", + "manifests", + ) + + @classmethod + def create(cls, root: Path) -> "TinyPipelineWorkspace": + """Create an empty canonical workspace under ``root``.""" + + workspace = cls(root=root) + workspace.materialize() + return workspace + + @property + def inputs(self) -> Path: + return self.root / "inputs" + + @property + def stage_1(self) -> Path: + return self.root / "stage_1" + + @property + def stage_2(self) -> Path: + return self.root / "stage_2" + + @property + def stage_3(self) -> Path: + return self.root / "stage_3" + + @property + def stage_4(self) -> Path: + return self.root / "stage_4" + + @property + def stage_5(self) -> Path: + return self.root / "stage_5" + + @property + def calibration(self) -> Path: + return self.root / "calibration" + + @property + def h5(self) -> Path: + return self.root / "h5" + + @property + def h5_outputs(self) -> Path: + return self.h5 / "outputs" + + @property + def h5_staging(self) -> Path: + return self.h5 / "staging" + + @property + def h5_diagnostics(self) -> Path: + return self.h5 / "diagnostics" + + @property + def h5_manifests(self) -> Path: + return self.h5 / "manifests" + + def materialize(self) -> None: + """Create all canonical directories without writing artifacts.""" + + for dirname in self.TOP_LEVEL_DIRS: + (self.root / dirname).mkdir(parents=True, exist_ok=True) + for dirname in self.H5_DIRS: + (self.h5 / dirname).mkdir(parents=True, exist_ok=True) + + def stage_dir(self, stage: str) -> Path: + """Return the directory for a known stage name.""" + + if stage == "h5_outputs": + return self.h5_outputs + if stage not in STAGE_ARTIFACTS: + raise KeyError(f"Unknown tiny pipeline stage: {stage}") + return self.root / stage + + def artifact_path(self, stage: str, relative_path: str) -> Path: + """Return an artifact path and ensure nested parent dirs exist.""" + + path = self.stage_dir(stage) / relative_path + path.parent.mkdir(parents=True, exist_ok=True) + return path + + def expected_artifacts(self, stage: str) -> tuple[Path, ...]: + """Return expected artifact paths for a known stage.""" + + if stage not in STAGE_ARTIFACTS: + raise KeyError(f"Unknown tiny pipeline stage: {stage}") + return tuple( + self.artifact_path(stage, relative_path) + for relative_path in STAGE_ARTIFACTS[stage] + ) + + def all_expected_artifacts(self) -> dict[str, tuple[Path, ...]]: + """Return every currently defined expected artifact path by stage.""" + + return { + stage: self.expected_artifacts(stage) + for stage in STAGE_ARTIFACTS + } diff --git a/tests/unit/calibration/fixtures/test_local_h5_fingerprinting.py b/tests/unit/calibration/fixtures/test_local_h5_fingerprinting.py new file mode 100644 index 000000000..2ecffd000 --- /dev/null +++ b/tests/unit/calibration/fixtures/test_local_h5_fingerprinting.py @@ -0,0 +1,117 @@ +"""Fixture helpers for ``test_local_h5_fingerprinting.py``.""" + +from __future__ import annotations + +import importlib +import json +from pathlib import Path + +import h5py +import numpy as np + +from tests.unit.calibration.fixtures.test_local_h5_geography_loader import ( + write_saved_geography, +) + +__test__ = False + +_FINGERPRINTING_EXPORTS = None + + +def load_fingerprinting_exports(): + """Load the fingerprinting module without replacing shared package modules.""" + + global _FINGERPRINTING_EXPORTS + if _FINGERPRINTING_EXPORTS is not None: + return _FINGERPRINTING_EXPORTS + + module = importlib.import_module( + "policyengine_us_data.calibration.local_h5.fingerprinting" + ) + _FINGERPRINTING_EXPORTS = { + "module": module, + "ArtifactIdentity": module.ArtifactIdentity, + "FingerprintingService": module.FingerprintingService, + "PublishingInputBundle": module.PublishingInputBundle, + "TraceabilityBundle": module.TraceabilityBundle, + } + return _FINGERPRINTING_EXPORTS + + +def write_source_dataset( + path: Path, + *, + n_records: int, + person_records: int | None = None, +) -> None: + """Write a minimal HDF5 dataset with a ``person`` entity.""" + + person_count = person_records if person_records is not None else n_records + with h5py.File(path, "w") as handle: + person = handle.create_group("person") + person.create_dataset("person_id", data=np.arange(person_count, dtype=np.int32)) + + +def write_run_config(path: Path, *, package_version: str = "1.0.0") -> None: + """Write a minimal run-config payload with provenance fields.""" + + payload = { + "git_commit": "deadbeefcafebabe", + "git_branch": "main", + "git_dirty": False, + "package_version": package_version, + } + path.write_text(json.dumps(payload)) + + +def write_artifact_file(path: Path, content: bytes) -> None: + """Write one small binary artifact for traceability tests.""" + + path.write_bytes(content) + + +def make_publishing_inputs( + bundle_cls, + *, + tmp_path: Path, + n_records: int = 2, + person_records: int | None = None, + n_clones: int = 2, + seed: int = 42, + package_version: str = "1.0.0", +): + """Create a fully-populated publishing input bundle for tests.""" + + tmp_path.mkdir(parents=True, exist_ok=True) + weights_path = tmp_path / "calibration_weights.npy" + dataset_path = tmp_path / "source.h5" + db_path = tmp_path / "policy_data.db" + geography_path = tmp_path / "geography_assignment.npz" + run_config_path = tmp_path / "unified_run_config.json" + + np.save(weights_path, np.arange(n_records * n_clones, dtype=float) + 1.0) + write_source_dataset( + dataset_path, + n_records=n_records, + person_records=person_records, + ) + write_artifact_file(db_path, b"fake-db") + write_saved_geography( + geography_path, + n_records=n_records, + n_clones=n_clones, + ) + write_run_config(run_config_path, package_version=package_version) + + return bundle_cls( + weights_path=weights_path, + source_dataset_path=dataset_path, + target_db_path=db_path, + exact_geography_path=geography_path, + calibration_package_path=None, + run_config_path=run_config_path, + run_id="run-123", + version="1.2.3", + n_clones=n_clones, + seed=seed, + ) diff --git a/tests/unit/calibration/test_local_h5_fingerprinting.py b/tests/unit/calibration/test_local_h5_fingerprinting.py new file mode 100644 index 000000000..08d2b593b --- /dev/null +++ b/tests/unit/calibration/test_local_h5_fingerprinting.py @@ -0,0 +1,149 @@ +from tests.unit.calibration.fixtures.test_local_h5_fingerprinting import ( + load_fingerprinting_exports, + make_publishing_inputs, +) + + +exports = load_fingerprinting_exports() +FingerprintingService = exports["FingerprintingService"] +PublishingInputBundle = exports["PublishingInputBundle"] + + +def test_build_traceability_captures_artifact_identity_and_metadata(tmp_path): + inputs = make_publishing_inputs(PublishingInputBundle, tmp_path=tmp_path) + + service = FingerprintingService() + traceability = service.build_traceability(inputs=inputs, scope="regional") + + assert traceability.scope == "regional" + assert traceability.weights.path == inputs.weights_path + assert traceability.weights.sha256.startswith("sha256:") + assert traceability.source_dataset.sha256.startswith("sha256:") + assert traceability.exact_geography is not None + assert traceability.exact_geography.metadata["source_kind"] == "saved_geography" + assert traceability.exact_geography.metadata["canonical_sha256"].startswith( + "sha256:" + ) + assert traceability.target_db is not None + assert traceability.model_build["locked_version"] == "1.0.0" + assert traceability.metadata["n_clones"] == 2 + assert traceability.metadata["seed"] == 42 + + +def test_scope_fingerprint_differs_between_regional_and_national(tmp_path): + inputs = make_publishing_inputs(PublishingInputBundle, tmp_path=tmp_path) + + service = FingerprintingService() + regional = service.compute_scope_fingerprint( + service.build_traceability(inputs=inputs, scope="regional") + ) + national = service.compute_scope_fingerprint( + service.build_traceability(inputs=inputs, scope="national") + ) + + assert regional != national + + +def test_scope_fingerprint_is_stable_for_identical_inputs(tmp_path): + inputs = make_publishing_inputs(PublishingInputBundle, tmp_path=tmp_path) + + service = FingerprintingService() + first = service.compute_scope_fingerprint( + service.build_traceability(inputs=inputs, scope="regional") + ) + second = service.compute_scope_fingerprint( + service.build_traceability(inputs=inputs, scope="regional") + ) + + assert first == second + + +def test_scope_fingerprint_changes_when_relevant_provenance_changes(tmp_path): + first_inputs = make_publishing_inputs( + PublishingInputBundle, + tmp_path=tmp_path / "first", + ) + second_inputs = make_publishing_inputs( + PublishingInputBundle, + tmp_path=tmp_path / "second", + ) + second_inputs.target_db_path.write_bytes(b"changed-db") + + service = FingerprintingService() + first = service.compute_scope_fingerprint( + service.build_traceability(inputs=first_inputs, scope="regional") + ) + second = service.compute_scope_fingerprint( + service.build_traceability(inputs=second_inputs, scope="regional") + ) + + assert first != second + + +def test_traceability_uses_weight_derived_household_count_for_geography(tmp_path): + inputs = make_publishing_inputs( + PublishingInputBundle, + tmp_path=tmp_path, + n_records=2, + person_records=5, + n_clones=2, + ) + + service = FingerprintingService() + traceability = service.build_traceability(inputs=inputs, scope="regional") + + assert traceability.exact_geography is not None + assert traceability.exact_geography.metadata["canonical_sha256"].startswith( + "sha256:" + ) + + +def test_resumability_material_prefers_canonical_geography_checksum(tmp_path): + inputs = make_publishing_inputs(PublishingInputBundle, tmp_path=tmp_path) + + service = FingerprintingService() + traceability = service.build_traceability(inputs=inputs, scope="regional") + resumability = traceability.resumability_material() + + assert traceability.exact_geography is not None + assert ( + resumability["exact_geography_sha256"] + == traceability.exact_geography.metadata["canonical_sha256"] + ) + + +def test_traceability_handles_missing_optional_artifacts(tmp_path): + inputs = make_publishing_inputs(PublishingInputBundle, tmp_path=tmp_path) + standalone_weights_path = tmp_path / "standalone" / "weights.npy" + standalone_weights_path.parent.mkdir(parents=True, exist_ok=True) + standalone_weights_path.write_bytes(inputs.weights_path.read_bytes()) + inputs = PublishingInputBundle( + weights_path=standalone_weights_path, + source_dataset_path=inputs.source_dataset_path, + target_db_path=None, + exact_geography_path=None, + calibration_package_path=None, + run_config_path=None, + run_id=inputs.run_id, + version=inputs.version, + n_clones=inputs.n_clones, + seed=inputs.seed, + legacy_blocks_path=None, + ) + + service = FingerprintingService() + traceability = service.build_traceability(inputs=inputs, scope="regional") + + assert traceability.target_db is None + assert traceability.exact_geography is None + assert traceability.calibration_package is None + assert traceability.run_config is None + assert traceability.code_version == { + "git_commit": None, + "git_branch": None, + "git_dirty": None, + } + assert traceability.model_build == { + "locked_version": None, + "git_commit": None, + } diff --git a/tests/unit/calibration/test_publish_local_area.py b/tests/unit/calibration/test_publish_local_area.py index d4ebf685b..e6b5aa4d5 100644 --- a/tests/unit/calibration/test_publish_local_area.py +++ b/tests/unit/calibration/test_publish_local_area.py @@ -80,7 +80,10 @@ def test_compute_input_fingerprint_uses_loader_canonical_geography_identity( monkeypatch.setattr( "policyengine_us_data.calibration.publish_local_area.CalibrationGeographyLoader.resolve_source", - lambda self, **kwargs: SimpleNamespace(kind="saved_geography"), + lambda self, **kwargs: SimpleNamespace( + kind="saved_geography", + path=kwargs["geography_path"], + ), ) monkeypatch.setattr( "policyengine_us_data.calibration.publish_local_area.CalibrationGeographyLoader.compute_canonical_checksum", @@ -118,7 +121,10 @@ def test_compute_input_fingerprint_passes_calibration_package_path_to_loader( def fake_resolve_source(self, **kwargs): seen["resolve"] = kwargs - return SimpleNamespace(kind="calibration_package") + return SimpleNamespace( + kind="calibration_package", + path=kwargs["calibration_package_path"], + ) def fake_compute_canonical_checksum(self, **kwargs): seen["checksum"] = kwargs diff --git a/tests/unit/fixtures/test_modal_local_area.py b/tests/unit/fixtures/test_modal_local_area.py deleted file mode 100644 index 377e879ae..000000000 --- a/tests/unit/fixtures/test_modal_local_area.py +++ /dev/null @@ -1,88 +0,0 @@ -"""Fixture helpers for `test_modal_local_area.py`.""" - -import importlib -import sys -from contextlib import contextmanager -from types import ModuleType, SimpleNamespace - -__test__ = False - - -@contextmanager -def _patched_module_registry(overrides: dict[str, ModuleType]): - """Temporarily replace selected `sys.modules` entries for one import.""" - - sentinel = object() - previous = { - name: sys.modules.get(name, sentinel) - for name in [*overrides.keys(), "modal_app.local_area"] - } - - try: - for name, module in overrides.items(): - sys.modules[name] = module - sys.modules.pop("modal_app.local_area", None) - yield - finally: - for name, module in previous.items(): - if module is sentinel: - sys.modules.pop(name, None) - else: - sys.modules[name] = module - - -def load_local_area_module(): - """Import `modal_app.local_area` with scoped fake Modal dependencies.""" - - fake_modal = ModuleType("modal") - fake_policyengine = ModuleType("policyengine_us_data") - fake_calibration = ModuleType("policyengine_us_data.calibration") - fake_local_h5 = ModuleType("policyengine_us_data.calibration.local_h5") - fake_partitioning = ModuleType( - "policyengine_us_data.calibration.local_h5.partitioning" - ) - fake_policyengine.__path__ = [] - fake_calibration.__path__ = [] - fake_local_h5.__path__ = [] - - class _FakeApp: - def __init__(self, *args, **kwargs): - pass - - def function(self, *args, **kwargs): - def decorator(func): - return func - - return decorator - - def local_entrypoint(self, *args, **kwargs): - def decorator(func): - return func - - return decorator - - fake_modal.App = _FakeApp - fake_modal.Secret = SimpleNamespace(from_name=lambda *args, **kwargs: object()) - fake_modal.Volume = SimpleNamespace(from_name=lambda *args, **kwargs: object()) - - fake_images = ModuleType("modal_app.images") - fake_images.cpu_image = object() - - fake_resilience = ModuleType("modal_app.resilience") - fake_resilience.reconcile_run_dir_fingerprint = lambda *args, **kwargs: None - fake_partitioning.partition_weighted_work_items = lambda *args, **kwargs: [] - - with _patched_module_registry( - { - "modal": fake_modal, - "modal_app.images": fake_images, - "modal_app.resilience": fake_resilience, - "policyengine_us_data": fake_policyengine, - "policyengine_us_data.calibration": fake_calibration, - "policyengine_us_data.calibration.local_h5": fake_local_h5, - "policyengine_us_data.calibration.local_h5.partitioning": ( - fake_partitioning - ), - } - ): - return importlib.import_module("modal_app.local_area") diff --git a/tests/unit/test_modal_local_area.py b/tests/unit/test_modal_local_area.py index 0e3cd9fd6..2e745846a 100644 --- a/tests/unit/test_modal_local_area.py +++ b/tests/unit/test_modal_local_area.py @@ -1,4 +1,6 @@ -from tests.unit.fixtures.test_modal_local_area import load_local_area_module +from pathlib import Path + +from tests.support.modal_local_area import load_local_area_module def test_build_promote_national_publish_script_imports_version_manifest_helpers(): @@ -28,3 +30,164 @@ def test_build_promote_publish_script_finalizes_complete_release(): assert "should_finalize_local_area_release" in script assert "create_tag=should_finalize" in script assert "upload_manifest(" in script + + +def test_build_publishing_input_bundle_preserves_traceability_inputs(): + local_area = load_local_area_module(stub_policyengine=False) + + bundle = local_area._build_publishing_input_bundle( + weights_path=Path("/tmp/calibration_weights.npy"), + dataset_path=Path("/tmp/source.h5"), + db_path=Path("/tmp/policy_data.db"), + geography_path=Path("/tmp/geography_assignment.npz"), + calibration_package_path=Path("/tmp/calibration_package.pkl"), + run_config_path=Path("/tmp/unified_run_config.json"), + run_id="run-123", + version="1.2.3", + n_clones=4, + seed=42, + legacy_blocks_path=Path("/tmp/stacked_blocks.npy"), + ) + + assert bundle.weights_path == Path("/tmp/calibration_weights.npy") + assert bundle.source_dataset_path == Path("/tmp/source.h5") + assert bundle.target_db_path == Path("/tmp/policy_data.db") + assert bundle.exact_geography_path == Path("/tmp/geography_assignment.npz") + assert bundle.calibration_package_path == Path("/tmp/calibration_package.pkl") + assert bundle.run_config_path == Path("/tmp/unified_run_config.json") + assert bundle.run_id == "run-123" + assert bundle.version == "1.2.3" + assert bundle.n_clones == 4 + assert bundle.seed == 42 + assert bundle.legacy_blocks_path == Path("/tmp/stacked_blocks.npy") + + +def test_resolve_scope_fingerprint_computes_when_no_pin(monkeypatch): + local_area = load_local_area_module(stub_policyengine=False) + + seen = {} + + class FakeFingerprintingService: + def build_traceability(self, *, inputs, scope): + seen["inputs"] = inputs + seen["scope"] = scope + return {"scope": scope, "run_id": inputs.run_id} + + def compute_scope_fingerprint(self, traceability): + seen["traceability"] = traceability + return "computed-fingerprint" + + monkeypatch.setattr( + local_area, + "FingerprintingService", + FakeFingerprintingService, + ) + + bundle = local_area._build_publishing_input_bundle( + weights_path=Path("/tmp/calibration_weights.npy"), + dataset_path=Path("/tmp/source.h5"), + db_path=None, + geography_path=None, + calibration_package_path=None, + run_config_path=None, + run_id="run-123", + version="1.2.3", + n_clones=2, + seed=42, + ) + + fingerprint = local_area._resolve_scope_fingerprint( + inputs=bundle, + scope="regional", + ) + + assert fingerprint == "computed-fingerprint" + assert seen["inputs"] == bundle + assert seen["scope"] == "regional" + assert seen["traceability"] == {"scope": "regional", "run_id": "run-123"} + + +def test_resolve_scope_fingerprint_preserves_matching_pin(monkeypatch, capsys): + local_area = load_local_area_module(stub_policyengine=False) + + class FakeFingerprintingService: + def build_traceability(self, *, inputs, scope): + return scope + + def compute_scope_fingerprint(self, traceability): + return "pinned-fingerprint" + + monkeypatch.setattr( + local_area, + "FingerprintingService", + FakeFingerprintingService, + ) + + bundle = local_area._build_publishing_input_bundle( + weights_path=Path("/tmp/calibration_weights.npy"), + dataset_path=Path("/tmp/source.h5"), + db_path=None, + geography_path=None, + calibration_package_path=None, + run_config_path=None, + run_id="run-123", + version="1.2.3", + n_clones=2, + seed=42, + ) + + fingerprint = local_area._resolve_scope_fingerprint( + inputs=bundle, + scope="regional", + expected_fingerprint="pinned-fingerprint", + ) + + captured = capsys.readouterr() + assert fingerprint == "pinned-fingerprint" + assert "Using pinned fingerprint from pipeline" in captured.out + + +def test_resolve_scope_fingerprint_warns_and_preserves_mismatched_pin( + monkeypatch, capsys +): + local_area = load_local_area_module(stub_policyengine=False) + + class FakeFingerprintingService: + def build_traceability(self, *, inputs, scope): + return scope + + def compute_scope_fingerprint(self, traceability): + return "computed-fingerprint" + + monkeypatch.setattr( + local_area, + "FingerprintingService", + FakeFingerprintingService, + ) + + bundle = local_area._build_publishing_input_bundle( + weights_path=Path("/tmp/calibration_weights.npy"), + dataset_path=Path("/tmp/source.h5"), + db_path=None, + geography_path=None, + calibration_package_path=None, + run_config_path=None, + run_id="run-123", + version="1.2.3", + n_clones=2, + seed=42, + ) + + fingerprint = local_area._resolve_scope_fingerprint( + inputs=bundle, + scope="national", + expected_fingerprint="legacy-fingerprint", + ) + + captured = capsys.readouterr() + assert fingerprint == "legacy-fingerprint" + assert "Pinned fingerprint differs from current national scope fingerprint" in ( + captured.out + ) + assert "legacy-fingerprint" in captured.out + assert "computed-fingerprint" in captured.out diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 5aaca8a47..5458c53a7 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -8,7 +8,7 @@ modal = pytest.importorskip("modal") -from modal_app.pipeline import ( +from modal_app.pipeline import ( # noqa: E402 RunMetadata, _step_completed, _record_step, @@ -63,6 +63,39 @@ def test_from_dict(self): assert meta.status == "completed" assert meta.step_timings["build_datasets"]["status"] == "completed" + def test_from_dict_maps_legacy_fingerprint_to_regional_scope(self): + meta = RunMetadata.from_dict( + { + "run_id": "test", + "branch": "main", + "sha": "abc12345deadbeef", + "version": "1.72.3", + "start_time": "2026-03-19T12:00:00Z", + "status": "running", + "fingerprint": "legacy-fingerprint", + } + ) + + assert meta.fingerprint == "legacy-fingerprint" + assert meta.regional_fingerprint == "legacy-fingerprint" + + def test_from_dict_keeps_explicit_regional_fingerprint_when_both_present(self): + meta = RunMetadata.from_dict( + { + "run_id": "test", + "branch": "main", + "sha": "abc12345deadbeef", + "version": "1.72.3", + "start_time": "2026-03-19T12:00:00Z", + "status": "running", + "fingerprint": "legacy-fingerprint", + "regional_fingerprint": "regional-fingerprint", + } + ) + + assert meta.fingerprint == "legacy-fingerprint" + assert meta.regional_fingerprint == "regional-fingerprint" + def test_roundtrip(self): meta = RunMetadata( run_id="1.72.3_abc12345_20260319_120000", @@ -79,6 +112,39 @@ def test_roundtrip(self): assert roundtripped.status == meta.status assert roundtripped.error == meta.error + def test_to_dict_keeps_legacy_fingerprint_alias_in_sync(self): + meta = RunMetadata( + run_id="test", + branch="main", + sha="abc", + version="1.0.0", + start_time="now", + status="running", + regional_fingerprint="regional-fp", + ) + + payload = meta.to_dict() + + assert payload["fingerprint"] == "regional-fp" + assert payload["regional_fingerprint"] == "regional-fp" + + def test_to_dict_preserves_distinct_explicit_regional_fingerprint(self): + meta = RunMetadata( + run_id="test", + branch="main", + sha="abc", + version="1.0.0", + start_time="now", + status="running", + fingerprint="legacy-fp", + regional_fingerprint="regional-fp", + ) + + payload = meta.to_dict() + + assert payload["fingerprint"] == "legacy-fp" + assert payload["regional_fingerprint"] == "regional-fp" + def test_step_timings_default_empty(self): meta = RunMetadata( run_id="test", diff --git a/tests/test_refresh_local_agi_state_targets.py b/tests/unit/test_refresh_local_agi_state_targets.py similarity index 98% rename from tests/test_refresh_local_agi_state_targets.py rename to tests/unit/test_refresh_local_agi_state_targets.py index 9b59c056d..c036e1caf 100644 --- a/tests/test_refresh_local_agi_state_targets.py +++ b/tests/unit/test_refresh_local_agi_state_targets.py @@ -5,7 +5,7 @@ import pandas as pd -REPO_ROOT = Path(__file__).resolve().parent.parent +REPO_ROOT = Path(__file__).resolve().parents[2] PACKAGE_ROOT = REPO_ROOT / "policyengine_us_data" MODULE_PATH = ( PACKAGE_ROOT diff --git a/tests/unit/test_tiny_pipeline_workspace.py b/tests/unit/test_tiny_pipeline_workspace.py new file mode 100644 index 000000000..da2aec7da --- /dev/null +++ b/tests/unit/test_tiny_pipeline_workspace.py @@ -0,0 +1,64 @@ +from pathlib import Path + +import pytest + +from tests.support.pipeline_workspace import ( + STAGE_ARTIFACTS, + TinyPipelineWorkspace, +) + + +def test_tiny_pipeline_workspace_creates_canonical_directories(tmp_path): + workspace = TinyPipelineWorkspace.create(tmp_path / "tiny-pipeline") + + expected_dirs = [ + workspace.inputs, + workspace.stage_1, + workspace.stage_2, + workspace.stage_3, + workspace.stage_4, + workspace.stage_5, + workspace.calibration, + workspace.h5_outputs, + workspace.h5_staging, + workspace.h5_diagnostics, + workspace.h5_manifests, + ] + + assert all(path.is_dir() for path in expected_dirs) + + +def test_tiny_pipeline_workspace_resolves_expected_artifacts(tmp_path): + workspace = TinyPipelineWorkspace.create(tmp_path / "tiny-pipeline") + + assert workspace.expected_artifacts("stage_1") == ( + workspace.stage_1 / "uprating_factors.csv", + workspace.stage_1 / "acs_2022.h5", + workspace.stage_1 / "irs_puf_2015.h5", + ) + assert workspace.expected_artifacts("h5_outputs") == ( + workspace.h5_outputs / "states" / "AL.h5", + workspace.h5_outputs / "districts" / "NC-01.h5", + workspace.h5_outputs / "national" / "US.h5", + ) + + # Nested expected artifact paths should be immediately writable by later + # fixture builders. + for path in workspace.expected_artifacts("h5_outputs"): + assert path.parent.is_dir() + + +def test_tiny_pipeline_workspace_rejects_unknown_stage(tmp_path): + workspace = TinyPipelineWorkspace.create(tmp_path / "tiny-pipeline") + + with pytest.raises(KeyError, match="Unknown tiny pipeline stage"): + workspace.stage_dir("not-a-stage") + + +def test_tiny_pipeline_workspace_exposes_all_declared_artifacts(tmp_path): + workspace = TinyPipelineWorkspace.create(tmp_path / "tiny-pipeline") + + artifacts = workspace.all_expected_artifacts() + + assert set(artifacts) == set(STAGE_ARTIFACTS) + assert all(isinstance(path, Path) for paths in artifacts.values() for path in paths) diff --git a/policyengine_us_data/tests/test_trace_tro.py b/tests/unit/test_trace_tro.py similarity index 97% rename from policyengine_us_data/tests/test_trace_tro.py rename to tests/unit/test_trace_tro.py index fafd714c5..327e64412 100644 --- a/policyengine_us_data/tests/test_trace_tro.py +++ b/tests/unit/test_trace_tro.py @@ -1,4 +1,5 @@ import hashlib +import importlib.resources import json from pathlib import Path @@ -219,8 +220,9 @@ def test_tro_validates_against_shipped_schema(tmp_path): tro = build_trace_tro_from_release_manifest(manifest) - schema_path = ( - Path(__file__).resolve().parents[1] / "schemas" / "trace_tro.schema.json" + schema = json.loads( + importlib.resources.files("policyengine_us_data") + .joinpath("schemas/trace_tro.schema.json") + .read_text() ) - schema = json.loads(schema_path.read_text()) jsonschema.validate(instance=tro, schema=schema) diff --git a/tests/unit/version_manifest/__init__.py b/tests/unit/version_manifest/__init__.py new file mode 100644 index 000000000..8baf56aff --- /dev/null +++ b/tests/unit/version_manifest/__init__.py @@ -0,0 +1 @@ +"""Version manifest unit tests.""" diff --git a/tests/conftest.py b/tests/unit/version_manifest/conftest.py similarity index 62% rename from tests/conftest.py rename to tests/unit/version_manifest/conftest.py index fc97e2882..452224741 100644 --- a/tests/conftest.py +++ b/tests/unit/version_manifest/conftest.py @@ -1,12 +1,17 @@ -"""Shared fixtures and helpers for version manifest tests.""" +"""Shared fixtures for version manifest tests.""" -from __future__ import annotations - -import json from unittest.mock import MagicMock import pytest +from policyengine_us_data.utils.version_manifest import ( + HFVersionInfo, + GCSVersionInfo, + VersionManifest, + VersionRegistry, +) +from policyengine_us_data.utils.policyengine import PolicyEngineUSBuildInfo + # -- Fixtures ------------------------------------------------------ @@ -21,8 +26,6 @@ def sample_generations() -> dict[str, int]: @pytest.fixture def sample_hf_info() -> HFVersionInfo: - from policyengine_us_data.utils.version_manifest import HFVersionInfo - return HFVersionInfo( repo="policyengine/policyengine-us-data", commit="abc123def456", @@ -31,8 +34,6 @@ def sample_hf_info() -> HFVersionInfo: @pytest.fixture def sample_policyengine_us_info() -> PolicyEngineUSBuildInfo: - from policyengine_us_data.utils.policyengine import PolicyEngineUSBuildInfo - return PolicyEngineUSBuildInfo( version="1.587.0", locked_version="1.587.0", @@ -47,11 +48,6 @@ def sample_manifest( sample_hf_info: HFVersionInfo, sample_policyengine_us_info: PolicyEngineUSBuildInfo, ) -> VersionManifest: - from policyengine_us_data.utils.version_manifest import ( - GCSVersionInfo, - VersionManifest, - ) - return VersionManifest( version="1.72.3", created_at="2026-03-10T14:30:00Z", @@ -69,8 +65,6 @@ def sample_registry( sample_manifest: VersionManifest, ) -> VersionRegistry: """A registry with one version entry.""" - from policyengine_us_data.utils.version_manifest import VersionRegistry - return VersionRegistry( current="1.72.3", versions=[sample_manifest], @@ -82,23 +76,3 @@ def mock_bucket() -> MagicMock: bucket = MagicMock() bucket.name = "policyengine-us-data" return bucket - - -# -- Helpers ------------------------------------------------------- - - -def make_mock_blob(generation: int) -> MagicMock: - blob = MagicMock() - blob.generation = generation - return blob - - -def setup_bucket_with_registry( - bucket: MagicMock, - registry: VersionRegistry, -) -> None: - """Configure a mock bucket to serve a registry.""" - registry_json = json.dumps(registry.to_dict()) - blob = MagicMock() - blob.download_as_text.return_value = registry_json - bucket.blob.return_value = blob diff --git a/tests/unit/version_manifest/support.py b/tests/unit/version_manifest/support.py new file mode 100644 index 000000000..591a6484b --- /dev/null +++ b/tests/unit/version_manifest/support.py @@ -0,0 +1,23 @@ +"""Support helpers for version manifest unit tests.""" + +import json +from unittest.mock import MagicMock + +from policyengine_us_data.utils.version_manifest import VersionRegistry + + +def make_mock_blob(generation: int) -> MagicMock: + blob = MagicMock() + blob.generation = generation + return blob + + +def setup_bucket_with_registry( + bucket: MagicMock, + registry: VersionRegistry, +) -> None: + """Configure a mock bucket to serve a registry.""" + registry_json = json.dumps(registry.to_dict()) + blob = MagicMock() + blob.download_as_text.return_value = registry_json + bucket.blob.return_value = blob diff --git a/tests/unit/test_version_manifest.py b/tests/unit/version_manifest/test_version_manifest.py similarity index 99% rename from tests/unit/test_version_manifest.py rename to tests/unit/version_manifest/test_version_manifest.py index 7e46f16c6..1c2eede0f 100644 --- a/tests/unit/test_version_manifest.py +++ b/tests/unit/version_manifest/test_version_manifest.py @@ -20,7 +20,7 @@ get_data_manifest, get_data_version, ) -from tests.conftest import ( +from .support import ( make_mock_blob, setup_bucket_with_registry, )