Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions docs/plans/2026-06-13-log-tail-worker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Plan: Reusable event-driven log-tail worker (`hex log-tail`)

**Date:** 2026-06-13
**Branch:** `feature/log-tail-worker` (off `develop`, GitFlow)
**Spec written against:** hex-harness `0.43.0`, foundation `develop` @ `3659a064`

## Goal

Ship a **generic, event-driven log-tail capability** in hex-foundation whose first consumer is capturing headroom proxy overhead (`STAGE_TIMINGS.total_pre_upstream`) into hex's unified telemetry + event bus — headroom treated as an opaque, swappable upstream. The reusable core is a `LineObserver` trait; headroom is one compiled impl. New upstreams = new impl, never a config DSL.

Origin: `me/decisions/headroom-overhead-telemetry-architecture-2026-06-13.md` (mrap-hex) + `docs/ideation/2026-06-13-headroom-overhead-telemetry-ideation.md`.

## Key design decision (forced by SDK reality)

The Rust worker SDK (`Worker::on_cron/on_event`) runs handlers on `spawn_blocking` — **handlers must return; no daemon/continuous-loop primitive exists.** A continuous, event-driven `tail -F` therefore cannot be a `*.worker.rs` cron handler without degrading to a poll (the thing we explicitly rejected). The hex-idiomatic continuous mechanism is the **`iii-exec` supervised daemon** — the exact pattern that runs the headroom proxy and console (`engine-workers.yaml`, `restart.on_crash: true`).

**Therefore:** implement `hex log-tail` as a long-running CLI subcommand (event-driven via `notify`/FSEvents — blocks on kernel FS events, no timer), supervised by iii-exec. This honors no-poll + reuses hex's supervised-daemon worker pattern. The "reusable worker" is the iii-exec stanza running `hex log-tail --observer <name>`.

**Testability discipline:** the brain is pure and unit-tested; the notify loop is thin glue.
- `LineObserver` trait + impls — pure, unit-tested.
- Offset/rotation **tail reader** (given file + prior offset → new lines, new offset, rotation flag) — pure, unit-tested with temp files (deterministic, no FS-event flakiness).
- Episode **coalescer** (state machine: burst of breaches → one episode) — pure, unit-tested.
- notify follow-loop — thin glue; exercised by a bounded integration test, not relied on for FS-event timing in CI.

## Architecture / files

New module `system/harness/src/log_tail/`:
- `mod.rs` — module exports + the `run()` entrypoint (follow loop driver) + `LogTailConfig`.
- `observer.rs` — `LineObserver` trait, `Observation { value_ms, stage, severity, fields }`, `Severity { Normal, Degraded, Stall }`, and `observer_registry(name) -> Option<Box<dyn LineObserver>>`.
- `reader.rs` — `TailReader`: open-at-offset, read new complete lines, detect rotation/truncation (len < offset OR inode change → restart at 0), return `(Vec<String>, new_offset)`. Pure, no notify.
- `coalesce.rs` — `EpisodeCoalescer`: feed `(ts, Observation)`; emits a `StallEpisode { duration_ms, count, peak_ms, dominant_stage }` when a burst closes (gap > `quiet_window`). Pure state machine.
- `emit.rs` — durable-first sink: `telemetry::record(...)` FIRST (durable SQLite), then `ops::emit(event, data, producer)` best-effort (lossy bus). Never lose the row to a bus drop.
- `observers/headroom.rs` — `HeadroomStageTimings: LineObserver`. Matches lines containing `STAGE_TIMINGS`, parses the embedded JSON, extracts `stages.total_pre_upstream` (+ `read_request_json`, `deep_copy`), classifies severity by thresholds, returns `Observation`.

Wiring:
- `lib.rs` — add `pub mod log_tail;`.
- `main.rs` — add `Commands::LogTail { path, observer, event, source, threshold_ms, quiet_ms, from_start }` (flat args) + a dispatch arm calling `log_tail::run(cfg)`.
- `Cargo.toml` — add `notify = "8"` (already in Cargo.lock @ 8.2.0 transitively → no new download).
- `system/iii/engine-workers.example.yaml` — add a commented `log-tail-headroom` iii-exec stanza documenting how to enable (instance enables in its own `engine-workers.yaml`).

Fixture + contract:
- `system/harness/tests/fixtures/headroom-stage-timings.jsonl` — a golden real `STAGE_TIMINGS` line.
- Contract test (inline in `observers/headroom.rs` tests) asserts the parser extracts `total_pre_upstream` from the golden line — fails loud if headroom's format drifts (the "telemetry dies silently" guard, idea #6).

## Event + telemetry contract

- Telemetry row (durable-first): `source="headroom-proxy"` (or `--source`), `event="overhead"` / `"stall_episode"`, `status` ∈ {`ok`,`degraded`,`stall`}, `duration_ms = total_pre_upstream_ms` (peak for episodes), `detail = compact JSON { stage, count, ... }`.
- Bus event (best-effort): name from `--event` (default `headroom.overhead`), `data = { severity, value_ms, stage, count, peak_ms, source }`. Consumers register `on_event`.
- Producer label: `--source` value, so swapping/dual-running upstreams stays attributable (producer_id baked in).

## Severity thresholds (headroom)

Normal < 500ms ≤ Degraded < 2000ms ≤ Stall. Only Degraded/Stall emit per-event; Normal feeds episode stats only. Defaults overridable via `--threshold-ms` (degraded) — keep flags minimal; no config DSL.

## Testing strategy / CI gates

CI: `cargo test --manifest-path system/harness/Cargo.toml` (canonical, per AGENTS.md) + release build in Docker. Local gates before push: `cargo build`, `cargo test`, `cargo clippy -- -D warnings`, `cargo fmt --check` (run all in the worktree).

Unit tests (pure, deterministic):
- `reader`: appends advance offset; rotation (truncate/shrink) restarts at 0; partial trailing line not emitted until newline.
- `coalesce`: single breach → episode of 1; burst within window → one episode with correct count/peak; gap closes episode.
- `observer/headroom`: parses golden line; classifies Normal/Degraded/Stall; ignores non-STAGE_TIMINGS lines; tolerates malformed JSON (no panic).
- `registry`: known name → Some, unknown → None.
- contract: golden fixture parses to expected total_pre_upstream.

## STOP / risk conditions

- If `notify` cannot be added cleanly (version conflict with the iii engine's transitive copy) → fall back to a `std` + short-interval read loop **inside the daemon** (still a continuous supervised process, latency bounded; document the deviation). Do NOT convert to a cron `*.worker.rs` poll.
- If `main.rs` wiring risks breaking the large central file → keep the variant + arm minimal; all logic in `log_tail/`.
- Code ≠ this plan's description of the SDK → re-verify against live code, never improvise.

## Out of scope (YAGNI — recorded, not built)

- Non-file sources (stream/pipe/socket) — trait stays source-agnostic; ship file only.
- `/metrics` or `/stats-history` taps (polling; rejected).
- SPC adaptive baseline + self-closing decision-file loop (idea #7) — strong follow-up, separate change once the base signal flows.
- Cost-correlation, back-pressure, generic "observe-any-upstream" config language.
4 changes: 4 additions & 0 deletions system/harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ sha2 = "0.10"
fs2 = "0.4"
rusqlite = { version = "0.31", features = ["bundled-full"] }
regex = "1"
# Cross-platform filesystem watcher (FSEvents/inotify/kqueue) for the event-driven
# `hex log-tail` daemon — already present transitively (8.x via the iii engine), so
# promoting it to a direct dep pulls no new version.
notify = "8"
sqlite-vec = "0.1"
# Tree-trim attempted (2026-05-21): `default-features = false, features = ["ort-download-binaries"]`
# broke the build — `TextEmbedding::try_new` is only available with default features enabled
Expand Down
1 change: 1 addition & 0 deletions system/harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod harness;
pub mod ledger;
pub mod lint_gates;
pub mod llm_cost;
pub mod log_tail;
pub mod reconciler;
pub mod capability_exec;
pub mod charter;
Expand Down
233 changes: 233 additions & 0 deletions system/harness/src/log_tail/coalesce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//! Episode coalescer. A single event-loop stall makes many queued requests all
//! report high latency at once; counting them as N independent incidents misleads
//! the "is this getting worse?" decision and floods alerting. This collapses a
//! burst of Degraded/Stall observations into ONE [`StallEpisode`], closed after
//! `quiet_window` elapses with no further breach.
//!
//! Pure state machine: the caller supplies monotonic timestamps ([`Instant`]),
//! so it is unit-testable without a real clock.

use super::observer::Severity;
use std::time::{Duration, Instant};

/// One coalesced stall episode (a contiguous burst of breaches).
#[derive(Debug, Clone, PartialEq)]
pub struct StallEpisode {
/// Number of breaching lines in the episode.
pub count: u64,
/// Worst latency seen in the episode (ms).
pub peak_ms: f64,
/// Worst severity seen in the episode (an all-Degraded burst stays Degraded;
/// any Stall makes the whole episode Stall).
pub peak_severity: Severity,
/// Dominant stage at the peak, when known.
pub dominant_stage: Option<String>,
}

pub struct EpisodeCoalescer {
quiet_window: Duration,
open: Option<OpenEpisode>,
}

struct OpenEpisode {
count: u64,
peak_ms: f64,
peak_severity: Severity,
dominant_stage: Option<String>,
last_breach: Instant,
}

impl EpisodeCoalescer {
pub fn new(quiet_window: Duration) -> Self {
EpisodeCoalescer {
quiet_window,
open: None,
}
}

/// Feed one breach (Degraded/Stall) observed at `now`. If this breach arrives
/// after the prior episode's quiet window already elapsed, that prior episode
/// is closed and returned (and a fresh one opens for this breach).
pub fn record_breach(
&mut self,
now: Instant,
value_ms: f64,
severity: Severity,
stage: Option<String>,
) -> Option<StallEpisode> {
let gap_exceeded = self
.open
.as_ref()
.is_some_and(|ep| now.duration_since(ep.last_breach) > self.quiet_window);
let closed = if gap_exceeded {
self.take_closed()
} else {
None
};

match &mut self.open {
Some(ep) => {
ep.count += 1;
if value_ms > ep.peak_ms {
ep.peak_ms = value_ms;
ep.dominant_stage = stage;
}
ep.peak_severity = ep.peak_severity.max(severity);
ep.last_breach = now;
}
None => {
self.open = Some(OpenEpisode {
count: 1,
peak_ms: value_ms,
peak_severity: severity,
dominant_stage: stage,
last_breach: now,
});
}
}
closed
}

/// Call when idle (no new breaches): closes and returns an open episode whose
/// quiet window has elapsed. Returns `None` if nothing is open or the window
/// has not yet passed.
pub fn tick(&mut self, now: Instant) -> Option<StallEpisode> {
let elapsed = self
.open
.as_ref()
.is_some_and(|ep| now.duration_since(ep.last_breach) > self.quiet_window);
if elapsed {
self.take_closed()
} else {
None
}
}

fn take_closed(&mut self) -> Option<StallEpisode> {
self.open.take().map(|ep| StallEpisode {
count: ep.count,
peak_ms: ep.peak_ms,
peak_severity: ep.peak_severity,
dominant_stage: ep.dominant_stage,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn single_breach_then_quiet_closes_episode_of_one() {
let mut c = EpisodeCoalescer::new(Duration::from_millis(100));
let t0 = Instant::now();
assert_eq!(
c.record_breach(
t0,
600.0,
Severity::Degraded,
Some("read_request_json".into())
),
None
);
// Not yet elapsed.
assert_eq!(c.tick(t0 + Duration::from_millis(50)), None);
// Elapsed → closes.
let ep = c.tick(t0 + Duration::from_millis(200)).unwrap();
assert_eq!(ep.count, 1);
assert_eq!(ep.peak_ms, 600.0);
assert_eq!(ep.peak_severity, Severity::Degraded);
assert_eq!(ep.dominant_stage.as_deref(), Some("read_request_json"));
}

#[test]
fn burst_within_window_coalesces_into_one_with_peak_and_count() {
let mut c = EpisodeCoalescer::new(Duration::from_millis(100));
let t0 = Instant::now();
assert_eq!(
c.record_breach(t0, 600.0, Severity::Degraded, Some("a".into())),
None
);
assert_eq!(
c.record_breach(
t0 + Duration::from_millis(10),
8800.0,
Severity::Stall,
Some("deep_copy".into())
),
None
);
assert_eq!(
c.record_breach(
t0 + Duration::from_millis(20),
700.0,
Severity::Degraded,
Some("a".into())
),
None
);
let ep = c.tick(t0 + Duration::from_millis(200)).unwrap();
assert_eq!(
ep.count, 3,
"three breaches in one burst → one episode of 3"
);
assert_eq!(ep.peak_ms, 8800.0, "peak is the worst in the burst");
assert_eq!(
ep.peak_severity,
Severity::Stall,
"any Stall makes the episode Stall"
);
assert_eq!(
ep.dominant_stage.as_deref(),
Some("deep_copy"),
"dominant stage tracks the peak"
);
}

#[test]
fn all_degraded_burst_stays_degraded() {
let mut c = EpisodeCoalescer::new(Duration::from_millis(100));
let t0 = Instant::now();
c.record_breach(t0, 600.0, Severity::Degraded, None);
c.record_breach(
t0 + Duration::from_millis(10),
700.0,
Severity::Degraded,
None,
);
let ep = c.tick(t0 + Duration::from_millis(200)).unwrap();
assert_eq!(
ep.peak_severity,
Severity::Degraded,
"no Stall in the burst → episode is Degraded, not Stall"
);
}

#[test]
fn gap_closes_prior_and_opens_new() {
let mut c = EpisodeCoalescer::new(Duration::from_millis(100));
let t0 = Instant::now();
assert_eq!(c.record_breach(t0, 600.0, Severity::Degraded, None), None);
// Next breach after the quiet window → returns the closed prior episode.
let closed = c
.record_breach(
t0 + Duration::from_millis(300),
900.0,
Severity::Degraded,
None,
)
.expect("prior episode should close");
assert_eq!(closed.count, 1);
assert_eq!(closed.peak_ms, 600.0);
// The new episode is still open until its own quiet window passes.
let next = c.tick(t0 + Duration::from_millis(500)).unwrap();
assert_eq!(next.count, 1);
assert_eq!(next.peak_ms, 900.0);
}

#[test]
fn tick_with_nothing_open_is_none() {
let mut c = EpisodeCoalescer::new(Duration::from_millis(100));
assert_eq!(c.tick(Instant::now()), None);
}
}
53 changes: 53 additions & 0 deletions system/harness/src/log_tail/emit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Durable-first sink. The iii STREAM event bus is at-most-once and drops events
//! on disconnect/restart — exactly the moment a stall happens. So write the
//! durable telemetry row FIRST (SQLite), then emit to the bus best-effort. The
//! record is never lost to a bus drop. (A discipline worth propagating to any
//! loss-sensitive hex producer, not just this one.)
//!
//! **Delivery is at-least-once, not exactly-once.** The offset checkpoint is
//! persisted once per drain batch, so a crash between an emit and a successful
//! checkpoint re-emits that batch on respawn. Telemetry consumers must be
//! idempotent on `(source, event, ts, duration_ms)`.

use crate::telemetry::{record_loud, TelemetryEvent};
use serde_json::Value;

/// Named arguments for [`emit_durable_first`]. A struct (not positional args)
/// because `source/event/bus_event/status` are all `&str` and `detail/data` are
/// both `Value` — a positional transposition (especially `detail`↔`data`, which
/// would silently route the bus payload into the durable row and vice-versa)
/// would compile clean. Named fields make that class of bug impossible.
pub struct EmitArgs<'a> {
/// Telemetry `source` + bus producer label (keeps upstreams attributable).
pub source: &'a str,
/// Telemetry `event` name (e.g. `overhead`, `stall_episode`).
pub event: &'a str,
/// Bus event name consumers subscribe to (e.g. `headroom.overhead`).
pub bus_event: &'a str,
/// Telemetry `status` (`degraded` / `stall`).
pub status: &'a str,
/// Headline latency stored as the row's `duration_ms`.
pub value_ms: f64,
/// Compact JSON stored in the durable telemetry row's `detail`.
pub detail: Value,
/// The (best-effort) bus event payload.
pub data: Value,
}

/// Write a durable telemetry row, then emit a best-effort bus event.
pub fn emit_durable_first(args: EmitArgs<'_>) {
// 1. Durable first (SQLite). `record_loud` logs on failure (S6: no quiet failures).
record_loud(&TelemetryEvent {
source: args.source.to_string(),
event: args.event.to_string(),
status: args.status.to_string(),
duration_ms: Some(args.value_ms.round() as i64),
exit_code: None,
detail: Some(args.detail.to_string()),
});

// 2. Bus best-effort. A drop here is acceptable — the row above is durable.
if let Err(e) = crate::ops::emit(args.bus_event, args.data, Some(args.source)) {
eprintln!("hex log-tail: bus emit failed (durable row already written): {e}");
}
}
Loading
Loading