From 931ae731a1572c2d508cf06931e3df78fe5ae660 Mon Sep 17 00:00:00 2001 From: Mike Rapadas Date: Sat, 13 Jun 2026 04:39:02 -0400 Subject: [PATCH] =?UTF-8?q?feat(log-tail):=20event-driven=20log-tail=20wor?= =?UTF-8?q?ker;=20headroom=20overhead=20=E2=86=92=20telemetry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `hex log-tail`, a generic event-driven log-tail daemon that captures an opaque upstream's signal into hex's unified telemetry + event bus, behind a boundary hex owns. First consumer: headroom proxy overhead (STAGE_TIMINGS.total_pre_upstream), so the "tolerate the stall" decision stays evidence-based. Design: the Rust worker SDK has no daemon primitive (handlers must return), so a continuous tail can't be a *.worker.rs cron handler without becoming a poll. This ships as an iii-exec-supervised CLI daemon (the headroom-proxy pattern), notify-driven (FSEvents/inotify/kqueue — no polling). Reusable seam: the only upstream-specific code is the `LineObserver` trait impl selected by --observer (a typed Rust trait, not a config DSL). New upstream = new impl + registry entry. `HeadroomStageTimings` is the first. - Generic engine: notify tail with inode+size rotation detection, byte-offset checkpoint in iii-state (no replay / no drop across restart), stall-episode coalescing, durable-first emit (telemetry SQLite first, bus best-effort — at-least-once; consumers idempotent on source/event/ts/duration_ms). - Pure cores (reader/coalesce/observer) unit-tested; the notify loop is thin glue. - Golden-fixture contract test fails loud if headroom's log format drifts. - engine-workers.example.yaml documents the headroom tailer stanza (disabled by default; the instance enables it in its own engine-workers.yaml). Hardened via a 4-dimension adversarial review (correctness, reliability, standards, maintainability) — fixed inode-blind rotation (silent data loss), episodes never flushing under steady traffic, quiet-ms busy-spin, swallowed state errors (S6), and an emit arg-transposition risk. Verified: cargo build + 437 lib tests pass (0 failed); rename-guard OK; release build green. total_pre_upstream confirmed in ms (avoided a 1000x unit bug). Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 1 + docs/plans/2026-06-13-log-tail-worker.md | 77 +++++ system/harness/Cargo.toml | 4 + system/harness/src/lib.rs | 1 + system/harness/src/log_tail/coalesce.rs | 233 +++++++++++++ system/harness/src/log_tail/emit.rs | 53 +++ system/harness/src/log_tail/headroom.rs | 147 +++++++++ system/harness/src/log_tail/mod.rs | 310 ++++++++++++++++++ system/harness/src/log_tail/observer.rs | 87 +++++ system/harness/src/log_tail/reader.rs | 271 +++++++++++++++ system/harness/src/main.rs | 50 +++ .../fixtures/headroom-stage-timings.jsonl | 1 + system/iii/engine-workers.example.yaml | 22 ++ 13 files changed, 1257 insertions(+) create mode 100644 docs/plans/2026-06-13-log-tail-worker.md create mode 100644 system/harness/src/log_tail/coalesce.rs create mode 100644 system/harness/src/log_tail/emit.rs create mode 100644 system/harness/src/log_tail/headroom.rs create mode 100644 system/harness/src/log_tail/mod.rs create mode 100644 system/harness/src/log_tail/observer.rs create mode 100644 system/harness/src/log_tail/reader.rs create mode 100644 system/harness/tests/fixtures/headroom-stage-timings.jsonl diff --git a/Cargo.lock b/Cargo.lock index 66e992cf..1e4d2746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1836,6 +1836,7 @@ dependencies = [ "iii", "iii-sdk", "libc", + "notify", "regex", "rusqlite", "rustls-pemfile", diff --git a/docs/plans/2026-06-13-log-tail-worker.md b/docs/plans/2026-06-13-log-tail-worker.md new file mode 100644 index 00000000..cec9f7e2 --- /dev/null +++ b/docs/plans/2026-06-13-log-tail-worker.md @@ -0,0 +1,77 @@ +# Plan: Reusable event-driven log-tail worker (`hex log-tail`) + +**Date:** 2026-06-13 +**Branch:** `feature/log-tail-worker` (off `develop`, GitFlow) +**Spec written against:** hex-harness `0.43.0`, foundation `develop` @ `3659a064` + +## Goal + +Ship a **generic, event-driven log-tail capability** in hex-foundation whose first consumer is capturing headroom proxy overhead (`STAGE_TIMINGS.total_pre_upstream`) into hex's unified telemetry + event bus — headroom treated as an opaque, swappable upstream. The reusable core is a `LineObserver` trait; headroom is one compiled impl. New upstreams = new impl, never a config DSL. + +Origin: `me/decisions/headroom-overhead-telemetry-architecture-2026-06-13.md` (mrap-hex) + `docs/ideation/2026-06-13-headroom-overhead-telemetry-ideation.md`. + +## Key design decision (forced by SDK reality) + +The Rust worker SDK (`Worker::on_cron/on_event`) runs handlers on `spawn_blocking` — **handlers must return; no daemon/continuous-loop primitive exists.** A continuous, event-driven `tail -F` therefore cannot be a `*.worker.rs` cron handler without degrading to a poll (the thing we explicitly rejected). The hex-idiomatic continuous mechanism is the **`iii-exec` supervised daemon** — the exact pattern that runs the headroom proxy and console (`engine-workers.yaml`, `restart.on_crash: true`). + +**Therefore:** implement `hex log-tail` as a long-running CLI subcommand (event-driven via `notify`/FSEvents — blocks on kernel FS events, no timer), supervised by iii-exec. This honors no-poll + reuses hex's supervised-daemon worker pattern. The "reusable worker" is the iii-exec stanza running `hex log-tail --observer `. + +**Testability discipline:** the brain is pure and unit-tested; the notify loop is thin glue. +- `LineObserver` trait + impls — pure, unit-tested. +- Offset/rotation **tail reader** (given file + prior offset → new lines, new offset, rotation flag) — pure, unit-tested with temp files (deterministic, no FS-event flakiness). +- Episode **coalescer** (state machine: burst of breaches → one episode) — pure, unit-tested. +- notify follow-loop — thin glue; exercised by a bounded integration test, not relied on for FS-event timing in CI. + +## Architecture / files + +New module `system/harness/src/log_tail/`: +- `mod.rs` — module exports + the `run()` entrypoint (follow loop driver) + `LogTailConfig`. +- `observer.rs` — `LineObserver` trait, `Observation { value_ms, stage, severity, fields }`, `Severity { Normal, Degraded, Stall }`, and `observer_registry(name) -> Option>`. +- `reader.rs` — `TailReader`: open-at-offset, read new complete lines, detect rotation/truncation (len < offset OR inode change → restart at 0), return `(Vec, new_offset)`. Pure, no notify. +- `coalesce.rs` — `EpisodeCoalescer`: feed `(ts, Observation)`; emits a `StallEpisode { duration_ms, count, peak_ms, dominant_stage }` when a burst closes (gap > `quiet_window`). Pure state machine. +- `emit.rs` — durable-first sink: `telemetry::record(...)` FIRST (durable SQLite), then `ops::emit(event, data, producer)` best-effort (lossy bus). Never lose the row to a bus drop. +- `observers/headroom.rs` — `HeadroomStageTimings: LineObserver`. Matches lines containing `STAGE_TIMINGS`, parses the embedded JSON, extracts `stages.total_pre_upstream` (+ `read_request_json`, `deep_copy`), classifies severity by thresholds, returns `Observation`. + +Wiring: +- `lib.rs` — add `pub mod log_tail;`. +- `main.rs` — add `Commands::LogTail { path, observer, event, source, threshold_ms, quiet_ms, from_start }` (flat args) + a dispatch arm calling `log_tail::run(cfg)`. +- `Cargo.toml` — add `notify = "8"` (already in Cargo.lock @ 8.2.0 transitively → no new download). +- `system/iii/engine-workers.example.yaml` — add a commented `log-tail-headroom` iii-exec stanza documenting how to enable (instance enables in its own `engine-workers.yaml`). + +Fixture + contract: +- `system/harness/tests/fixtures/headroom-stage-timings.jsonl` — a golden real `STAGE_TIMINGS` line. +- Contract test (inline in `observers/headroom.rs` tests) asserts the parser extracts `total_pre_upstream` from the golden line — fails loud if headroom's format drifts (the "telemetry dies silently" guard, idea #6). + +## Event + telemetry contract + +- Telemetry row (durable-first): `source="headroom-proxy"` (or `--source`), `event="overhead"` / `"stall_episode"`, `status` ∈ {`ok`,`degraded`,`stall`}, `duration_ms = total_pre_upstream_ms` (peak for episodes), `detail = compact JSON { stage, count, ... }`. +- Bus event (best-effort): name from `--event` (default `headroom.overhead`), `data = { severity, value_ms, stage, count, peak_ms, source }`. Consumers register `on_event`. +- Producer label: `--source` value, so swapping/dual-running upstreams stays attributable (producer_id baked in). + +## Severity thresholds (headroom) + +Normal < 500ms ≤ Degraded < 2000ms ≤ Stall. Only Degraded/Stall emit per-event; Normal feeds episode stats only. Defaults overridable via `--threshold-ms` (degraded) — keep flags minimal; no config DSL. + +## Testing strategy / CI gates + +CI: `cargo test --manifest-path system/harness/Cargo.toml` (canonical, per AGENTS.md) + release build in Docker. Local gates before push: `cargo build`, `cargo test`, `cargo clippy -- -D warnings`, `cargo fmt --check` (run all in the worktree). + +Unit tests (pure, deterministic): +- `reader`: appends advance offset; rotation (truncate/shrink) restarts at 0; partial trailing line not emitted until newline. +- `coalesce`: single breach → episode of 1; burst within window → one episode with correct count/peak; gap closes episode. +- `observer/headroom`: parses golden line; classifies Normal/Degraded/Stall; ignores non-STAGE_TIMINGS lines; tolerates malformed JSON (no panic). +- `registry`: known name → Some, unknown → None. +- contract: golden fixture parses to expected total_pre_upstream. + +## STOP / risk conditions + +- If `notify` cannot be added cleanly (version conflict with the iii engine's transitive copy) → fall back to a `std` + short-interval read loop **inside the daemon** (still a continuous supervised process, latency bounded; document the deviation). Do NOT convert to a cron `*.worker.rs` poll. +- If `main.rs` wiring risks breaking the large central file → keep the variant + arm minimal; all logic in `log_tail/`. +- Code ≠ this plan's description of the SDK → re-verify against live code, never improvise. + +## Out of scope (YAGNI — recorded, not built) + +- Non-file sources (stream/pipe/socket) — trait stays source-agnostic; ship file only. +- `/metrics` or `/stats-history` taps (polling; rejected). +- SPC adaptive baseline + self-closing decision-file loop (idea #7) — strong follow-up, separate change once the base signal flows. +- Cost-correlation, back-pressure, generic "observe-any-upstream" config language. diff --git a/system/harness/Cargo.toml b/system/harness/Cargo.toml index 4d016e0e..db2a3309 100644 --- a/system/harness/Cargo.toml +++ b/system/harness/Cargo.toml @@ -35,6 +35,10 @@ sha2 = "0.10" fs2 = "0.4" rusqlite = { version = "0.31", features = ["bundled-full"] } regex = "1" +# Cross-platform filesystem watcher (FSEvents/inotify/kqueue) for the event-driven +# `hex log-tail` daemon — already present transitively (8.x via the iii engine), so +# promoting it to a direct dep pulls no new version. +notify = "8" sqlite-vec = "0.1" # Tree-trim attempted (2026-05-21): `default-features = false, features = ["ort-download-binaries"]` # broke the build — `TextEmbedding::try_new` is only available with default features enabled diff --git a/system/harness/src/lib.rs b/system/harness/src/lib.rs index 5728270f..aabae1c2 100644 --- a/system/harness/src/lib.rs +++ b/system/harness/src/lib.rs @@ -15,6 +15,7 @@ pub mod harness; pub mod ledger; pub mod lint_gates; pub mod llm_cost; +pub mod log_tail; pub mod reconciler; pub mod capability_exec; pub mod charter; diff --git a/system/harness/src/log_tail/coalesce.rs b/system/harness/src/log_tail/coalesce.rs new file mode 100644 index 00000000..0d7e82fb --- /dev/null +++ b/system/harness/src/log_tail/coalesce.rs @@ -0,0 +1,233 @@ +//! Episode coalescer. A single event-loop stall makes many queued requests all +//! report high latency at once; counting them as N independent incidents misleads +//! the "is this getting worse?" decision and floods alerting. This collapses a +//! burst of Degraded/Stall observations into ONE [`StallEpisode`], closed after +//! `quiet_window` elapses with no further breach. +//! +//! Pure state machine: the caller supplies monotonic timestamps ([`Instant`]), +//! so it is unit-testable without a real clock. + +use super::observer::Severity; +use std::time::{Duration, Instant}; + +/// One coalesced stall episode (a contiguous burst of breaches). +#[derive(Debug, Clone, PartialEq)] +pub struct StallEpisode { + /// Number of breaching lines in the episode. + pub count: u64, + /// Worst latency seen in the episode (ms). + pub peak_ms: f64, + /// Worst severity seen in the episode (an all-Degraded burst stays Degraded; + /// any Stall makes the whole episode Stall). + pub peak_severity: Severity, + /// Dominant stage at the peak, when known. + pub dominant_stage: Option, +} + +pub struct EpisodeCoalescer { + quiet_window: Duration, + open: Option, +} + +struct OpenEpisode { + count: u64, + peak_ms: f64, + peak_severity: Severity, + dominant_stage: Option, + last_breach: Instant, +} + +impl EpisodeCoalescer { + pub fn new(quiet_window: Duration) -> Self { + EpisodeCoalescer { + quiet_window, + open: None, + } + } + + /// Feed one breach (Degraded/Stall) observed at `now`. If this breach arrives + /// after the prior episode's quiet window already elapsed, that prior episode + /// is closed and returned (and a fresh one opens for this breach). + pub fn record_breach( + &mut self, + now: Instant, + value_ms: f64, + severity: Severity, + stage: Option, + ) -> Option { + let gap_exceeded = self + .open + .as_ref() + .is_some_and(|ep| now.duration_since(ep.last_breach) > self.quiet_window); + let closed = if gap_exceeded { + self.take_closed() + } else { + None + }; + + match &mut self.open { + Some(ep) => { + ep.count += 1; + if value_ms > ep.peak_ms { + ep.peak_ms = value_ms; + ep.dominant_stage = stage; + } + ep.peak_severity = ep.peak_severity.max(severity); + ep.last_breach = now; + } + None => { + self.open = Some(OpenEpisode { + count: 1, + peak_ms: value_ms, + peak_severity: severity, + dominant_stage: stage, + last_breach: now, + }); + } + } + closed + } + + /// Call when idle (no new breaches): closes and returns an open episode whose + /// quiet window has elapsed. Returns `None` if nothing is open or the window + /// has not yet passed. + pub fn tick(&mut self, now: Instant) -> Option { + let elapsed = self + .open + .as_ref() + .is_some_and(|ep| now.duration_since(ep.last_breach) > self.quiet_window); + if elapsed { + self.take_closed() + } else { + None + } + } + + fn take_closed(&mut self) -> Option { + self.open.take().map(|ep| StallEpisode { + count: ep.count, + peak_ms: ep.peak_ms, + peak_severity: ep.peak_severity, + dominant_stage: ep.dominant_stage, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn single_breach_then_quiet_closes_episode_of_one() { + let mut c = EpisodeCoalescer::new(Duration::from_millis(100)); + let t0 = Instant::now(); + assert_eq!( + c.record_breach( + t0, + 600.0, + Severity::Degraded, + Some("read_request_json".into()) + ), + None + ); + // Not yet elapsed. + assert_eq!(c.tick(t0 + Duration::from_millis(50)), None); + // Elapsed → closes. + let ep = c.tick(t0 + Duration::from_millis(200)).unwrap(); + assert_eq!(ep.count, 1); + assert_eq!(ep.peak_ms, 600.0); + assert_eq!(ep.peak_severity, Severity::Degraded); + assert_eq!(ep.dominant_stage.as_deref(), Some("read_request_json")); + } + + #[test] + fn burst_within_window_coalesces_into_one_with_peak_and_count() { + let mut c = EpisodeCoalescer::new(Duration::from_millis(100)); + let t0 = Instant::now(); + assert_eq!( + c.record_breach(t0, 600.0, Severity::Degraded, Some("a".into())), + None + ); + assert_eq!( + c.record_breach( + t0 + Duration::from_millis(10), + 8800.0, + Severity::Stall, + Some("deep_copy".into()) + ), + None + ); + assert_eq!( + c.record_breach( + t0 + Duration::from_millis(20), + 700.0, + Severity::Degraded, + Some("a".into()) + ), + None + ); + let ep = c.tick(t0 + Duration::from_millis(200)).unwrap(); + assert_eq!( + ep.count, 3, + "three breaches in one burst → one episode of 3" + ); + assert_eq!(ep.peak_ms, 8800.0, "peak is the worst in the burst"); + assert_eq!( + ep.peak_severity, + Severity::Stall, + "any Stall makes the episode Stall" + ); + assert_eq!( + ep.dominant_stage.as_deref(), + Some("deep_copy"), + "dominant stage tracks the peak" + ); + } + + #[test] + fn all_degraded_burst_stays_degraded() { + let mut c = EpisodeCoalescer::new(Duration::from_millis(100)); + let t0 = Instant::now(); + c.record_breach(t0, 600.0, Severity::Degraded, None); + c.record_breach( + t0 + Duration::from_millis(10), + 700.0, + Severity::Degraded, + None, + ); + let ep = c.tick(t0 + Duration::from_millis(200)).unwrap(); + assert_eq!( + ep.peak_severity, + Severity::Degraded, + "no Stall in the burst → episode is Degraded, not Stall" + ); + } + + #[test] + fn gap_closes_prior_and_opens_new() { + let mut c = EpisodeCoalescer::new(Duration::from_millis(100)); + let t0 = Instant::now(); + assert_eq!(c.record_breach(t0, 600.0, Severity::Degraded, None), None); + // Next breach after the quiet window → returns the closed prior episode. + let closed = c + .record_breach( + t0 + Duration::from_millis(300), + 900.0, + Severity::Degraded, + None, + ) + .expect("prior episode should close"); + assert_eq!(closed.count, 1); + assert_eq!(closed.peak_ms, 600.0); + // The new episode is still open until its own quiet window passes. + let next = c.tick(t0 + Duration::from_millis(500)).unwrap(); + assert_eq!(next.count, 1); + assert_eq!(next.peak_ms, 900.0); + } + + #[test] + fn tick_with_nothing_open_is_none() { + let mut c = EpisodeCoalescer::new(Duration::from_millis(100)); + assert_eq!(c.tick(Instant::now()), None); + } +} diff --git a/system/harness/src/log_tail/emit.rs b/system/harness/src/log_tail/emit.rs new file mode 100644 index 00000000..f75d4a1f --- /dev/null +++ b/system/harness/src/log_tail/emit.rs @@ -0,0 +1,53 @@ +//! Durable-first sink. The iii STREAM event bus is at-most-once and drops events +//! on disconnect/restart — exactly the moment a stall happens. So write the +//! durable telemetry row FIRST (SQLite), then emit to the bus best-effort. The +//! record is never lost to a bus drop. (A discipline worth propagating to any +//! loss-sensitive hex producer, not just this one.) +//! +//! **Delivery is at-least-once, not exactly-once.** The offset checkpoint is +//! persisted once per drain batch, so a crash between an emit and a successful +//! checkpoint re-emits that batch on respawn. Telemetry consumers must be +//! idempotent on `(source, event, ts, duration_ms)`. + +use crate::telemetry::{record_loud, TelemetryEvent}; +use serde_json::Value; + +/// Named arguments for [`emit_durable_first`]. A struct (not positional args) +/// because `source/event/bus_event/status` are all `&str` and `detail/data` are +/// both `Value` — a positional transposition (especially `detail`↔`data`, which +/// would silently route the bus payload into the durable row and vice-versa) +/// would compile clean. Named fields make that class of bug impossible. +pub struct EmitArgs<'a> { + /// Telemetry `source` + bus producer label (keeps upstreams attributable). + pub source: &'a str, + /// Telemetry `event` name (e.g. `overhead`, `stall_episode`). + pub event: &'a str, + /// Bus event name consumers subscribe to (e.g. `headroom.overhead`). + pub bus_event: &'a str, + /// Telemetry `status` (`degraded` / `stall`). + pub status: &'a str, + /// Headline latency stored as the row's `duration_ms`. + pub value_ms: f64, + /// Compact JSON stored in the durable telemetry row's `detail`. + pub detail: Value, + /// The (best-effort) bus event payload. + pub data: Value, +} + +/// Write a durable telemetry row, then emit a best-effort bus event. +pub fn emit_durable_first(args: EmitArgs<'_>) { + // 1. Durable first (SQLite). `record_loud` logs on failure (S6: no quiet failures). + record_loud(&TelemetryEvent { + source: args.source.to_string(), + event: args.event.to_string(), + status: args.status.to_string(), + duration_ms: Some(args.value_ms.round() as i64), + exit_code: None, + detail: Some(args.detail.to_string()), + }); + + // 2. Bus best-effort. A drop here is acceptable — the row above is durable. + if let Err(e) = crate::ops::emit(args.bus_event, args.data, Some(args.source)) { + eprintln!("hex log-tail: bus emit failed (durable row already written): {e}"); + } +} diff --git a/system/harness/src/log_tail/headroom.rs b/system/harness/src/log_tail/headroom.rs new file mode 100644 index 00000000..886e26f1 --- /dev/null +++ b/system/harness/src/log_tail/headroom.rs @@ -0,0 +1,147 @@ +//! `HeadroomStageTimings` — the first [`LineObserver`]. Parses headroom's +//! per-request `STAGE_TIMINGS {...}` log line and reports `total_pre_upstream` +//! (the proxy's single-threaded event-loop overhead, in **milliseconds**). +//! +//! headroom stays opaque/swappable: we read only the log line it already writes — +//! its documented stable parse surface (`headroom perf` depends on it). No proxy +//! code is touched, no internals are reached into. The contract test below pins +//! the format against a captured golden line so silent format drift fails loud. + +use super::observer::{LineObserver, Observation, Severity}; +use serde_json::Value; + +/// The marker that precedes the JSON payload in a headroom STAGE_TIMINGS log line. +const MARKER: &str = "STAGE_TIMINGS "; + +/// Stages that run synchronously on headroom's event loop before upstream hand-off +/// — i.e. the stages that can block `/livez`. The largest is reported as dominant. +const BLOCKING_STAGES: &[&str] = &["read_request_json", "deep_copy", "compression_first_stage"]; + +pub struct HeadroomStageTimings { + /// Degraded floor (ms). Below this is Normal. + pub degraded_ms: f64, + /// Stall floor (ms). At/above this is Stall. + pub stall_ms: f64, +} + +impl Default for HeadroomStageTimings { + fn default() -> Self { + // 0.86% of requests exceed 500ms (the degraded floor); the observed tail + // reaches several seconds (the stall floor). Both overridable via the CLI. + HeadroomStageTimings { + degraded_ms: 500.0, + stall_ms: 2000.0, + } + } +} + +impl LineObserver for HeadroomStageTimings { + fn name(&self) -> &'static str { + "headroom-stage-timings" + } + + fn observe(&self, line: &str) -> Option { + // Fast reject: only STAGE_TIMINGS lines carry stage data. + let after_marker = &line[line.find(MARKER)? + MARKER.len()..]; + // The payload is a JSON object after the marker (the log line prefixes a + // timestamp/logger header, which we skip by locating the first '{'). + let json_start = after_marker.find('{')?; + let v: Value = serde_json::from_str(after_marker[json_start..].trim()).ok()?; + let stages = v.get("stages")?.as_object()?; + + // `total_pre_upstream` is already in milliseconds (verified against the + // `headroom_stage_timing_ms_*` Prometheus series — do NOT scale). + let value_ms = stages.get("total_pre_upstream")?.as_f64()?; + + let dominant = BLOCKING_STAGES + .iter() + .filter_map(|k| stages.get(*k).and_then(|x| x.as_f64()).map(|s| (*k, s))) + .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)) + .map(|(k, _)| k.to_string()); + + Some(Observation { + value_ms, + stage: dominant, + severity: Severity::classify(value_ms, self.degraded_ms, self.stall_ms), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn obs() -> HeadroomStageTimings { + HeadroomStageTimings::default() + } + + #[test] + fn ignores_non_stage_timings_lines() { + assert!(obs() + .observe("2026-06-13 - headroom.proxy - INFO - some other log") + .is_none()); + assert!(obs().observe("").is_none()); + } + + #[test] + fn malformed_json_does_not_panic_and_returns_none() { + assert!(obs() + .observe("[req] STAGE_TIMINGS {not valid json") + .is_none()); + assert!(obs().observe("[req] STAGE_TIMINGS {}").is_none()); // no `stages` + assert!(obs() + .observe(r#"[req] STAGE_TIMINGS {"stages": {}}"#) + .is_none()); // no total_pre_upstream + } + + #[test] + fn parses_value_in_ms_without_scaling() { + let line = r#"... STAGE_TIMINGS {"stages": {"total_pre_upstream": 1.54, "read_request_json": 0.6, "deep_copy": 0.1}}"#; + let o = obs().observe(line).unwrap(); + assert!( + (o.value_ms - 1.54).abs() < 1e-9, + "value must be ms verbatim, got {}", + o.value_ms + ); + assert_eq!(o.severity, Severity::Normal); + assert_eq!( + o.stage.as_deref(), + Some("read_request_json"), + "dominant blocking stage" + ); + } + + #[test] + fn classifies_degraded_and_stall() { + let degraded = r#"x STAGE_TIMINGS {"stages": {"total_pre_upstream": 750.0}}"#; + assert_eq!( + obs().observe(degraded).unwrap().severity, + Severity::Degraded + ); + let stall = r#"x STAGE_TIMINGS {"stages": {"total_pre_upstream": 8800.0}}"#; + assert_eq!(obs().observe(stall).unwrap().severity, Severity::Stall); + } + + /// Contract test: a real captured STAGE_TIMINGS line must still parse. If + /// headroom changes its log format on upgrade, this fails loud BEFORE the + /// telemetry silently goes dark (the silent-telemetry-death guard). + #[test] + fn golden_fixture_parses() { + let fixture = concat!( + env!("CARGO_MANIFEST_DIR"), + "/tests/fixtures/headroom-stage-timings.jsonl" + ); + let content = std::fs::read_to_string(fixture) + .unwrap_or_else(|e| panic!("cannot read golden fixture {fixture}: {e}")); + let line = content + .lines() + .next() + .expect("fixture has at least one line"); + let o = obs() + .observe(line) + .expect("golden STAGE_TIMINGS line must parse — headroom log format may have drifted"); + // The captured line's total_pre_upstream is ~1.54 ms (a Normal request); + // the contract is that it parses to a finite ms value, not its exact magnitude. + assert!(o.value_ms.is_finite() && o.value_ms >= 0.0); + } +} diff --git a/system/harness/src/log_tail/mod.rs b/system/harness/src/log_tail/mod.rs new file mode 100644 index 00000000..6eae8209 --- /dev/null +++ b/system/harness/src/log_tail/mod.rs @@ -0,0 +1,310 @@ +//! `hex log-tail` — a generic, event-driven log-tail daemon. +//! +//! Follows an append-only log file via `notify` (FSEvents / inotify / kqueue — +//! blocks on kernel FS events, no polling), parses each new line through a +//! pluggable [`observer::LineObserver`], coalesces stall bursts into episodes, +//! and emits **durable-first** to hex telemetry + the event bus. +//! +//! ## Why a CLI daemon and not a `*.worker.rs` +//! The Rust worker SDK runs handlers on `spawn_blocking` — they must return; there +//! is no daemon/continuous-loop primitive. A continuous `tail -F` in a cron handler +//! would degrade to a poll. The hex-idiomatic continuous mechanism is the +//! `iii-exec` supervised daemon (the same pattern that runs the headroom proxy): +//! run `hex log-tail …` as a supervised long-lived process. +//! +//! ## Reusable seam +//! The only upstream-specific code is the [`observer::LineObserver`] impl selected +//! by `--observer`. A new upstream = a new impl + a [`observer_registry`] entry — +//! never a runtime config DSL (a typed Rust trait, checked at compile time). + +pub mod coalesce; +pub mod emit; +pub mod headroom; +pub mod observer; +pub mod reader; + +use coalesce::EpisodeCoalescer; +use emit::EmitArgs; +use notify::Watcher; +use observer::{LineObserver, Severity}; +use reader::TailReader; +use serde_json::json; +use std::time::{Duration, Instant}; + +/// iii-state scope for the persisted byte-offset checkpoint. +const STATE_SCOPE: &str = "log-tail"; + +/// Names every registered observer understands — the single source for the +/// `--observer` registry below and the not-found error message. +pub const KNOWN_OBSERVERS: &[&str] = &["headroom-stage-timings"]; + +/// Smallest accepted quiet window (ms). Guards against `--quiet-ms 0`, which would +/// turn `recv_timeout` into a 100%-CPU busy-spin under iii-exec supervision. +const MIN_QUIET_MS: u64 = 50; + +/// Daemon configuration (one tailer instance). +pub struct LogTailConfig { + /// Absolute path of the log file to follow. + pub path: String, + /// Observer impl name (see [`KNOWN_OBSERVERS`]). + pub observer: String, + /// Bus event name to emit (e.g. `headroom.overhead`). + pub event: String, + /// Telemetry `source` + bus producer label (e.g. `headroom-proxy`). + pub source: String, + /// Degraded latency floor (ms). Stall floor = 4× this. + pub threshold_ms: f64, + /// Quiet window (ms) that closes a stall episode (clamped to >= 50). + pub quiet_ms: u64, + /// Read the whole file from byte 0 (default: only new lines from EOF). + pub from_start: bool, +} + +/// Resolve an observer impl by name. The reusable registry — extend here (and in +/// [`KNOWN_OBSERVERS`]) when a new upstream needs tailing. +pub fn observer_registry(name: &str, threshold_ms: f64) -> Option> { + match name { + "headroom-stage-timings" => Some(Box::new(headroom::HeadroomStageTimings { + degraded_ms: threshold_ms, + stall_ms: threshold_ms * 4.0, + })), + _ => None, + } +} + +/// Run the daemon. Blocks until the watcher channel disconnects. Intended to run +/// under iii-exec supervision. Returns a process exit code (non-zero = the +/// supervisor should respawn). +pub fn run(cfg: LogTailConfig) -> i32 { + let observer = match observer_registry(&cfg.observer, cfg.threshold_ms) { + Some(o) => o, + None => { + eprintln!( + "hex log-tail: unknown observer '{}' (known: {})", + cfg.observer, + KNOWN_OBSERVERS.join(", ") + ); + return 2; + } + }; + + let mut reader = TailReader::new(&cfg.path, cfg.from_start); + // Restore the checkpoint if present — resume without replay or drop. A state + // error is loud (S6): we proceed from EOF but the operator must see the outage. + match crate::ops::state_get(STATE_SCOPE, &checkpoint_key(&cfg)) { + Ok(Some(v)) => { + if let Some(off) = v.as_u64() { + reader.set_offset(off); + } + } + Ok(None) => {} + Err(e) => { + eprintln!("hex log-tail: failed to restore checkpoint (starting from EOF): {e}"); + } + } + + let parent = match reader.parent_dir() { + Some(p) => p.to_path_buf(), + None => { + eprintln!("hex log-tail: path '{}' has no parent directory", cfg.path); + return 2; + } + }; + + let quiet = Duration::from_millis(cfg.quiet_ms.max(MIN_QUIET_MS)); + let mut coalescer = EpisodeCoalescer::new(quiet); + + // Arm the watcher BEFORE the initial drain so lines appended during setup are + // not missed (they'd otherwise wait for the next unrelated FS event). + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = match notify::recommended_watcher(move |res| { + let _ = tx.send(res); + }) { + Ok(w) => w, + Err(e) => { + eprintln!("hex log-tail: cannot create file watcher: {e}"); + return 1; + } + }; + // Watch the parent directory (non-recursive) so log rotation — the file being + // recreated — is observed, not just appends to the current inode. + if let Err(e) = watcher.watch(&parent, notify::RecursiveMode::NonRecursive) { + eprintln!("hex log-tail: cannot watch '{}': {e}", parent.display()); + return 1; + } + + eprintln!( + "hex log-tail: following {} via observer '{}' → event '{}' (durable source '{}')", + cfg.path, + observer.name(), + cfg.event, + cfg.source + ); + + // Initial catch-up drain (watcher already armed → no startup race). + drain(&mut reader, observer.as_ref(), &mut coalescer, &cfg); + + loop { + match rx.recv_timeout(quiet) { + // A real FS event → drain. (Inner notify errors are surfaced, not + // silently treated as events — S6.) + Ok(Ok(_event)) => drain(&mut reader, observer.as_ref(), &mut coalescer, &cfg), + Ok(Err(e)) => eprintln!("hex log-tail: notify backend error (continuing): {e}"), + // Idle for `quiet` — flush any open stall episode. + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + if let Some(ep) = coalescer.tick(Instant::now()) { + emit_episode(&cfg, &ep); + } + } + // Abnormal watcher-thread death (normal teardown is SIGTERM, which never + // reaches here). Non-zero so iii-exec respawns rather than silently dying. + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + eprintln!("hex log-tail: watcher channel disconnected (watcher died); exiting non-zero for respawn"); + return 1; + } + } + } +} + +fn checkpoint_key(cfg: &LogTailConfig) -> String { + // Key by source so concurrent tailers (different upstreams) never collide. + format!("offset:{}", cfg.source) +} + +fn drain( + reader: &mut TailReader, + observer: &dyn LineObserver, + coalescer: &mut EpisodeCoalescer, + cfg: &LogTailConfig, +) { + let before = reader.offset(); + let lines = match reader.read_new() { + Ok(l) => l, + Err(e) => { + eprintln!("hex log-tail: read error on {}: {e}", cfg.path); + return; + } + }; + + for line in &lines { + let Some(o) = observer.observe(line) else { + continue; + }; + // Normal lines are ignored entirely — only Degraded/Stall drive events and + // episode coalescing (there is no Normal-feed path on the coalescer). + if o.severity == Severity::Normal { + continue; + } + emit::emit_durable_first(EmitArgs { + source: &cfg.source, + event: "overhead", + bus_event: &cfg.event, + status: o.severity.status(), + value_ms: o.value_ms, + detail: json!({ "stage": o.stage }), + data: json!({ + "severity": o.severity.status(), + "value_ms": o.value_ms, + "stage": o.stage, + "source": cfg.source, + }), + }); + if let Some(ep) = + coalescer.record_breach(Instant::now(), o.value_ms, o.severity, o.stage.clone()) + { + emit_episode(cfg, &ep); + } + } + + // Persist the checkpoint AFTER emitting (at-least-once: a crash before this + // re-emits the batch on respawn rather than dropping it). Only when the offset + // actually advanced — avoids spamming iii-state on no-op drains. Loud on + // failure (S6): a silent checkpoint loss grows the replay/duplicate window. + let after = reader.offset(); + if after != before { + if let Err(e) = crate::ops::state_set(STATE_SCOPE, &checkpoint_key(cfg), &json!(after)) { + eprintln!("hex log-tail: checkpoint persist failed (offset {after}): {e}"); + } + } + + // Flush an episode whose quiet window has elapsed. Done on every drain (not + // only on the idle Timeout arm) so steady Normal traffic after a burst still + // closes the episode instead of leaving it open forever. + if let Some(ep) = coalescer.tick(Instant::now()) { + emit_episode(cfg, &ep); + } +} + +fn emit_episode(cfg: &LogTailConfig, ep: &coalesce::StallEpisode) { + emit::emit_durable_first(EmitArgs { + source: &cfg.source, + event: "stall_episode", + bus_event: &cfg.event, + // Reflect the worst severity in the burst (an all-Degraded episode stays + // "degraded"; any Stall makes it "stall"). + status: ep.peak_severity.status(), + value_ms: ep.peak_ms, + detail: json!({ "count": ep.count, "peak_ms": ep.peak_ms, "dominant_stage": ep.dominant_stage }), + data: json!({ + "kind": "stall_episode", + "count": ep.count, + "peak_ms": ep.peak_ms, + "severity": ep.peak_severity.status(), + "dominant_stage": ep.dominant_stage, + "source": cfg.source, + }), + }); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn registry_resolves_headroom_and_rejects_unknown() { + assert!(observer_registry("headroom-stage-timings", 500.0).is_some()); + assert!(observer_registry("does-not-exist", 500.0).is_none()); + } + + #[test] + fn known_observers_all_resolve() { + for name in KNOWN_OBSERVERS { + assert!( + observer_registry(name, 500.0).is_some(), + "{name} must resolve" + ); + } + } + + #[test] + fn registry_threshold_sets_stall_at_four_x() { + let o = observer_registry("headroom-stage-timings", 500.0).unwrap(); + // Degraded at 500, so 1999 is Degraded and 2000 (4×500) is Stall. + assert_eq!( + o.observe(r#"x STAGE_TIMINGS {"stages":{"total_pre_upstream":1999.0}}"#) + .unwrap() + .severity, + Severity::Degraded + ); + assert_eq!( + o.observe(r#"x STAGE_TIMINGS {"stages":{"total_pre_upstream":2000.0}}"#) + .unwrap() + .severity, + Severity::Stall + ); + } + + #[test] + fn checkpoint_key_is_source_scoped() { + let cfg = LogTailConfig { + path: "/tmp/x".into(), + observer: "headroom-stage-timings".into(), + event: "headroom.overhead".into(), + source: "headroom-proxy".into(), + threshold_ms: 500.0, + quiet_ms: 3000, + from_start: false, + }; + assert_eq!(checkpoint_key(&cfg), "offset:headroom-proxy"); + } +} diff --git a/system/harness/src/log_tail/observer.rs b/system/harness/src/log_tail/observer.rs new file mode 100644 index 00000000..906c4473 --- /dev/null +++ b/system/harness/src/log_tail/observer.rs @@ -0,0 +1,87 @@ +//! The reusable tap seam. A [`LineObserver`] turns one raw log line into an +//! optional [`Observation`]. This is the ONLY upstream-specific surface: adding a +//! new upstream is a new impl (sibling file) + a registry entry in +//! [`super::observer_registry`] — never a runtime config DSL. Observers are pure: +//! no I/O, no emit, no clock — so they are trivially unit-tested. + +/// Severity of a single observed line, by latency band. Declaration order is the +/// `Ord` order (`Normal < Degraded < Stall`), so episodes can track the peak. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum Severity { + /// Below the degraded floor — no per-line event and no coalescer feed. + Normal, + /// At or above the degraded floor, below the stall floor. + Degraded, + /// At or above the stall floor. + Stall, +} + +impl Severity { + /// Telemetry `status` string for this severity. + pub fn status(&self) -> &'static str { + match self { + Severity::Normal => "ok", + Severity::Degraded => "degraded", + Severity::Stall => "stall", + } + } + + /// Classify a latency (ms) against a degraded floor and a stall floor. + /// `stall_ms` is expected to be >= `degraded_ms`. + pub fn classify(value_ms: f64, degraded_ms: f64, stall_ms: f64) -> Severity { + if value_ms >= stall_ms { + Severity::Stall + } else if value_ms >= degraded_ms { + Severity::Degraded + } else { + Severity::Normal + } + } +} + +/// One parsed, classified data point from a single log line. +#[derive(Debug, Clone, PartialEq)] +pub struct Observation { + /// The headline latency this observer measures, in **milliseconds**. + pub value_ms: f64, + /// The dominant/most-relevant stage name, when the source breaks work into + /// stages (used for attribution); `None` otherwise. + pub stage: Option, + /// Severity by latency band. + pub severity: Severity, +} + +/// The reusable tap. One impl per upstream log format. Pure — no I/O, no emit. +pub trait LineObserver: Send { + /// Stable identifier used by the registry and the `--observer` flag. + fn name(&self) -> &'static str; + + /// Parse + classify one raw line. `None` = the line is not relevant (skip it). + /// MUST NOT panic on malformed input — return `None` instead (a corrupt or + /// truncated line must never take the daemon down). + fn observe(&self, line: &str) -> Option; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn classify_bands() { + assert_eq!(Severity::classify(10.0, 500.0, 2000.0), Severity::Normal); + assert_eq!(Severity::classify(500.0, 500.0, 2000.0), Severity::Degraded); + assert_eq!( + Severity::classify(1999.9, 500.0, 2000.0), + Severity::Degraded + ); + assert_eq!(Severity::classify(2000.0, 500.0, 2000.0), Severity::Stall); + assert_eq!(Severity::classify(8800.0, 500.0, 2000.0), Severity::Stall); + } + + #[test] + fn status_strings() { + assert_eq!(Severity::Normal.status(), "ok"); + assert_eq!(Severity::Degraded.status(), "degraded"); + assert_eq!(Severity::Stall.status(), "stall"); + } +} diff --git a/system/harness/src/log_tail/reader.rs b/system/harness/src/log_tail/reader.rs new file mode 100644 index 00000000..682ceca4 --- /dev/null +++ b/system/harness/src/log_tail/reader.rs @@ -0,0 +1,271 @@ +//! Offset-tracking tail reader. Given a path and a byte offset, returns the +//! complete new lines appended since that offset plus the advanced offset. +//! Detects rotation/truncation two ways: file shorter than the offset (truncate), +//! OR the inode changed (logrotate move-and-recreate, even when the new file has +//! already grown past the old offset). Either case restarts from byte 0. +//! +//! Pure filesystem I/O — no `notify`, no emit — so it is deterministically +//! testable with temp files, without depending on FS-event timing. + +use std::fs::File; +use std::io::{BufRead, BufReader, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; + +/// Sentinel: "seek to EOF on the first read" (i.e. only new lines, no checkpoint). +const SEEK_EOF: u64 = u64::MAX; + +pub struct TailReader { + path: PathBuf, + offset: u64, + /// Inode of the file last read, to detect rotation that size alone misses + /// (a new inode at the same path that already grew past `offset`). + last_inode: Option, +} + +/// Inode of an open file, when the platform exposes one (Unix). `None` elsewhere, +/// where rotation falls back to the size-shrink check only. +fn inode_of(f: &File) -> Option { + #[cfg(unix)] + { + use std::os::unix::fs::MetadataExt; + f.metadata().ok().map(|m| m.ino()) + } + #[cfg(not(unix))] + { + let _ = f; + None + } +} + +impl TailReader { + /// `from_start = true` reads the whole file from byte 0; `false` starts at EOF + /// on the first read (only newly-appended lines are seen) unless a checkpoint + /// is restored via [`TailReader::set_offset`]. + pub fn new(path: impl Into, from_start: bool) -> Self { + TailReader { + path: path.into(), + offset: if from_start { 0 } else { SEEK_EOF }, + last_inode: None, + } + } + + /// Current byte offset (the checkpoint to persist). The "seek to EOF" sentinel + /// reports as 0 (nothing meaningful to persist before the first read). + pub fn offset(&self) -> u64 { + if self.offset == SEEK_EOF { + 0 + } else { + self.offset + } + } + + /// Restore a persisted byte offset (resume across restarts without replay/drop). + pub fn set_offset(&mut self, offset: u64) { + self.offset = offset; + } + + /// The parent directory of the tailed file (watched for rotation/creation). + pub fn parent_dir(&self) -> Option<&Path> { + self.path.parent() + } + + /// Read all complete new lines since the current offset. A trailing partial + /// line (no newline yet) is NOT returned and NOT counted — it is re-read once + /// completed. A missing file yields `Ok(vec![])` (not an error: the log may not + /// exist yet). Rotation (size shrink OR inode change) restarts from byte 0. + pub fn read_new(&mut self) -> std::io::Result> { + let f = match File::open(&self.path) { + Ok(f) => f, + Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), + Err(e) => return Err(e), + }; + let len = f.metadata()?.len(); + let inode = inode_of(&f); + + // First read with no checkpoint and from_start=false: start at EOF. + if self.offset == SEEK_EOF { + self.offset = len; + self.last_inode = inode; + return Ok(Vec::new()); + } + + // Rotation detection: + // - inode changed (logrotate move+recreate) → a different file at the path, + // - OR the file shrank below our offset (copytruncate / truncation). + // Either way, restart from the beginning of the current file. + let inode_changed = match (self.last_inode, inode) { + (Some(prev), Some(cur)) => prev != cur, + _ => false, // unknown on either side → fall back to size check only + }; + if inode_changed || len < self.offset { + self.offset = 0; + } + self.last_inode = inode; + + if len == self.offset { + return Ok(Vec::new()); + } + + let mut file = f; + file.seek(SeekFrom::Start(self.offset))?; + let mut reader = BufReader::new(file); + let mut lines = Vec::new(); + let mut consumed: u64 = 0; + loop { + let mut buf = Vec::new(); + // `read_until` is byte-oriented (no UTF-8 assumption) and fills `buf` + // up to and including the delimiter — exactly the semantics we want. + let n = reader.read_until(b'\n', &mut buf)?; + if n == 0 { + break; + } + if buf.last() == Some(&b'\n') { + // Complete line — count it and strip the trailing newline(s). + consumed += n as u64; + while matches!(buf.last(), Some(b'\n') | Some(b'\r')) { + buf.pop(); + } + lines.push(String::from_utf8_lossy(&buf).into_owned()); + } else { + // Partial trailing line (no newline yet) — leave it for next read. + break; + } + } + self.offset += consumed; + Ok(lines) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + + fn tmp() -> PathBuf { + let mut p = std::env::temp_dir(); + let uniq = format!( + "hex-logtail-test-{}-{}.log", + std::process::id(), + // monotonic-ish unique suffix without external crates + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0) + ); + p.push(uniq); + p + } + + #[test] + fn appends_advance_offset_and_only_complete_lines() { + let path = tmp(); + std::fs::write(&path, b"line one\nline two\n").unwrap(); + let mut r = TailReader::new(&path, true); + let lines = r.read_new().unwrap(); + assert_eq!(lines, vec!["line one".to_string(), "line two".to_string()]); + let off = r.offset(); + assert_eq!(off, 18); + + // Append a partial line (no newline): not returned yet. + { + let mut f = std::fs::OpenOptions::new() + .append(true) + .open(&path) + .unwrap(); + f.write_all(b"partial").unwrap(); + } + assert!(r.read_new().unwrap().is_empty()); + assert_eq!(r.offset(), off, "partial line must not advance the offset"); + + // Complete it: now it appears. + { + let mut f = std::fs::OpenOptions::new() + .append(true) + .open(&path) + .unwrap(); + f.write_all(b" done\n").unwrap(); + } + assert_eq!(r.read_new().unwrap(), vec!["partial done".to_string()]); + std::fs::remove_file(&path).ok(); + } + + #[test] + fn from_eof_skips_existing_then_reads_new() { + let path = tmp(); + std::fs::write(&path, b"old\n").unwrap(); + let mut r = TailReader::new(&path, false); + // First read seeks to EOF — existing content is skipped. + assert!(r.read_new().unwrap().is_empty()); + { + let mut f = std::fs::OpenOptions::new() + .append(true) + .open(&path) + .unwrap(); + f.write_all(b"new\n").unwrap(); + } + assert_eq!(r.read_new().unwrap(), vec!["new".to_string()]); + std::fs::remove_file(&path).ok(); + } + + #[test] + fn truncation_restarts_from_zero() { + let path = tmp(); + std::fs::write(&path, b"a\nb\n").unwrap(); + let mut r = TailReader::new(&path, true); + assert_eq!(r.read_new().unwrap().len(), 2); + // Same inode, shorter content (copytruncate) → len (2) < offset (4) → restart. + let f = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(&path) + .unwrap(); + f.set_len(0).unwrap(); + drop(f); + std::fs::write(&path, b"c\n").unwrap(); + assert_eq!(r.read_new().unwrap(), vec!["c".to_string()]); + std::fs::remove_file(&path).ok(); + } + + #[cfg(unix)] + #[test] + fn inode_change_restarts_even_when_new_file_is_larger() { + // The size-only check misses this: the rotated file is LONGER than the old + // offset, so only the inode comparison catches it. + let path = tmp(); + std::fs::write(&path, b"x\n").unwrap(); // offset will be 2 + let mut r = TailReader::new(&path, true); + assert_eq!(r.read_new().unwrap(), vec!["x".to_string()]); + assert_eq!(r.offset(), 2); + + // Rotate: remove and recreate at the same path (new inode), already grown + // past the old offset of 2. + std::fs::remove_file(&path).unwrap(); + std::fs::write(&path, b"alpha\nbeta\n").unwrap(); // len 11 > offset 2 + let lines = r.read_new().unwrap(); + assert_eq!( + lines, + vec!["alpha".to_string(), "beta".to_string()], + "inode change must restart from 0 even though the new file is larger" + ); + std::fs::remove_file(&path).ok(); + } + + #[test] + fn missing_file_is_not_an_error() { + let path = tmp(); // never created + let mut r = TailReader::new(&path, true); + assert!(r.read_new().unwrap().is_empty()); + } + + #[test] + fn strips_crlf() { + let path = tmp(); + std::fs::write(&path, b"win\r\nunix\n").unwrap(); + let mut r = TailReader::new(&path, true); + assert_eq!( + r.read_new().unwrap(), + vec!["win".to_string(), "unix".to_string()] + ); + std::fs::remove_file(&path).ok(); + } +} diff --git a/system/harness/src/main.rs b/system/harness/src/main.rs index 1e8e2437..e9a116d3 100644 --- a/system/harness/src/main.rs +++ b/system/harness/src/main.rs @@ -99,6 +99,36 @@ enum Commands { #[command(subcommand)] command: StateCommands, }, + /// Follow a log file (event-driven) and emit observed signals into telemetry + the bus. + /// + /// A long-running, iii-exec-supervised daemon (not a cron worker). The reusable + /// core is the `--observer` impl; headroom is the first one. + #[command(display_order = 14)] + LogTail { + /// Absolute path of the log file to follow. + #[arg(long)] + path: String, + /// Observer impl that parses each line (e.g. `headroom-stage-timings`). + #[arg(long)] + observer: String, + /// Bus event name to emit for breaches/episodes. Generic default; the + /// headroom deployment sets `--event headroom.overhead` explicitly. + #[arg(long, default_value = "log.overhead")] + event: String, + /// Telemetry `source` + bus producer label (keeps upstreams attributable). + #[arg(long)] + source: String, + /// Degraded latency floor in ms (stall floor = 4×). Default 500. + #[arg(long, default_value_t = 500.0)] + threshold_ms: f64, + /// Quiet window in ms that closes a stall episode. Default 3000; clamped + /// to a minimum of 50 (0 would busy-spin the watcher loop). + #[arg(long, default_value_t = 3000)] + quiet_ms: u64, + /// Read the whole file from the start (default: only new lines from EOF). + #[arg(long)] + from_start: bool, + }, /// Telemetry store: query and emit events from the native SQLite log #[command(display_order = 6)] Telemetry { @@ -810,6 +840,26 @@ fn main() { } }, }, + Commands::LogTail { + path, + observer, + event, + source, + threshold_ms, + quiet_ms, + from_start, + } => { + let code = hex::log_tail::run(hex::log_tail::LogTailConfig { + path, + observer, + event, + source, + threshold_ms, + quiet_ms, + from_start, + }); + std::process::exit(code); + } Commands::Integration { command } => { if let IntegrationCommands::Template = command { integration::template(); diff --git a/system/harness/tests/fixtures/headroom-stage-timings.jsonl b/system/harness/tests/fixtures/headroom-stage-timings.jsonl new file mode 100644 index 00000000..7815facc --- /dev/null +++ b/system/harness/tests/fixtures/headroom-stage-timings.jsonl @@ -0,0 +1 @@ +2026-06-13 04:10:14,711 - headroom.proxy - INFO - [hr_1781338212_000857] STAGE_TIMINGS {"event": "stage_timings", "path": "anthropic_messages", "request_id": "hr_1781338212_000857", "session_id": "f9e686aabaa74b2e9671f5851a39d839", "stages": {"pre_upstream_wait": 0.037999998312443495, "read_request_json": 0.5993749982735608, "deep_copy": 0.006166999810375273, "compression_first_stage": null, "memory_context": null, "upstream_connect": 2209.739375000936, "upstream_first_byte": 2209.739375000936, "total_pre_upstream": 1.541665998956887}} diff --git a/system/iii/engine-workers.example.yaml b/system/iii/engine-workers.example.yaml index 3a67703f..967777ca 100644 --- a/system/iii/engine-workers.example.yaml +++ b/system/iii/engine-workers.example.yaml @@ -72,3 +72,25 @@ # sampling_ratio: 1.0 # metrics_enabled: true # logs_enabled: true +# +# # Event-driven log-tail daemon (`hex log-tail`): capture headroom's per-request +# # overhead (STAGE_TIMINGS.total_pre_upstream) into hex telemetry + the event bus, +# # treating headroom as an opaque, swappable upstream (reads only the log file it +# # already writes — no proxy code touched). The reusable core is `--observer`; +# # headroom-stage-timings is the first impl. notify-driven (no polling). +# # Breaches (>500ms) and coalesced stall episodes land as `headroom.overhead` bus +# # events + durable telemetry rows (source=headroom-proxy). Consume with a worker +# # registered `on_event("headroom.overhead", …)`; query `hex telemetry failures`. +# - name: log-tail-headroom +# type: iii-exec +# config: +# exec: +# - pkill -f 'hex log-tail .*headroom-stage-timings' 2>/dev/null; true +# # Absolute path — the hex binary lives at $HEX_DIR/.hex/bin/hex and is not +# # guaranteed on PATH in iii-exec's minimal spawn shell (cf. the headroom / +# # console stanzas above, which also use absolute paths). +# - $HEX_DIR/.hex/bin/hex log-tail --path "$HOME/.headroom/logs/proxy.log" --observer headroom-stage-timings --event headroom.overhead --source headroom-proxy +# restart: +# on_crash: true +# backoff_secs: 5 +# max_backoff_secs: 60