From 4417a905c14eea28102e94daecd5a821ee442475 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Sun, 26 Apr 2026 22:17:59 -0700 Subject: [PATCH] Abort parquet worker --- src/lib/workers/parquetWorker.ts | 12 +++++++++++ src/lib/workers/parquetWorkerClient.ts | 28 +++++++++++++++++++++++--- src/lib/workers/types.ts | 17 ++++++++++++---- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 4985f6e..7512013 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -5,6 +5,7 @@ import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, Parquet import { fromToAsyncBuffer } from './utils.js' const cache = new Map>() +const aborted = new Set() function postCompleteMessage ({ queryId, rows }: Omit) { self.postMessage({ kind: 'onComplete', queryId, rows }) @@ -29,30 +30,41 @@ function postParquetQueryResultMessage ({ queryId, rows }: Omit { + if (data.kind === 'abort') { + aborted.add(data.queryId) + return + } const { queryId, from, kind, options } = data const file = await fromToAsyncBuffer(from, cache) try { if (kind === 'parquetReadObjects') { const rows = (await parquetReadObjects({ ...options, rowFormat: 'object', file, compressors, onChunk, onPage })) as Rows + if (aborted.delete(queryId)) return postParquetReadObjectsResultMessage({ queryId, rows }) } else if (kind === 'parquetQuery') { const rows = await parquetQuery({ ...options, file, compressors, onChunk, onPage }) + if (aborted.delete(queryId)) return postParquetQueryResultMessage({ queryId, rows }) } else { await parquetRead({ ...options, rowFormat: 'object', file, compressors, onComplete, onChunk, onPage }) + if (aborted.delete(queryId)) return postParquetReadResultMessage({ queryId }) } } catch (error) { + if (aborted.delete(queryId)) return postErrorMessage({ error: error as Error, queryId }) } function onComplete(rows: Rows) { + if (aborted.has(queryId)) return postCompleteMessage({ queryId, rows }) } function onChunk(chunk: ColumnData) { + if (aborted.has(queryId)) return postChunkMessage({ chunk, queryId }) } function onPage(page: SubColumnData) { + if (aborted.has(queryId)) return postPageMessage({ page, queryId }) } } diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index cfdc4f6..b3dd491 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -68,6 +68,25 @@ function getWorker() { return worker } +/** Wires an AbortSignal to the queryId: posts an abort message to the worker + * and rejects the promise. The worker will suppress the result for this id. */ +function wireAbort(worker: Worker, queryId: number, signal: AbortSignal | undefined, reject: (e: Error) => void): boolean { + if (!signal) return false + if (signal.aborted) { + pendingAgents.delete(queryId) + worker.postMessage({ queryId, kind: 'abort' } satisfies ClientMessage) + reject(new DOMException('Aborted', 'AbortError')) + return true + } + signal.addEventListener('abort', () => { + if (!pendingAgents.has(queryId)) return + pendingAgents.delete(queryId) + worker.postMessage({ queryId, kind: 'abort' } satisfies ClientMessage) + reject(new DOMException('Aborted', 'AbortError')) + }, { once: true }) + return false +} + /** * Presents almost the same interface as parquetRead, but runs in a worker. * This is useful for reading large parquet files without blocking the main thread. @@ -78,11 +97,12 @@ function getWorker() { * Note that it only supports 'rowFormat: object' (the default). */ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise { - const { onComplete, onChunk, onPage, from, ...serializableOptions } = options + const { onComplete, onChunk, onPage, from, signal, ...serializableOptions } = options return new Promise((resolve, reject) => { const queryId = nextQueryId++ pendingAgents.set(queryId, { parquetReadResolve: resolve, reject, onComplete, onChunk, onPage }) const worker = getWorker() + if (wireAbort(worker, queryId, signal, reject)) return const message: ClientMessage = { queryId, from, kind: 'parquetRead', options: serializableOptions } worker.postMessage(message) }) @@ -98,11 +118,12 @@ export function parquetReadWorker(options: ParquetReadWorkerOptions): Promise { - const { onChunk, onPage, from, ...serializableOptions } = options + const { onChunk, onPage, from, signal, ...serializableOptions } = options return new Promise((resolve, reject) => { const queryId = nextQueryId++ pendingAgents.set(queryId, { parquetReadObjectsResolve: resolve, reject, onChunk, onPage }) const worker = getWorker() + if (wireAbort(worker, queryId, signal, reject)) return const message: ClientMessage = { queryId, from, kind: 'parquetReadObjects', options: serializableOptions } worker.postMessage(message) }) @@ -118,11 +139,12 @@ export function parquetReadObjectsWorker(options: ParquetReadObjectsWorkerOption * Note that it only supports 'rowFormat: object' (the default). */ export function parquetQueryWorker(options: ParquetQueryWorkerOptions): Promise { - const { onComplete, onChunk, onPage, from, ...serializableOptions } = options + const { onComplete, onChunk, onPage, from, signal, ...serializableOptions } = options return new Promise((resolve, reject) => { const queryId = nextQueryId++ pendingAgents.set(queryId, { parquetQueryResolve: resolve, reject, onComplete, onChunk, onPage }) const worker = getWorker() + if (wireAbort(worker, queryId, signal, reject)) return const message: ClientMessage = { queryId, from, kind: 'parquetQuery', options: serializableOptions } worker.postMessage(message) }) diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index 89c901e..bd7bf95 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -32,6 +32,12 @@ export interface ParquetReadWorkerOptions extends Omit void + /** + * Aborting the signal posts an abort message to the worker. The in-flight + * read continues to completion (hyparquet has no AbortSignal support), but + * the result is suppressed and the returned promise rejects with AbortError. + */ + signal?: AbortSignal } /** * Options for the worker version of parquetReadObjects @@ -64,17 +70,20 @@ export interface From { } export interface ParquetReadClientMessage extends QueryId, From { kind: 'parquetRead' - options: Omit + options: Omit } export interface ParquetReadObjectsClientMessage extends QueryId, From { kind: 'parquetReadObjects' - options: Omit + options: Omit } export interface ParquetQueryClientMessage extends QueryId, From { kind: 'parquetQuery' - options: Omit + options: Omit } -export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage +export interface AbortClientMessage extends QueryId { + kind: 'abort' +} +export type ClientMessage = ParquetQueryClientMessage | ParquetReadObjectsClientMessage | ParquetReadClientMessage | AbortClientMessage /** * Messages sent by the worker to the client