Skip to content

Parallelize chunked matrix building behind a coordinator class (follow-up to #753) #818

@juaristi22

Description

@juaristi22

Summary

UnifiedMatrixBuilder.build_matrix_chunked() (introduced in #753) resolved the ~49.6 GB RSS OOM of the non-chunked path on full-CPS national runs, but it does not scale to production. At default settings a full run produces ~207 chunks of 25,000 clone-household columns each and executes them serially on one 8-CPU / 64 GB Modal container, which cannot finish inside the 14 h timeout (remote_calibration_runner.py:440). The final-assembly step at unified_matrix_builder.py:3489–3506 also reintroduces a memory peak by loading every per-chunk COO shard into three Python lists before np.concatenate, partially undoing the gain from chunking.

We want the chunked builder to finish full-scale production runs in a fraction of current wall time and under a memory budget that doesn't regress vs. per-chunk peak.

Context

Production scale (confirmed in-repo):

Quantity Value Source
Stratified households (CPS records) ~12,000 docs/methodology.md:422; calibration/create_stratified_cps.py:365 (__main__ default target = 12_000)
Default clones per household 430 docs/methodology.md:422; calibration/unified_calibration.py:65 (DEFAULT_N_CLONES)
Total clone-household columns ~5.16M (12,000 × 430) docs/methodology.md:422 ("approximately 5.2 million total records entering calibration")
Calibration targets (rows) 3,842 docs/internals/README.md:130
Default chunk size 25,000 cols calibration/unified_matrix_builder.py:3071
Chunks per production run ~207 derived
Per-worker Modal ceiling 8 CPU / 64 GB / 14 h modal_app/remote_calibration_runner.py:440
Existing H5 worker concurrency max_containers=50 modal_app/local_area.py:452

Note on prior confusion: the create_stratified_cps_dataset function has target_households=30_000 as its signature default, but that path is only taken by programmatic callers with no args. The production __main__ entry point (what data_build.py invokes) hard-codes target = 12_000.

Current build_matrix_chunked() (unified_matrix_builder.py:3065–3519) is a ~450-line method that owns partition, per-chunk materialization, per-chunk Microsimulation construction, COO shard writing, resume manifest validation, progress logging, and final CSR assembly. This matches the refactor doc's observation that UnifiedMatrixBuilder is overloaded (refactor §6: target-repo work, uprating, simulation, geography indexing, constraint evaluation, take-up handling, sparse assembly, and now orchestration).

Per-chunk work is structurally independent: the only cross-chunk state is constraint_cache (built once at line 3136, immutable thereafter) and the geography arrays (read-only). This is embarrassingly parallel.

Goals

  1. Full-CPS chunked matrix build finishes end-to-end on Modal within the current 14 h ceiling, with substantial wall-time headroom for future growth.
  2. Peak memory during final assembly bounded by final-CSR size, not ~2× via list-then-concatenate.
  3. Parallelization introduced in a way that is consistent with the refactor direction — behind a small, testable class — not as a new 500-line branch inside build_matrix_chunked.
  4. No new Modal apps, volumes, or concurrency budget (see "Infrastructure impact" below).

Non-goals

  • Full Phase 4 extraction of MatrixAssembler / SimulationBatchEvaluator / TargetRepository / ConstraintEvaluator. That is a larger effort tracked by the refactor plan. This issue carves out the coordination layer only, which the refactor doc treats as a natural seam.
  • Changes to the non-chunked build_matrix() path. It will continue to exist for small/local callers.
  • Changes to L0 fitting, target querying, or take-up logic.
  • Schema/contract changes to calibration_package.pkl — that's Phase 1 of the refactor.

Proposed direction

Extract a coordinator class — proposed name ChunkedMatrixAssembler — in policyengine_us_data/calibration/. The class owns:

  • chunk partitioning (given n_records, n_clones, chunk_size);
  • dispatch of per-chunk work to an injected ChunkDispatcher;
  • resume machinery (chunk_manifest.json + lineage signature, already implemented);
  • per-chunk progress / ETA logging (already implemented);
  • streaming final assembly into CSR (new — replaces the current list-then-concat).

The per-chunk kernel (materialize subset H5 → build Microsimulation → compute variables → write COO shard) is factored into a pure callable so the same function runs in-process today and inside a Modal worker tomorrow. The refactor's design principle applies: the kernel knows arrays, the dispatcher knows where they execute.

┌─────────────────────────────────────────────────────────┐
│ UnifiedMatrixBuilder.build_matrix_chunked(...) ← facade │
└────────────┬────────────────────────────────────────────┘
             │ delegates
             ▼
     ┌─────────────────────────────┐
     │ ChunkedMatrixAssembler      │
     │  • partition                │
     │  • dispatcher.submit()      │
     │  • resume manifest          │
     │  • streaming CSR assembly   │
     └───────┬─────────────────────┘
             │ uses
             ▼
     ┌─────────────────────────────┐     ┌──────────────────────────┐
     │ ChunkDispatcher (interface) │◀────│ ChunkKernel (pure)       │
     │                             │     │  (current per-chunk body)│
     │  LocalChunkDispatcher       │     └──────────────────────────┘
     │  ModalChunkDispatcher       │
     └─────────────────────────────┘
  • LocalChunkDispatcher runs kernels in the current process — the existing serial path, preserved for small runs, tests, and environments without Modal.
  • ModalChunkDispatcher spawns build_matrix_chunk_worker Modal functions batched by chunks_per_worker, following the build_areas_worker pattern (modal_app/local_area.py:442–454, max_containers=50, modal.Volume for shard coordination, .spawn() / .get() idiom). Shards land at /staging/{run_id}/matrix_chunks/chunk_XXXXXX.npz; the existing chunk_manifest.json machinery (lines 3197–3218) works unchanged.

CLI: add --parallel / --no-parallel and --num-matrix-workers to unified_calibration. These flags are only meaningful when the chunked path is active — on the single-pass build_matrix path there's nothing to fan out. When --parallel is passed without --chunked-matrix, silently ignore rather than error. Default --no-parallel during initial rollout; flip the default after the benchmark (see "Validation" below) confirms the parallel path is correct and stable.

Fan-out default: start at num_workers=50 to match the existing H5-worker convention (modal_app/local_area.py:452) and the num_workers input already wired through pipeline.yaml. At ~207 chunks that's ~4 chunks/worker — slightly finer-grained than H5 building but consistent. The per-chunk benchmark (Open Questions §1) can push this down if cold-start cost turns out to dominate or up if per-chunk wall time is large; 50 is the operating default, not a hard target.

Streaming final assembly replaces the current three-list concat. Two-pass approach: (1) walk shard headers to sum nnz, (2) preallocate data, indices, indptr arrays once and fill by iterating shards. Peak memory becomes the final CSR, not 2× it.

Relationship to the refactor plan

This issue implements a small, deliberate subset of refactor Phase 4. Specifically:

  • ChunkedMatrixAssembler is a precursor to the full MatrixAssembler extraction. It takes on orchestration but does not yet absorb target-repo / uprater / constraint-evaluator / simulation-batch-evaluator responsibilities from UnifiedMatrixBuilder. Those remain inside the builder for now.
  • ChunkDispatcher matches the refactor's push to make execution boundaries (local vs Modal) replaceable rather than hard-coded in orchestration code.
  • Per-chunk kernel purification aligns with refactor §376–378 ("side effects are too deeply embedded"). It does not finish the job, but it stops making it worse.

This keeps the refactor migration forward-compatible: when Phase 4 starts extracting MatrixAssembler proper, ChunkedMatrixAssembler becomes the orchestration caller for it, not a competitor.

Infrastructure impact

Zero net change to Modal footprint:

  • Apps: modal_app/pipeline.py registers one modal.App("policyengine-us-data-pipeline"). New @app.function decorators register inside it — no new apps.
  • Volumes: new shards live in the existing pipeline-artifacts (or local-area-staging) volume under a new /staging/{run_id}/matrix_chunks/ path. Ephemeral per-run; no durable growth.
  • Concurrency budget: max_containers=50 reused from the H5-worker ceiling, not added on top.
  • Billing: CPU-seconds × memory-GB-seconds are invariant to container count for the same total work. 1 worker × 64 GB × 14 h ≈ 50 workers × 64 GB × ~17 min ≈ 896 GB-hours. Cold-start overhead ≈ 20 s × 50 × 8 CPU ≈ 2 CPU-hours (rounding error). Net expected effect is neutral-to-favorable because today's sequential run can burn the full 14 h allocation on incomplete work that has to be re-run.

Prerequisite: make Modal pipeline boot again

Independent of parallelization, the deployed policyengine-us-data-pipeline Modal app has not successfully executed a single function call since #765 due to a container-boot import failure. Evidence: all six fc-... spawns for commits merged after #765 (fc-01KPF4SQNGRS82ZYM81FZ0DWCX, fc-01KPNAJGTJC6W1CHD6W3G0BS8T, fc-01KPPWKY5HJ07C9WEEZHGBN5VX, fc-01KPPZ6V3W2VZ9BJNT949CZKWA, fc-01KPQVWZQZ1ZKWCJ0RCBB35FMH, fc-01KPS3XADWAFFBVE01NRWACFR2) log the same trace in modal app logs ap-m5QsBgnRqhJtRMRKfA8myq --env main:

File "/root/policyengine-us-data/modal_app/local_area.py", line 32, ...
File "/root/policyengine-us-data/policyengine_us_data/__init__.py", line 3, in <module>
    from .geography import ZIP_CODE_DATASET
File "/root/policyengine-us-data/policyengine_us_data/geography/__init__.py", line 2, in <module>
    import pandas as pd
ModuleNotFoundError: No module named 'pandas'

Root cause: modal_app/images.py::_base_image runs uv sync --frozen which installs deps into /root/policyengine-us-data/.venv/, but Modal boots the container with system Python, which has only uv.

Two aligned fixes, both consistent with the refactor doc:

  1. Short-term (same issue / same PR): add .env({"VIRTUAL_ENV": ..., "PATH": "/root/.../.venv/bin:...", "PYTHONPATH": "/root/.../.venv/lib/python3.14/site-packages"}) after .run_commands(... uv sync ...) in modal_app/images.py. Unblocks boot without touching import semantics.
  2. Long-term (tracked by the refactor plan): make policyengine_us_data/__init__.py and policyengine_us_data/geography/__init__.py import-safe in minimal environments — no module-scope pandas, no dataset reads at package load. This is refactor §414–430 ("import and dependency boundaries"). Out of scope here, referenced for continuity.

The short-term fix must land in the same PR as (or before) the parallelization work, because the parallelization cannot be verified end-to-end on Modal without it.

Open questions (resolve before implementation)

  1. Per-chunk benchmark to sanity-check the 50-worker default. Production chunk count is ~207. The test fixture is 50 households / 100 cols (tests/integration/test_chunked_matrix_builder.py:180) — ~52,000× smaller than production, so we cannot extrapolate per-chunk wall time from tests. Proposed: after the venv fix lands and the first build_matrix_chunk_worker is deployable, run one worker on one production-scale 25K-col chunk on Modal, record wall time + peak RSS + shard nnz. With those numbers, verify that num_workers=50 (i.e. ~4 chunks per worker) gives acceptable wall time without cold-start overhead swamping useful work. Only adjust if the benchmark clearly says to.
  2. Where does ChunkedMatrixAssembler live? Candidates: policyengine_us_data/calibration/chunked_matrix_assembler.py (new file) or keep it inside unified_matrix_builder.py below the existing class. Refactor doc prefers separate files per responsibility; I'd follow that.
  3. Should the coordinator run outside the Modal container entirely? Currently the whole pipeline (including the matrix-builder step) runs inside one Modal function. If the coordinator spawns workers, it still needs to be inside some container to call .spawn(). Simplest is to keep the coordinator inside the existing matrix-building Modal function and let it fan out to workers — no architectural change. Flagging for confirmation.
  4. Test strategy for ModalChunkDispatcher. Refactor doc prescribes unit tests with fakes. Proposed: a FakeChunkDispatcher used in unit tests to exercise ChunkedMatrixAssembler partition/collect/assemble logic without Modal; one real Modal smoke integration test gated behind an env flag.

Acceptance criteria

  • ChunkedMatrixAssembler class exists in policyengine_us_data/calibration/ with unit tests covering: partition correctness, resume/skip correctness with fake completed shards, streaming CSR assembly correctness on hand-built COO fixtures.
  • ChunkDispatcher interface with LocalChunkDispatcher and ModalChunkDispatcher implementations. FakeChunkDispatcher used in unit tests.
  • UnifiedMatrixBuilder.build_matrix_chunked() is a facade that delegates to ChunkedMatrixAssembler. Public signature unchanged. All existing build_matrix_chunked unit and integration tests pass against the facade with no edits.
  • Full-CPS parallel run completes on Modal via workflow_dispatch on pipeline.yaml with the new --parallel flag (in combination with --chunked-matrix). Wall time documented in the PR description.
  • --parallel without --chunked-matrix is a silent no-op (no behavior change vs. the non-chunked path).
  • Output CSR on a small fixture (existing tests/integration/test_chunked_matrix_builder.py fixture) is byte-identical (or nnz/row-sum/column-sum identical modulo stable COO ordering) between LocalChunkDispatcher and ModalChunkDispatcher.
  • Final-assembly peak memory profiled on a medium fixture is within ~1.1× of final-CSR size (streaming works).
  • Modal venv fix merged so the deployed app can boot (part of same PR unless a standalone fix lands first).
  • Towncrier changelog fragment added under changelog.d/ (one .changed.md for the parallelization, one .fixed.md for the venv activation).

Validation plan

Same as the "Verification" block of the earlier plan draft:

  1. Unit: new tests + existing test_chunked_matrix_builder.py green against the facade-over-coordinator layout.
  2. Modal deploy smoke: modal deploy modal_app/pipeline.py succeeds; workflow_dispatch run boots past module imports; modal app logs ap-m5QsBgnRqhJtRMRKfA8myq --env main shows the pipeline progressing into real work.
  3. Single-chunk benchmark: one production-scale 25K-col chunk on Modal → record wall time, peak RSS, nnz → pick chunks_per_worker.
  4. End-to-end parallel run: full ~207-chunk build across num_workers containers → CSR shape / nnz / row-sums match a serial baseline on a reduced fixture.
  5. Resume: interrupt mid-flight, re-dispatch with same run_id, confirm completed chunks skip.
  6. Memory profile: peak RSS during assembly ≤ ~1.1× final-CSR size on a medium fixture.

Related

  • National calibration fixes and chunked matrix builder #753 — introduced the chunked path this issue scales.
  • Refactor doc: US Data Pipeline Refactor.md (Phase 4 extraction of UnifiedMatrixBuilder; §414–430 on import-time dependency boundaries).
  • modal_app/local_area.py:442–454 — reference pattern (build_areas_worker + max_containers=50).
  • modal_app/pipeline.py — existing .spawn() / .get() usages and run_id/volume conventions.
  • Container boot regression (referenced under "Prerequisite"): fc-01KPF4SQNGRS82ZYM81FZ0DWCX, fc-01KPNAJGTJC6W1CHD6W3G0BS8T, fc-01KPPWKY5HJ07C9WEEZHGBN5VX, fc-01KPPZ6V3W2VZ9BJNT949CZKWA, fc-01KPQVWZQZ1ZKWCJ0RCBB35FMH, fc-01KPS3XADWAFFBVE01NRWACFR2.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions