diff --git a/src/app/api/cron/tripwire-asn-update/route.ts b/src/app/api/cron/tripwire-asn-update/route.ts index 4d6f575..f92aa27 100644 --- a/src/app/api/cron/tripwire-asn-update/route.ts +++ b/src/app/api/cron/tripwire-asn-update/route.ts @@ -6,8 +6,10 @@ // whatever ASN db is currently in blob. import { NextResponse, type NextRequest } from "next/server" -import { syncGeoipToBlob } from "@/lib/tripwire/sync-geoip" -import { checkCronAuth, makeCronLogger } from "@/lib/cron-helpers" +import { revalidateTag } from "next/cache" +import { syncGeoipToBlob, ASN_BLOB_TAG } from "@/lib/tripwire/sync-geoip" +import { checkCronAuth } from "@/lib/cron-helpers" +import { log } from "@/lib/log" export const runtime = "nodejs" export const dynamic = "force-dynamic" @@ -18,11 +20,14 @@ export async function GET(req: NextRequest): Promise { if (authError) return authError const startedAt = Date.now() - const log = makeCronLogger("cron.tripwire_asn_update", startedAt) + const cronLog = log.child({ event: "cron.tripwire_asn_update" }) - log({ step: "start" }) + cronLog.info({ step: "start" }) const result = await syncGeoipToBlob() - log({ step: "done", ...result }) + // Invalidate the build-stats fetch cache so the next build-stats run + // picks up the freshly-uploaded mmdb instead of serving the stale one. + revalidateTag(ASN_BLOB_TAG, "max") + cronLog.info({ step: "done", elapsed_ms: Date.now() - startedAt, ...result }) return NextResponse.json({ ok: true, diff --git a/src/app/api/cron/tripwire-build-stats/route.ts b/src/app/api/cron/tripwire-build-stats/route.ts index 697a88c..4965275 100644 --- a/src/app/api/cron/tripwire-build-stats/route.ts +++ b/src/app/api/cron/tripwire-build-stats/route.ts @@ -8,7 +8,9 @@ import { NextResponse, type NextRequest } from "next/server" import { revalidateTag } from "next/cache" import { buildAggregates, publishAggregates } from "@/lib/tripwire/stats" -import { checkCronAuth, makeCronLogger } from "@/lib/cron-helpers" +import { STATS_BLOB_TAG } from "@/lib/tripwire/aggregate-shape" +import { checkCronAuth } from "@/lib/cron-helpers" +import { log } from "@/lib/log" export const runtime = "nodejs" export const dynamic = "force-dynamic" @@ -19,21 +21,22 @@ export async function GET(req: NextRequest): Promise { if (authError) return authError const startedAt = Date.now() - const log = makeCronLogger("cron.tripwire_build_stats", startedAt) + const cronLog = log.child({ event: "cron.tripwire_build_stats" }) - log({ step: "build_start" }) + cronLog.info({ step: "build_start" }) const aggregates = await buildAggregates() - log({ + cronLog.info({ step: "build_done", + elapsed_ms: Date.now() - startedAt, total: aggregates.lifetime.totalEvents, ips: aggregates.lifetime.distinctIps, asns: aggregates.lifetime.distinctAsns, }) - log({ step: "publish_start" }) + cronLog.info({ step: "publish_start" }) await publishAggregates(aggregates) - revalidateTag("tripwire-aggregates", "max") - log({ step: "publish_done" }) + revalidateTag(STATS_BLOB_TAG, "max") + cronLog.info({ step: "publish_done", elapsed_ms: Date.now() - startedAt }) return NextResponse.json({ ok: true, diff --git a/src/app/api/cron/tripwire-ingest/route.ts b/src/app/api/cron/tripwire-ingest/route.ts index 9421e85..97ef833 100644 --- a/src/app/api/cron/tripwire-ingest/route.ts +++ b/src/app/api/cron/tripwire-ingest/route.ts @@ -7,7 +7,8 @@ import { NextResponse, type NextRequest } from "next/server" import { ingestNewEvents } from "@/lib/tripwire/ingest" -import { checkCronAuth, makeCronLogger } from "@/lib/cron-helpers" +import { checkCronAuth } from "@/lib/cron-helpers" +import { log } from "@/lib/log" export const runtime = "nodejs" export const dynamic = "force-dynamic" @@ -20,14 +21,14 @@ export async function GET(req: NextRequest): Promise { if (authError) return authError const startedAt = Date.now() - const log = makeCronLogger("cron.tripwire_ingest", startedAt) + const cronLog = log.child({ event: "cron.tripwire_ingest" }) - log({ step: "start" }) + cronLog.info({ step: "start" }) const result = await ingestNewEvents({ - onProgress: log, + onProgress: (e) => cronLog.info(e), deadlineMs: startedAt + INGEST_DEADLINE_MS, }) - log({ step: "done", ...result }) + cronLog.info({ step: "done", elapsed_ms: Date.now() - startedAt, ...result }) return NextResponse.json({ ok: true, diff --git a/src/lib/blob-stream.ts b/src/lib/blob-stream.ts deleted file mode 100644 index cb4dc7a..0000000 --- a/src/lib/blob-stream.ts +++ /dev/null @@ -1,30 +0,0 @@ -// src/lib/blob-stream.ts -// -// Drain a Web ReadableStream returned by @vercel/blob's get() into a -// Buffer / string. -// -// We had been doing `Buffer.from(await new Response(stream).arrayBuffer())` -// and the equivalent `.text()`. Both work in Bun and locally in `next dev`. -// In production on Vercel Fluid Compute (Node.js), wrapping the blob stream -// in a Web `Response` and reading it back hangs forever — the cron's -// 12 MB ASN db fetch never produced a `drain_done` log line, the function -// just sat at full elapsed_ms until the 300s platform timeout. -// -// Pulling chunks straight off the reader avoids whatever `Response`-wrap -// quirk is at play and is also less indirection. - -export async function streamToBuffer(stream: ReadableStream): Promise { - const reader = stream.getReader() - const chunks: Uint8Array[] = [] - for (;;) { - const { done, value } = await reader.read() - if (done) break - if (value) chunks.push(value) - } - return Buffer.concat(chunks) -} - -export async function streamToText(stream: ReadableStream): Promise { - const buf = await streamToBuffer(stream) - return buf.toString("utf8") -} diff --git a/src/lib/cron-helpers.ts b/src/lib/cron-helpers.ts index e42977d..090db8e 100644 --- a/src/lib/cron-helpers.ts +++ b/src/lib/cron-helpers.ts @@ -1,15 +1,16 @@ // src/lib/cron-helpers.ts // // Shared boilerplate for the tripwire cron route handlers. Each route -// does ONE thing — this just removes the repeated auth check and the -// structured-log scaffolding from those handlers. +// does ONE thing — this just removes the repeated auth check. Logging +// goes through the singleton in src/lib/log.ts. import { NextResponse, type NextRequest } from "next/server" +import { log } from "@/lib/log" export function checkCronAuth(req: NextRequest): NextResponse | null { const secret = process.env.CRON_SECRET if (!secret) { - console.error("[cron] CRON_SECRET not configured") + log.error({ event: "cron.auth", reason: "no_secret" }) return NextResponse.json({ ok: false, error: "not_configured" }, { status: 500 }) } if (req.headers.get("authorization") !== `Bearer ${secret}`) { @@ -17,16 +18,3 @@ export function checkCronAuth(req: NextRequest): NextResponse | null { } return null } - -export type CronLogger = (fields: Record) => void - -export function makeCronLogger(eventName: string, startedAt: number): CronLogger { - return (fields) => - console.log( - JSON.stringify({ - event: eventName, - elapsed_ms: Date.now() - startedAt, - ...fields, - }), - ) -} diff --git a/src/lib/log.test.ts b/src/lib/log.test.ts new file mode 100644 index 0000000..0a773cf --- /dev/null +++ b/src/lib/log.test.ts @@ -0,0 +1,102 @@ +// src/lib/log.test.ts +import { describe, test, expect, mock } from "bun:test" +import { consoleLogger } from "./log" + +function captureConsole(): { lines: unknown[][]; restore: () => void } { + const lines: unknown[][] = [] + const original = console.log + const spy = mock((...args: unknown[]) => { + lines.push(args) + }) + console.log = spy + return { lines, restore: () => { console.log = original } } +} + +describe("consoleLogger", () => { + test("emits one JSON line per call with time + level + fields", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "debug" }) + log.info({ event: "boot", step: "ok" }) + expect(lines).toHaveLength(1) + expect(lines[0]).toHaveLength(1) + const record = JSON.parse(lines[0][0] as string) + expect(record.level).toBe("info") + expect(record.event).toBe("boot") + expect(record.step).toBe("ok") + expect(typeof record.time).toBe("string") + expect(Number.isNaN(Date.parse(record.time))).toBe(false) + } finally { + restore() + } + }) + + test("filters records below the threshold", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "warn" }) + log.debug({ x: 1 }) + log.info({ x: 2 }) + log.warn({ x: 3 }) + log.error({ x: 4 }) + const levels = lines.map((l) => JSON.parse(l[0] as string).level) + expect(levels).toEqual(["warn", "error"]) + } finally { + restore() + } + }) + + test("silent mutes everything", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "silent" }) + log.error({ x: 1 }) + expect(lines).toHaveLength(0) + } finally { + restore() + } + }) + + test("child merges bindings; later bindings win on key collision", () => { + const { lines, restore } = captureConsole() + try { + const root = consoleLogger({ level: "debug", bindings: { service: "tripwire", env: "test" } }) + const child = root.child({ event: "cron.ingest", env: "prod" }) + child.info({ step: "start" }) + const record = JSON.parse(lines[0][0] as string) + expect(record.service).toBe("tripwire") + expect(record.event).toBe("cron.ingest") + expect(record.env).toBe("prod") + expect(record.step).toBe("start") + } finally { + restore() + } + }) + + test("call-site fields override parent + child bindings", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "debug", bindings: { event: "from-root" } }).child({ + event: "from-child", + }) + log.info({ event: "from-call" }) + const record = JSON.parse(lines[0][0] as string) + expect(record.event).toBe("from-call") + } finally { + restore() + } + }) + + test("child inherits the parent level", () => { + const { lines, restore } = captureConsole() + try { + const log = consoleLogger({ level: "warn" }).child({ scope: "x" }) + log.info({ skip: true }) + log.warn({ keep: true }) + expect(lines).toHaveLength(1) + expect(JSON.parse(lines[0][0] as string).keep).toBe(true) + } finally { + restore() + } + }) +}) diff --git a/src/lib/log.ts b/src/lib/log.ts new file mode 100644 index 0000000..afd9faa --- /dev/null +++ b/src/lib/log.ts @@ -0,0 +1,83 @@ +// src/lib/log.ts +// +// Tiny structured logger. One `Logger` interface, one provider that +// writes JSON lines via console.log. Vercel auto-parses JSON in stdout +// into searchable fields in runtime logs and forwards the same to log +// drains, so this is the idiomatic shape for this platform. +// +// Why a wrapper at all instead of raw `console.log(JSON.stringify(...))`: +// the interface gives us levels, bound context (`child`), and a single +// place to swap implementations later (an HTTP drain, a no-op for tests, +// a prefixed logger for CLI scripts) without touching call sites. +// +// What this is not: it has no transports, no formatters, no redaction, +// no async I/O. If we ever need any of those, they're additive. + +export type Level = "debug" | "info" | "warn" | "error" | "silent" + +export interface Logger { + debug(fields: Record): void + info(fields: Record): void + warn(fields: Record): void + error(fields: Record): void + child(bindings: Record): Logger +} + +const RANK: Record, number> = { + debug: 10, + info: 20, + warn: 30, + error: 40, +} + +interface ConsoleLoggerOptions { + level?: Level + bindings?: Record +} + +export function consoleLogger(opts: ConsoleLoggerOptions = {}): Logger { + const level = opts.level ?? "info" + const bindings = opts.bindings ?? {} + const threshold = level === "silent" ? Number.POSITIVE_INFINITY : RANK[level] + + function emit(name: Exclude, fields: Record): void { + if (RANK[name] < threshold) return + console.log( + JSON.stringify({ + time: new Date().toISOString(), + level: name, + ...bindings, + ...fields, + }), + ) + } + + return { + debug: (f) => emit("debug", f), + info: (f) => emit("info", f), + warn: (f) => emit("warn", f), + error: (f) => emit("error", f), + child: (b) => consoleLogger({ level, bindings: { ...bindings, ...b } }), + } +} + +function readLevel(): Level { + // Default: quiet on production, debug everywhere else (preview, dev). + // Crons run identical code in every environment, so a preview deploy + // gets the per-step trace without needing a manual env var. LOG_LEVEL + // overrides if set explicitly. + const fallback = process.env.VERCEL_ENV === "production" ? "info" : "debug" + const raw = (process.env.LOG_LEVEL ?? fallback).toLowerCase() + if ( + raw === "debug" || + raw === "info" || + raw === "warn" || + raw === "error" || + raw === "silent" + ) { + return raw + } + return "info" +} + +export const log: Logger = consoleLogger({ level: readLevel() }) diff --git a/src/lib/tripwire/aggregate-shape.ts b/src/lib/tripwire/aggregate-shape.ts index 5ebb552..9fd0cde 100644 --- a/src/lib/tripwire/aggregate-shape.ts +++ b/src/lib/tripwire/aggregate-shape.ts @@ -8,6 +8,10 @@ // before this split. export const STATS_BLOB_KEY = "stats/tripwire-aggregates.json" +// Next.js fetch-cache tag. The page-side loader fetches with this tag; +// the build-stats cron calls revalidateTag after a successful publish, +// so warm pages flip to fresh aggregates without polling on a TTL. +export const STATS_BLOB_TAG = "tripwire-aggregates" export const DEFAULT_TOP_PATHS = 100 export interface Aggregates { diff --git a/src/lib/tripwire/aggregates.test.ts b/src/lib/tripwire/aggregates.test.ts index e77cdf1..f88b3ae 100644 --- a/src/lib/tripwire/aggregates.test.ts +++ b/src/lib/tripwire/aggregates.test.ts @@ -1,7 +1,7 @@ // src/lib/tripwire/aggregates.test.ts import { describe, test, expect, beforeEach, mock } from "bun:test" import * as blob from "@vercel/blob" -import type { Aggregates } from "@/lib/tripwire/aggregate-shape" +import { STATS_BLOB_TAG, type Aggregates } from "@/lib/tripwire/aggregate-shape" const SAMPLE: Aggregates = { generatedAt: "2026-05-02T00:00:00.000Z", @@ -21,53 +21,67 @@ const SAMPLE: Aggregates = { byAsn: [], } -interface GetCall { - pathname: string - options: Record -} -const getCalls: GetCall[] = [] -type GetMode = "ok" | "bad-status" -let getMode: GetMode = "ok" +const FAKE_URL = "https://store.private.blob.vercel-storage.com/stats/tripwire-aggregates.json" + +interface HeadCall { pathname: string } +interface FetchCall { url: string; init: RequestInit | undefined } + +const headCalls: HeadCall[] = [] +const fetchCalls: FetchCall[] = [] +type FetchMode = "ok" | "bad-status" +let fetchMode: FetchMode = "ok" mock.module("@vercel/blob", () => ({ ...blob, - get: async (pathname: string, options: Record) => { - getCalls.push({ pathname, options }) - if (getMode === "bad-status") return { stream: null, statusCode: 404 } - const text = JSON.stringify(SAMPLE) - const stream = new ReadableStream({ - start(c) { - c.enqueue(new TextEncoder().encode(text)) - c.close() - }, - }) - return { stream, statusCode: 200 } + head: async (pathname: string) => { + headCalls.push({ pathname }) + return { url: FAKE_URL, pathname } }, })) +const realFetch = globalThis.fetch +globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => { + fetchCalls.push({ url: String(input), init }) + if (fetchMode === "bad-status") { + return new Response("nope", { status: 404, statusText: "Not Found" }) + } + return new Response(JSON.stringify(SAMPLE), { + status: 200, + headers: { "content-type": "application/json" }, + }) +}) as typeof fetch + +process.env.BLOB_READ_WRITE_TOKEN = "vercel_blob_rw_test_token" + const { getAggregates, _resetAggregatesCacheForTests } = await import("./aggregates") beforeEach(() => { _resetAggregatesCacheForTests() - getCalls.length = 0 - getMode = "ok" + headCalls.length = 0 + fetchCalls.length = 0 + fetchMode = "ok" }) describe("getAggregates", () => { test("cache miss fetches and parses the blob", async () => { const result = await getAggregates() expect(result).toEqual(SAMPLE) - expect(getCalls).toHaveLength(1) - expect(getCalls[0].pathname).toBe("stats/tripwire-aggregates.json") - expect(getCalls[0].options).toMatchObject({ access: "private" }) + expect(headCalls).toHaveLength(1) + expect(headCalls[0].pathname).toBe("stats/tripwire-aggregates.json") + expect(fetchCalls).toHaveLength(1) + expect(fetchCalls[0].url).toBe(FAKE_URL) + const headers = new Headers(fetchCalls[0].init?.headers) + expect(headers.get("authorization")).toBe("Bearer vercel_blob_rw_test_token") + const next = (fetchCalls[0].init as { next?: { tags?: string[] } } | undefined)?.next + expect(next?.tags).toEqual([STATS_BLOB_TAG]) }) test("cache hit within TTL skips the fetch", async () => { await getAggregates() - expect(getCalls).toHaveLength(1) + expect(fetchCalls).toHaveLength(1) const result = await getAggregates() expect(result).toEqual(SAMPLE) - expect(getCalls).toHaveLength(1) + expect(fetchCalls).toHaveLength(1) }) test("expired TTL triggers a fresh fetch", async () => { @@ -76,17 +90,22 @@ describe("getAggregates", () => { Date.now = () => now try { await getAggregates() - expect(getCalls).toHaveLength(1) + expect(fetchCalls).toHaveLength(1) now += 2 * 60 * 1000 + 1 await getAggregates() - expect(getCalls).toHaveLength(2) + expect(fetchCalls).toHaveLength(2) } finally { Date.now = realNow } }) - test("throws when get() returns a non-200 status", async () => { - getMode = "bad-status" + test("throws when fetch returns a non-200 status", async () => { + fetchMode = "bad-status" await expect(getAggregates()).rejects.toThrow(/status 404/) }) }) + +// Restore real fetch so other test files aren't affected. +process.on("beforeExit", () => { + globalThis.fetch = realFetch +}) diff --git a/src/lib/tripwire/aggregates.ts b/src/lib/tripwire/aggregates.ts index 798f2bd..e727f5d 100644 --- a/src/lib/tripwire/aggregates.ts +++ b/src/lib/tripwire/aggregates.ts @@ -1,18 +1,26 @@ // src/lib/tripwire/aggregates.ts // // Page-side analytics blob loader. The build-stats cron republishes -// stats/tripwire-aggregates.json every ~15 minutes. We hold the parsed -// JSON in a module-level singleton with a 2-minute TTL so a warm -// Fluid Compute instance only pays for the blob fetch once per window. -// On TTL expiry the next request triggers a fresh fetch. +// stats/tripwire-aggregates.json every ~15 minutes and calls +// revalidateTag(STATS_BLOB_TAG) right after, so the page-side fetch is +// served from Next's data cache until that exact moment of invalidation. +// +// We also keep a 2-minute module-level singleton in front of the fetch: +// the data cache lives in the regional edge, the singleton lives in the +// running instance. The singleton absorbs bursty page traffic on a warm +// instance without crossing the network at all. Stale data is fine for +// up to 2 minutes — the cron only runs every 15. // // On any fetch error we throw — `src/app/x/tripwire/error.tsx` surfaces // a retry button. We deliberately don't fall back to stale data; a hard // failure is better than silently lying about freshness. -import { get } from "@vercel/blob" -import { streamToText } from "@/lib/blob-stream" -import { STATS_BLOB_KEY, type Aggregates } from "@/lib/tripwire/aggregate-shape" +import { head } from "@vercel/blob" +import { + STATS_BLOB_KEY, + STATS_BLOB_TAG, + type Aggregates, +} from "@/lib/tripwire/aggregate-shape" const TTL_MS = 2 * 60 * 1000 @@ -22,14 +30,18 @@ export async function getAggregates(): Promise { if (cached && Date.now() - cached.fetchedAt < TTL_MS) { return cached.data } - const file = await get(STATS_BLOB_KEY, { access: "private" }) - if (!file || file.statusCode !== 200) { - throw new Error( - `blob get failed (status ${file?.statusCode ?? "no response"})`, - ) + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + + const meta = await head(STATS_BLOB_KEY) + const res = await fetch(meta.url, { + headers: { authorization: `Bearer ${token}` }, + next: { tags: [STATS_BLOB_TAG] }, + }) + if (!res.ok) { + throw new Error(`blob fetch failed (status ${res.status} ${res.statusText})`) } - const text = await streamToText(file.stream) - const data = JSON.parse(text) as Aggregates + const data = (await res.json()) as Aggregates cached = { data, fetchedAt: Date.now() } return data } diff --git a/src/lib/tripwire/ingest.test.ts b/src/lib/tripwire/ingest.test.ts new file mode 100644 index 0000000..9310641 --- /dev/null +++ b/src/lib/tripwire/ingest.test.ts @@ -0,0 +1,30 @@ +// src/lib/tripwire/ingest.test.ts +import { describe, test, expect } from "bun:test" +import { recentDatePrefixes } from "./ingest" + +describe("recentDatePrefixes", () => { + test("returns today and yesterday in UTC, today first", () => { + const now = new Date("2026-05-03T12:34:56.000Z") + expect(recentDatePrefixes(now)).toEqual([ + "events/2026-05-03/", + "events/2026-05-02/", + ]) + }) + + test("crosses month boundary correctly", () => { + const now = new Date("2026-06-01T00:30:00.000Z") + expect(recentDatePrefixes(now)).toEqual([ + "events/2026-06-01/", + "events/2026-05-31/", + ]) + }) + + test("uses UTC, not local time, just before midnight UTC", () => { + // Late on the 3rd UTC -> today=03, yesterday=02 regardless of host TZ. + const now = new Date("2026-05-03T23:59:59.000Z") + expect(recentDatePrefixes(now)).toEqual([ + "events/2026-05-03/", + "events/2026-05-02/", + ]) + }) +}) diff --git a/src/lib/tripwire/ingest.ts b/src/lib/tripwire/ingest.ts index 36a1180..d1bf0d6 100644 --- a/src/lib/tripwire/ingest.ts +++ b/src/lib/tripwire/ingest.ts @@ -11,12 +11,13 @@ // Pure library: no console.log, no process.exit. Callers (the CLI script // and the cron route) decide how to log and surface results. -import { list, get } from "@vercel/blob" import { inArray } from "drizzle-orm" import { getDb, schema } from "@/db" -import { streamToText } from "@/lib/blob-stream" +import { log } from "@/lib/log" import { isTripwireEvent, type TripwireEvent } from "@/lib/tripwire/patterns" +const ilog = log.child({ event: "tripwire.ingest" }) + const DEFAULT_BATCH = 200 const ID_LOOKUP_CHUNK = 1000 @@ -59,25 +60,84 @@ function idFromPathname(pathname: string): string | null { return match ? match[2] : null } +interface BlobListPage { + blobs: Array<{ pathname: string; url: string; size: number; uploadedAt: string }> + cursor?: string + hasMore: boolean +} + +// Direct call to Vercel Blob's list API. We bypass @vercel/blob's list() +// for the same reason we bypass get(): the SDK ends in `apiResponse.json()` +// after the Response object goes out of scope, which under Bun on Vercel +// can leave the body stream stuck waiting for EOF. By keeping our own +// Response in scope across the .json() drain, the request completes. +async function listBlobsPage(prefix: string, cursor: string | undefined): Promise { + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + const params = new URLSearchParams({ prefix }) + if (cursor) params.set("cursor", cursor) + const res = await fetch(`https://vercel.com/api/blob/?${params}`, { + headers: { + authorization: `Bearer ${token}`, + "x-api-version": "12", + }, + cache: "no-store", + }) + if (!res.ok) { + throw new Error(`blob list failed: ${res.status} ${res.statusText}`) + } + return (await res.json()) as BlobListPage +} + +// Bound the listing to the trailing INGEST_WINDOW_DAYS UTC dates. Cron runs +// every 5 minutes, so a 2-day window leaves 24h+ of slack against any cron +// outage. Events older than the window won't be auto-ingested by the cron; +// the CLI script (scripts/tripwire/ingest-events.ts) still walks the full +// events/ prefix and can backfill manually if a longer outage happens. +const INGEST_WINDOW_DAYS = 2 + +export function recentDatePrefixes(now: Date): string[] { + const out: string[] = [] + for (let i = 0; i < INGEST_WINDOW_DAYS; i++) { + const d = new Date(now.getTime() - i * 24 * 60 * 60 * 1000) + out.push(`events/${d.toISOString().slice(0, 10)}/`) + } + return out +} + async function listAllBlobs( log: (e: IngestLogEvent) => void, ): Promise<{ refs: BlobRef[]; unrecognized: number }> { const refs: BlobRef[] = [] let unrecognized = 0 - let cursor: string | undefined - do { - const page = await list({ prefix: "events/", cursor }) - for (const blob of page.blobs) { - const id = idFromPathname(blob.pathname) - if (!id) { - unrecognized++ - log({ step: "list.unrecognized_blob", pathname: blob.pathname }) - continue + for (const prefix of recentDatePrefixes(new Date())) { + let cursor: string | undefined + let page = 0 + do { + page++ + const t0 = Date.now() + ilog.debug({ step: "list.page_start", prefix, page, cursor: cursor ?? null }) + const result = await listBlobsPage(prefix, cursor) + ilog.debug({ + step: "list.page_done", + prefix, + page, + elapsed_ms: Date.now() - t0, + blobs: result.blobs.length, + has_cursor: Boolean(result.cursor), + }) + for (const blob of result.blobs) { + const id = idFromPathname(blob.pathname) + if (!id) { + unrecognized++ + log({ step: "list.unrecognized_blob", pathname: blob.pathname }) + continue + } + refs.push({ pathname: blob.pathname, url: blob.url, id }) } - refs.push({ pathname: blob.pathname, url: blob.url, id }) - } - cursor = page.cursor - } while (cursor) + cursor = result.cursor + } while (cursor) + } return { refs, unrecognized } } @@ -85,12 +145,35 @@ async function fetchEvent( url: string, log: (e: IngestLogEvent) => void, ): Promise { - const file = await get(url, { access: "private" }) - if (!file || file.statusCode !== 200) { - log({ step: "fetch_event.bad_status", url, statusCode: file?.statusCode ?? null }) + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + + // Direct fetch instead of @vercel/blob's get(): get() returns + // response.body and lets the Response go out of scope, which under + // Bun on Vercel can leave the body stream stuck waiting for EOF. + // Each event JSON is read once, so no caching. + const t0 = Date.now() + ilog.debug({ step: "fetch_event.fetch_start", url }) + const res = await fetch(url, { + headers: { authorization: `Bearer ${token}` }, + cache: "no-store", + }) + ilog.debug({ + step: "fetch_event.fetch_done", + elapsed_ms: Date.now() - t0, + status: res.status, + }) + if (!res.ok) { + log({ step: "fetch_event.bad_status", url, statusCode: res.status }) return null } - const text = await streamToText(file.stream) + const t1 = Date.now() + const text = await res.text() + ilog.debug({ + step: "fetch_event.drain_done", + elapsed_ms: Date.now() - t1, + bytes: text.length, + }) let parsed: unknown try { parsed = JSON.parse(text) @@ -131,10 +214,18 @@ async function existingIds(ids: string[]): Promise> { const db = getDb() for (let i = 0; i < ids.length; i += ID_LOOKUP_CHUNK) { const chunk = ids.slice(i, i + ID_LOOKUP_CHUNK) + const t0 = Date.now() + ilog.debug({ step: "dedup.chunk_start", offset: i, size: chunk.length }) const rows = await db .select({ id: schema.tripwireEvents.id }) .from(schema.tripwireEvents) .where(inArray(schema.tripwireEvents.id, chunk)) + ilog.debug({ + step: "dedup.chunk_done", + offset: i, + elapsed_ms: Date.now() - t0, + matched: rows.length, + }) for (const r of rows) out.add(r.id) } return out @@ -143,7 +234,10 @@ async function existingIds(ids: string[]): Promise> { async function insertBatch(rows: schema.NewTripwireEventRow[]): Promise { if (rows.length === 0) return 0 const db = getDb() + const t0 = Date.now() + ilog.debug({ step: "insert.start", rows: rows.length }) await db.insert(schema.tripwireEvents).values(rows).onConflictDoNothing() + ilog.debug({ step: "insert.done", rows: rows.length, elapsed_ms: Date.now() - t0 }) return rows.length } diff --git a/src/lib/tripwire/stats.ts b/src/lib/tripwire/stats.ts index 55aac01..6daf0af 100644 --- a/src/lib/tripwire/stats.ts +++ b/src/lib/tripwire/stats.ts @@ -11,18 +11,19 @@ // it across cron invocations and only the first cold instance pays the // ~10MB blob fetch. -import { get, put } from "@vercel/blob" +import { head, put } from "@vercel/blob" import { Reader, type Asn, type ReaderModel } from "@maxmind/geoip2-node" import { sql } from "drizzle-orm" import { getDb } from "@/db" -import { streamToBuffer } from "@/lib/blob-stream" +import { log } from "@/lib/log" +import { ASN_BLOB_KEY, ASN_BLOB_TAG } from "@/lib/tripwire/sync-geoip" import { STATS_BLOB_KEY, DEFAULT_TOP_PATHS, type Aggregates, } from "@/lib/tripwire/aggregate-shape" -const ASN_BLOB_KEY = "geoip/GeoLite2-ASN.mmdb" +const slog = log.child({ event: "tripwire.stats" }) // Re-export so existing callers can keep importing from "@/lib/tripwire/stats". // The page-side loader imports STATS_BLOB_KEY straight from aggregate-shape so @@ -32,16 +33,51 @@ export { STATS_BLOB_KEY, DEFAULT_TOP_PATHS, type Aggregates } let cachedAsnReader: ReaderModel | null = null async function getAsnReader(): Promise { - if (cachedAsnReader) return cachedAsnReader - const file = await get(ASN_BLOB_KEY, { access: "private" }) - if (!file || file.statusCode !== 200) { + if (cachedAsnReader) { + slog.debug({ step: "asn.cache_hit" }) + return cachedAsnReader + } + const token = process.env.BLOB_READ_WRITE_TOKEN + if (!token) throw new Error("BLOB_READ_WRITE_TOKEN is not set") + + // head() resolves the (stable) blob URL for the pathname. The body is + // small JSON metadata, so it doesn't trip the large-body stream hang we + // hit when calling get() on the 12MB mmdb directly. + const tHead = Date.now() + slog.debug({ step: "asn.head_start", key: ASN_BLOB_KEY }) + const meta = await head(ASN_BLOB_KEY) + slog.debug({ step: "asn.head_done", elapsed_ms: Date.now() - tHead, url: meta.url }) + + // Direct fetch with the bearer token, tagged for the Next.js data cache. + // tripwire-asn-update calls revalidateTag(ASN_BLOB_TAG) after a fresh put, + // so we only pay for the 12MB drain when the mmdb actually changed. + const tFetch = Date.now() + slog.debug({ step: "asn.fetch_start" }) + const res = await fetch(meta.url, { + headers: { authorization: `Bearer ${token}` }, + next: { tags: [ASN_BLOB_TAG] }, + }) + slog.debug({ + step: "asn.fetch_done", + elapsed_ms: Date.now() - tFetch, + status: res.status, + cache: res.headers.get("x-vercel-cache"), + }) + if (!res.ok) { throw new Error( - `Failed to fetch ${ASN_BLOB_KEY} from blob (status: ${file?.statusCode ?? "no response"}). ` + + `Failed to fetch ${ASN_BLOB_KEY} (status: ${res.status} ${res.statusText}). ` + `Run the tripwire-asn-update cron / sync-geoip-to-blob.ts to populate it.`, ) } - const buf = await streamToBuffer(file.stream) + + const tBuf = Date.now() + slog.debug({ step: "asn.array_buffer_start" }) + const buf = Buffer.from(await res.arrayBuffer()) + slog.debug({ step: "asn.array_buffer_done", elapsed_ms: Date.now() - tBuf, bytes: buf.length }) + + const tOpen = Date.now() cachedAsnReader = Reader.openBuffer(buf) + slog.debug({ step: "asn.reader_open_done", elapsed_ms: Date.now() - tOpen }) return cachedAsnReader } @@ -83,6 +119,8 @@ export async function buildAggregates( ): Promise { const db = getDb() + const tQ1 = Date.now() + slog.debug({ step: "sql.lifetime.start" }) const lifetimeResult = await db.execute(sql` SELECT COUNT(*)::int AS total_events, @@ -92,11 +130,14 @@ export async function buildAggregates( COUNT(DISTINCT path)::int AS distinct_paths FROM tripwire_events `) + slog.debug({ step: "sql.lifetime.done", elapsed_ms: Date.now() - tQ1 }) const lifetime = lifetimeResult.rows[0] if (!lifetime || lifetime.total_events === 0) { throw new Error("no events in tripwire_events; run ingest first") } + const tQ2 = Date.now() + slog.debug({ step: "sql.byCategory.start" }) const byCategory = await db.execute(sql` SELECT category, COUNT(*)::int AS count FROM tripwire_events @@ -104,14 +145,20 @@ export async function buildAggregates( GROUP BY category ORDER BY count DESC, category ASC `) + slog.debug({ step: "sql.byCategory.done", elapsed_ms: Date.now() - tQ2, rows: byCategory.rows.length }) + const tQ3 = Date.now() + slog.debug({ step: "sql.byUaFamily.start" }) const byUaFamily = await db.execute(sql` SELECT COALESCE(ua_family, 'unknown') AS ua, COUNT(*)::int AS count FROM tripwire_events GROUP BY ua ORDER BY count DESC, ua ASC `) + slog.debug({ step: "sql.byUaFamily.done", elapsed_ms: Date.now() - tQ3, rows: byUaFamily.rows.length }) + const tQ4 = Date.now() + slog.debug({ step: "sql.byDay.start" }) const byDay = await db.execute(sql` SELECT TO_CHAR(ts AT TIME ZONE 'UTC', 'YYYY-MM-DD') AS date, COUNT(*)::int AS count @@ -119,7 +166,10 @@ export async function buildAggregates( GROUP BY date ORDER BY date ASC `) + slog.debug({ step: "sql.byDay.done", elapsed_ms: Date.now() - tQ4, rows: byDay.rows.length }) + const tQ5 = Date.now() + slog.debug({ step: "sql.topPaths.start", limit: topPathsLimit }) const topPaths = await db.execute(sql` SELECT path, COUNT(*)::int AS count, @@ -129,19 +179,27 @@ export async function buildAggregates( ORDER BY count DESC, path ASC LIMIT ${topPathsLimit} `) + slog.debug({ step: "sql.topPaths.done", elapsed_ms: Date.now() - tQ5, rows: topPaths.rows.length }) // ASN enrichment at query time: fold each event's IP through the mmdb // and roll up. Lifetime.distinctAsns is computed from the rolled-up map // rather than from a SQL DISTINCT — the column-stored value is no // longer the source of truth for ASN since we stopped writing it during // ingest. + const tQ6 = Date.now() + slog.debug({ step: "sql.ipCounts.start" }) const ipCountsResult = await db.execute(sql` SELECT ip, COUNT(*)::int AS count FROM tripwire_events WHERE ip IS NOT NULL AND ip <> '' GROUP BY ip `) + slog.debug({ step: "sql.ipCounts.done", elapsed_ms: Date.now() - tQ6, rows: ipCountsResult.rows.length }) + const reader = await getAsnReader() + + const tEnrich = Date.now() + slog.debug({ step: "asn.enrich.start", ips: ipCountsResult.rows.length }) const asnTotals = new Map() for (const row of ipCountsResult.rows) { const lookup = lookupAsn(reader, row.ip) @@ -153,6 +211,11 @@ export async function buildAggregates( const byAsn = [...asnTotals.entries()] .map(([asn, v]) => ({ asn, name: v.name, count: v.count })) .sort((a, b) => (b.count - a.count) || a.asn.localeCompare(b.asn)) + slog.debug({ + step: "asn.enrich.done", + elapsed_ms: Date.now() - tEnrich, + asns: byAsn.length, + }) const earliestDate = new Date(lifetime.earliest_ts) const daysSinceFirst = Math.max( @@ -184,10 +247,14 @@ export async function buildAggregates( } export async function publishAggregates(agg: Aggregates): Promise { - await put(STATS_BLOB_KEY, JSON.stringify(agg, null, 2), { + const body = JSON.stringify(agg, null, 2) + const t0 = Date.now() + slog.debug({ step: "publish.put_start", key: STATS_BLOB_KEY, bytes: body.length }) + await put(STATS_BLOB_KEY, body, { access: "private", contentType: "application/json", addRandomSuffix: false, allowOverwrite: true, }) + slog.debug({ step: "publish.put_done", elapsed_ms: Date.now() - t0 }) } diff --git a/src/lib/tripwire/sync-geoip.ts b/src/lib/tripwire/sync-geoip.ts index e20c0dc..b5c9c0f 100644 --- a/src/lib/tripwire/sync-geoip.ts +++ b/src/lib/tripwire/sync-geoip.ts @@ -11,10 +11,17 @@ import { put } from "@vercel/blob" import { gunzipSync } from "node:zlib" +import { log } from "@/lib/log" + +const glog = log.child({ event: "tripwire.sync_geoip" }) const DOWNLOAD_URL = "https://download.maxmind.com/geoip/databases/GeoLite2-ASN/download?suffix=tar.gz" export const ASN_BLOB_KEY = "geoip/GeoLite2-ASN.mmdb" +// Next.js fetch-cache tag. Build-stats fetches the mmdb with this tag; +// this cron calls revalidateTag after a successful upload, so subsequent +// build-stats runs get the new mmdb without paying for the 12MB drain. +export const ASN_BLOB_TAG = "asn-mmdb" const MMDB_NAME = "GeoLite2-ASN.mmdb" export interface SyncGeoipResult { @@ -28,16 +35,31 @@ async function downloadTarball( licenseKey: string, ): Promise { const auth = Buffer.from(`${accountId}:${licenseKey}`).toString("base64") + const t0 = Date.now() + glog.debug({ step: "maxmind.fetch_start", url: DOWNLOAD_URL }) const res = await fetch(DOWNLOAD_URL, { headers: { Authorization: `Basic ${auth}` }, }) + glog.debug({ + step: "maxmind.fetch_headers", + elapsed_ms: Date.now() - t0, + status: res.status, + content_length: res.headers.get("content-length"), + }) if (!res.ok) { const body = await res.text().catch(() => "") throw new Error( `MaxMind download failed: ${res.status} ${res.statusText}. ${body.slice(0, 200)}`, ) } - return Buffer.from(await res.arrayBuffer()) + const t1 = Date.now() + const buf = Buffer.from(await res.arrayBuffer()) + glog.debug({ + step: "maxmind.fetch_body_done", + elapsed_ms: Date.now() - t1, + bytes: buf.length, + }) + return buf } // POSIX ustar header. We only need name, size, typeflag, prefix. @@ -94,14 +116,25 @@ export async function syncGeoipToBlob(): Promise { } const tarball = await downloadTarball(accountId, licenseKey) + + const tExtract = Date.now() + glog.debug({ step: "extract.start", bytes: tarball.length }) const mmdb = extractFileFromTarGz(tarball, MMDB_NAME) + glog.debug({ + step: "extract.done", + elapsed_ms: Date.now() - tExtract, + mmdb_bytes: mmdb.length, + }) + const tPut = Date.now() + glog.debug({ step: "blob.put_start", key: ASN_BLOB_KEY, bytes: mmdb.length }) await put(ASN_BLOB_KEY, mmdb, { access: "private", contentType: "application/octet-stream", addRandomSuffix: false, allowOverwrite: true, }) + glog.debug({ step: "blob.put_done", elapsed_ms: Date.now() - tPut }) return { tarballBytes: tarball.length,