feat(indexer): improve batch ingestion and queue visibility#497
feat(indexer): improve batch ingestion and queue visibility#497hedhoud wants to merge 2 commits into
Conversation
📝 WalkthroughWalkthroughAdds a ChangesBatch Upload Endpoint
Stage Tracking and Visibility
Sequence DiagramsequenceDiagram
participant Client
participant add_files as POST /partition/{partition}/files
participant Validation as Validation Helpers
participant IndexingService
participant Worker as IndexerWorker
participant Pipeline as IndexingPipeline
participant TaskState as TaskStateManager
Client->>add_files: multipart files + JSON items
add_files->>Validation: parse and validate items, format, workspaces
add_files->>IndexingService: check duplicates, enqueue file
IndexingService->>Worker: process_file task
Worker->>TaskState: start_stage("PARSING")
Worker->>Pipeline: run with stage_reporter
Pipeline->>TaskState: start_stage("CHUNKING")
Pipeline->>TaskState: start_stage("EMBEDDING")
Pipeline->>TaskState: start_stage("INSERTING")
Pipeline-->>Worker: completion
Worker->>TaskState: finish_current_stage
Worker->>TaskState: set_state(COMPLETED)
add_files-->>Client: BatchUploadResponse with task URLs
Client->>Client: request task status
Client->>IndexingService: get_task_status(task_id)
IndexingService->>TaskState: get_info(task_id)
TaskState-->>IndexingService: {state, current_stage, failed_stage, stage_durations, stage_history}
IndexingService-->>Client: enriched task status with stage metadata
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@openrag/api/routers/admin/indexing.py`:
- Around line 332-334: The batch upload flow at lines 332-334 extracts metadata
from item.metadata and validates only the file format with
_validate_upload_format, but it does not apply the metadata validation rules
that the single-file add_file endpoint uses via Depends(validate_metadata). To
fix this, apply the same metadata validation logic to the item.metadata
dictionary before calling _queue_uploaded_file. Identify the validation function
or logic used by the validate_metadata dependency in the add_file endpoint and
invoke it on the extracted metadata to ensure batch uploads and single uploads
enforce the same metadata validation rules.
- Around line 368-375: The BatchUploadResult being constructed in the exception
handler at line 374 exposes the raw exception string via the detail parameter,
which can leak internal implementation details to clients. Replace the str(exc)
argument passed to the detail parameter with a generic, user-friendly error
message that does not expose backend specifics (such as file paths or internal
service names). The exception details should remain in the server-side
logger.exception call for debugging purposes, but client responses must contain
sanitized error messages.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: db864dae-c6a8-4277-a856-d8fac08de973
📒 Files selected for processing (3)
openrag/api/routers/admin/indexing.pyopenrag/api/schemas/admin/common.pytests/unit/api/routers/admin/test_batch_upload_api.py
| metadata = dict(item.metadata or {}) | ||
| _validate_upload_format(file, metadata, config) | ||
| task_status_url = await _queue_uploaded_file( |
There was a problem hiding this comment.
Validate batch metadata with the same rules as single-file upload.
Line 332 currently trusts item.metadata as-is, and line 333 only checks file format. This bypasses the metadata validation path used by add_file (Depends(validate_metadata)), so malformed metadata can be queued in batch but rejected in single-upload flows.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@openrag/api/routers/admin/indexing.py` around lines 332 - 334, The batch
upload flow at lines 332-334 extracts metadata from item.metadata and validates
only the file format with _validate_upload_format, but it does not apply the
metadata validation rules that the single-file add_file endpoint uses via
Depends(validate_metadata). To fix this, apply the same metadata validation
logic to the item.metadata dictionary before calling _queue_uploaded_file.
Identify the validation function or logic used by the validate_metadata
dependency in the add_file endpoint and invoke it on the extracted metadata to
ensure batch uploads and single uploads enforce the same metadata validation
rules.
| except Exception as exc: | ||
| logger.exception("Failed to queue batch upload item.", file_id=raw_file_id, error=str(exc)) | ||
| results.append( | ||
| BatchUploadResult( | ||
| file_id=raw_file_id, | ||
| status="failed", | ||
| detail=str(exc), | ||
| ) |
There was a problem hiding this comment.
Do not return raw exception strings to clients.
Line 374 exposes str(exc) in response payloads. Unexpected exceptions can leak internal paths or backend details.
Suggested patch
except Exception as exc:
logger.exception("Failed to queue batch upload item.", file_id=raw_file_id, error=str(exc))
results.append(
BatchUploadResult(
file_id=raw_file_id,
status="failed",
- detail=str(exc),
+ detail="Failed to queue uploaded file",
)
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@openrag/api/routers/admin/indexing.py` around lines 368 - 375, The
BatchUploadResult being constructed in the exception handler at line 374 exposes
the raw exception string via the detail parameter, which can leak internal
implementation details to clients. Replace the str(exc) argument passed to the
detail parameter with a generic, user-friendly error message that does not
expose backend specifics (such as file paths or internal service names). The
exception details should remain in the server-side logger.exception call for
debugging purposes, but client responses must contain sanitized error messages.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@openrag/services/workers/indexer_actor.py`:
- Around line 61-63: The report_stage function directly awaits the
TaskStateManager remote call (self._tsm.start_stage.remote()) without the
centralized resilience timeout wrapper, which can cause stage reporting to stall
if the actor hangs. Refactor report_stage and all other TaskStateManager
lifecycle calls in the file to use call_ray_actor_with_timeout() from
openrag/services/workers/ray_utils, replacing the direct await calls and
matching the resilience pattern already established in WorkerDispatcher.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a8b15229-b1d2-4798-b123-93c26bb5f719
📒 Files selected for processing (14)
openrag/api/routers/admin/indexing.pyopenrag/api/routers/admin/jobs.pyopenrag/core/indexing/dispatcher.pyopenrag/services/orchestrators/indexing_service.pyopenrag/services/orchestrators/job_service.pyopenrag/services/workers/dispatcher.pyopenrag/services/workers/indexer_actor.pyopenrag/services/workers/pipeline_builder.pyopenrag/services/workers/task_state.pytests/unit/api/routers/admin/test_task_stage_visibility.pytests/unit/services/orchestrators/test_indexing_service.pytests/unit/services/orchestrators/test_job_service.pytests/unit/services/workers/test_indexer_worker.pytests/unit/services/workers/test_pipeline_builder.py
🚧 Files skipped from review as they are similar to previous changes (1)
- openrag/api/routers/admin/indexing.py
| async def report_stage(stage: str) -> None: | ||
| await self._tsm.start_stage.remote(task_id, stage) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify direct TaskStateManager remote awaits in the changed worker path
rg -n "await self\._tsm\.[a-z_]+\.remote\(" openrag/services/workers/indexer_actor.py
# Verify whether the centralized timeout helper is used in this file
rg -n "call_ray_actor_with_timeout|services\.workers\.ray_utils|components\.ray_utils" openrag/services/workers/indexer_actor.pyRepository: linagora/openrag
Length of output: 411
🏁 Script executed:
# Read the file to verify exact line numbers and context
head -100 openrag/services/workers/indexer_actor.py | tail -50Repository: linagora/openrag
Length of output: 2084
🏁 Script executed:
# Check if call_ray_actor_with_timeout exists in components/ray_utils
rg -n "def call_ray_actor_with_timeout" openrag/Repository: linagora/openrag
Length of output: 140
🏁 Script executed:
# Verify the actual lines 61-63, 76, and 88 in context
sed -n '55,95p' openrag/services/workers/indexer_actor.py | cat -nRepository: linagora/openrag
Length of output: 2158
🏁 Script executed:
# Check line 76 and surrounding context
sed -n '70,85p' openrag/services/workers/indexer_actor.py | cat -nRepository: linagora/openrag
Length of output: 845
🏁 Script executed:
# Check imports at the top of the file
head -30 openrag/services/workers/indexer_actor.py | cat -nRepository: linagora/openrag
Length of output: 1319
🏁 Script executed:
# Check if call_ray_actor_with_timeout is imported
rg -n "^from|^import" openrag/services/workers/indexer_actor.py | head -20Repository: linagora/openrag
Length of output: 369
🏁 Script executed:
# Get the exact line 76 in the file
sed -n '76p' openrag/services/workers/indexer_actor.pyRepository: linagora/openrag
Length of output: 108
🏁 Script executed:
# Get broader context to understand what's at line 76
sed -n '74,78p' openrag/services/workers/indexer_actor.py | cat -nRepository: linagora/openrag
Length of output: 303
🏁 Script executed:
# Get all lines with TaskStateManager remote calls in context
rg -n "await self\._tsm\.[a-z_]+\.remote\(" openrag/services/workers/indexer_actor.py -B 1 -A 1Repository: linagora/openrag
Length of output: 678
🏁 Script executed:
# Check what WorkerDispatcher does as reference pattern
rg -n "call_ray_actor_with_timeout" openrag/services/workers/dispatcher.py -B 2 -A 2 | head -40Repository: linagora/openrag
Length of output: 340
🏁 Script executed:
# Verify the correct import path for the timeout utility
fd "ray_utils.py" openrag/Repository: linagora/openrag
Length of output: 98
🏁 Script executed:
# Check if components/ray_utils exists and what it contains
cat openrag/components/ray_utils.py | head -100Repository: linagora/openrag
Length of output: 124
🏁 Script executed:
# Get the function signature of call_ray_actor_with_timeout
sed -n '70,90p' openrag/services/workers/ray_utils.py | cat -nRepository: linagora/openrag
Length of output: 988
🏁 Script executed:
# Check what the actual line 76 reference might be about - verify all lines with immediate context
sed -n '59,95p' openrag/services/workers/indexer_actor.py | cat -nRepository: linagora/openrag
Length of output: 1911
🏁 Script executed:
# Double-check what exact code is at line 76 with full context
sed -n '75,77p' openrag/services/workers/indexer_actor.py | cat -nRepository: linagora/openrag
Length of output: 191
Route all TaskStateManager actor calls through the timeout wrapper.
Lines 59, 62, 88, 89, and 93 directly await TaskStateManager remote calls without the centralized resilience wrapper. If that actor slows or hangs, stage reporting and state transitions can stall file processing. Use call_ray_actor_with_timeout() from openrag/services/workers/ray_utils for all these TaskStateManager lifecycle calls, matching the resilience pattern used in WorkerDispatcher.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@openrag/services/workers/indexer_actor.py` around lines 61 - 63, The
report_stage function directly awaits the TaskStateManager remote call
(self._tsm.start_stage.remote()) without the centralized resilience timeout
wrapper, which can cause stage reporting to stall if the actor hangs. Refactor
report_stage and all other TaskStateManager lifecycle calls in the file to use
call_ray_actor_with_timeout() from openrag/services/workers/ray_utils, replacing
the direct await calls and matching the resilience pattern already established
in WorkerDispatcher.
Source: Coding guidelines
|
Andrzej is currently working on the Admin UI, and he plans to integrate this functionality automatically. |
Why
Benchmarking and UI-driven ingestion need two things: submit several files without one request per file, and understand what active indexing tasks are actually doing. This keeps OpenRAG's safer model intact: every accepted file still becomes its own indexing task, and one bad file does not block the rest.
Behavior
active_stagesqueue summary so we can see pressure in parsing, captioning, embedding, and inserting.Validation
uv run pytest tests/unit/api -quv run pytest tests/unit/services/workers/test_pipeline_builder.py tests/unit/services/workers/test_indexer_worker.py tests/unit/services/orchestrators/test_job_service.py tests/unit/services/orchestrators/test_indexing_service.py -quv run ruff format --check openrag testsuv run ruff check openrag testsNote: a broader local services run still hits an existing ignored
.envside effect in model endpoint seeding; that failure is unrelated to this PR.Summary by CodeRabbit
POST /partition/{partition}/files) supporting multiple multipart files with JSON item metadata