diff --git a/docs/superpowers/plans/2026-05-29-live-streaming-transcription.md b/docs/superpowers/plans/2026-05-29-live-streaming-transcription.md new file mode 100644 index 0000000..bc5f032 --- /dev/null +++ b/docs/superpowers/plans/2026-05-29-live-streaming-transcription.md @@ -0,0 +1,442 @@ +# Live Streaming Transcription Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `--stream` mode to `transcribe.py` that captures microphone audio, segments speech using VAD, and transcribes each segment in near-real-time. + +**Architecture:** sounddevice InputStream callback pushes audio into a thread-safe buffer. A VAD state machine (energy-based RMS) detects speech segments. Completed segments are pushed onto a `queue.Queue` and consumed by a transcription thread that runs the Cohere ASR model and prints timestamped output. Ctrl+C triggers clean shutdown. + +**Tech Stack:** Python 3.14, sounddevice, numpy, transformers (CohereAsrForConditionalGeneration), threading, queue + +**Spec:** `docs/superpowers/specs/2026-05-29-live-streaming-transcription-design.md` + +--- + +## File Structure + +All changes are in a single file: + +- **Modify:** `transcribe.py` — add `--stream` and `--lang` CLI flags, VAD logic, streaming capture loop, transcription consumer thread, clean shutdown handling. Grows from ~52 lines to ~170 lines. + +No new files. No test files (this is a hardware-dependent demo script — verification is manual with a real microphone). + +--- + +### Task 1: Refactor CLI argument parsing + +**Files:** +- Modify: `transcribe.py:1-52` + +Currently the script uses raw `sys.argv` checks. Replace with `argparse` to cleanly support `--stream`, `--mic`, `--lang`, and the default demo mode. + +- [ ] **Step 1: Replace sys.argv parsing with argparse** + +Replace the bottom half of `transcribe.py` (lines 30-52) with argparse-based dispatch. Move model loading after argument parsing so `--help` doesn't trigger a slow model load. + +```python +import sys +import argparse +import numpy as np +import sounddevice as sd +from transformers import AutoProcessor, CohereAsrForConditionalGeneration +from transformers.audio_utils import load_audio +from huggingface_hub import hf_hub_download + +MODEL_ID = "CohereLabs/cohere-transcribe-03-2026" +SAMPLE_RATE = 16000 + + +def load_model(): + print("Loading model...") + processor = AutoProcessor.from_pretrained(MODEL_ID) + model = CohereAsrForConditionalGeneration.from_pretrained( + MODEL_ID, device_map="auto" + ) + return processor, model + + +def transcribe_audio(processor, model, audio, language="en"): + inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language) + inputs.to(model.device, dtype=model.dtype) + outputs = model.generate(**inputs, max_new_tokens=256) + return processor.decode(outputs, skip_special_tokens=True) + + +def record_audio(duration): + print(f"Recording for {duration} seconds...") + audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32") + sd.wait() + return audio.flatten() + + +def main(): + parser = argparse.ArgumentParser(description="Cohere ASR Transcription") + group = parser.add_mutually_exclusive_group() + group.add_argument("--mic", type=int, nargs="?", const=5, metavar="SECONDS", + help="Record from microphone for N seconds (default: 5)") + group.add_argument("--stream", action="store_true", + help="Live streaming transcription with VAD") + parser.add_argument("--lang", default="en", help="Language code (default: en)") + args = parser.parse_args() + + if args.stream: + processor, model = load_model() + stream_transcribe(processor, model, args.lang) + elif args.mic is not None: + processor, model = load_model() + try: + mic_audio = record_audio(args.mic) + print("Transcribing...") + text = transcribe_audio(processor, model, mic_audio, args.lang) + print(f"\nTranscription:\n{text}\n") + except OSError as e: + print(f"Microphone error: {e}") + print("Hint: Run with nix-shell for PortAudio support") + else: + processor, model = load_model() + print("Loading demo audio...") + audio_file = hf_hub_download(repo_id=MODEL_ID, filename="demo/voxpopuli_test_en_demo.wav") + audio = load_audio(audio_file, sampling_rate=SAMPLE_RATE) + print("Transcribing...") + text = transcribe_audio(processor, model, audio, args.lang) + print(f"\nTranscription:\n{text}\n") + + +def stream_transcribe(processor, model, language): + print("TODO: streaming mode") + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 2: Verify existing modes still work** + +Run the demo mode to confirm nothing is broken: + +```bash +uv run python transcribe.py +``` + +Expected: loads model, downloads demo audio, prints transcription. + +Run `--mic` mode: + +```bash +uv run python transcribe.py --mic 2 +``` + +Expected: records 2 seconds, transcribes, prints result. + +Run `--help`: + +```bash +uv run python transcribe.py --help +``` + +Expected: prints usage without loading the model. + +- [ ] **Step 3: Commit** + +```bash +git add transcribe.py +git commit -m "refactor: switch to argparse, add --stream and --lang flags" +``` + +--- + +### Task 2: Implement silence calibration and VAD state machine + +**Files:** +- Modify: `transcribe.py` — add `calibrate_silence()` and `VADStateMachine` class + +- [ ] **Step 1: Add silence calibration function** + +Add this function above `stream_transcribe`: + +```python +def calibrate_silence(duration=0.5): + print("Calibrating silence threshold...") + audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32") + sd.wait() + rms = np.sqrt(np.mean(audio ** 2)) + threshold = max(rms * 3, 0.01) + print(f" Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}") + return threshold +``` + +- [ ] **Step 2: Add the VAD state machine** + +Add this class above `stream_transcribe`. The VAD operates on 50ms frames (800 samples at 16kHz). It tracks state transitions between SILENCE and SPEAKING using consecutive frame counts and a configurable silence duration to end a segment. + +```python +FRAME_SIZE = 800 # 50ms at 16kHz +PRE_ROLL_FRAMES = 6 # ~0.3s of audio before speech onset +SILENCE_FRAMES = 16 # ~0.8s of silence to end a segment +SPEECH_ONSET_FRAMES = 3 # ~150ms of speech to trigger +MAX_SPEECH_SECONDS = 30 # force chunk boundary + + +class VADStateMachine: + def __init__(self, threshold): + self.threshold = threshold + self.speaking = False + self.speech_frames = 0 + self.silence_frames = 0 + self.pre_roll = [] + self.segment = [] + self.segment_start_time = 0.0 + + def process_frame(self, frame, elapsed_time): + """Process one 50ms frame. Returns a (start_time, audio_array) tuple when a + complete speech segment is detected, otherwise None.""" + rms = np.sqrt(np.mean(frame ** 2)) + is_loud = rms > self.threshold + + if not self.speaking: + self.pre_roll.append(frame) + if len(self.pre_roll) > PRE_ROLL_FRAMES: + self.pre_roll.pop(0) + + if is_loud: + self.speech_frames += 1 + if self.speech_frames >= SPEECH_ONSET_FRAMES: + self.speaking = True + self.silence_frames = 0 + self.segment = list(self.pre_roll) + self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE) + self.pre_roll = [] + else: + self.speech_frames = 0 + return None + + # Currently speaking + self.segment.append(frame) + + if is_loud: + self.silence_frames = 0 + else: + self.silence_frames += 1 + + segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE + if self.silence_frames >= SILENCE_FRAMES or segment_duration >= MAX_SPEECH_SECONDS: + result = (self.segment_start_time, np.concatenate(self.segment)) + self.speaking = False + self.speech_frames = 0 + self.silence_frames = 0 + self.segment = [] + self.pre_roll = [] + return result + + return None +``` + +- [ ] **Step 3: Verify VAD with a quick smoke test** + +Run a quick inline test to make sure the VAD detects speech: + +```bash +uv run python -c " +import numpy as np +from transcribe import VADStateMachine, FRAME_SIZE, SAMPLE_RATE + +vad = VADStateMachine(threshold=0.01) + +# Feed 10 silent frames +for i in range(10): + frame = np.zeros(FRAME_SIZE, dtype='float32') + result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE) + assert result is None + +# Feed 5 loud frames (triggers speech after 3) +for i in range(10, 15): + frame = np.ones(FRAME_SIZE, dtype='float32') * 0.05 + result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE) + assert result is None # speaking but not yet ended + +# Feed 20 silent frames (triggers end after 16) +for i in range(15, 35): + frame = np.zeros(FRAME_SIZE, dtype='float32') + result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE) + if result is not None: + start_time, audio = result + duration = len(audio) / SAMPLE_RATE + print(f'Segment detected: start={start_time:.2f}s, duration={duration:.2f}s') + break +else: + raise AssertionError('No segment detected') + +print('VAD smoke test passed') +" +``` + +Expected: prints segment info and "VAD smoke test passed". + +- [ ] **Step 4: Commit** + +```bash +git add transcribe.py +git commit -m "feat: add silence calibration and VAD state machine" +``` + +--- + +### Task 3: Implement the streaming transcription loop + +**Files:** +- Modify: `transcribe.py` — replace `stream_transcribe` stub with full implementation + +- [ ] **Step 1: Add imports at the top of the file** + +Add these imports to the top of `transcribe.py` (after `import argparse`): + +```python +import queue +import threading +import time +``` + +- [ ] **Step 2: Implement stream_transcribe** + +Replace the `stream_transcribe` stub with the full implementation. This function: +1. Calibrates silence threshold +2. Starts a transcription consumer thread +3. Opens a sounddevice InputStream that feeds frames to the VAD +4. When VAD emits a segment, pushes it onto the queue +5. Handles Ctrl+C for clean shutdown + +```python +def stream_transcribe(processor, model, language): + threshold = calibrate_silence() + vad = VADStateMachine(threshold) + seg_queue = queue.Queue() + stop_event = threading.Event() + start_time = time.monotonic() + + def transcription_worker(): + while not stop_event.is_set() or not seg_queue.empty(): + try: + seg_start, audio = seg_queue.get(timeout=0.5) + except queue.Empty: + continue + minutes = int(seg_start) // 60 + seconds = int(seg_start) % 60 + text = transcribe_audio(processor, model, audio, language) + if text.strip(): + print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}") + + worker = threading.Thread(target=transcription_worker, daemon=True) + worker.start() + + frame_buf = np.empty(0, dtype="float32") + + def audio_callback(indata, frames, time_info, status): + nonlocal frame_buf + if stop_event.is_set(): + return + frame_buf = np.append(frame_buf, indata[:, 0]) + while len(frame_buf) >= FRAME_SIZE: + frame = frame_buf[:FRAME_SIZE] + frame_buf = frame_buf[FRAME_SIZE:] + elapsed = time.monotonic() - start_time + result = vad.process_frame(frame, elapsed) + if result is not None: + seg_queue.put(result) + + print("Listening... (Ctrl+C to stop)") + stream = sd.InputStream( + samplerate=SAMPLE_RATE, channels=1, dtype="float32", + callback=audio_callback, blocksize=FRAME_SIZE, + ) + + try: + with stream: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + pass + + stop_event.set() + + # Flush any remaining speech segment + if vad.speaking and vad.segment: + elapsed = time.monotonic() - start_time + seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment))) + + worker.join(timeout=30) + print("\nDone.") +``` + +- [ ] **Step 3: Verify streaming mode starts and captures speech** + +Run the streaming mode and speak a sentence into the microphone, then press Ctrl+C: + +```bash +uv run python transcribe.py --stream +``` + +Expected output: +``` +Loading model... +Calibrating silence threshold... + Ambient RMS: 0.00XX, threshold: 0.00XX +Listening... (Ctrl+C to stop) +[00:03] +^C +Done. +``` + +- [ ] **Step 4: Verify --lang flag works** + +```bash +uv run python transcribe.py --stream --lang en +``` + +Expected: same as above, English transcription. + +- [ ] **Step 5: Verify existing modes still work** + +```bash +uv run python transcribe.py --mic 3 +``` + +Expected: records 3 seconds, transcribes, prints result — same behavior as before. + +- [ ] **Step 6: Commit** + +```bash +git add transcribe.py +git commit -m "feat: implement live streaming transcription with VAD" +``` + +--- + +### Task 4: End-to-end verification + +No code changes in this task — just verification that everything works together. + +- [ ] **Step 1: Test continuous conversation** + +Run streaming mode and speak multiple sentences with natural pauses between them: + +```bash +uv run python transcribe.py --stream +``` + +Verify: +- Each sentence appears as a separate timestamped line +- Timestamps roughly correspond to when you started speaking +- No words are cut off at segment boundaries +- Pauses within a sentence (< 0.8s) don't split the segment + +- [ ] **Step 2: Test long speech (safety cap)** + +Speak continuously for 30+ seconds without pausing. Verify the safety cap forces a chunk boundary and transcription still works. + +- [ ] **Step 3: Test Ctrl+C with buffered speech** + +Start speaking and immediately press Ctrl+C. Verify the buffered speech is flushed and transcribed before exit. + +- [ ] **Step 4: Test quiet environment** + +Run in a quiet room without speaking. Verify no spurious segments are detected.