From 63a30675d57fbe0960d544e7c2738373c5d2a6f3 Mon Sep 17 00:00:00 2001 From: Joseph Sauto Date: Wed, 20 May 2026 10:42:33 -0400 Subject: [PATCH] feat: Enhance error handling and processing logic in API request queue - Refactor error code handling in to introduce NON_RETRYABLE_MONDAY_ERROR_CODES and NON_FORWARDABLE_PARTIAL_CODES. - Update to spread job data when consuming messages. - Modify to handle partial data and implement new upload and push callback logic. - Introduce new utility functions in for better error classification and handling. - Enhance tests for , , and to cover new error handling scenarios and partial data forwarding. - Update to return partial results when applicable. - Ensure that retry logic respects new error classifications and bypass conditions. --- src/constants/monday-error-codes.ts | 11 +- src/consumers/sqs/sqs-consumer.ts | 4 +- .../api-request-queue-processor.ts | 87 ++++---- .../callback/callback-queue-processor.ts | 10 +- .../base-call-monday-api-task.ts | 18 +- .../monday-client-call-monday-api-task.ts | 54 +++-- src/types/task-types.ts | 1 + src/utils/retry-utils.ts | 45 ++++- test/api-request-queue-processor.test.ts | 181 +++++++++++++++++ test/callback-queue-processor.test.ts | 38 ++++ ...monday-client-call-monday-api-task.test.ts | 187 ++++++++++++++++++ test/retry-utils.test.ts | 153 +++++++++++++- test/sqs-consumer.test.ts | 17 +- 13 files changed, 722 insertions(+), 84 deletions(-) diff --git a/src/constants/monday-error-codes.ts b/src/constants/monday-error-codes.ts index 9f98106..e1157af 100644 --- a/src/constants/monday-error-codes.ts +++ b/src/constants/monday-error-codes.ts @@ -5,10 +5,11 @@ export enum MondayErrorCodes { MAX_CONCURRENCY_EXCEEDED = "MAX_CONCURRENCY_EXCEEDED", } -export const TERMINAL_MONDAY_ERROR_CODES: ReadonlySet = new Set([ +export const NON_RETRYABLE_MONDAY_ERROR_CODES: ReadonlySet = new Set([ "ColumnValueException", "CorrectedValueException", "CreateBoardException", + "CursorException", "InvalidArgumentException", "InvalidBoardIdException", "InvalidColumnIdException", @@ -31,3 +32,11 @@ export const RETRYABLE_HTTP_STATUS_CODES: ReadonlySet = new Set([ StatusCodes.LOCKED, StatusCodes.TOO_MANY_REQUESTS, ]); + +// Errors that must fail completely even when partial data is present. +// The DLQ consumer in mdp-backend handles recovery for these (e.g. re-issuing +// a fresh cursor request). Forwarding partial data would delete the SQS message +// before the DLQ consumer can act on it. +export const NON_FORWARDABLE_PARTIAL_CODES: ReadonlySet = new Set([ + "CursorException", +]); diff --git a/src/consumers/sqs/sqs-consumer.ts b/src/consumers/sqs/sqs-consumer.ts index 0434156..c8e37da 100644 --- a/src/consumers/sqs/sqs-consumer.ts +++ b/src/consumers/sqs/sqs-consumer.ts @@ -42,8 +42,8 @@ export abstract class SqsConsumer extends BaseConsumer { async handleMessage(message: Message): Promise { const messageId = message.MessageId; - const data = JSON.parse(message.Body || "{}") as QueueJob; - await this.consume({ data, id: messageId }); + const job = JSON.parse(message.Body || "{}") as QueueJob; + await this.consume({ ...job, id: messageId }); } async start() { diff --git a/src/queue-processors/api-request/api-request-queue-processor.ts b/src/queue-processors/api-request/api-request-queue-processor.ts index d241496..0ba97d3 100644 --- a/src/queue-processors/api-request/api-request-queue-processor.ts +++ b/src/queue-processors/api-request/api-request-queue-processor.ts @@ -11,6 +11,7 @@ import { buildErrorContext, calculateBackoffDelay, getRetryAfterSeconds, + isRateLimitMondayError, isRetryableMondayError, } from "../../utils/retry-utils"; @@ -40,13 +41,48 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor< ); } - private shouldRetryApiCall(error: unknown): boolean { + private shouldRetryApiCall(error: unknown): { + retry: boolean; + bypass: boolean; + } { if (error instanceof MondayError) { const mondayErrors = error.mondayErrors || []; - return mondayErrors.some(isRetryableMondayError); + const retry = mondayErrors.some(isRetryableMondayError); + const bypass = retry && isRateLimitMondayError(mondayErrors); + return { retry, bypass }; + } + // Unknown error: retryable but does NOT bypass maxRetries + return { retry: true, bypass: false }; + } + + private async uploadAndPushCallback( + job: ApiRequestQueueJob, + data: unknown, + status: "success" | "partial", + ): Promise { + const s3UploadResponse = await this.fileUploadTask.execute(data, uuidv4()); + if (!s3UploadResponse.success || !s3UploadResponse.data) { + throw new JobError("File upload failed", true, s3UploadResponse); + } + + const callbackJob: CallbackQueueJob = { + data: s3UploadResponse.data, + callbackUrl: job.callbackUrl ?? "", + payload: { + ...(job.metadata && { metadata: job.metadata }), + result: s3UploadResponse.data, + status, + token: job.token, + query: job.query, + variables: job.variables, + }, + headers: job.headers, + }; + + const pushResult = await this.pushToCallbackQueueTask.execute(callbackJob); + if (!pushResult.success) { + throw new JobError("Push to next queue failed", true, pushResult.error); } - // unknown error, assume retryable - return true; } async performJob(job: ApiRequestQueueJob): Promise { @@ -56,9 +92,15 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor< query, variables, ); + + if (apiResponse.partial && apiResponse.data) { + await this.uploadAndPushCallback(job, apiResponse.data, "partial"); + return; + } + if (!apiResponse.success || !apiResponse.data) { - const shouldRetryApiCall = this.shouldRetryApiCall(apiResponse.error); - if (shouldRetryApiCall) { + const { retry, bypass } = this.shouldRetryApiCall(apiResponse.error); + if (retry) { const retryAfter = this.getRetryAfterValue( apiResponse.error as MondayError, attempts, @@ -68,7 +110,7 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor< true, buildErrorContext(apiResponse.error, attempts, retryAfter), { delay: retryAfter }, - true, + bypass, ); } throw new JobError( @@ -78,35 +120,6 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor< ); } - const s3UploadResponse = await this.fileUploadTask.execute( - apiResponse.data, - uuidv4(), - ); - if (!s3UploadResponse.success || !s3UploadResponse.data) { - throw new JobError("File upload failed", true, s3UploadResponse); - } - - const callbackJob: CallbackQueueJob = { - data: s3UploadResponse.data, - callbackUrl: job.callbackUrl ?? "", - payload: { - ...(job.metadata && { metadata: job.metadata }), - result: s3UploadResponse.data, - status: "success", - token: job.token, - query: job.query, - variables: job.variables, - }, - headers: job.headers, - }; - const pushToNextQueueResult = - await this.pushToCallbackQueueTask.execute(callbackJob); - if (!pushToNextQueueResult.success) { - throw new JobError( - "Push to next queue failed", - true, - pushToNextQueueResult.error, - ); - } + await this.uploadAndPushCallback(job, apiResponse.data, "success"); } } diff --git a/src/queue-processors/callback/callback-queue-processor.ts b/src/queue-processors/callback/callback-queue-processor.ts index 16037fc..3421fcc 100644 --- a/src/queue-processors/callback/callback-queue-processor.ts +++ b/src/queue-processors/callback/callback-queue-processor.ts @@ -2,9 +2,11 @@ import { BaseQueueProcessor } from "../base-queue-processor"; import { CallbackQueueJob } from "../../types/queue-types"; import { CallbackProcessorOptions } from "../../types/processor-types"; import { BaseCallEndpointTask } from "../../tasks/call-endpoint-task/base-call-endpoint-task"; -import { StatusCodes } from "http-status-codes"; import { JobError } from "../../errors/job-error"; -import { calculateBackoffDelay } from "../../utils/retry-utils"; +import { + calculateBackoffDelay, + isRetryableCallbackStatus, +} from "../../utils/retry-utils"; export class CallbackQueueProcessor extends BaseQueueProcessor< CallbackQueueJob, @@ -25,9 +27,7 @@ export class CallbackQueueProcessor extends BaseQueueProcessor< options, ); if (!result.success) { - const { status } = result; - const shouldRetryJob = - status >= StatusCodes.INTERNAL_SERVER_ERROR || !status; + const shouldRetryJob = isRetryableCallbackStatus(result.status); throw new JobError("Callback task failed", shouldRetryJob, result.error, { delay: calculateBackoffDelay(job.attempts), }); diff --git a/src/tasks/call-monday-api-task/base-call-monday-api-task.ts b/src/tasks/call-monday-api-task/base-call-monday-api-task.ts index d3df928..06108f3 100644 --- a/src/tasks/call-monday-api-task/base-call-monday-api-task.ts +++ b/src/tasks/call-monday-api-task/base-call-monday-api-task.ts @@ -21,20 +21,18 @@ export abstract class BaseCallMondayApiTask try { const result = await this.executeMondayRequest(token, query, variables); if (!result.success) { + return { success: false, error: result.error }; + } + if (result.partial) { return { - success: false, - error: result.error, + success: true, + partial: true, + data: result.data as CallMondayApiResultData, }; } - return { - success: true, - data: result.data as CallMondayApiResultData, - }; + return { success: true, data: result.data as CallMondayApiResultData }; } catch (error) { - return { - success: false, - error, - }; + return { success: false, error }; } } } diff --git a/src/tasks/call-monday-api-task/monday-client-call-monday-api-task.ts b/src/tasks/call-monday-api-task/monday-client-call-monday-api-task.ts index 25cc45e..ac58fa3 100644 --- a/src/tasks/call-monday-api-task/monday-client-call-monday-api-task.ts +++ b/src/tasks/call-monday-api-task/monday-client-call-monday-api-task.ts @@ -5,6 +5,11 @@ import { BaseCallMondayApiTask } from "./base-call-monday-api-task"; import { CallEndpointResult } from "../../types/task-types"; import { StatusCodes } from "http-status-codes"; import { MondayError } from "../../errors/monday-error"; +import { MondayApiError } from "../../types/monday-types"; +import { + isDlqRecoveryError, + isRetryableMondayError, +} from "../../utils/retry-utils"; export interface MondayClientCallMondayApiTaskOptions { apiVersion?: string; @@ -27,23 +32,48 @@ export class MondayClientCallMondayApiTask extends BaseCallMondayApiTask { return toFailureResult(new Error("API key is required")); } try { - const client = new ApiClient({ token, apiVersion: this.apiVersion }); + // errorPolicy:'all' makes rawRequest resolve with {data,errors} instead of throwing + // for application-level errors. Transport errors (non-200 HTTP) still throw ClientError. + // Source: https://developer.monday.com/api-reference/docs/javascript-sdk#error-handling + const client = new ApiClient({ + token, + apiVersion: this.apiVersion, + requestConfig: { errorPolicy: "all" }, + }); const { data, errors, extensions } = await client.rawRequest( query, variables, ); - if (errors && errors.length > 0) { - throw new MondayError("Monday.com API returned application errors", { - response: { data, errors }, - mondayErrors: errors, - partialData: data, - }); + + if (!errors || errors.length === 0) { + return { + success: true, + status: StatusCodes.OK, + data: { data, errors, extensions }, + }; } - return { - success: true, - status: StatusCodes.OK, - data: { data, errors, extensions }, - }; + + const mondayErrors = errors as MondayApiError[]; + const hasRetryableError = mondayErrors.some(isRetryableMondayError); + const hasDlqRecoveryError = mondayErrors.some(isDlqRecoveryError); + + // Non-retryable errors with partial data → forward to caller as a partial result, + // unless the error requires DLQ recovery (e.g. CursorException). + if (!hasRetryableError && !hasDlqRecoveryError && data != null) { + return { + success: true, + status: StatusCodes.OK, + data: { data, errors: mondayErrors, extensions }, + partial: true, + }; + } + + // Retryable errors or total failure → failure result for retry/DLQ routing + throw new MondayError("Monday.com API returned application errors", { + response: { data, errors: mondayErrors }, + mondayErrors, + partialData: data, + }); } catch (error) { return toFailureResult(error); } diff --git a/src/types/task-types.ts b/src/types/task-types.ts index 76c8822..13668c1 100644 --- a/src/types/task-types.ts +++ b/src/types/task-types.ts @@ -2,6 +2,7 @@ export interface TaskResult { success: boolean; data?: TaskData; error?: unknown; + partial?: boolean; } export interface FileUploadResultData { diff --git a/src/utils/retry-utils.ts b/src/utils/retry-utils.ts index fbaf685..436378b 100644 --- a/src/utils/retry-utils.ts +++ b/src/utils/retry-utils.ts @@ -2,11 +2,13 @@ import { StatusCodes } from "http-status-codes"; import { MondayError } from "../errors/monday-error"; import { MondayApiError } from "../types/monday-types"; import { + MondayErrorCodes, + NON_FORWARDABLE_PARTIAL_CODES, RETRYABLE_HTTP_STATUS_CODES, - TERMINAL_MONDAY_ERROR_CODES, + NON_RETRYABLE_MONDAY_ERROR_CODES, } from "../constants/monday-error-codes"; -export { TERMINAL_MONDAY_ERROR_CODES } from "../constants/monday-error-codes"; +export { NON_RETRYABLE_MONDAY_ERROR_CODES } from "../constants/monday-error-codes"; export function buildErrorContext( error: unknown, @@ -52,15 +54,50 @@ function isRetryableStatusCode(status: number): boolean { } function isTerminalCode(code: string | undefined): boolean { - return !!code && TERMINAL_MONDAY_ERROR_CODES.has(code); + return !!code && NON_RETRYABLE_MONDAY_ERROR_CODES.has(code); } export function isRetryableMondayError(error: MondayApiError): boolean { const { status_code, code, error_code } = error.extensions ?? {}; + // Denylist wins regardless of status_code (e.g. CursorException arrives with status_code: 200) + if (isTerminalCode(code ?? error_code)) return false; if (typeof status_code === "number" && status_code > 0) { return isRetryableStatusCode(status_code); } - return !isTerminalCode(code ?? error_code); + return true; +} + +export function isDlqRecoveryError(error: MondayApiError): boolean { + const { code, error_code } = error.extensions ?? {}; + const c = code ?? error_code; + return !!c && NON_FORWARDABLE_PARTIAL_CODES.has(c); +} + +export function isRateLimitMondayError( + mondayErrors: MondayApiError[], +): boolean { + return mondayErrors.some((e) => { + const { status_code, code, error_code } = e.extensions ?? {}; + if ( + typeof status_code === "number" && + RETRYABLE_HTTP_STATUS_CODES.has(status_code) + ) { + return true; + } + const c = code ?? error_code; + return ( + c === MondayErrorCodes.COMPLEXITY_BUDGET_EXHAUSTED || + c === MondayErrorCodes.MAX_CONCURRENCY_EXCEEDED + ); + }); +} + +export function isRetryableCallbackStatus(status: number | undefined): boolean { + if (!status) return true; + return ( + RETRYABLE_HTTP_STATUS_CODES.has(status) || + status >= StatusCodes.INTERNAL_SERVER_ERROR + ); } export function getRetryAfterSeconds( diff --git a/test/api-request-queue-processor.test.ts b/test/api-request-queue-processor.test.ts index 98b3b57..754232d 100644 --- a/test/api-request-queue-processor.test.ts +++ b/test/api-request-queue-processor.test.ts @@ -288,6 +288,80 @@ describe("ApiRequestQueueProcessor", () => { expect(delay).toBeLessThanOrEqual(2); }); + test("unknown/network error respects maxRetries — does NOT bypass the limit", async () => { + // G2: non-MondayError must not set bypassRetryLimit=true; it should obey maxRetries + const processorWithZeroRetries = new ApiRequestQueueProcessor({ + callMondayApiTask: mockCallMondayApiTask, + fileUploadTask: mockFileUploadTask, + pushToCallbackQueueTask: mockPushToCallbackQueueTask, + requeueTask: mockRequeueTask, + maxRetries: 0, + shouldRetryOnGeneralError: false, + }); + + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: false, + error: new Error("ECONNRESET"), + }); + + const outcome = await processorWithZeroRetries.process({ ...validJob, attempts: 5 }); + + expect(outcome.status).toBe("failed"); + expect(mockRequeueTask.execute).not.toHaveBeenCalled(); + }); + + test("rate-limit MondayError still bypasses maxRetries", async () => { + // G2: rate-limit signals retain bypassRetryLimit=true + const processorWithZeroRetries = new ApiRequestQueueProcessor({ + callMondayApiTask: mockCallMondayApiTask, + fileUploadTask: mockFileUploadTask, + pushToCallbackQueueTask: mockPushToCallbackQueueTask, + requeueTask: mockRequeueTask, + maxRetries: 0, + shouldRetryOnGeneralError: false, + }); + + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: false, + error: new MondayError("Rate limited", { + mondayErrors: [{ extensions: { code: "COMPLEXITY_BUDGET_EXHAUSTED", status_code: 429, retry_in_seconds: 30 } }], + }), + }); + (mockRequeueTask.execute as Mock).mockResolvedValue({ + success: true, + data: { messageId: "bypass-ok", success: true }, + }); + + const outcome = await processorWithZeroRetries.process({ ...validJob, attempts: 5 }); + + expect(outcome.status).toBe("requeued"); + expect(mockRequeueTask.execute).toHaveBeenCalled(); + }); + + test("retryable 500 MondayError does NOT bypass maxRetries", async () => { + // G2: server errors are retryable but not rate-limit signals — they respect maxRetries + const processorWithZeroRetries = new ApiRequestQueueProcessor({ + callMondayApiTask: mockCallMondayApiTask, + fileUploadTask: mockFileUploadTask, + pushToCallbackQueueTask: mockPushToCallbackQueueTask, + requeueTask: mockRequeueTask, + maxRetries: 0, + shouldRetryOnGeneralError: false, + }); + + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: false, + error: new MondayError("Server error", { + mondayErrors: [{ extensions: { status_code: 500 } }], + }), + }); + + const outcome = await processorWithZeroRetries.process({ ...validJob, attempts: 5 }); + + expect(outcome.status).toBe("failed"); + expect(mockRequeueTask.execute).not.toHaveBeenCalled(); + }); + test("should bypass maxRetries for retryable Monday errors", async () => { const processorWithZeroRetries = new ApiRequestQueueProcessor({ callMondayApiTask: mockCallMondayApiTask, @@ -329,6 +403,34 @@ describe("ApiRequestQueueProcessor", () => { ); }); + test("should not requeue CursorException — must reach DLQ so dlq-consumer can issue a fresh boardItems request", async () => { + // CursorException arrives with status_code: 200 from Monday. Without the denylist-first fix, + // isRetryableMondayError short-circuited on status_code=200 → retryable=true → infinite requeue + // → SQS receive-count never hit → DLQ consumer starved → export stuck in_progress forever. + const cursorError = new MondayError("Monday.com API returned application errors", { + mondayErrors: [ + { + message: "CursorExpiredError: The cursor provided for pagination has expired.", + extensions: { + code: "CursorException", + status_code: 200, + error_code: "CURSOR_EXPIRED", + }, + }, + ], + }); + + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: false, + error: cursorError, + }); + + const outcome = await processor.process(validJob); + + expect(outcome.status).toBe("failed"); + expect(mockRequeueTask.execute).not.toHaveBeenCalled(); + }); + test("should route partial-error MondayError (FIELD_MINUTE_RATE_LIMIT_EXCEEDED) into requeue with retry_in_seconds delay", async () => { const partialError = new MondayError( "Monday.com API returned application errors", @@ -405,6 +507,85 @@ describe("ApiRequestQueueProcessor", () => { }); }); + describe("process - partial data forwarding (G1)", () => { + test("forwards partial data to callback queue with status:'partial' when task returns partial:true", async () => { + const partialData = { boards: [{ id: "1" }] }; + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: true, + partial: true, + data: partialData, + }); + (mockFileUploadTask.execute as Mock).mockResolvedValue({ + success: true, + data: { fileUrl: "https://s3.amazonaws.com/bucket/partial.json" }, + }); + (mockPushToCallbackQueueTask.execute as Mock).mockResolvedValue({ + success: true, + }); + + const outcome = await processor.process(validJob); + + expect(outcome.status).toBe("success"); + expect(mockPushToCallbackQueueTask.execute).toHaveBeenCalledWith( + expect.objectContaining({ + payload: expect.objectContaining({ status: "partial" }), + }), + ); + expect(mockRequeueTask.execute).not.toHaveBeenCalled(); + }); + + test("partial path still uploads to S3 before pushing callback", async () => { + const partialData = { boards: [{ id: "2" }] }; + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: true, + partial: true, + data: partialData, + }); + (mockFileUploadTask.execute as Mock).mockResolvedValue({ + success: true, + data: { fileUrl: "https://s3.amazonaws.com/bucket/partial.json" }, + }); + (mockPushToCallbackQueueTask.execute as Mock).mockResolvedValue({ + success: true, + }); + + await processor.process(validJob); + + expect(mockFileUploadTask.execute).toHaveBeenCalledWith( + partialData, + expect.any(String), + ); + expect(mockPushToCallbackQueueTask.execute).toHaveBeenCalledWith( + expect.objectContaining({ + payload: expect.objectContaining({ + status: "partial", + result: { fileUrl: "https://s3.amazonaws.com/bucket/partial.json" }, + }), + }), + ); + }); + + test("partial path retries if S3 upload fails", async () => { + (mockCallMondayApiTask.execute as Mock).mockResolvedValue({ + success: true, + partial: true, + data: { boards: [] }, + }); + (mockFileUploadTask.execute as Mock).mockResolvedValue({ + success: false, + error: "S3 down", + }); + (mockRequeueTask.execute as Mock).mockResolvedValue({ + success: true, + data: { messageId: "retry-partial-s3", success: true }, + }); + + const outcome = await processor.process(validJob); + + expect(outcome.status).toBe("requeued"); + }); + }); + describe("process - file upload failures", () => { test("should retry on file upload failure", async () => { const mockApiResponse: TaskResult = { diff --git a/test/callback-queue-processor.test.ts b/test/callback-queue-processor.test.ts index 78054dc..2e1b958 100644 --- a/test/callback-queue-processor.test.ts +++ b/test/callback-queue-processor.test.ts @@ -213,6 +213,44 @@ describe("CallbackQueueProcessor", () => { } }); + test("should retry on 429 Too Many Requests from callback endpoint", async () => { + const mockCallbackResponse: CallEndpointResult = { + success: false, + status: 429, + error: new Error("Too Many Requests"), + }; + (mockCallEndpointTask.execute as Mock).mockResolvedValue(mockCallbackResponse); + (mockRequeueTask.execute as Mock).mockResolvedValue({ + success: true, + data: { messageId: "retry-msg-429", success: true }, + }); + + const outcome = await processor.process(validJob); + + expect(outcome.status).toBe("requeued"); + expect(mockRequeueTask.execute).toHaveBeenCalledWith( + { ...validJob, attempts: 1 }, + { delay: expect.any(Number) }, + ); + }); + + test("should retry on 423 Locked from callback endpoint", async () => { + const mockCallbackResponse: CallEndpointResult = { + success: false, + status: 423, + error: new Error("Locked"), + }; + (mockCallEndpointTask.execute as Mock).mockResolvedValue(mockCallbackResponse); + (mockRequeueTask.execute as Mock).mockResolvedValue({ + success: true, + data: { messageId: "retry-msg-423", success: true }, + }); + + const outcome = await processor.process(validJob); + + expect(outcome.status).toBe("requeued"); + }); + test("should not retry on 4xx client errors", async () => { const clientErrorStatusCodes = [400, 401, 403, 404, 422]; diff --git a/test/monday-client-call-monday-api-task.test.ts b/test/monday-client-call-monday-api-task.test.ts index 237be07..42ee4ec 100644 --- a/test/monday-client-call-monday-api-task.test.ts +++ b/test/monday-client-call-monday-api-task.test.ts @@ -27,6 +27,7 @@ import { import { MondayError } from "../src/errors/monday-error"; import { isRetryableMondayError, + getRetryAfterSeconds, } from "../src/utils/retry-utils"; function makeClientError(response: { @@ -273,4 +274,190 @@ describe("MondayClientCallMondayApiTask.executeMondayRequest", () => { expect(result.status).toBe(503); expect(result.error).toBeInstanceOf(MondayError); }); + + // fixture 12.2b — non-retryable error with partial data → partial success forwarded to caller + test("returns partial success when all errors are non-retryable and data is present", async () => { + rawRequestMock.mockResolvedValue({ + data: { boards: [{ id: "1" }] }, + errors: [ + { + message: "Column not found", + extensions: { code: "InvalidColumnIdException", status_code: 400 }, + }, + ], + extensions: undefined, + headers: new Headers(), + status: 200, + }); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { boards { id } }"); + + expect(result.success).toBe(true); + expect(result.partial).toBe(true); + expect((result.data as { data: unknown }).data).toEqual({ boards: [{ id: "1" }] }); + }); + + // fixture 12.2c — non-retryable error with null data → total failure (no forwarding) + test("returns failure when all errors are non-retryable and data is null", async () => { + rawRequestMock.mockResolvedValue({ + data: null, + errors: [ + { + message: "Unauthorized", + extensions: { code: "UserUnauthorizedException", status_code: 403 }, + }, + ], + extensions: undefined, + headers: new Headers(), + status: 200, + }); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { boards { id } }"); + + expect(result.success).toBe(false); + expect(result.partial).toBeUndefined(); + expect(result.error).toBeInstanceOf(MondayError); + }); + + // fixture 12.2d — mixed retryable + non-retryable errors with data → retry path (not partial forward) + test("retries when mixed errors include a retryable one, even with partial data present", async () => { + rawRequestMock.mockResolvedValue({ + data: { boards: [{ id: "1" }] }, + errors: [ + { message: "Rate limited", extensions: { code: "FIELD_MINUTE_RATE_LIMIT_EXCEEDED", status_code: 429, retry_in_seconds: 10 } }, + { message: "Column not found", extensions: { code: "InvalidColumnIdException", status_code: 400 } }, + ], + extensions: undefined, + headers: new Headers(), + status: 200, + }); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { boards { id } }"); + + expect(result.success).toBe(false); + expect(result.partial).toBeUndefined(); + const mondayError = result.error as MondayError; + expect(mondayError).toBeInstanceOf(MondayError); + expect(mondayError.mondayErrors?.some(isRetryableMondayError)).toBe(true); + }); + + // fixture 12.3 — full 429 ClientError thrown by rawRequest + test("classifies 429 ClientError thrown by rawRequest as retryable with retry_in_seconds delay", async () => { + const clientError = new ClientError( + { + data: null, + errors: [ + { + message: "Too Many Requests", + extensions: { status_code: 429, retry_in_seconds: 10, code: "RATE_LIMIT" }, + }, + ] as never, + status: 429, + headers: new Headers(), + } as never, + { query: "test" } as never, + ); + rawRequestMock.mockRejectedValue(clientError); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { boards { id } }"); + + expect(result.success).toBe(false); + expect(result.status).toBe(429); + const mondayError = result.error as MondayError; + expect(mondayError).toBeInstanceOf(MondayError); + expect(mondayError.mondayErrors?.some(isRetryableMondayError)).toBe(true); + expect(getRetryAfterSeconds(mondayError.mondayErrors)).toBe(10); + }); + + // fixture 12.4 — rawRequest resolves with CursorException partial (per-board cursor expired) + test("throws MondayError when rawRequest resolves with CursorException on one of N aliased boards", async () => { + rawRequestMock.mockResolvedValue({ + data: { + board_1234567890: { items: [], cursor: "next" }, + board_9876543210: null, + }, + errors: [ + { + message: "CursorExpiredError: The cursor provided for pagination has expired.", + path: ["board_9876543210"], + extensions: { code: "CursorException", status_code: 200, error_code: "CURSOR_EXPIRED" }, + }, + ], + extensions: undefined, + headers: new Headers(), + status: 200, + }); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { ... }"); + + expect(result.success).toBe(false); + const mondayError = result.error as MondayError; + expect(mondayError).toBeInstanceOf(MondayError); + // CursorException is terminal — retrying with an expired cursor is futile, DLQ consumer handles recovery + expect(mondayError.mondayErrors?.some(isRetryableMondayError)).toBe(false); + expect(mondayError.partialData).toMatchObject({ board_1234567890: { cursor: "next" } }); + }); + + // fixture 12.5 — InvalidBoardIdException ClientError thrown by rawRequest (terminal) + test("classifies InvalidBoardIdException ClientError as terminal (shouldRetry=false)", async () => { + const clientError = new ClientError( + { + data: null, + errors: [ + { + message: "Invalid board id", + extensions: { code: "InvalidBoardIdException", status_code: 400 }, + }, + ] as never, + status: 400, + headers: new Headers(), + } as never, + { query: "test" } as never, + ); + rawRequestMock.mockRejectedValue(clientError); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { boards { id } }"); + + expect(result.success).toBe(false); + expect(result.status).toBe(400); + const mondayError = result.error as MondayError; + expect(mondayError).toBeInstanceOf(MondayError); + expect(mondayError.mondayErrors?.some(isRetryableMondayError)).toBe(false); + }); + + // fixture 12.6 — rawRequest resolves with data:null + COMPLEXITY_BUDGET_EXHAUSTED + test("throws MondayError with retry_in_seconds when rawRequest resolves with COMPLEXITY_BUDGET_EXHAUSTED and null data", async () => { + rawRequestMock.mockResolvedValue({ + data: null, + errors: [ + { + message: "Complexity budget exhausted", + extensions: { + code: "COMPLEXITY_BUDGET_EXHAUSTED", + complexity: 5000000, + complexity_budget_left: 0, + retry_in_seconds: 60, + }, + }, + ], + extensions: undefined, + headers: new Headers(), + status: 200, + }); + + const task = new MondayClientCallMondayApiTask(); + const result = await task.executeMondayRequest("token", "query { boards { id } }"); + + expect(result.success).toBe(false); + const mondayError = result.error as MondayError; + expect(mondayError).toBeInstanceOf(MondayError); + expect(mondayError.mondayErrors?.some(isRetryableMondayError)).toBe(true); + expect(getRetryAfterSeconds(mondayError.mondayErrors)).toBe(60); + }); }); diff --git a/test/retry-utils.test.ts b/test/retry-utils.test.ts index ea65216..0869447 100644 --- a/test/retry-utils.test.ts +++ b/test/retry-utils.test.ts @@ -2,8 +2,11 @@ import { expect, test, describe } from "vitest"; import { StatusCodes } from "http-status-codes"; import { getRetryAfterSeconds, + isDlqRecoveryError, isRetryableMondayError, - TERMINAL_MONDAY_ERROR_CODES, + isRateLimitMondayError, + isRetryableCallbackStatus, + NON_RETRYABLE_MONDAY_ERROR_CODES, } from "../src/utils/retry-utils"; import { MondayApiError, MondayErrorExtensions } from "../src/types/monday-types"; @@ -55,19 +58,29 @@ describe("isRetryableMondayError", () => { }); describe("Code denylist (self-testing via exported Set)", () => { - test.each([...TERMINAL_MONDAY_ERROR_CODES])( + test.each([...NON_RETRYABLE_MONDAY_ERROR_CODES])( "code %s with no status_code is terminal", (code) => { expect(isRetryableMondayError(err({ code }))).toBe(false); }, ); - test.each([...TERMINAL_MONDAY_ERROR_CODES])( + test.each([...NON_RETRYABLE_MONDAY_ERROR_CODES])( "error_code %s with no status_code is terminal", (code) => { expect(isRetryableMondayError(err({ error_code: code }))).toBe(false); }, ); + + test("CursorException is terminal even when status_code is 200 (regression: status_code must not override denylist)", () => { + // CursorException arrives with status_code: 200; old code short-circuited on status_code and + // returned retryable=true, starving the DLQ consumer that handles cursor-reissue recovery. + expect( + isRetryableMondayError( + err({ code: "CursorException", status_code: StatusCodes.OK }), + ), + ).toBe(false); + }); }); describe("Field-name handling (code vs error_code)", () => { @@ -173,6 +186,140 @@ describe("isRetryableMondayError", () => { }); }); +describe("isDlqRecoveryError", () => { + test("CursorException via code field is a DLQ recovery error", () => { + expect(isDlqRecoveryError(err({ code: "CursorException" }))).toBe(true); + }); + + test("CursorException via error_code field is a DLQ recovery error", () => { + expect(isDlqRecoveryError(err({ error_code: "CursorException" }))).toBe(true); + }); + + test("code takes precedence over error_code", () => { + expect(isDlqRecoveryError(err({ code: "CursorException", error_code: "SOMETHING_ELSE" }))).toBe(true); + }); + + test("non-DLQ error code is not a DLQ recovery error", () => { + expect(isDlqRecoveryError(err({ code: "InvalidArgumentException" }))).toBe(false); + }); + + test("missing extensions is not a DLQ recovery error", () => { + expect(isDlqRecoveryError(err({ noExtensions: true }))).toBe(false); + }); + + test("empty extensions is not a DLQ recovery error", () => { + expect(isDlqRecoveryError(err({}))).toBe(false); + }); +}); + +describe("isRateLimitMondayError", () => { + test("COMPLEXITY_BUDGET_EXHAUSTED with status 429 is a rate-limit signal", () => { + expect( + isRateLimitMondayError([ + err({ code: "COMPLEXITY_BUDGET_EXHAUSTED", status_code: StatusCodes.TOO_MANY_REQUESTS }), + ]), + ).toBe(true); + }); + + test("MAX_CONCURRENCY_EXCEEDED with status 429 is a rate-limit signal", () => { + expect( + isRateLimitMondayError([ + err({ code: "MAX_CONCURRENCY_EXCEEDED", status_code: StatusCodes.TOO_MANY_REQUESTS }), + ]), + ).toBe(true); + }); + + test("status_code 429 alone (novel code) is a rate-limit signal", () => { + expect( + isRateLimitMondayError([err({ status_code: StatusCodes.TOO_MANY_REQUESTS })]), + ).toBe(true); + }); + + test("status_code 423 (locked) is a rate-limit signal", () => { + expect( + isRateLimitMondayError([err({ status_code: StatusCodes.LOCKED })]), + ).toBe(true); + }); + + test("COMPLEXITY_BUDGET_EXHAUSTED with no status_code is a rate-limit signal", () => { + expect( + isRateLimitMondayError([err({ code: "COMPLEXITY_BUDGET_EXHAUSTED" })]), + ).toBe(true); + }); + + test("MAX_CONCURRENCY_EXCEEDED with no status_code is a rate-limit signal", () => { + expect( + isRateLimitMondayError([err({ code: "MAX_CONCURRENCY_EXCEEDED" })]), + ).toBe(true); + }); + + test("status_code 500 is NOT a rate-limit signal", () => { + expect( + isRateLimitMondayError([err({ status_code: StatusCodes.INTERNAL_SERVER_ERROR })]), + ).toBe(false); + }); + + test("denylist error (InvalidArgumentException) with no status is NOT a rate-limit signal", () => { + expect( + isRateLimitMondayError([err({ code: "InvalidArgumentException" })]), + ).toBe(false); + }); + + test("unknown retryable error (no status, no rate-limit code) is NOT a rate-limit signal", () => { + expect(isRateLimitMondayError([err({ code: "SOMETHING_NEW" })])).toBe(false); + }); + + test("returns true when at least one of multiple errors is a rate-limit signal", () => { + expect( + isRateLimitMondayError([ + err({ code: "INTERNAL_SERVER_ERROR", status_code: StatusCodes.INTERNAL_SERVER_ERROR }), + err({ code: "COMPLEXITY_BUDGET_EXHAUSTED" }), + ]), + ).toBe(true); + }); + + test("empty array returns false", () => { + expect(isRateLimitMondayError([])).toBe(false); + }); +}); + +describe("isRetryableCallbackStatus", () => { + test.each([ + [StatusCodes.INTERNAL_SERVER_ERROR, "500"], + [StatusCodes.BAD_GATEWAY, "502"], + [StatusCodes.SERVICE_UNAVAILABLE, "503"], + [StatusCodes.GATEWAY_TIMEOUT, "504"], + ])("status %i is retryable", (status) => { + expect(isRetryableCallbackStatus(status)).toBe(true); + }); + + test("status 429 is retryable", () => { + expect(isRetryableCallbackStatus(StatusCodes.TOO_MANY_REQUESTS)).toBe(true); + }); + + test("status 423 is retryable", () => { + expect(isRetryableCallbackStatus(StatusCodes.LOCKED)).toBe(true); + }); + + test.each([ + [StatusCodes.BAD_REQUEST, "400"], + [StatusCodes.UNAUTHORIZED, "401"], + [StatusCodes.FORBIDDEN, "403"], + [StatusCodes.NOT_FOUND, "404"], + [StatusCodes.UNPROCESSABLE_ENTITY, "422"], + ])("status %i is NOT retryable", (status) => { + expect(isRetryableCallbackStatus(status)).toBe(false); + }); + + test("undefined status is retryable (network error)", () => { + expect(isRetryableCallbackStatus(undefined)).toBe(true); + }); + + test("null-ish 0 status is retryable (unset)", () => { + expect(isRetryableCallbackStatus(0)).toBe(true); + }); +}); + describe("getRetryAfterSeconds", () => { test("returns undefined when input is undefined", () => { expect(getRetryAfterSeconds(undefined)).toBeUndefined(); diff --git a/test/sqs-consumer.test.ts b/test/sqs-consumer.test.ts index 421e807..5326861 100644 --- a/test/sqs-consumer.test.ts +++ b/test/sqs-consumer.test.ts @@ -222,7 +222,7 @@ describe("SqsConsumer", () => { await actualHandleMessage(message); expect(mockProcessor.process).toHaveBeenCalledWith({ - data: validJob, + ...validJob, id: "msg-123", }); }); @@ -240,7 +240,6 @@ describe("SqsConsumer", () => { expect(mockProcessor.process).toHaveBeenCalledWith({ id: "msg-456", - data: {}, }); }); @@ -257,7 +256,6 @@ describe("SqsConsumer", () => { expect(mockProcessor.process).toHaveBeenCalledWith({ id: "msg-456", - data: {}, }); }); @@ -274,7 +272,6 @@ describe("SqsConsumer", () => { expect(mockProcessor.process).toHaveBeenCalledWith({ id: "msg-456", - data: {}, }); }); @@ -299,7 +296,7 @@ describe("SqsConsumer", () => { await actualHandleMessage(message); expect(mockProcessor.process).toHaveBeenCalledWith({ - data: complexJob, + ...complexJob, id: "msg-complex", }); }); @@ -320,7 +317,7 @@ describe("SqsConsumer", () => { ); expect(mockProcessor.process).toHaveBeenCalledWith({ - data: validJob, + ...validJob, id: "msg-error", }); }); @@ -485,8 +482,8 @@ describe("SqsConsumer", () => { await actualHandleMessage(message); expect(mockProcessor.process).toHaveBeenCalledWith({ - data: validJob, - id: "sqs-message-id-123", // Should use SQS message ID, not original job ID + ...validJob, + id: "sqs-message-id-123", // SQS MessageId overrides any id in the job body }); }); @@ -502,7 +499,7 @@ describe("SqsConsumer", () => { await actualHandleMessage(message); expect(mockProcessor.process).toHaveBeenCalledWith({ - data: validJob, + ...validJob, id: undefined, }); }); @@ -532,7 +529,7 @@ describe("SqsConsumer", () => { await actualHandleMessage(message); expect(mockProcessor.process).toHaveBeenCalledWith({ - data: validJob, + ...validJob, id: "msg-integration", }); });