Files
cohere-transcribe/docs/superpowers/plans/2026-05-29-live-streaming-transcription.md
T
2026-05-29 02:42:00 +08:00

14 KiB

Live Streaming Transcription Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add --stream mode to transcribe.py that captures microphone audio, segments speech using VAD, and transcribes each segment in near-real-time.

Architecture: sounddevice InputStream callback pushes audio into a thread-safe buffer. A VAD state machine (energy-based RMS) detects speech segments. Completed segments are pushed onto a queue.Queue and consumed by a transcription thread that runs the Cohere ASR model and prints timestamped output. Ctrl+C triggers clean shutdown.

Tech Stack: Python 3.14, sounddevice, numpy, transformers (CohereAsrForConditionalGeneration), threading, queue

Spec: docs/superpowers/specs/2026-05-29-live-streaming-transcription-design.md


File Structure

All changes are in a single file:

  • Modify: transcribe.py — add --stream and --lang CLI flags, VAD logic, streaming capture loop, transcription consumer thread, clean shutdown handling. Grows from ~52 lines to ~170 lines.

No new files. No test files (this is a hardware-dependent demo script — verification is manual with a real microphone).


Task 1: Refactor CLI argument parsing

Files:

  • Modify: transcribe.py:1-52

Currently the script uses raw sys.argv checks. Replace with argparse to cleanly support --stream, --mic, --lang, and the default demo mode.

  • Step 1: Replace sys.argv parsing with argparse

Replace the bottom half of transcribe.py (lines 30-52) with argparse-based dispatch. Move model loading after argument parsing so --help doesn't trigger a slow model load.

import sys
import argparse
import numpy as np
import sounddevice as sd
from transformers import AutoProcessor, CohereAsrForConditionalGeneration
from transformers.audio_utils import load_audio
from huggingface_hub import hf_hub_download

MODEL_ID = "CohereLabs/cohere-transcribe-03-2026"
SAMPLE_RATE = 16000


def load_model():
    print("Loading model...")
    processor = AutoProcessor.from_pretrained(MODEL_ID)
    model = CohereAsrForConditionalGeneration.from_pretrained(
        MODEL_ID, device_map="auto"
    )
    return processor, model


def transcribe_audio(processor, model, audio, language="en"):
    inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language)
    inputs.to(model.device, dtype=model.dtype)
    outputs = model.generate(**inputs, max_new_tokens=256)
    return processor.decode(outputs, skip_special_tokens=True)


def record_audio(duration):
    print(f"Recording for {duration} seconds...")
    audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
    sd.wait()
    return audio.flatten()


def main():
    parser = argparse.ArgumentParser(description="Cohere ASR Transcription")
    group = parser.add_mutually_exclusive_group()
    group.add_argument("--mic", type=int, nargs="?", const=5, metavar="SECONDS",
                       help="Record from microphone for N seconds (default: 5)")
    group.add_argument("--stream", action="store_true",
                       help="Live streaming transcription with VAD")
    parser.add_argument("--lang", default="en", help="Language code (default: en)")
    args = parser.parse_args()

    if args.stream:
        processor, model = load_model()
        stream_transcribe(processor, model, args.lang)
    elif args.mic is not None:
        processor, model = load_model()
        try:
            mic_audio = record_audio(args.mic)
            print("Transcribing...")
            text = transcribe_audio(processor, model, mic_audio, args.lang)
            print(f"\nTranscription:\n{text}\n")
        except OSError as e:
            print(f"Microphone error: {e}")
            print("Hint: Run with nix-shell for PortAudio support")
    else:
        processor, model = load_model()
        print("Loading demo audio...")
        audio_file = hf_hub_download(repo_id=MODEL_ID, filename="demo/voxpopuli_test_en_demo.wav")
        audio = load_audio(audio_file, sampling_rate=SAMPLE_RATE)
        print("Transcribing...")
        text = transcribe_audio(processor, model, audio, args.lang)
        print(f"\nTranscription:\n{text}\n")


def stream_transcribe(processor, model, language):
    print("TODO: streaming mode")


if __name__ == "__main__":
    main()
  • Step 2: Verify existing modes still work

Run the demo mode to confirm nothing is broken:

uv run python transcribe.py

Expected: loads model, downloads demo audio, prints transcription.

Run --mic mode:

uv run python transcribe.py --mic 2

Expected: records 2 seconds, transcribes, prints result.

Run --help:

uv run python transcribe.py --help

Expected: prints usage without loading the model.

  • Step 3: Commit
git add transcribe.py
git commit -m "refactor: switch to argparse, add --stream and --lang flags"

Task 2: Implement silence calibration and VAD state machine

Files:

  • Modify: transcribe.py — add calibrate_silence() and VADStateMachine class

  • Step 1: Add silence calibration function

Add this function above stream_transcribe:

def calibrate_silence(duration=0.5):
    print("Calibrating silence threshold...")
    audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
    sd.wait()
    rms = np.sqrt(np.mean(audio ** 2))
    threshold = max(rms * 3, 0.01)
    print(f"  Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}")
    return threshold
  • Step 2: Add the VAD state machine

Add this class above stream_transcribe. The VAD operates on 50ms frames (800 samples at 16kHz). It tracks state transitions between SILENCE and SPEAKING using consecutive frame counts and a configurable silence duration to end a segment.

FRAME_SIZE = 800          # 50ms at 16kHz
PRE_ROLL_FRAMES = 6       # ~0.3s of audio before speech onset
SILENCE_FRAMES = 16       # ~0.8s of silence to end a segment
SPEECH_ONSET_FRAMES = 3   # ~150ms of speech to trigger
MAX_SPEECH_SECONDS = 30   # force chunk boundary


class VADStateMachine:
    def __init__(self, threshold):
        self.threshold = threshold
        self.speaking = False
        self.speech_frames = 0
        self.silence_frames = 0
        self.pre_roll = []
        self.segment = []
        self.segment_start_time = 0.0

    def process_frame(self, frame, elapsed_time):
        """Process one 50ms frame. Returns a (start_time, audio_array) tuple when a
        complete speech segment is detected, otherwise None."""
        rms = np.sqrt(np.mean(frame ** 2))
        is_loud = rms > self.threshold

        if not self.speaking:
            self.pre_roll.append(frame)
            if len(self.pre_roll) > PRE_ROLL_FRAMES:
                self.pre_roll.pop(0)

            if is_loud:
                self.speech_frames += 1
                if self.speech_frames >= SPEECH_ONSET_FRAMES:
                    self.speaking = True
                    self.silence_frames = 0
                    self.segment = list(self.pre_roll)
                    self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE)
                    self.pre_roll = []
            else:
                self.speech_frames = 0
            return None

        # Currently speaking
        self.segment.append(frame)

        if is_loud:
            self.silence_frames = 0
        else:
            self.silence_frames += 1

        segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE
        if self.silence_frames >= SILENCE_FRAMES or segment_duration >= MAX_SPEECH_SECONDS:
            result = (self.segment_start_time, np.concatenate(self.segment))
            self.speaking = False
            self.speech_frames = 0
            self.silence_frames = 0
            self.segment = []
            self.pre_roll = []
            return result

        return None
  • Step 3: Verify VAD with a quick smoke test

Run a quick inline test to make sure the VAD detects speech:

uv run python -c "
import numpy as np
from transcribe import VADStateMachine, FRAME_SIZE, SAMPLE_RATE

vad = VADStateMachine(threshold=0.01)

# Feed 10 silent frames
for i in range(10):
    frame = np.zeros(FRAME_SIZE, dtype='float32')
    result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
    assert result is None

# Feed 5 loud frames (triggers speech after 3)
for i in range(10, 15):
    frame = np.ones(FRAME_SIZE, dtype='float32') * 0.05
    result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
    assert result is None  # speaking but not yet ended

# Feed 20 silent frames (triggers end after 16)
for i in range(15, 35):
    frame = np.zeros(FRAME_SIZE, dtype='float32')
    result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
    if result is not None:
        start_time, audio = result
        duration = len(audio) / SAMPLE_RATE
        print(f'Segment detected: start={start_time:.2f}s, duration={duration:.2f}s')
        break
else:
    raise AssertionError('No segment detected')

print('VAD smoke test passed')
"

Expected: prints segment info and "VAD smoke test passed".

  • Step 4: Commit
git add transcribe.py
git commit -m "feat: add silence calibration and VAD state machine"

Task 3: Implement the streaming transcription loop

Files:

  • Modify: transcribe.py — replace stream_transcribe stub with full implementation

  • Step 1: Add imports at the top of the file

Add these imports to the top of transcribe.py (after import argparse):

import queue
import threading
import time
  • Step 2: Implement stream_transcribe

Replace the stream_transcribe stub with the full implementation. This function:

  1. Calibrates silence threshold
  2. Starts a transcription consumer thread
  3. Opens a sounddevice InputStream that feeds frames to the VAD
  4. When VAD emits a segment, pushes it onto the queue
  5. Handles Ctrl+C for clean shutdown
def stream_transcribe(processor, model, language):
    threshold = calibrate_silence()
    vad = VADStateMachine(threshold)
    seg_queue = queue.Queue()
    stop_event = threading.Event()
    start_time = time.monotonic()

    def transcription_worker():
        while not stop_event.is_set() or not seg_queue.empty():
            try:
                seg_start, audio = seg_queue.get(timeout=0.5)
            except queue.Empty:
                continue
            minutes = int(seg_start) // 60
            seconds = int(seg_start) % 60
            text = transcribe_audio(processor, model, audio, language)
            if text.strip():
                print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}")

    worker = threading.Thread(target=transcription_worker, daemon=True)
    worker.start()

    frame_buf = np.empty(0, dtype="float32")

    def audio_callback(indata, frames, time_info, status):
        nonlocal frame_buf
        if stop_event.is_set():
            return
        frame_buf = np.append(frame_buf, indata[:, 0])
        while len(frame_buf) >= FRAME_SIZE:
            frame = frame_buf[:FRAME_SIZE]
            frame_buf = frame_buf[FRAME_SIZE:]
            elapsed = time.monotonic() - start_time
            result = vad.process_frame(frame, elapsed)
            if result is not None:
                seg_queue.put(result)

    print("Listening... (Ctrl+C to stop)")
    stream = sd.InputStream(
        samplerate=SAMPLE_RATE, channels=1, dtype="float32",
        callback=audio_callback, blocksize=FRAME_SIZE,
    )

    try:
        with stream:
            while True:
                time.sleep(0.1)
    except KeyboardInterrupt:
        pass

    stop_event.set()

    # Flush any remaining speech segment
    if vad.speaking and vad.segment:
        elapsed = time.monotonic() - start_time
        seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))

    worker.join(timeout=30)
    print("\nDone.")
  • Step 3: Verify streaming mode starts and captures speech

Run the streaming mode and speak a sentence into the microphone, then press Ctrl+C:

uv run python transcribe.py --stream

Expected output:

Loading model...
Calibrating silence threshold...
  Ambient RMS: 0.00XX, threshold: 0.00XX
Listening... (Ctrl+C to stop)
[00:03] <your spoken words appear here>
^C
Done.
  • Step 4: Verify --lang flag works
uv run python transcribe.py --stream --lang en

Expected: same as above, English transcription.

  • Step 5: Verify existing modes still work
uv run python transcribe.py --mic 3

Expected: records 3 seconds, transcribes, prints result — same behavior as before.

  • Step 6: Commit
git add transcribe.py
git commit -m "feat: implement live streaming transcription with VAD"

Task 4: End-to-end verification

No code changes in this task — just verification that everything works together.

  • Step 1: Test continuous conversation

Run streaming mode and speak multiple sentences with natural pauses between them:

uv run python transcribe.py --stream

Verify:

  • Each sentence appears as a separate timestamped line

  • Timestamps roughly correspond to when you started speaking

  • No words are cut off at segment boundaries

  • Pauses within a sentence (< 0.8s) don't split the segment

  • Step 2: Test long speech (safety cap)

Speak continuously for 30+ seconds without pausing. Verify the safety cap forces a chunk boundary and transcription still works.

  • Step 3: Test Ctrl+C with buffered speech

Start speaking and immediately press Ctrl+C. Verify the buffered speech is flushed and transcribed before exit.

  • Step 4: Test quiet environment

Run in a quiet room without speaking. Verify no spurious segments are detected.