# Live Streaming Transcription Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add `--stream` mode to `transcribe.py` that captures microphone audio, segments speech using VAD, and transcribes each segment in near-real-time. **Architecture:** sounddevice InputStream callback pushes audio into a thread-safe buffer. A VAD state machine (energy-based RMS) detects speech segments. Completed segments are pushed onto a `queue.Queue` and consumed by a transcription thread that runs the Cohere ASR model and prints timestamped output. Ctrl+C triggers clean shutdown. **Tech Stack:** Python 3.14, sounddevice, numpy, transformers (CohereAsrForConditionalGeneration), threading, queue **Spec:** `docs/superpowers/specs/2026-05-29-live-streaming-transcription-design.md` --- ## File Structure All changes are in a single file: - **Modify:** `transcribe.py` — add `--stream` and `--lang` CLI flags, VAD logic, streaming capture loop, transcription consumer thread, clean shutdown handling. Grows from ~52 lines to ~170 lines. No new files. No test files (this is a hardware-dependent demo script — verification is manual with a real microphone). --- ### Task 1: Refactor CLI argument parsing **Files:** - Modify: `transcribe.py:1-52` Currently the script uses raw `sys.argv` checks. Replace with `argparse` to cleanly support `--stream`, `--mic`, `--lang`, and the default demo mode. - [ ] **Step 1: Replace sys.argv parsing with argparse** Replace the bottom half of `transcribe.py` (lines 30-52) with argparse-based dispatch. Move model loading after argument parsing so `--help` doesn't trigger a slow model load. ```python import sys import argparse import numpy as np import sounddevice as sd from transformers import AutoProcessor, CohereAsrForConditionalGeneration from transformers.audio_utils import load_audio from huggingface_hub import hf_hub_download MODEL_ID = "CohereLabs/cohere-transcribe-03-2026" SAMPLE_RATE = 16000 def load_model(): print("Loading model...") processor = AutoProcessor.from_pretrained(MODEL_ID) model = CohereAsrForConditionalGeneration.from_pretrained( MODEL_ID, device_map="auto" ) return processor, model def transcribe_audio(processor, model, audio, language="en"): inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language) inputs.to(model.device, dtype=model.dtype) outputs = model.generate(**inputs, max_new_tokens=256) return processor.decode(outputs, skip_special_tokens=True) def record_audio(duration): print(f"Recording for {duration} seconds...") audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32") sd.wait() return audio.flatten() def main(): parser = argparse.ArgumentParser(description="Cohere ASR Transcription") group = parser.add_mutually_exclusive_group() group.add_argument("--mic", type=int, nargs="?", const=5, metavar="SECONDS", help="Record from microphone for N seconds (default: 5)") group.add_argument("--stream", action="store_true", help="Live streaming transcription with VAD") parser.add_argument("--lang", default="en", help="Language code (default: en)") args = parser.parse_args() if args.stream: processor, model = load_model() stream_transcribe(processor, model, args.lang) elif args.mic is not None: processor, model = load_model() try: mic_audio = record_audio(args.mic) print("Transcribing...") text = transcribe_audio(processor, model, mic_audio, args.lang) print(f"\nTranscription:\n{text}\n") except OSError as e: print(f"Microphone error: {e}") print("Hint: Run with nix-shell for PortAudio support") else: processor, model = load_model() print("Loading demo audio...") audio_file = hf_hub_download(repo_id=MODEL_ID, filename="demo/voxpopuli_test_en_demo.wav") audio = load_audio(audio_file, sampling_rate=SAMPLE_RATE) print("Transcribing...") text = transcribe_audio(processor, model, audio, args.lang) print(f"\nTranscription:\n{text}\n") def stream_transcribe(processor, model, language): print("TODO: streaming mode") if __name__ == "__main__": main() ``` - [ ] **Step 2: Verify existing modes still work** Run the demo mode to confirm nothing is broken: ```bash uv run python transcribe.py ``` Expected: loads model, downloads demo audio, prints transcription. Run `--mic` mode: ```bash uv run python transcribe.py --mic 2 ``` Expected: records 2 seconds, transcribes, prints result. Run `--help`: ```bash uv run python transcribe.py --help ``` Expected: prints usage without loading the model. - [ ] **Step 3: Commit** ```bash git add transcribe.py git commit -m "refactor: switch to argparse, add --stream and --lang flags" ``` --- ### Task 2: Implement silence calibration and VAD state machine **Files:** - Modify: `transcribe.py` — add `calibrate_silence()` and `VADStateMachine` class - [ ] **Step 1: Add silence calibration function** Add this function above `stream_transcribe`: ```python def calibrate_silence(duration=0.5): print("Calibrating silence threshold...") audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32") sd.wait() rms = np.sqrt(np.mean(audio ** 2)) threshold = max(rms * 3, 0.01) print(f" Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}") return threshold ``` - [ ] **Step 2: Add the VAD state machine** Add this class above `stream_transcribe`. The VAD operates on 50ms frames (800 samples at 16kHz). It tracks state transitions between SILENCE and SPEAKING using consecutive frame counts and a configurable silence duration to end a segment. ```python FRAME_SIZE = 800 # 50ms at 16kHz PRE_ROLL_FRAMES = 6 # ~0.3s of audio before speech onset SILENCE_FRAMES = 16 # ~0.8s of silence to end a segment SPEECH_ONSET_FRAMES = 3 # ~150ms of speech to trigger MAX_SPEECH_SECONDS = 30 # force chunk boundary class VADStateMachine: def __init__(self, threshold): self.threshold = threshold self.speaking = False self.speech_frames = 0 self.silence_frames = 0 self.pre_roll = [] self.segment = [] self.segment_start_time = 0.0 def process_frame(self, frame, elapsed_time): """Process one 50ms frame. Returns a (start_time, audio_array) tuple when a complete speech segment is detected, otherwise None.""" rms = np.sqrt(np.mean(frame ** 2)) is_loud = rms > self.threshold if not self.speaking: self.pre_roll.append(frame) if len(self.pre_roll) > PRE_ROLL_FRAMES: self.pre_roll.pop(0) if is_loud: self.speech_frames += 1 if self.speech_frames >= SPEECH_ONSET_FRAMES: self.speaking = True self.silence_frames = 0 self.segment = list(self.pre_roll) self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE) self.pre_roll = [] else: self.speech_frames = 0 return None # Currently speaking self.segment.append(frame) if is_loud: self.silence_frames = 0 else: self.silence_frames += 1 segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE if self.silence_frames >= SILENCE_FRAMES or segment_duration >= MAX_SPEECH_SECONDS: result = (self.segment_start_time, np.concatenate(self.segment)) self.speaking = False self.speech_frames = 0 self.silence_frames = 0 self.segment = [] self.pre_roll = [] return result return None ``` - [ ] **Step 3: Verify VAD with a quick smoke test** Run a quick inline test to make sure the VAD detects speech: ```bash uv run python -c " import numpy as np from transcribe import VADStateMachine, FRAME_SIZE, SAMPLE_RATE vad = VADStateMachine(threshold=0.01) # Feed 10 silent frames for i in range(10): frame = np.zeros(FRAME_SIZE, dtype='float32') result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE) assert result is None # Feed 5 loud frames (triggers speech after 3) for i in range(10, 15): frame = np.ones(FRAME_SIZE, dtype='float32') * 0.05 result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE) assert result is None # speaking but not yet ended # Feed 20 silent frames (triggers end after 16) for i in range(15, 35): frame = np.zeros(FRAME_SIZE, dtype='float32') result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE) if result is not None: start_time, audio = result duration = len(audio) / SAMPLE_RATE print(f'Segment detected: start={start_time:.2f}s, duration={duration:.2f}s') break else: raise AssertionError('No segment detected') print('VAD smoke test passed') " ``` Expected: prints segment info and "VAD smoke test passed". - [ ] **Step 4: Commit** ```bash git add transcribe.py git commit -m "feat: add silence calibration and VAD state machine" ``` --- ### Task 3: Implement the streaming transcription loop **Files:** - Modify: `transcribe.py` — replace `stream_transcribe` stub with full implementation - [ ] **Step 1: Add imports at the top of the file** Add these imports to the top of `transcribe.py` (after `import argparse`): ```python import queue import threading import time ``` - [ ] **Step 2: Implement stream_transcribe** Replace the `stream_transcribe` stub with the full implementation. This function: 1. Calibrates silence threshold 2. Starts a transcription consumer thread 3. Opens a sounddevice InputStream that feeds frames to the VAD 4. When VAD emits a segment, pushes it onto the queue 5. Handles Ctrl+C for clean shutdown ```python def stream_transcribe(processor, model, language): threshold = calibrate_silence() vad = VADStateMachine(threshold) seg_queue = queue.Queue() stop_event = threading.Event() start_time = time.monotonic() def transcription_worker(): while not stop_event.is_set() or not seg_queue.empty(): try: seg_start, audio = seg_queue.get(timeout=0.5) except queue.Empty: continue minutes = int(seg_start) // 60 seconds = int(seg_start) % 60 text = transcribe_audio(processor, model, audio, language) if text.strip(): print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}") worker = threading.Thread(target=transcription_worker, daemon=True) worker.start() frame_buf = np.empty(0, dtype="float32") def audio_callback(indata, frames, time_info, status): nonlocal frame_buf if stop_event.is_set(): return frame_buf = np.append(frame_buf, indata[:, 0]) while len(frame_buf) >= FRAME_SIZE: frame = frame_buf[:FRAME_SIZE] frame_buf = frame_buf[FRAME_SIZE:] elapsed = time.monotonic() - start_time result = vad.process_frame(frame, elapsed) if result is not None: seg_queue.put(result) print("Listening... (Ctrl+C to stop)") stream = sd.InputStream( samplerate=SAMPLE_RATE, channels=1, dtype="float32", callback=audio_callback, blocksize=FRAME_SIZE, ) try: with stream: while True: time.sleep(0.1) except KeyboardInterrupt: pass stop_event.set() # Flush any remaining speech segment if vad.speaking and vad.segment: elapsed = time.monotonic() - start_time seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment))) worker.join(timeout=30) print("\nDone.") ``` - [ ] **Step 3: Verify streaming mode starts and captures speech** Run the streaming mode and speak a sentence into the microphone, then press Ctrl+C: ```bash uv run python transcribe.py --stream ``` Expected output: ``` Loading model... Calibrating silence threshold... Ambient RMS: 0.00XX, threshold: 0.00XX Listening... (Ctrl+C to stop) [00:03] ^C Done. ``` - [ ] **Step 4: Verify --lang flag works** ```bash uv run python transcribe.py --stream --lang en ``` Expected: same as above, English transcription. - [ ] **Step 5: Verify existing modes still work** ```bash uv run python transcribe.py --mic 3 ``` Expected: records 3 seconds, transcribes, prints result — same behavior as before. - [ ] **Step 6: Commit** ```bash git add transcribe.py git commit -m "feat: implement live streaming transcription with VAD" ``` --- ### Task 4: End-to-end verification No code changes in this task — just verification that everything works together. - [ ] **Step 1: Test continuous conversation** Run streaming mode and speak multiple sentences with natural pauses between them: ```bash uv run python transcribe.py --stream ``` Verify: - Each sentence appears as a separate timestamped line - Timestamps roughly correspond to when you started speaking - No words are cut off at segment boundaries - Pauses within a sentence (< 0.8s) don't split the segment - [ ] **Step 2: Test long speech (safety cap)** Speak continuously for 30+ seconds without pausing. Verify the safety cap forces a chunk boundary and transcription still works. - [ ] **Step 3: Test Ctrl+C with buffered speech** Start speaking and immediately press Ctrl+C. Verify the buffered speech is flushed and transcribed before exit. - [ ] **Step 4: Test quiet environment** Run in a quiet room without speaking. Verify no spurious segments are detected.