Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/constants/monday-error-codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ export enum MondayErrorCodes {
MAX_CONCURRENCY_EXCEEDED = "MAX_CONCURRENCY_EXCEEDED",
}

export const TERMINAL_MONDAY_ERROR_CODES: ReadonlySet<string> = new Set([
export const NON_RETRYABLE_MONDAY_ERROR_CODES: ReadonlySet<string> = new Set([
"ColumnValueException",
"CorrectedValueException",
"CreateBoardException",
"CursorException",
"InvalidArgumentException",
"InvalidBoardIdException",
"InvalidColumnIdException",
Expand All @@ -31,3 +32,11 @@ export const RETRYABLE_HTTP_STATUS_CODES: ReadonlySet<number> = new Set([
StatusCodes.LOCKED,
StatusCodes.TOO_MANY_REQUESTS,
]);

// Errors that must fail completely even when partial data is present.
// The DLQ consumer in mdp-backend handles recovery for these (e.g. re-issuing
// a fresh cursor request). Forwarding partial data would delete the SQS message
// before the DLQ consumer can act on it.
export const NON_FORWARDABLE_PARTIAL_CODES: ReadonlySet<string> = new Set([
"CursorException",
]);
4 changes: 2 additions & 2 deletions src/consumers/sqs/sqs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ export abstract class SqsConsumer extends BaseConsumer {

async handleMessage(message: Message): Promise<void | Message> {
const messageId = message.MessageId;
const data = JSON.parse(message.Body || "{}") as QueueJob;
await this.consume({ data, id: messageId });
const job = JSON.parse(message.Body || "{}") as QueueJob;
await this.consume({ ...job, id: messageId });
}

async start() {
Expand Down
87 changes: 50 additions & 37 deletions src/queue-processors/api-request/api-request-queue-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
buildErrorContext,
calculateBackoffDelay,
getRetryAfterSeconds,
isRateLimitMondayError,
isRetryableMondayError,
} from "../../utils/retry-utils";

Expand Down Expand Up @@ -40,13 +41,48 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor<
);
}

private shouldRetryApiCall(error: unknown): boolean {
private shouldRetryApiCall(error: unknown): {
retry: boolean;
bypass: boolean;
} {
if (error instanceof MondayError) {
const mondayErrors = error.mondayErrors || [];
return mondayErrors.some(isRetryableMondayError);
const retry = mondayErrors.some(isRetryableMondayError);
const bypass = retry && isRateLimitMondayError(mondayErrors);
return { retry, bypass };
}
// Unknown error: retryable but does NOT bypass maxRetries
return { retry: true, bypass: false };
}

private async uploadAndPushCallback(
job: ApiRequestQueueJob,
data: unknown,
status: "success" | "partial",
): Promise<void> {
const s3UploadResponse = await this.fileUploadTask.execute(data, uuidv4());
if (!s3UploadResponse.success || !s3UploadResponse.data) {
throw new JobError("File upload failed", true, s3UploadResponse);
}

const callbackJob: CallbackQueueJob = {
data: s3UploadResponse.data,
callbackUrl: job.callbackUrl ?? "",
payload: {
...(job.metadata && { metadata: job.metadata }),
result: s3UploadResponse.data,
status,
token: job.token,
query: job.query,
variables: job.variables,
},
headers: job.headers,
};

const pushResult = await this.pushToCallbackQueueTask.execute(callbackJob);
if (!pushResult.success) {
throw new JobError("Push to next queue failed", true, pushResult.error);
}
// unknown error, assume retryable
return true;
}

async performJob(job: ApiRequestQueueJob): Promise<void> {
Expand All @@ -56,9 +92,15 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor<
query,
variables,
);

if (apiResponse.partial && apiResponse.data) {
await this.uploadAndPushCallback(job, apiResponse.data, "partial");
return;
}

if (!apiResponse.success || !apiResponse.data) {
const shouldRetryApiCall = this.shouldRetryApiCall(apiResponse.error);
if (shouldRetryApiCall) {
const { retry, bypass } = this.shouldRetryApiCall(apiResponse.error);
if (retry) {
const retryAfter = this.getRetryAfterValue(
apiResponse.error as MondayError,
attempts,
Expand All @@ -68,7 +110,7 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor<
true,
buildErrorContext(apiResponse.error, attempts, retryAfter),
{ delay: retryAfter },
true,
bypass,
);
}
throw new JobError(
Expand All @@ -78,35 +120,6 @@ export class ApiRequestQueueProcessor extends BaseQueueProcessor<
);
}

const s3UploadResponse = await this.fileUploadTask.execute(
apiResponse.data,
uuidv4(),
);
if (!s3UploadResponse.success || !s3UploadResponse.data) {
throw new JobError("File upload failed", true, s3UploadResponse);
}

const callbackJob: CallbackQueueJob = {
data: s3UploadResponse.data,
callbackUrl: job.callbackUrl ?? "",
payload: {
...(job.metadata && { metadata: job.metadata }),
result: s3UploadResponse.data,
status: "success",
token: job.token,
query: job.query,
variables: job.variables,
},
headers: job.headers,
};
const pushToNextQueueResult =
await this.pushToCallbackQueueTask.execute(callbackJob);
if (!pushToNextQueueResult.success) {
throw new JobError(
"Push to next queue failed",
true,
pushToNextQueueResult.error,
);
}
await this.uploadAndPushCallback(job, apiResponse.data, "success");
}
}
10 changes: 5 additions & 5 deletions src/queue-processors/callback/callback-queue-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { BaseQueueProcessor } from "../base-queue-processor";
import { CallbackQueueJob } from "../../types/queue-types";
import { CallbackProcessorOptions } from "../../types/processor-types";
import { BaseCallEndpointTask } from "../../tasks/call-endpoint-task/base-call-endpoint-task";
import { StatusCodes } from "http-status-codes";
import { JobError } from "../../errors/job-error";
import { calculateBackoffDelay } from "../../utils/retry-utils";
import {
calculateBackoffDelay,
isRetryableCallbackStatus,
} from "../../utils/retry-utils";

export class CallbackQueueProcessor extends BaseQueueProcessor<
CallbackQueueJob,
Expand All @@ -25,9 +27,7 @@ export class CallbackQueueProcessor extends BaseQueueProcessor<
options,
);
if (!result.success) {
const { status } = result;
const shouldRetryJob =
status >= StatusCodes.INTERNAL_SERVER_ERROR || !status;
const shouldRetryJob = isRetryableCallbackStatus(result.status);
throw new JobError("Callback task failed", shouldRetryJob, result.error, {
delay: calculateBackoffDelay(job.attempts),
});
Expand Down
18 changes: 8 additions & 10 deletions src/tasks/call-monday-api-task/base-call-monday-api-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ export abstract class BaseCallMondayApiTask
try {
const result = await this.executeMondayRequest(token, query, variables);
if (!result.success) {
return { success: false, error: result.error };
}
if (result.partial) {
return {
success: false,
error: result.error,
success: true,
partial: true,
data: result.data as CallMondayApiResultData,
};
}
return {
success: true,
data: result.data as CallMondayApiResultData,
};
return { success: true, data: result.data as CallMondayApiResultData };
} catch (error) {
return {
success: false,
error,
};
return { success: false, error };
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { BaseCallMondayApiTask } from "./base-call-monday-api-task";
import { CallEndpointResult } from "../../types/task-types";
import { StatusCodes } from "http-status-codes";
import { MondayError } from "../../errors/monday-error";
import { MondayApiError } from "../../types/monday-types";
import {
isDlqRecoveryError,
isRetryableMondayError,
} from "../../utils/retry-utils";

export interface MondayClientCallMondayApiTaskOptions {
apiVersion?: string;
Expand All @@ -27,23 +32,48 @@ export class MondayClientCallMondayApiTask extends BaseCallMondayApiTask {
return toFailureResult(new Error("API key is required"));
}
try {
const client = new ApiClient({ token, apiVersion: this.apiVersion });
// errorPolicy:'all' makes rawRequest resolve with {data,errors} instead of throwing
// for application-level errors. Transport errors (non-200 HTTP) still throw ClientError.
// Source: https://developer.monday.com/api-reference/docs/javascript-sdk#error-handling
const client = new ApiClient({
token,
apiVersion: this.apiVersion,
requestConfig: { errorPolicy: "all" },
});
const { data, errors, extensions } = await client.rawRequest(
query,
variables,
);
if (errors && errors.length > 0) {
throw new MondayError("Monday.com API returned application errors", {
response: { data, errors },
mondayErrors: errors,
partialData: data,
});

if (!errors || errors.length === 0) {
return {
success: true,
status: StatusCodes.OK,
data: { data, errors, extensions },
};
}
return {
success: true,
status: StatusCodes.OK,
data: { data, errors, extensions },
};

const mondayErrors = errors as MondayApiError[];
const hasRetryableError = mondayErrors.some(isRetryableMondayError);
const hasDlqRecoveryError = mondayErrors.some(isDlqRecoveryError);

// Non-retryable errors with partial data → forward to caller as a partial result,
// unless the error requires DLQ recovery (e.g. CursorException).
if (!hasRetryableError && !hasDlqRecoveryError && data != null) {
return {
success: true,
status: StatusCodes.OK,
data: { data, errors: mondayErrors, extensions },
partial: true,
};
}

// Retryable errors or total failure → failure result for retry/DLQ routing
throw new MondayError("Monday.com API returned application errors", {
response: { data, errors: mondayErrors },
mondayErrors,
partialData: data,
});
} catch (error) {
return toFailureResult(error);
}
Expand Down
1 change: 1 addition & 0 deletions src/types/task-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export interface TaskResult<TaskData = unknown> {
success: boolean;
data?: TaskData;
error?: unknown;
partial?: boolean;
}

export interface FileUploadResultData {
Expand Down
45 changes: 41 additions & 4 deletions src/utils/retry-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { StatusCodes } from "http-status-codes";
import { MondayError } from "../errors/monday-error";
import { MondayApiError } from "../types/monday-types";
import {
MondayErrorCodes,
NON_FORWARDABLE_PARTIAL_CODES,
RETRYABLE_HTTP_STATUS_CODES,
TERMINAL_MONDAY_ERROR_CODES,
NON_RETRYABLE_MONDAY_ERROR_CODES,
} from "../constants/monday-error-codes";

export { TERMINAL_MONDAY_ERROR_CODES } from "../constants/monday-error-codes";
export { NON_RETRYABLE_MONDAY_ERROR_CODES } from "../constants/monday-error-codes";

export function buildErrorContext(
error: unknown,
Expand Down Expand Up @@ -52,15 +54,50 @@ function isRetryableStatusCode(status: number): boolean {
}

function isTerminalCode(code: string | undefined): boolean {
return !!code && TERMINAL_MONDAY_ERROR_CODES.has(code);
return !!code && NON_RETRYABLE_MONDAY_ERROR_CODES.has(code);
}

export function isRetryableMondayError(error: MondayApiError): boolean {
const { status_code, code, error_code } = error.extensions ?? {};
// Denylist wins regardless of status_code (e.g. CursorException arrives with status_code: 200)
if (isTerminalCode(code ?? error_code)) return false;
if (typeof status_code === "number" && status_code > 0) {
return isRetryableStatusCode(status_code);
}
return !isTerminalCode(code ?? error_code);
return true;
}

export function isDlqRecoveryError(error: MondayApiError): boolean {
const { code, error_code } = error.extensions ?? {};
const c = code ?? error_code;
return !!c && NON_FORWARDABLE_PARTIAL_CODES.has(c);
}

export function isRateLimitMondayError(
mondayErrors: MondayApiError[],
): boolean {
return mondayErrors.some((e) => {
const { status_code, code, error_code } = e.extensions ?? {};
if (
typeof status_code === "number" &&
RETRYABLE_HTTP_STATUS_CODES.has(status_code)
) {
return true;
}
const c = code ?? error_code;
return (
c === MondayErrorCodes.COMPLEXITY_BUDGET_EXHAUSTED ||
c === MondayErrorCodes.MAX_CONCURRENCY_EXCEEDED
);
});
}

export function isRetryableCallbackStatus(status: number | undefined): boolean {
if (!status) return true;
return (
RETRYABLE_HTTP_STATUS_CODES.has(status) ||
status >= StatusCodes.INTERNAL_SERVER_ERROR
);
}

export function getRetryAfterSeconds(
Expand Down
Loading
Loading