Skip to content

feat(indexer): improve batch ingestion and queue visibility#497

Closed
hedhoud wants to merge 2 commits into
refactor/hexagonalfrom
refactor/batch-upload-api
Closed

feat(indexer): improve batch ingestion and queue visibility#497
hedhoud wants to merge 2 commits into
refactor/hexagonalfrom
refactor/batch-upload-api

Conversation

@hedhoud

@hedhoud hedhoud commented Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Why

Benchmarking and UI-driven ingestion need two things: submit several files without one request per file, and understand what active indexing tasks are actually doing. This keeps OpenRAG's safer model intact: every accepted file still becomes its own indexing task, and one bad file does not block the rest.

Behavior

  • Adds a multipart batch upload endpoint for partition files.
  • Returns one result per file with accepted or failed status.
  • Uses partial success semantics so valid files are queued even when another item is invalid or already exists.
  • Records the current indexing stage for active tasks.
  • Exposes stage details in task status and queue task listings.
  • Adds an active_stages queue summary so we can see pressure in parsing, captioning, embedding, and inserting.

Validation

  • uv run pytest tests/unit/api -q
  • uv run pytest tests/unit/services/workers/test_pipeline_builder.py tests/unit/services/workers/test_indexer_worker.py tests/unit/services/orchestrators/test_job_service.py tests/unit/services/orchestrators/test_indexing_service.py -q
  • uv run ruff format --check openrag tests
  • uv run ruff check openrag tests

Note: a broader local services run still hits an existing ignored .env side effect in model endpoint seeding; that failure is unrelated to this PR.

Summary by CodeRabbit

  • New Features
    • Added an admin batch upload endpoint (POST /partition/{partition}/files) supporting multiple multipart files with JSON item metadata
    • Per-file results now include accepted/failed status, task status URLs, and detailed failure information
    • Batch uploads enforce strict file-to-item mapping, detect duplicate file IDs, validate workspace IDs, and verify uploaded formats/mimetypes
    • Task status responses and task listings now expose stage information (current stage, failed stage, stage durations, and stage history)
  • Tests
    • Added unit coverage for batch upload success/partial success and invalid input handling, plus task stage visibility.

@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds a POST /partition/{partition}/files batch upload endpoint to the admin indexing router with three new Pydantic schemas and four internal helper functions. Additionally introduces comprehensive stage tracking throughout the indexing pipeline, capturing timing and history for each stage (PARSING, CHUNKING, EMBEDDING, INSERTING) and exposing this metadata via task status and job queue APIs.

Changes

Batch Upload Endpoint

Layer / File(s) Summary
Batch upload Pydantic schemas
openrag/api/schemas/admin/common.py
Adds BatchUploadItem, BatchUploadResult (with Literal status), and BatchUploadResponse (with accepted/failed counts and results list); extends imports with Field and Literal.
Helper functions and POST /files endpoint
openrag/api/routers/admin/indexing.py
Updates imports, adds PydanticValidationError alias, then four internal helpers (_parse_batch_upload_items, _validate_workspace_ids, _validate_upload_format, _queue_uploaded_file) that orchestrate parsing, validation, duplicate detection, and persistence. The add_files endpoint aggregates per-file results and returns BatchUploadResponse with 201 or 207 status.
Unit tests for batch upload
tests/unit/api/routers/admin/test_batch_upload_api.py
Adds FakeIndexingService, _build_app helper with dependency overrides, and two async tests verifying partial success (207) and items/files count mismatch (400).

Stage Tracking and Visibility

Layer / File(s) Summary
Dispatcher interface and service delegation
openrag/core/indexing/dispatcher.py, openrag/services/orchestrators/indexing_service.py, openrag/services/workers/dispatcher.py
Adds get_task_info abstract method to dispatcher and implements delegation chain to retrieve full task metadata snapshots.
Task state management with stage tracking
openrag/services/workers/task_state.py
Extends TaskInfo dataclass with stage fields (current_stage, current_stage_started_at, failed_stage, stage_durations, stage_history). Adds internal helpers for stage duration computation and snapshot generation. Updates state transitions to close/record stages on terminal events. Introduces public start_stage, finish_current_stage, and get_info methods.
Stage reporting in pipeline and worker
openrag/services/workers/pipeline_builder.py, openrag/services/workers/indexer_actor.py
Adds _report_stage callback mechanism in pipeline that emits stage names before each step. Worker defines report_stage to forward calls to task state manager and injects it into the pipeline. Worker calls finish_current_stage after pipeline completion.
Task status and job queue API enrichment
openrag/api/routers/admin/indexing.py, openrag/api/routers/admin/jobs.py, openrag/services/orchestrators/job_service.py
Updates get_task_status to populate stage fields from service.get_task_info. Updates list_tasks endpoint to include stage fields with defaults. Updates JobService to compute active_stages count aggregation and return enriched task rows.
Tests for stage tracking and visibility
tests/unit/api/routers/admin/test_task_stage_visibility.py, tests/unit/services/orchestrators/test_indexing_service.py, tests/unit/services/orchestrators/test_job_service.py, tests/unit/services/workers/test_indexer_worker.py, tests/unit/services/workers/test_pipeline_builder.py
Comprehensive test coverage: task status endpoint returns stage metadata, dispatcher doubles implement get_task_info, job service validates active stages rollup and task row enrichment, worker and pipeline tests verify stage lifecycle calls and emission order.

Sequence Diagram

sequenceDiagram
  participant Client
  participant add_files as POST /partition/{partition}/files
  participant Validation as Validation Helpers
  participant IndexingService
  participant Worker as IndexerWorker
  participant Pipeline as IndexingPipeline
  participant TaskState as TaskStateManager

  Client->>add_files: multipart files + JSON items
  add_files->>Validation: parse and validate items, format, workspaces
  add_files->>IndexingService: check duplicates, enqueue file
  IndexingService->>Worker: process_file task
  Worker->>TaskState: start_stage("PARSING")
  Worker->>Pipeline: run with stage_reporter
  Pipeline->>TaskState: start_stage("CHUNKING")
  Pipeline->>TaskState: start_stage("EMBEDDING")
  Pipeline->>TaskState: start_stage("INSERTING")
  Pipeline-->>Worker: completion
  Worker->>TaskState: finish_current_stage
  Worker->>TaskState: set_state(COMPLETED)
  add_files-->>Client: BatchUploadResponse with task URLs
  
  Client->>Client: request task status
  Client->>IndexingService: get_task_status(task_id)
  IndexingService->>TaskState: get_info(task_id)
  TaskState-->>IndexingService: {state, current_stage, failed_stage, stage_durations, stage_history}
  IndexingService-->>Client: enriched task status with stage metadata
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • linagora/openrag#419: Main PR's stage metadata support builds on the Phase-9 worker staging refactor that introduced the stage-based indexing pipeline and stage-aware worker modules.
  • linagora/openrag#396: Introduced the IndexingDispatcher abstraction that this PR extends with get_task_info() and stage-aware task details.

Suggested labels

feat

Poem

🐇 Files hop through stages—parsing, chunking, then they fly,
Each one tracked with timestamps, durations piling high!
A batch endpoint and lifecycle—workers report their way,
Stage by stage the metadata blooms for all to see and say.
Two cohorts dance together: upload and insight combined,
One batch endpoint to upload, one sprint to track the grind! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.40% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately summarizes the main changes: it introduces both batch ingestion (new batch upload API endpoint) and queue visibility (enriched stage metadata in task status responses).
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/batch-upload-api

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@openrag/api/routers/admin/indexing.py`:
- Around line 332-334: The batch upload flow at lines 332-334 extracts metadata
from item.metadata and validates only the file format with
_validate_upload_format, but it does not apply the metadata validation rules
that the single-file add_file endpoint uses via Depends(validate_metadata). To
fix this, apply the same metadata validation logic to the item.metadata
dictionary before calling _queue_uploaded_file. Identify the validation function
or logic used by the validate_metadata dependency in the add_file endpoint and
invoke it on the extracted metadata to ensure batch uploads and single uploads
enforce the same metadata validation rules.
- Around line 368-375: The BatchUploadResult being constructed in the exception
handler at line 374 exposes the raw exception string via the detail parameter,
which can leak internal implementation details to clients. Replace the str(exc)
argument passed to the detail parameter with a generic, user-friendly error
message that does not expose backend specifics (such as file paths or internal
service names). The exception details should remain in the server-side
logger.exception call for debugging purposes, but client responses must contain
sanitized error messages.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: db864dae-c6a8-4277-a856-d8fac08de973

📥 Commits

Reviewing files that changed from the base of the PR and between 14301a4 and 9272ef5.

📒 Files selected for processing (3)
  • openrag/api/routers/admin/indexing.py
  • openrag/api/schemas/admin/common.py
  • tests/unit/api/routers/admin/test_batch_upload_api.py

Comment on lines +332 to +334
metadata = dict(item.metadata or {})
_validate_upload_format(file, metadata, config)
task_status_url = await _queue_uploaded_file(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate batch metadata with the same rules as single-file upload.

Line 332 currently trusts item.metadata as-is, and line 333 only checks file format. This bypasses the metadata validation path used by add_file (Depends(validate_metadata)), so malformed metadata can be queued in batch but rejected in single-upload flows.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openrag/api/routers/admin/indexing.py` around lines 332 - 334, The batch
upload flow at lines 332-334 extracts metadata from item.metadata and validates
only the file format with _validate_upload_format, but it does not apply the
metadata validation rules that the single-file add_file endpoint uses via
Depends(validate_metadata). To fix this, apply the same metadata validation
logic to the item.metadata dictionary before calling _queue_uploaded_file.
Identify the validation function or logic used by the validate_metadata
dependency in the add_file endpoint and invoke it on the extracted metadata to
ensure batch uploads and single uploads enforce the same metadata validation
rules.

Comment on lines +368 to +375
except Exception as exc:
logger.exception("Failed to queue batch upload item.", file_id=raw_file_id, error=str(exc))
results.append(
BatchUploadResult(
file_id=raw_file_id,
status="failed",
detail=str(exc),
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not return raw exception strings to clients.

Line 374 exposes str(exc) in response payloads. Unexpected exceptions can leak internal paths or backend details.

Suggested patch
         except Exception as exc:
             logger.exception("Failed to queue batch upload item.", file_id=raw_file_id, error=str(exc))
             results.append(
                 BatchUploadResult(
                     file_id=raw_file_id,
                     status="failed",
-                    detail=str(exc),
+                    detail="Failed to queue uploaded file",
                 )
             )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openrag/api/routers/admin/indexing.py` around lines 368 - 375, The
BatchUploadResult being constructed in the exception handler at line 374 exposes
the raw exception string via the detail parameter, which can leak internal
implementation details to clients. Replace the str(exc) argument passed to the
detail parameter with a generic, user-friendly error message that does not
expose backend specifics (such as file paths or internal service names). The
exception details should remain in the server-side logger.exception call for
debugging purposes, but client responses must contain sanitized error messages.

@hedhoud hedhoud changed the title feat(indexer): add batch upload API feat(indexer): improve batch ingestion and queue visibility Jun 16, 2026
@coderabbitai coderabbitai Bot added the feat Add a new feature label Jun 16, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@openrag/services/workers/indexer_actor.py`:
- Around line 61-63: The report_stage function directly awaits the
TaskStateManager remote call (self._tsm.start_stage.remote()) without the
centralized resilience timeout wrapper, which can cause stage reporting to stall
if the actor hangs. Refactor report_stage and all other TaskStateManager
lifecycle calls in the file to use call_ray_actor_with_timeout() from
openrag/services/workers/ray_utils, replacing the direct await calls and
matching the resilience pattern already established in WorkerDispatcher.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: a8b15229-b1d2-4798-b123-93c26bb5f719

📥 Commits

Reviewing files that changed from the base of the PR and between 9272ef5 and f5676ec.

📒 Files selected for processing (14)
  • openrag/api/routers/admin/indexing.py
  • openrag/api/routers/admin/jobs.py
  • openrag/core/indexing/dispatcher.py
  • openrag/services/orchestrators/indexing_service.py
  • openrag/services/orchestrators/job_service.py
  • openrag/services/workers/dispatcher.py
  • openrag/services/workers/indexer_actor.py
  • openrag/services/workers/pipeline_builder.py
  • openrag/services/workers/task_state.py
  • tests/unit/api/routers/admin/test_task_stage_visibility.py
  • tests/unit/services/orchestrators/test_indexing_service.py
  • tests/unit/services/orchestrators/test_job_service.py
  • tests/unit/services/workers/test_indexer_worker.py
  • tests/unit/services/workers/test_pipeline_builder.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • openrag/api/routers/admin/indexing.py

Comment on lines +61 to +63
async def report_stage(stage: str) -> None:
await self._tsm.start_stage.remote(task_id, stage)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify direct TaskStateManager remote awaits in the changed worker path
rg -n "await self\._tsm\.[a-z_]+\.remote\(" openrag/services/workers/indexer_actor.py

# Verify whether the centralized timeout helper is used in this file
rg -n "call_ray_actor_with_timeout|services\.workers\.ray_utils|components\.ray_utils" openrag/services/workers/indexer_actor.py

Repository: linagora/openrag

Length of output: 411


🏁 Script executed:

# Read the file to verify exact line numbers and context
head -100 openrag/services/workers/indexer_actor.py | tail -50

Repository: linagora/openrag

Length of output: 2084


🏁 Script executed:

# Check if call_ray_actor_with_timeout exists in components/ray_utils
rg -n "def call_ray_actor_with_timeout" openrag/

Repository: linagora/openrag

Length of output: 140


🏁 Script executed:

# Verify the actual lines 61-63, 76, and 88 in context
sed -n '55,95p' openrag/services/workers/indexer_actor.py | cat -n

Repository: linagora/openrag

Length of output: 2158


🏁 Script executed:

# Check line 76 and surrounding context
sed -n '70,85p' openrag/services/workers/indexer_actor.py | cat -n

Repository: linagora/openrag

Length of output: 845


🏁 Script executed:

# Check imports at the top of the file
head -30 openrag/services/workers/indexer_actor.py | cat -n

Repository: linagora/openrag

Length of output: 1319


🏁 Script executed:

# Check if call_ray_actor_with_timeout is imported
rg -n "^from|^import" openrag/services/workers/indexer_actor.py | head -20

Repository: linagora/openrag

Length of output: 369


🏁 Script executed:

# Get the exact line 76 in the file
sed -n '76p' openrag/services/workers/indexer_actor.py

Repository: linagora/openrag

Length of output: 108


🏁 Script executed:

# Get broader context to understand what's at line 76
sed -n '74,78p' openrag/services/workers/indexer_actor.py | cat -n

Repository: linagora/openrag

Length of output: 303


🏁 Script executed:

# Get all lines with TaskStateManager remote calls in context
rg -n "await self\._tsm\.[a-z_]+\.remote\(" openrag/services/workers/indexer_actor.py -B 1 -A 1

Repository: linagora/openrag

Length of output: 678


🏁 Script executed:

# Check what WorkerDispatcher does as reference pattern
rg -n "call_ray_actor_with_timeout" openrag/services/workers/dispatcher.py -B 2 -A 2 | head -40

Repository: linagora/openrag

Length of output: 340


🏁 Script executed:

# Verify the correct import path for the timeout utility
fd "ray_utils.py" openrag/

Repository: linagora/openrag

Length of output: 98


🏁 Script executed:

# Check if components/ray_utils exists and what it contains
cat openrag/components/ray_utils.py | head -100

Repository: linagora/openrag

Length of output: 124


🏁 Script executed:

# Get the function signature of call_ray_actor_with_timeout
sed -n '70,90p' openrag/services/workers/ray_utils.py | cat -n

Repository: linagora/openrag

Length of output: 988


🏁 Script executed:

# Check what the actual line 76 reference might be about - verify all lines with immediate context
sed -n '59,95p' openrag/services/workers/indexer_actor.py | cat -n

Repository: linagora/openrag

Length of output: 1911


🏁 Script executed:

# Double-check what exact code is at line 76 with full context
sed -n '75,77p' openrag/services/workers/indexer_actor.py | cat -n

Repository: linagora/openrag

Length of output: 191


Route all TaskStateManager actor calls through the timeout wrapper.

Lines 59, 62, 88, 89, and 93 directly await TaskStateManager remote calls without the centralized resilience wrapper. If that actor slows or hangs, stage reporting and state transitions can stall file processing. Use call_ray_actor_with_timeout() from openrag/services/workers/ray_utils for all these TaskStateManager lifecycle calls, matching the resilience pattern used in WorkerDispatcher.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openrag/services/workers/indexer_actor.py` around lines 61 - 63, The
report_stage function directly awaits the TaskStateManager remote call
(self._tsm.start_stage.remote()) without the centralized resilience timeout
wrapper, which can cause stage reporting to stall if the actor hangs. Refactor
report_stage and all other TaskStateManager lifecycle calls in the file to use
call_ray_actor_with_timeout() from openrag/services/workers/ray_utils, replacing
the direct await calls and matching the resilience pattern already established
in WorkerDispatcher.

Source: Coding guidelines

@hedhoud hedhoud closed this Jun 18, 2026
@hedhoud

hedhoud commented Jun 18, 2026

Copy link
Copy Markdown
Collaborator Author

Andrzej is currently working on the Admin UI, and he plans to integrate this functionality automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feat Add a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant