Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: "3.9"

services:
phoenix:
image: arizephoenix/phoenix:latest
container_name: factor-phoenix
ports:
- "6006:6006" # UI + HTTP collector
- "4317:4317" # OTLP gRPC collector
environment:
- PHOENIX_WORKING_DIR=/phoenix_data
volumes:
- phoenix_data:/phoenix_data
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:6006/healthz"]
interval: 10s
timeout: 5s
retries: 3

volumes:
phoenix_data:
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ dependencies = [
"httpx>=0.27.0",
"opentelemetry-api>=1.27.0",
"opentelemetry-sdk>=1.27.0",
"opentelemetry-exporter-otlp-proto-http>=1.27.0",
"arize-phoenix-otel>=0.6.0",
]

[project.optional-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ python-multipart>=0.0.9
httpx>=0.27.0
opentelemetry-api>=1.27.0
opentelemetry-sdk>=1.27.0
opentelemetry-exporter-otlp-proto-http>=1.27.0
arize-phoenix-otel>=0.6.0
pytest>=8.3.0
pytest-asyncio>=0.24.0
moto>=5.0.0
13 changes: 11 additions & 2 deletions src/factor/agents/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from factor.agents.prompts import ANALYSIS_PROMPT
from factor.config import settings
from factor.harness.circuit_breaker import CircuitBreaker
from factor.harness.model_wrapper import GuardedBedrockModel
from factor.tools.detection import detect_provision_type
from factor.tools.scoring import score_risk
from factor.tools.gaps import find_gaps
Expand All @@ -17,22 +19,29 @@
logger = logging.getLogger(__name__)


def create_analysis_agent() -> Agent:
def create_analysis_agent(breaker: CircuitBreaker | None = None) -> Agent:
"""Create and return the Analysis Agent.

The Analysis Agent handles provision classification, risk scoring,
gap analysis, and cross-document comparison.

Args:
breaker: Optional circuit breaker for financial guardrails.
"""
model = BedrockModel(
model_id=settings.bedrock_model_id,
region_name=settings.aws_region,
)

if breaker is not None:
model = GuardedBedrockModel(model, breaker)

agent = Agent(
model=model,
system_prompt=ANALYSIS_PROMPT,
tools=[detect_provision_type, score_risk, find_gaps, compare_across_documents],
)

logger.info("Created Analysis Agent with model=%s", settings.bedrock_model_id)
logger.info("Created Analysis Agent with model=%s guarded=%s",
settings.bedrock_model_id, breaker is not None)
return agent
15 changes: 12 additions & 3 deletions src/factor/agents/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from factor.agents.prompts import COORDINATOR_PROMPT
from factor.config import settings
from factor.harness.circuit_breaker import CircuitBreaker
from factor.harness.model_wrapper import GuardedBedrockModel
from factor.tools.parsing import parse_pdf, parse_docx
from factor.tools.chunking import chunk_provisions
from factor.tools.detection import detect_provision_type
Expand Down Expand Up @@ -185,17 +187,23 @@ def generate_report(analysis_results: dict, output_dir: str = "./reports") -> di
return report


def create_coordinator_agent() -> Agent:
def create_coordinator_agent(breaker: CircuitBreaker | None = None) -> Agent:
"""Create and return the Coordinator Agent.

The Coordinator orchestrates the full due diligence pipeline:
ingest → analyze → search knowledge → report.
ingest -> analyze -> search knowledge -> report.

Args:
breaker: Optional circuit breaker for financial guardrails.
"""
model = BedrockModel(
model_id=settings.bedrock_model_id,
region_name=settings.aws_region,
)

if breaker is not None:
model = GuardedBedrockModel(model, breaker)

agent = Agent(
model=model,
system_prompt=COORDINATOR_PROMPT,
Expand All @@ -207,5 +215,6 @@ def create_coordinator_agent() -> Agent:
],
)

logger.info("Created Coordinator Agent with model=%s", settings.bedrock_model_id)
logger.info("Created Coordinator Agent with model=%s guarded=%s",
settings.bedrock_model_id, breaker is not None)
return agent
13 changes: 11 additions & 2 deletions src/factor/agents/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,37 @@

from factor.agents.prompts import INGESTION_PROMPT
from factor.config import settings
from factor.harness.circuit_breaker import CircuitBreaker
from factor.harness.model_wrapper import GuardedBedrockModel
from factor.tools.parsing import parse_pdf, parse_docx
from factor.tools.chunking import chunk_provisions

logger = logging.getLogger(__name__)


def create_ingestion_agent() -> Agent:
def create_ingestion_agent(breaker: CircuitBreaker | None = None) -> Agent:
"""Create and return the Ingestion Agent.

The Ingestion Agent handles document parsing (PDF, DOCX) and
provision chunking using anchor patterns.

Args:
breaker: Optional circuit breaker for financial guardrails.
"""
model = BedrockModel(
model_id=settings.bedrock_model_id,
region_name=settings.aws_region,
)

if breaker is not None:
model = GuardedBedrockModel(model, breaker)

agent = Agent(
model=model,
system_prompt=INGESTION_PROMPT,
tools=[parse_pdf, parse_docx, chunk_provisions],
)

logger.info("Created Ingestion Agent with model=%s", settings.bedrock_model_id)
logger.info("Created Ingestion Agent with model=%s guarded=%s",
settings.bedrock_model_id, breaker is not None)
return agent
13 changes: 11 additions & 2 deletions src/factor/agents/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,38 @@

from factor.agents.prompts import KNOWLEDGE_PROMPT
from factor.config import settings
from factor.harness.circuit_breaker import CircuitBreaker
from factor.harness.model_wrapper import GuardedBedrockModel
from factor.tools.rag import search_synthetic_knowledge
from factor.tools.classification import classify_domain
from factor.tools.citations import extract_citations

logger = logging.getLogger(__name__)


def create_knowledge_agent() -> Agent:
def create_knowledge_agent(breaker: CircuitBreaker | None = None) -> Agent:
"""Create and return the Knowledge Agent.

The Knowledge Agent searches the synthetic legal knowledge base,
classifies provisions by domain, and extracts/labels citations.

Args:
breaker: Optional circuit breaker for financial guardrails.
"""
model = BedrockModel(
model_id=settings.bedrock_model_id,
region_name=settings.aws_region,
)

if breaker is not None:
model = GuardedBedrockModel(model, breaker)

agent = Agent(
model=model,
system_prompt=KNOWLEDGE_PROMPT,
tools=[search_synthetic_knowledge, classify_domain, extract_citations],
)

logger.info("Created Knowledge Agent with model=%s", settings.bedrock_model_id)
logger.info("Created Knowledge Agent with model=%s guarded=%s",
settings.bedrock_model_id, breaker is not None)
return agent
13 changes: 11 additions & 2 deletions src/factor/agents/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,36 @@

from factor.agents.prompts import REPORTING_PROMPT
from factor.config import settings
from factor.harness.circuit_breaker import CircuitBreaker
from factor.harness.model_wrapper import GuardedBedrockModel
from factor.tools.export import build_risk_report, export_excel, export_html

logger = logging.getLogger(__name__)


def create_reporting_agent() -> Agent:
def create_reporting_agent(breaker: CircuitBreaker | None = None) -> Agent:
"""Create and return the Reporting Agent.

The Reporting Agent assembles structured risk reports and exports
them in multiple formats (JSON, Excel, HTML).

Args:
breaker: Optional circuit breaker for financial guardrails.
"""
model = BedrockModel(
model_id=settings.bedrock_model_id,
region_name=settings.aws_region,
)

if breaker is not None:
model = GuardedBedrockModel(model, breaker)

agent = Agent(
model=model,
system_prompt=REPORTING_PROMPT,
tools=[build_risk_report, export_excel, export_html],
)

logger.info("Created Reporting Agent with model=%s", settings.bedrock_model_id)
logger.info("Created Reporting Agent with model=%s guarded=%s",
settings.bedrock_model_id, breaker is not None)
return agent
64 changes: 64 additions & 0 deletions src/factor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from factor import DISCLAIMER, __version__
from factor.config import settings
from factor.harness.guardrail import get_guardrail
from factor.harness.exceptions import CircuitBreakerTripped
from factor.tools.chunking import chunk_provisions
from factor.tools.detection import detect_provision_type
from factor.tools.scoring import score_risk
Expand Down Expand Up @@ -66,6 +68,11 @@ async def configure_logging():
logging.getLogger("httpx").setLevel(logging.WARNING)
logger.info("Logging configured: level=%s", settings.factor_log_level)

if settings.phoenix_enabled:
from factor.aws.observability import init_tracing
init_tracing("factor")
logger.info("Phoenix telemetry initialized")


@app.get("/api/v1/health")
async def health_check():
Expand Down Expand Up @@ -120,10 +127,20 @@ async def analyze_documents(files: list[UploadFile] = File(...)):

session_store.create_session(session_id, [f.filename or "" for f in files])

guardrail = get_guardrail()
breaker = guardrail.register_session(session_id) if settings.guardrail_enabled else None

async def event_stream() -> AsyncGenerator[dict, None]:
try:
yield {"event": "session", "data": json.dumps({"session_id": session_id, "disclaimer": DISCLAIMER})}

if breaker:
yield {"event": "guardrail", "data": json.dumps({
"stage": "initialized",
"budget_usd": settings.guardrail_session_budget_usd,
"max_steps": settings.guardrail_max_steps,
})}

yield {"event": "status", "data": json.dumps({"stage": "ingestion", "message": "Parsing documents..."})}

all_provisions = {}
Expand Down Expand Up @@ -164,6 +181,9 @@ async def event_stream() -> AsyncGenerator[dict, None]:
risk["document_id"] = doc_id
all_risk_scores.append(risk)

if breaker:
breaker.record_step(action="score_risk", meta={"doc_id": doc_id})

gaps = find_gaps(detected_provisions=detected_types, doc_type="unknown")
for gap in gaps:
gap["document_id"] = doc_id
Expand All @@ -186,9 +206,28 @@ async def event_stream() -> AsyncGenerator[dict, None]:

session_store.store_result(session_id, report)

if breaker:
yield {"event": "guardrail", "data": json.dumps({
"stage": "completed",
**breaker.status(),
})}

yield {"event": "report", "data": json.dumps(report)}
yield {"event": "done", "data": json.dumps({"session_id": session_id, "disclaimer": DISCLAIMER})}

except CircuitBreakerTripped as exc:
logger.warning("Circuit breaker halted session %s: %s", session_id, exc)
session_store.update_status(session_id, "halted")
yield {"event": "guardrail_halt", "data": json.dumps({
"halted": True,
**exc.status,
"message": str(exc),
"disclaimer": DISCLAIMER,
})}

finally:
if settings.guardrail_enabled:
guardrail.remove_session(session_id)
shutil.rmtree(upload_dir, ignore_errors=True)

return EventSourceResponse(event_stream())
Expand Down Expand Up @@ -259,6 +298,31 @@ async def export_report(
return {"path": path, "format": format, "disclaimer": DISCLAIMER}


@app.get("/api/v1/sessions/{session_id}/budget")
async def get_session_budget(session_id: str):
"""Get real-time budget and guardrail status for a session."""
guardrail = get_guardrail()
status = guardrail.session_status(session_id)
if status is None:
raise HTTPException(status_code=404, detail="No active guardrail for this session")
status["disclaimer"] = DISCLAIMER
return status


@app.get("/api/v1/guardrail/status")
async def guardrail_overview():
"""Get guardrail status across all active sessions."""
guardrail = get_guardrail()
return {
"enabled": settings.guardrail_enabled,
"phoenix_enabled": settings.phoenix_enabled,
"phoenix_endpoint": settings.phoenix_otlp_endpoint,
"default_budget_usd": settings.guardrail_session_budget_usd,
"active_sessions": guardrail.all_sessions(),
"disclaimer": DISCLAIMER,
}


@app.get("/api/v1/knowledge/search")
async def search_knowledge(
q: str = Query(..., min_length=1),
Expand Down
Loading
Loading