Add implementation plan for live streaming transcription
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,442 @@
|
||||
# Live Streaming Transcription Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Add `--stream` mode to `transcribe.py` that captures microphone audio, segments speech using VAD, and transcribes each segment in near-real-time.
|
||||
|
||||
**Architecture:** sounddevice InputStream callback pushes audio into a thread-safe buffer. A VAD state machine (energy-based RMS) detects speech segments. Completed segments are pushed onto a `queue.Queue` and consumed by a transcription thread that runs the Cohere ASR model and prints timestamped output. Ctrl+C triggers clean shutdown.
|
||||
|
||||
**Tech Stack:** Python 3.14, sounddevice, numpy, transformers (CohereAsrForConditionalGeneration), threading, queue
|
||||
|
||||
**Spec:** `docs/superpowers/specs/2026-05-29-live-streaming-transcription-design.md`
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
All changes are in a single file:
|
||||
|
||||
- **Modify:** `transcribe.py` — add `--stream` and `--lang` CLI flags, VAD logic, streaming capture loop, transcription consumer thread, clean shutdown handling. Grows from ~52 lines to ~170 lines.
|
||||
|
||||
No new files. No test files (this is a hardware-dependent demo script — verification is manual with a real microphone).
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Refactor CLI argument parsing
|
||||
|
||||
**Files:**
|
||||
- Modify: `transcribe.py:1-52`
|
||||
|
||||
Currently the script uses raw `sys.argv` checks. Replace with `argparse` to cleanly support `--stream`, `--mic`, `--lang`, and the default demo mode.
|
||||
|
||||
- [ ] **Step 1: Replace sys.argv parsing with argparse**
|
||||
|
||||
Replace the bottom half of `transcribe.py` (lines 30-52) with argparse-based dispatch. Move model loading after argument parsing so `--help` doesn't trigger a slow model load.
|
||||
|
||||
```python
|
||||
import sys
|
||||
import argparse
|
||||
import numpy as np
|
||||
import sounddevice as sd
|
||||
from transformers import AutoProcessor, CohereAsrForConditionalGeneration
|
||||
from transformers.audio_utils import load_audio
|
||||
from huggingface_hub import hf_hub_download
|
||||
|
||||
MODEL_ID = "CohereLabs/cohere-transcribe-03-2026"
|
||||
SAMPLE_RATE = 16000
|
||||
|
||||
|
||||
def load_model():
|
||||
print("Loading model...")
|
||||
processor = AutoProcessor.from_pretrained(MODEL_ID)
|
||||
model = CohereAsrForConditionalGeneration.from_pretrained(
|
||||
MODEL_ID, device_map="auto"
|
||||
)
|
||||
return processor, model
|
||||
|
||||
|
||||
def transcribe_audio(processor, model, audio, language="en"):
|
||||
inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language)
|
||||
inputs.to(model.device, dtype=model.dtype)
|
||||
outputs = model.generate(**inputs, max_new_tokens=256)
|
||||
return processor.decode(outputs, skip_special_tokens=True)
|
||||
|
||||
|
||||
def record_audio(duration):
|
||||
print(f"Recording for {duration} seconds...")
|
||||
audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
|
||||
sd.wait()
|
||||
return audio.flatten()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Cohere ASR Transcription")
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument("--mic", type=int, nargs="?", const=5, metavar="SECONDS",
|
||||
help="Record from microphone for N seconds (default: 5)")
|
||||
group.add_argument("--stream", action="store_true",
|
||||
help="Live streaming transcription with VAD")
|
||||
parser.add_argument("--lang", default="en", help="Language code (default: en)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.stream:
|
||||
processor, model = load_model()
|
||||
stream_transcribe(processor, model, args.lang)
|
||||
elif args.mic is not None:
|
||||
processor, model = load_model()
|
||||
try:
|
||||
mic_audio = record_audio(args.mic)
|
||||
print("Transcribing...")
|
||||
text = transcribe_audio(processor, model, mic_audio, args.lang)
|
||||
print(f"\nTranscription:\n{text}\n")
|
||||
except OSError as e:
|
||||
print(f"Microphone error: {e}")
|
||||
print("Hint: Run with nix-shell for PortAudio support")
|
||||
else:
|
||||
processor, model = load_model()
|
||||
print("Loading demo audio...")
|
||||
audio_file = hf_hub_download(repo_id=MODEL_ID, filename="demo/voxpopuli_test_en_demo.wav")
|
||||
audio = load_audio(audio_file, sampling_rate=SAMPLE_RATE)
|
||||
print("Transcribing...")
|
||||
text = transcribe_audio(processor, model, audio, args.lang)
|
||||
print(f"\nTranscription:\n{text}\n")
|
||||
|
||||
|
||||
def stream_transcribe(processor, model, language):
|
||||
print("TODO: streaming mode")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Verify existing modes still work**
|
||||
|
||||
Run the demo mode to confirm nothing is broken:
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py
|
||||
```
|
||||
|
||||
Expected: loads model, downloads demo audio, prints transcription.
|
||||
|
||||
Run `--mic` mode:
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py --mic 2
|
||||
```
|
||||
|
||||
Expected: records 2 seconds, transcribes, prints result.
|
||||
|
||||
Run `--help`:
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py --help
|
||||
```
|
||||
|
||||
Expected: prints usage without loading the model.
|
||||
|
||||
- [ ] **Step 3: Commit**
|
||||
|
||||
```bash
|
||||
git add transcribe.py
|
||||
git commit -m "refactor: switch to argparse, add --stream and --lang flags"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Implement silence calibration and VAD state machine
|
||||
|
||||
**Files:**
|
||||
- Modify: `transcribe.py` — add `calibrate_silence()` and `VADStateMachine` class
|
||||
|
||||
- [ ] **Step 1: Add silence calibration function**
|
||||
|
||||
Add this function above `stream_transcribe`:
|
||||
|
||||
```python
|
||||
def calibrate_silence(duration=0.5):
|
||||
print("Calibrating silence threshold...")
|
||||
audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
|
||||
sd.wait()
|
||||
rms = np.sqrt(np.mean(audio ** 2))
|
||||
threshold = max(rms * 3, 0.01)
|
||||
print(f" Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}")
|
||||
return threshold
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Add the VAD state machine**
|
||||
|
||||
Add this class above `stream_transcribe`. The VAD operates on 50ms frames (800 samples at 16kHz). It tracks state transitions between SILENCE and SPEAKING using consecutive frame counts and a configurable silence duration to end a segment.
|
||||
|
||||
```python
|
||||
FRAME_SIZE = 800 # 50ms at 16kHz
|
||||
PRE_ROLL_FRAMES = 6 # ~0.3s of audio before speech onset
|
||||
SILENCE_FRAMES = 16 # ~0.8s of silence to end a segment
|
||||
SPEECH_ONSET_FRAMES = 3 # ~150ms of speech to trigger
|
||||
MAX_SPEECH_SECONDS = 30 # force chunk boundary
|
||||
|
||||
|
||||
class VADStateMachine:
|
||||
def __init__(self, threshold):
|
||||
self.threshold = threshold
|
||||
self.speaking = False
|
||||
self.speech_frames = 0
|
||||
self.silence_frames = 0
|
||||
self.pre_roll = []
|
||||
self.segment = []
|
||||
self.segment_start_time = 0.0
|
||||
|
||||
def process_frame(self, frame, elapsed_time):
|
||||
"""Process one 50ms frame. Returns a (start_time, audio_array) tuple when a
|
||||
complete speech segment is detected, otherwise None."""
|
||||
rms = np.sqrt(np.mean(frame ** 2))
|
||||
is_loud = rms > self.threshold
|
||||
|
||||
if not self.speaking:
|
||||
self.pre_roll.append(frame)
|
||||
if len(self.pre_roll) > PRE_ROLL_FRAMES:
|
||||
self.pre_roll.pop(0)
|
||||
|
||||
if is_loud:
|
||||
self.speech_frames += 1
|
||||
if self.speech_frames >= SPEECH_ONSET_FRAMES:
|
||||
self.speaking = True
|
||||
self.silence_frames = 0
|
||||
self.segment = list(self.pre_roll)
|
||||
self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE)
|
||||
self.pre_roll = []
|
||||
else:
|
||||
self.speech_frames = 0
|
||||
return None
|
||||
|
||||
# Currently speaking
|
||||
self.segment.append(frame)
|
||||
|
||||
if is_loud:
|
||||
self.silence_frames = 0
|
||||
else:
|
||||
self.silence_frames += 1
|
||||
|
||||
segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE
|
||||
if self.silence_frames >= SILENCE_FRAMES or segment_duration >= MAX_SPEECH_SECONDS:
|
||||
result = (self.segment_start_time, np.concatenate(self.segment))
|
||||
self.speaking = False
|
||||
self.speech_frames = 0
|
||||
self.silence_frames = 0
|
||||
self.segment = []
|
||||
self.pre_roll = []
|
||||
return result
|
||||
|
||||
return None
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify VAD with a quick smoke test**
|
||||
|
||||
Run a quick inline test to make sure the VAD detects speech:
|
||||
|
||||
```bash
|
||||
uv run python -c "
|
||||
import numpy as np
|
||||
from transcribe import VADStateMachine, FRAME_SIZE, SAMPLE_RATE
|
||||
|
||||
vad = VADStateMachine(threshold=0.01)
|
||||
|
||||
# Feed 10 silent frames
|
||||
for i in range(10):
|
||||
frame = np.zeros(FRAME_SIZE, dtype='float32')
|
||||
result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
|
||||
assert result is None
|
||||
|
||||
# Feed 5 loud frames (triggers speech after 3)
|
||||
for i in range(10, 15):
|
||||
frame = np.ones(FRAME_SIZE, dtype='float32') * 0.05
|
||||
result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
|
||||
assert result is None # speaking but not yet ended
|
||||
|
||||
# Feed 20 silent frames (triggers end after 16)
|
||||
for i in range(15, 35):
|
||||
frame = np.zeros(FRAME_SIZE, dtype='float32')
|
||||
result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
|
||||
if result is not None:
|
||||
start_time, audio = result
|
||||
duration = len(audio) / SAMPLE_RATE
|
||||
print(f'Segment detected: start={start_time:.2f}s, duration={duration:.2f}s')
|
||||
break
|
||||
else:
|
||||
raise AssertionError('No segment detected')
|
||||
|
||||
print('VAD smoke test passed')
|
||||
"
|
||||
```
|
||||
|
||||
Expected: prints segment info and "VAD smoke test passed".
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add transcribe.py
|
||||
git commit -m "feat: add silence calibration and VAD state machine"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Implement the streaming transcription loop
|
||||
|
||||
**Files:**
|
||||
- Modify: `transcribe.py` — replace `stream_transcribe` stub with full implementation
|
||||
|
||||
- [ ] **Step 1: Add imports at the top of the file**
|
||||
|
||||
Add these imports to the top of `transcribe.py` (after `import argparse`):
|
||||
|
||||
```python
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Implement stream_transcribe**
|
||||
|
||||
Replace the `stream_transcribe` stub with the full implementation. This function:
|
||||
1. Calibrates silence threshold
|
||||
2. Starts a transcription consumer thread
|
||||
3. Opens a sounddevice InputStream that feeds frames to the VAD
|
||||
4. When VAD emits a segment, pushes it onto the queue
|
||||
5. Handles Ctrl+C for clean shutdown
|
||||
|
||||
```python
|
||||
def stream_transcribe(processor, model, language):
|
||||
threshold = calibrate_silence()
|
||||
vad = VADStateMachine(threshold)
|
||||
seg_queue = queue.Queue()
|
||||
stop_event = threading.Event()
|
||||
start_time = time.monotonic()
|
||||
|
||||
def transcription_worker():
|
||||
while not stop_event.is_set() or not seg_queue.empty():
|
||||
try:
|
||||
seg_start, audio = seg_queue.get(timeout=0.5)
|
||||
except queue.Empty:
|
||||
continue
|
||||
minutes = int(seg_start) // 60
|
||||
seconds = int(seg_start) % 60
|
||||
text = transcribe_audio(processor, model, audio, language)
|
||||
if text.strip():
|
||||
print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}")
|
||||
|
||||
worker = threading.Thread(target=transcription_worker, daemon=True)
|
||||
worker.start()
|
||||
|
||||
frame_buf = np.empty(0, dtype="float32")
|
||||
|
||||
def audio_callback(indata, frames, time_info, status):
|
||||
nonlocal frame_buf
|
||||
if stop_event.is_set():
|
||||
return
|
||||
frame_buf = np.append(frame_buf, indata[:, 0])
|
||||
while len(frame_buf) >= FRAME_SIZE:
|
||||
frame = frame_buf[:FRAME_SIZE]
|
||||
frame_buf = frame_buf[FRAME_SIZE:]
|
||||
elapsed = time.monotonic() - start_time
|
||||
result = vad.process_frame(frame, elapsed)
|
||||
if result is not None:
|
||||
seg_queue.put(result)
|
||||
|
||||
print("Listening... (Ctrl+C to stop)")
|
||||
stream = sd.InputStream(
|
||||
samplerate=SAMPLE_RATE, channels=1, dtype="float32",
|
||||
callback=audio_callback, blocksize=FRAME_SIZE,
|
||||
)
|
||||
|
||||
try:
|
||||
with stream:
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
stop_event.set()
|
||||
|
||||
# Flush any remaining speech segment
|
||||
if vad.speaking and vad.segment:
|
||||
elapsed = time.monotonic() - start_time
|
||||
seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))
|
||||
|
||||
worker.join(timeout=30)
|
||||
print("\nDone.")
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Verify streaming mode starts and captures speech**
|
||||
|
||||
Run the streaming mode and speak a sentence into the microphone, then press Ctrl+C:
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py --stream
|
||||
```
|
||||
|
||||
Expected output:
|
||||
```
|
||||
Loading model...
|
||||
Calibrating silence threshold...
|
||||
Ambient RMS: 0.00XX, threshold: 0.00XX
|
||||
Listening... (Ctrl+C to stop)
|
||||
[00:03] <your spoken words appear here>
|
||||
^C
|
||||
Done.
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Verify --lang flag works**
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py --stream --lang en
|
||||
```
|
||||
|
||||
Expected: same as above, English transcription.
|
||||
|
||||
- [ ] **Step 5: Verify existing modes still work**
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py --mic 3
|
||||
```
|
||||
|
||||
Expected: records 3 seconds, transcribes, prints result — same behavior as before.
|
||||
|
||||
- [ ] **Step 6: Commit**
|
||||
|
||||
```bash
|
||||
git add transcribe.py
|
||||
git commit -m "feat: implement live streaming transcription with VAD"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 4: End-to-end verification
|
||||
|
||||
No code changes in this task — just verification that everything works together.
|
||||
|
||||
- [ ] **Step 1: Test continuous conversation**
|
||||
|
||||
Run streaming mode and speak multiple sentences with natural pauses between them:
|
||||
|
||||
```bash
|
||||
uv run python transcribe.py --stream
|
||||
```
|
||||
|
||||
Verify:
|
||||
- Each sentence appears as a separate timestamped line
|
||||
- Timestamps roughly correspond to when you started speaking
|
||||
- No words are cut off at segment boundaries
|
||||
- Pauses within a sentence (< 0.8s) don't split the segment
|
||||
|
||||
- [ ] **Step 2: Test long speech (safety cap)**
|
||||
|
||||
Speak continuously for 30+ seconds without pausing. Verify the safety cap forces a chunk boundary and transcription still works.
|
||||
|
||||
- [ ] **Step 3: Test Ctrl+C with buffered speech**
|
||||
|
||||
Start speaking and immediately press Ctrl+C. Verify the buffered speech is flushed and transcribed before exit.
|
||||
|
||||
- [ ] **Step 4: Test quiet environment**
|
||||
|
||||
Run in a quiet room without speaking. Verify no spurious segments are detected.
|
||||
Reference in New Issue
Block a user