Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ coverage
npm-debug.log
yarn-error.log

# OS specific
.DS_Store

# Editors specific
.fleet
.idea
Expand Down
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ npm install @boringnode/queue
- **Priority Queues**: Process high-priority jobs first
- **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once
- **Job Grouping**: Organize related jobs for monitoring
- **Job Deduplication**: Prevent duplicate jobs with custom IDs
- **Retry with Backoff**: Exponential, linear, or fixed backoff strategies
- **Job Timeout**: Fail or retry jobs that exceed a time limit
- **Job History**: Retain completed/failed jobs for debugging
Expand Down Expand Up @@ -131,6 +132,38 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')

The `groupId` is stored with job data and accessible via `job.data.groupId`.

## Job Deduplication

Prevent the same job from being pushed to the queue twice using `.dedup()`:

```typescript
// First dispatch - job is created
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()

// Second dispatch with same dedup ID - silently skipped
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
```

The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts:

```typescript
// These are two different jobs, no conflict
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
```

Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks:

- **Redis**: Uses `HSETNX` (set-if-not-exists)
- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING`
- **SyncAdapter**: Executes jobs inline and does not support deduplication

> [!NOTE]
> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`.

> [!TIP]
> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally.

## Job History & Retention

Keep completed and failed jobs for debugging:
Expand Down
12 changes: 11 additions & 1 deletion src/drivers/fake_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ export class FakeAdapter implements Adapter {
}

async pushOn(queue: string, jobData: JobData): Promise<void> {
if (jobData.dedup) {
const existing = await this.getJob(jobData.id, queue)
if (existing) return
}

this.#recordPush(queue, jobData)
this.#enqueue(queue, jobData)
}
Expand All @@ -168,7 +173,12 @@ export class FakeAdapter implements Adapter {
return this.pushLaterOn('default', jobData, delay)
}

pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
if (jobData.dedup) {
const existing = await this.getJob(jobData.id, queue)
if (existing) return
}

this.#recordPush(queue, jobData, delay)
this.#schedulePush(queue, jobData, delay)

Expand Down
16 changes: 14 additions & 2 deletions src/drivers/knex_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,19 @@ export class KnexAdapter implements Adapter {
const timestamp = Date.now()
const score = calculateScore(priority, timestamp)

await this.#connection(this.#jobsTable).insert({
const query = this.#connection(this.#jobsTable).insert({
id: jobData.id,
queue,
status: 'pending',
data: JSON.stringify(jobData),
score,
})

if (jobData.dedup) {
await query.onConflict(['id', 'queue']).ignore()
} else {
await query
}
}

async pushLater(jobData: JobData, delay: number): Promise<void> {
Expand All @@ -386,13 +392,19 @@ export class KnexAdapter implements Adapter {
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
const executeAt = Date.now() + delay

await this.#connection(this.#jobsTable).insert({
const query = this.#connection(this.#jobsTable).insert({
id: jobData.id,
queue,
status: 'delayed',
data: JSON.stringify(jobData),
execute_at: executeAt,
})

if (jobData.dedup) {
await query.onConflict(['id', 'queue']).ignore()
} else {
await query
}
}

async pushMany(jobs: JobData[]): Promise<void> {
Expand Down
48 changes: 46 additions & 2 deletions src/drivers/redis_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ const PUSH_JOB_SCRIPT = `
return 1
`

/**
* Lua script for pushing a dedup job.
* Uses HSETNX to only store data if the job doesn't already exist.
* Only adds to pending ZSET if the job was newly created.
*/
const PUSH_DEDUP_JOB_SCRIPT = `
local data_key = KEYS[1]
local pending_key = KEYS[2]
local job_id = ARGV[1]
local job_data = ARGV[2]
local score = tonumber(ARGV[3])

local added = redis.call('HSETNX', data_key, job_id, job_data)
if added == 1 then
redis.call('ZADD', pending_key, score, job_id)
end

return added
`

/**
* Lua script for pushing a delayed job.
* Stores job data in the central hash and adds jobId to delayed ZSET.
Expand All @@ -52,6 +72,26 @@ const PUSH_DELAYED_JOB_SCRIPT = `
return 1
`

/**
* Lua script for pushing a dedup delayed job.
* Uses HSETNX to only store data if the job doesn't already exist.
* Only adds to delayed ZSET if the job was newly created.
*/
const PUSH_DEDUP_DELAYED_JOB_SCRIPT = `
local data_key = KEYS[1]
local delayed_key = KEYS[2]
local job_id = ARGV[1]
local job_data = ARGV[2]
local execute_at = tonumber(ARGV[3])

local added = redis.call('HSETNX', data_key, job_id, job_data)
if added == 1 then
redis.call('ZADD', delayed_key, execute_at, job_id)
end

return added
`

/**
* Lua script for atomic job acquisition.
* 1. Check and process delayed jobs
Expand Down Expand Up @@ -620,8 +660,10 @@ export class RedisAdapter implements Adapter {
const keys = this.#getKeys(queue)
const executeAt = Date.now() + delay

const script = jobData.dedup ? PUSH_DEDUP_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT

await this.#connection.eval(
PUSH_DELAYED_JOB_SCRIPT,
script,
2,
keys.data,
keys.delayed,
Expand All @@ -637,8 +679,10 @@ export class RedisAdapter implements Adapter {
const timestamp = Date.now()
const score = calculateScore(priority, timestamp)

const script = jobData.dedup ? PUSH_DEDUP_JOB_SCRIPT : PUSH_JOB_SCRIPT

await this.#connection.eval(
PUSH_JOB_SCRIPT,
script,
2,
keys.data,
keys.pending,
Expand Down
50 changes: 44 additions & 6 deletions src/job_dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import { parse } from './utils.js'
*
* ```
* Job.dispatch(payload)
* .toQueue('emails') // optional: target queue
* .priority(1) // optional: 1-10, lower = higher priority
* .in('5m') // optional: delay before processing
* .with('redis') // optional: specific adapter
* .run() // dispatch the job
* .toQueue('emails') // optional: target queue
* .priority(1) // optional: 1-10, lower = higher priority
* .in('5m') // optional: delay before processing
* .dedup({ id: 'order-123' }) // optional: deduplication
* .with('redis') // optional: specific adapter
* .run() // dispatch the job
* ```
*
* @typeParam T - The payload type for this job
Expand Down Expand Up @@ -47,6 +48,7 @@ export class JobDispatcher<T> {
#delay?: Duration
#priority?: number
#groupId?: string
#dedup?: { id: string }

/**
* Create a new job dispatcher.
Expand Down Expand Up @@ -148,6 +150,41 @@ export class JobDispatcher<T> {
return this
}

/**
* Configure deduplication for this job.
*
* When deduplication is configured, the adapter will silently skip
* the job if one with the same dedup ID already exists in the queue.
* The ID is automatically prefixed with the job name to prevent
* collisions between different job types.
*
* @param options - Deduplication options
* @param options.id - Unique deduplication key
* @returns This dispatcher for chaining
*
* @example
* ```typescript
* // Prevent duplicate invoice jobs for the same order
* await SendInvoiceJob.dispatch({ orderId: 123 })
* .dedup({ id: 'order-123' })
* .run()
*
* // Second dispatch with same dedup ID is silently skipped
* await SendInvoiceJob.dispatch({ orderId: 123 })
* .dedup({ id: 'order-123' })
* .run()
* ```
*/
dedup(options: { id: string }): this {
if (!options.id) {
throw new Error('Dedup ID must be a non-empty string')
}

this.#dedup = options

return this
}

/**
* Use a specific adapter for this job.
*
Expand Down Expand Up @@ -181,7 +218,7 @@ export class JobDispatcher<T> {
* ```
*/
async run(): Promise<DispatchResult> {
const id = randomUUID()
const id = this.#dedup ? `${this.#name}::${this.#dedup.id}` : randomUUID()

debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload)

Expand All @@ -197,6 +234,7 @@ export class JobDispatcher<T> {
priority: this.#priority,
groupId: this.#groupId,
createdAt: Date.now(),
...(this.#dedup ? { dedup: { id: this.#dedup.id } } : {}),
}

const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay }
Expand Down
11 changes: 11 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ export interface JobData {
* Injected by OTel plugin at dispatch time.
*/
traceContext?: Record<string, string>

/**
* Deduplication configuration for this job.
* When set, adapters use atomic insert-if-not-exists semantics
* to silently skip duplicate jobs with the same ID.
* Set automatically when `.dedup()` is called on the dispatcher.
*/
dedup?: {
/** The original dedup key provided by the caller (before name-prefixing). */
id: string
}
}

/**
Expand Down
12 changes: 11 additions & 1 deletion tests/_mocks/memory_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ export class MemoryAdapter implements Adapter {
}

async pushOn(queue: string, jobData: JobData): Promise<void> {
if (jobData.dedup) {
const existing = await this.getJob(jobData.id, queue)
if (existing) return
}

if (!this.#queues.has(queue)) {
this.#queues.set(queue, [])
}
Expand All @@ -62,7 +67,12 @@ export class MemoryAdapter implements Adapter {
return this.pushLaterOn('default', jobData, delay)
}

pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
if (jobData.dedup) {
const existing = await this.getJob(jobData.id, queue)
if (existing) return
}

if (!this.#delayedJobs.has(queue)) {
this.#delayedJobs.set(queue, new Map())
}
Expand Down
Loading