Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
*.py[cod]
84 changes: 84 additions & 0 deletions qa_system/ARCHITECTURE_ANALYSIS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# QA System 架构与数据链路分析

## 1. 系统定位

`qa_system` 是一个本地离线语音质检 Web 原型:
- 后端用 FastAPI 提供上传、异步任务、结果轮询接口。
- 核心能力是 FunASR 离线 ASR + 说话人分离。
- 前端是单页表单 + 轮询状态,输出说话人日志。

## 2. 代码结构

- `qa_system/app.py`:应用入口、接口定义、任务调度、结果落盘与状态管理。
- `qa_system/services/transcription.py`:离线转写总流程(音频预处理、模型推理、说话人分离策略选择、分段合并、日志格式化)。
- `qa_system/services/diarization.py`:说话人分离抽象层,当前主路径为 FunASR 句级说话人标签映射,预留 pyannote 扩展。
- `qa_system/templates/index.html`:上传与参数输入页面。
- `qa_system/static/script.js`:前端提交任务 + 轮询 `/api/jobs/{job_id}`。
- `qa_system/tests/test_diarization.py`:验证 diarizer 字段映射与合并逻辑。

## 3. 核心调用链路

1. 浏览器提交表单到 `POST /api/jobs`。
2. 后端将上传文件保存到 `uploads/<job_id>/`。
3. 后端在内存 `jobs[job_id]` 写入 `running` 状态,并将实际处理丢给线程池。
4. 后台线程调用 `OfflineTranscriptionService.transcribe_batch()`:
- 扫描有效音/视频文件;
- ffmpeg 转成单声道 16k WAV 字节;
- 调用 FunASR `generate()` 返回文本与句级时间戳/说话人信息;
- 根据策略做 diarization(campp / pyannote);
- 做相邻同 speaker 短句合并;
- 产出 `TranscriptResult`。
5. 后端把每个文件渲染为 `speaker + 时间戳 + 文本` 日志,写入 `results/<job_id>.txt`。
6. 内存状态更新为 `done`(或 `failed`)。
7. 前端轮询 `GET /api/jobs/{job_id}`,拿到结果并展示。

## 4. 数据流转(输入 -> 中间态 -> 输出)

### 4.1 输入层

- 文件输入:`files`(多文件)
- 参数输入:
- `diarization`(`campp` / `pyannote`)
- `merge_threshold_chars`(相邻同 speaker 合并阈值)
- `hotwords`(ASR 热词)

### 4.2 处理中间态

- 文件落盘:`uploads/<job_id>/<filename>`
- 状态内存:`jobs[job_id] = {status, created_at, ...}`
- 音频中间表示:ffmpeg 输出 WAV bytes(单声道/16k)
- 模型中间结果:
- `infer_result["text"]`
- `infer_result["sentence_info"]`(包含 `spk/start/end/text`)
- 领域对象:`SpeakerSegment`、`TranscriptResult`

### 4.3 输出层

- 文本文件:`results/<job_id>.txt`
- API 输出:
- 创建任务返回 `job_id`
- 查询任务返回 `running/done/failed` + 结果或错误
- 页面展示:任务状态 JSON + 说话人日志文本

## 5. qa_system 的关键设计点

- **离线优先**:ASR 主链路不依赖在线服务,模型从本地缓存加载。
- **策略解耦**:说话人分离通过 `Diarizer` 统一接口封装,默认 CAM++,可切 pyannote。
- **异步体验**:HTTP 接口快速返回 `job_id`,长任务在线程池执行,前端轮询。
- **可解释产物**:输出按 speaker + 时间轴组织,便于人工质检或后处理。

## 6. 当前限制与工程风险

- `jobs` 使用进程内字典,重启丢失、无法多实例共享。
- 线程任务数量固定(`max_workers=2`),高并发下排队明显。
- `service.diarizer.strategy` 在共享单例上被请求动态修改,存在并发串扰风险。
- pyannote 分支目前抛 `NotImplementedError`,前端可选但不可用。
- 文件读写与任务结果没有生命周期清理策略(磁盘增长风险)。

## 7. 建议的演进方向

1. 任务状态迁移到持久化存储(SQLite/Redis + 队列)。
2. 每个任务独立实例化转写服务或把策略作为函数参数透传,避免共享状态。
3. 对 pyannote 选项做能力探测,不可用时在前端禁用并给出提示。
4. 增加任务取消、超时、清理与审计日志。
5. 增加端到端测试(上传 -> 完成 -> 结果格式断言)。
33 changes: 33 additions & 0 deletions qa_system/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# 金融销售电话质检系统(二次开发版)

该目录是基于原项目核心能力(FunASR 离线转写 + 说话人分离)构建的 Web 版质检原型,面向“电话下单录音质检”场景。

## 功能

1. 支持录音文件批量上传。
2. 支持说话人分离(默认 FunASR CAM++,并预留 pyannote 扩展位)。
3. 支持普通离线自动转写。
4. 输出说话人日志(speaker + 时间戳 + 文本),用于 sales/customer 角色分析。

## 启动

```bash
pip install fastapi uvicorn jinja2 python-multipart
# 以及原项目依赖
pip install -U funasr modelscope ffmpeg-python pydub torch psutil

uvicorn qa_system.app:app --host 0.0.0.0 --port 8000 --reload
```

浏览器访问:`http://127.0.0.1:8000`

## 架构

- `app.py`: Web 接口、任务调度、状态轮询。
- `services/transcription.py`: 离线转写主流程(FunASR + ffmpeg)。
- `services/diarization.py`: 说话人分离抽象层(CAM++ / pyannote)。
- `templates/ + static/`: 前端交互页面。

## pyannote 说明

当前代码中保留了 pyannote 接口抽象,但默认未启用完整 Pipeline 初始化。生产接入时请按你们私有环境增加 HuggingFace Token、模型与缓存策略。
Empty file added qa_system/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions qa_system/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from uuid import uuid4

from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.requests import Request

from qa_system.services.transcription import OfflineTranscriptionService

BASE_DIR = Path(__file__).resolve().parent
UPLOAD_DIR = BASE_DIR / "uploads"
RESULT_DIR = BASE_DIR / "results"
UPLOAD_DIR.mkdir(exist_ok=True)
RESULT_DIR.mkdir(exist_ok=True)

app = FastAPI(title="金融销售电话质检系统")
app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static")
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
executor = ThreadPoolExecutor(max_workers=2)
service = OfflineTranscriptionService()
jobs: dict[str, dict] = {}


@app.get("/", response_class=HTMLResponse)
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})


@app.post("/api/jobs")
async def create_job(
files: list[UploadFile] = File(...),
diarization: str = Form("campp"),
merge_threshold_chars: int = Form(12),
hotwords: str = Form(""),
):
job_id = uuid4().hex
job_dir = UPLOAD_DIR / job_id
job_dir.mkdir(exist_ok=True)

source_paths: list[str] = []
for file in files:
save_path = job_dir / file.filename
content = await file.read()
save_path.write_bytes(content)
source_paths.append(str(save_path))

jobs[job_id] = {"status": "running", "created_at": datetime.now().isoformat()}

def _run() -> None:
try:
service.diarizer.strategy = diarization
transcripts = service.transcribe_batch(
source_paths=source_paths,
merge_threshold_chars=merge_threshold_chars,
hotwords=hotwords,
)
result_file = RESULT_DIR / f"{job_id}.txt"
dialogue_logs = [service.render_dialogue_log(t) for t in transcripts]
result_file.write_text("\n\n".join(dialogue_logs), encoding="utf-8")
jobs[job_id] = {
"status": "done",
"result": result_file.read_text(encoding="utf-8"),
"files": [t.source_file for t in transcripts],
}
except Exception as exc:
jobs[job_id] = {"status": "failed", "error": str(exc)}

executor.submit(_run)
return JSONResponse({"job_id": job_id, "status": jobs[job_id]["status"]})


@app.get("/api/jobs/{job_id}")
def get_job(job_id: str):
if job_id not in jobs:
return JSONResponse({"error": "job not found"}, status_code=404)
return JSONResponse(jobs[job_id])
Empty file added qa_system/services/__init__.py
Empty file.
60 changes: 60 additions & 0 deletions qa_system/services/diarization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from dataclasses import dataclass
from importlib import import_module
from importlib.util import find_spec
from typing import List


@dataclass
class SpeakerSegment:
speaker: str
start_ms: int
end_ms: int
text: str


class Diarizer:
"""Diarization abstraction supporting FunASR CAM++ first, then pyannote."""

def __init__(self, strategy: str = "campp") -> None:
self.strategy = strategy

def diarize_with_funasr(self, sentence_info: list[dict]) -> List[SpeakerSegment]:
segments: List[SpeakerSegment] = []
for sentence in sentence_info:
segments.append(
SpeakerSegment(
speaker=f"speaker{sentence['spk']}",
start_ms=int(sentence["start"]),
end_ms=int(sentence["end"]),
text=sentence["text"],
)
)
return segments

def diarize_with_pyannote(self, audio_path: str) -> List[SpeakerSegment]:
if find_spec("pyannote.audio") is None:
raise RuntimeError("pyannote.audio 未安装,无法使用 pyannote 方案")

pipeline_module = import_module("pyannote.audio")
Pipeline = getattr(pipeline_module, "Pipeline")

raise NotImplementedError(
"pyannote 方案需要 HuggingFace Token 与模型配置,请在生产环境接入后启用。"
f"当前输入文件: {audio_path}"
)

def merge_adjacent(self, segments: List[SpeakerSegment], merge_threshold_chars: int = 12) -> List[SpeakerSegment]:
if not segments:
return []

merged = [segments[0]]
for current in segments[1:]:
previous = merged[-1]
if current.speaker == previous.speaker and len(previous.text) < merge_threshold_chars:
previous.text += current.text
previous.end_ms = current.end_ms
else:
merged.append(current)
return merged
101 changes: 101 additions & 0 deletions qa_system/services/transcription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import os
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Iterable, List

import ffmpeg
import psutil
import torch
from funasr import AutoModel

from qa_system.services.diarization import Diarizer, SpeakerSegment


@dataclass
class TranscriptResult:
source_file: str
full_text: str
segments: List[SpeakerSegment]


class OfflineTranscriptionService:
def __init__(self, model_root: Path | None = None, diarization_strategy: str = "campp") -> None:
home = Path.home()
base = model_root or home / ".cache" / "modelscope" / "hub" / "models" / "iic"
self.model = AutoModel(
model=str(base / "speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"),
model_revision="v2.0.4",
vad_model=str(base / "speech_fsmn_vad_zh-cn-16k-common-pytorch"),
vad_model_revision="v2.0.4",
punc_model=str(base / "punc_ct-transformer_zh-cn-common-vocab272727-pytorch"),
punc_model_revision="v2.0.4",
spk_model=str(base / "speech_campplus_sv_zh-cn_16k-common"),
spk_model_revision="v2.0.4",
ngpu=1 if torch.cuda.is_available() else 0,
ncpu=psutil.cpu_count(),
disable_pbar=True,
disable_log=True,
disable_update=True,
)
self.diarizer = Diarizer(strategy=diarization_strategy)

@staticmethod
def _to_time(ms: int) -> str:
d = timedelta(milliseconds=ms)
return f"{d.seconds // 3600:02d}:{(d.seconds // 60) % 60:02d}:{d.seconds % 60:02d}.{d.microseconds // 1000:03d}"

@staticmethod
def _iter_audio_files(paths: Iterable[str]) -> Iterable[str]:
support_ext = {".mp3", ".m4a", ".aac", ".ogg", ".wav", ".flac", ".wma", ".aif", ".mp4", ".avi", ".mov", ".mkv"}
for path in paths:
p = Path(path)
if p.is_file() and p.suffix.lower() in support_ext:
yield str(p)
if p.is_dir():
for child in p.rglob("*"):
if child.is_file() and child.suffix.lower() in support_ext:
yield str(child)

def transcribe_batch(self, source_paths: Iterable[str], merge_threshold_chars: int = 12, hotwords: str = "") -> List[TranscriptResult]:
results: List[TranscriptResult] = []
for audio in self._iter_audio_files(source_paths):
audio_bytes, _ = (
ffmpeg.input(audio, threads=0)
.output("-", format="wav", acodec="pcm_s16le", ac=1, ar=16000)
.run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
)

infer_result = self.model.generate(
input=audio_bytes,
batch_size_s=300,
is_final=True,
sentence_timestamp=True,
hotword=hotwords,
)[0]

sentence_info = infer_result.get("sentence_info", [])
if self.diarizer.strategy == "pyannote":
segments = self.diarizer.diarize_with_pyannote(audio)
else:
segments = self.diarizer.diarize_with_funasr(sentence_info)
segments = self.diarizer.merge_adjacent(segments, merge_threshold_chars)
results.append(
TranscriptResult(
source_file=audio,
full_text=infer_result.get("text", ""),
segments=segments,
)
)
return results

@classmethod
def render_dialogue_log(cls, transcript: TranscriptResult) -> str:
rows = [f"# 文件: {os.path.basename(transcript.source_file)}"]
for seg in transcript.segments:
start = cls._to_time(seg.start_ms)
end = cls._to_time(seg.end_ms)
rows.append(f"{seg.speaker} [{start} --> {end}]:{seg.text}")
return "\n".join(rows)
Loading