Compare commits

..

1 Commits

Author SHA1 Message Date
tomatocream 96a47a60dc Initial commit 2026-05-31 01:03:23 +08:00
22 changed files with 21 additions and 2576 deletions
-1
View File
@@ -1 +0,0 @@
use flake
-14
View File
@@ -1,14 +0,0 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# Nix
.direnv/
result
-1
View File
@@ -1 +0,0 @@
3.14
+18
View File
@@ -0,0 +1,18 @@
MIT License
Copyright (c) 2026 tomatocream
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the "Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO
EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.
+3
View File
@@ -0,0 +1,3 @@
# cohere-transcribe
Live speech-to-text using Cohere ASR model
@@ -1,442 +0,0 @@
# Live Streaming Transcription Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add `--stream` mode to `transcribe.py` that captures microphone audio, segments speech using VAD, and transcribes each segment in near-real-time.
**Architecture:** sounddevice InputStream callback pushes audio into a thread-safe buffer. A VAD state machine (energy-based RMS) detects speech segments. Completed segments are pushed onto a `queue.Queue` and consumed by a transcription thread that runs the Cohere ASR model and prints timestamped output. Ctrl+C triggers clean shutdown.
**Tech Stack:** Python 3.14, sounddevice, numpy, transformers (CohereAsrForConditionalGeneration), threading, queue
**Spec:** `docs/superpowers/specs/2026-05-29-live-streaming-transcription-design.md`
---
## File Structure
All changes are in a single file:
- **Modify:** `transcribe.py` — add `--stream` and `--lang` CLI flags, VAD logic, streaming capture loop, transcription consumer thread, clean shutdown handling. Grows from ~52 lines to ~170 lines.
No new files. No test files (this is a hardware-dependent demo script — verification is manual with a real microphone).
---
### Task 1: Refactor CLI argument parsing
**Files:**
- Modify: `transcribe.py:1-52`
Currently the script uses raw `sys.argv` checks. Replace with `argparse` to cleanly support `--stream`, `--mic`, `--lang`, and the default demo mode.
- [ ] **Step 1: Replace sys.argv parsing with argparse**
Replace the bottom half of `transcribe.py` (lines 30-52) with argparse-based dispatch. Move model loading after argument parsing so `--help` doesn't trigger a slow model load.
```python
import sys
import argparse
import numpy as np
import sounddevice as sd
from transformers import AutoProcessor, CohereAsrForConditionalGeneration
from transformers.audio_utils import load_audio
from huggingface_hub import hf_hub_download
MODEL_ID = "CohereLabs/cohere-transcribe-03-2026"
SAMPLE_RATE = 16000
def load_model():
print("Loading model...")
processor = AutoProcessor.from_pretrained(MODEL_ID)
model = CohereAsrForConditionalGeneration.from_pretrained(
MODEL_ID, device_map="auto"
)
return processor, model
def transcribe_audio(processor, model, audio, language="en"):
inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language)
inputs.to(model.device, dtype=model.dtype)
outputs = model.generate(**inputs, max_new_tokens=256)
return processor.decode(outputs, skip_special_tokens=True)
def record_audio(duration):
print(f"Recording for {duration} seconds...")
audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
return audio.flatten()
def main():
parser = argparse.ArgumentParser(description="Cohere ASR Transcription")
group = parser.add_mutually_exclusive_group()
group.add_argument("--mic", type=int, nargs="?", const=5, metavar="SECONDS",
help="Record from microphone for N seconds (default: 5)")
group.add_argument("--stream", action="store_true",
help="Live streaming transcription with VAD")
parser.add_argument("--lang", default="en", help="Language code (default: en)")
args = parser.parse_args()
if args.stream:
processor, model = load_model()
stream_transcribe(processor, model, args.lang)
elif args.mic is not None:
processor, model = load_model()
try:
mic_audio = record_audio(args.mic)
print("Transcribing...")
text = transcribe_audio(processor, model, mic_audio, args.lang)
print(f"\nTranscription:\n{text}\n")
except OSError as e:
print(f"Microphone error: {e}")
print("Hint: Run with nix-shell for PortAudio support")
else:
processor, model = load_model()
print("Loading demo audio...")
audio_file = hf_hub_download(repo_id=MODEL_ID, filename="demo/voxpopuli_test_en_demo.wav")
audio = load_audio(audio_file, sampling_rate=SAMPLE_RATE)
print("Transcribing...")
text = transcribe_audio(processor, model, audio, args.lang)
print(f"\nTranscription:\n{text}\n")
def stream_transcribe(processor, model, language):
print("TODO: streaming mode")
if __name__ == "__main__":
main()
```
- [ ] **Step 2: Verify existing modes still work**
Run the demo mode to confirm nothing is broken:
```bash
uv run python transcribe.py
```
Expected: loads model, downloads demo audio, prints transcription.
Run `--mic` mode:
```bash
uv run python transcribe.py --mic 2
```
Expected: records 2 seconds, transcribes, prints result.
Run `--help`:
```bash
uv run python transcribe.py --help
```
Expected: prints usage without loading the model.
- [ ] **Step 3: Commit**
```bash
git add transcribe.py
git commit -m "refactor: switch to argparse, add --stream and --lang flags"
```
---
### Task 2: Implement silence calibration and VAD state machine
**Files:**
- Modify: `transcribe.py` — add `calibrate_silence()` and `VADStateMachine` class
- [ ] **Step 1: Add silence calibration function**
Add this function above `stream_transcribe`:
```python
def calibrate_silence(duration=0.5):
print("Calibrating silence threshold...")
audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
rms = np.sqrt(np.mean(audio ** 2))
threshold = max(rms * 3, 0.01)
print(f" Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}")
return threshold
```
- [ ] **Step 2: Add the VAD state machine**
Add this class above `stream_transcribe`. The VAD operates on 50ms frames (800 samples at 16kHz). It tracks state transitions between SILENCE and SPEAKING using consecutive frame counts and a configurable silence duration to end a segment.
```python
FRAME_SIZE = 800 # 50ms at 16kHz
PRE_ROLL_FRAMES = 6 # ~0.3s of audio before speech onset
SILENCE_FRAMES = 16 # ~0.8s of silence to end a segment
SPEECH_ONSET_FRAMES = 3 # ~150ms of speech to trigger
MAX_SPEECH_SECONDS = 30 # force chunk boundary
class VADStateMachine:
def __init__(self, threshold):
self.threshold = threshold
self.speaking = False
self.speech_frames = 0
self.silence_frames = 0
self.pre_roll = []
self.segment = []
self.segment_start_time = 0.0
def process_frame(self, frame, elapsed_time):
"""Process one 50ms frame. Returns a (start_time, audio_array) tuple when a
complete speech segment is detected, otherwise None."""
rms = np.sqrt(np.mean(frame ** 2))
is_loud = rms > self.threshold
if not self.speaking:
self.pre_roll.append(frame)
if len(self.pre_roll) > PRE_ROLL_FRAMES:
self.pre_roll.pop(0)
if is_loud:
self.speech_frames += 1
if self.speech_frames >= SPEECH_ONSET_FRAMES:
self.speaking = True
self.silence_frames = 0
self.segment = list(self.pre_roll)
self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE)
self.pre_roll = []
else:
self.speech_frames = 0
return None
# Currently speaking
self.segment.append(frame)
if is_loud:
self.silence_frames = 0
else:
self.silence_frames += 1
segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE
if self.silence_frames >= SILENCE_FRAMES or segment_duration >= MAX_SPEECH_SECONDS:
result = (self.segment_start_time, np.concatenate(self.segment))
self.speaking = False
self.speech_frames = 0
self.silence_frames = 0
self.segment = []
self.pre_roll = []
return result
return None
```
- [ ] **Step 3: Verify VAD with a quick smoke test**
Run a quick inline test to make sure the VAD detects speech:
```bash
uv run python -c "
import numpy as np
from transcribe import VADStateMachine, FRAME_SIZE, SAMPLE_RATE
vad = VADStateMachine(threshold=0.01)
# Feed 10 silent frames
for i in range(10):
frame = np.zeros(FRAME_SIZE, dtype='float32')
result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
assert result is None
# Feed 5 loud frames (triggers speech after 3)
for i in range(10, 15):
frame = np.ones(FRAME_SIZE, dtype='float32') * 0.05
result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
assert result is None # speaking but not yet ended
# Feed 20 silent frames (triggers end after 16)
for i in range(15, 35):
frame = np.zeros(FRAME_SIZE, dtype='float32')
result = vad.process_frame(frame, i * FRAME_SIZE / SAMPLE_RATE)
if result is not None:
start_time, audio = result
duration = len(audio) / SAMPLE_RATE
print(f'Segment detected: start={start_time:.2f}s, duration={duration:.2f}s')
break
else:
raise AssertionError('No segment detected')
print('VAD smoke test passed')
"
```
Expected: prints segment info and "VAD smoke test passed".
- [ ] **Step 4: Commit**
```bash
git add transcribe.py
git commit -m "feat: add silence calibration and VAD state machine"
```
---
### Task 3: Implement the streaming transcription loop
**Files:**
- Modify: `transcribe.py` — replace `stream_transcribe` stub with full implementation
- [ ] **Step 1: Add imports at the top of the file**
Add these imports to the top of `transcribe.py` (after `import argparse`):
```python
import queue
import threading
import time
```
- [ ] **Step 2: Implement stream_transcribe**
Replace the `stream_transcribe` stub with the full implementation. This function:
1. Calibrates silence threshold
2. Starts a transcription consumer thread
3. Opens a sounddevice InputStream that feeds frames to the VAD
4. When VAD emits a segment, pushes it onto the queue
5. Handles Ctrl+C for clean shutdown
```python
def stream_transcribe(processor, model, language):
threshold = calibrate_silence()
vad = VADStateMachine(threshold)
seg_queue = queue.Queue()
stop_event = threading.Event()
start_time = time.monotonic()
def transcription_worker():
while not stop_event.is_set() or not seg_queue.empty():
try:
seg_start, audio = seg_queue.get(timeout=0.5)
except queue.Empty:
continue
minutes = int(seg_start) // 60
seconds = int(seg_start) % 60
text = transcribe_audio(processor, model, audio, language)
if text.strip():
print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}")
worker = threading.Thread(target=transcription_worker, daemon=True)
worker.start()
frame_buf = np.empty(0, dtype="float32")
def audio_callback(indata, frames, time_info, status):
nonlocal frame_buf
if stop_event.is_set():
return
frame_buf = np.append(frame_buf, indata[:, 0])
while len(frame_buf) >= FRAME_SIZE:
frame = frame_buf[:FRAME_SIZE]
frame_buf = frame_buf[FRAME_SIZE:]
elapsed = time.monotonic() - start_time
result = vad.process_frame(frame, elapsed)
if result is not None:
seg_queue.put(result)
print("Listening... (Ctrl+C to stop)")
stream = sd.InputStream(
samplerate=SAMPLE_RATE, channels=1, dtype="float32",
callback=audio_callback, blocksize=FRAME_SIZE,
)
try:
with stream:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
stop_event.set()
# Flush any remaining speech segment
if vad.speaking and vad.segment:
elapsed = time.monotonic() - start_time
seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))
worker.join(timeout=30)
print("\nDone.")
```
- [ ] **Step 3: Verify streaming mode starts and captures speech**
Run the streaming mode and speak a sentence into the microphone, then press Ctrl+C:
```bash
uv run python transcribe.py --stream
```
Expected output:
```
Loading model...
Calibrating silence threshold...
Ambient RMS: 0.00XX, threshold: 0.00XX
Listening... (Ctrl+C to stop)
[00:03] <your spoken words appear here>
^C
Done.
```
- [ ] **Step 4: Verify --lang flag works**
```bash
uv run python transcribe.py --stream --lang en
```
Expected: same as above, English transcription.
- [ ] **Step 5: Verify existing modes still work**
```bash
uv run python transcribe.py --mic 3
```
Expected: records 3 seconds, transcribes, prints result — same behavior as before.
- [ ] **Step 6: Commit**
```bash
git add transcribe.py
git commit -m "feat: implement live streaming transcription with VAD"
```
---
### Task 4: End-to-end verification
No code changes in this task — just verification that everything works together.
- [ ] **Step 1: Test continuous conversation**
Run streaming mode and speak multiple sentences with natural pauses between them:
```bash
uv run python transcribe.py --stream
```
Verify:
- Each sentence appears as a separate timestamped line
- Timestamps roughly correspond to when you started speaking
- No words are cut off at segment boundaries
- Pauses within a sentence (< 0.8s) don't split the segment
- [ ] **Step 2: Test long speech (safety cap)**
Speak continuously for 30+ seconds without pausing. Verify the safety cap forces a chunk boundary and transcription still works.
- [ ] **Step 3: Test Ctrl+C with buffered speech**
Start speaking and immediately press Ctrl+C. Verify the buffered speech is flushed and transcribed before exit.
- [ ] **Step 4: Test quiet environment**
Run in a quiet room without speaking. Verify no spurious segments are detected.
@@ -1,81 +0,0 @@
# Live Streaming Microphone Transcription
## Summary
Add a `--stream` mode to `transcribe.py` that continuously captures audio from the microphone, detects speech segments using energy-based VAD, and transcribes each segment in near-real-time using the Cohere ASR model. Output scrolls as timestamped lines in the terminal. Ctrl+C stops the session.
## Context
- **Model**: CohereLabs/cohere-transcribe-03-2026, max 35s audio clips, 5s overlap for auto-chunking
- **Inference speed**: ~0.4s for 5-10s audio on GPU (0.04-0.08x real-time)
- **Microphone**: PD200X Podcast Microphone via PipeWire, 16kHz mono
- **Existing code**: `transcribe.py` has `--mic` (fixed duration) and demo file modes
## Architecture
### Audio Capture
`sounddevice.InputStream` with a callback streams 16kHz mono float32 audio into a thread-safe buffer. The callback appends raw samples; a separate consumer reads them.
### Voice Activity Detection
Energy-based VAD using RMS amplitude over 50ms frames (800 samples at 16kHz):
- **Threshold**: Calibrated from ~0.5s of ambient silence at startup, with a sensible fallback (~-40 dBFS)
- **State machine**: `SILENCE -> SPEAKING -> SILENCE`
- SILENCE -> SPEAKING: RMS exceeds threshold for >= 3 consecutive frames (~150ms)
- SPEAKING -> SILENCE: RMS stays below threshold for >= 0.8s
- **Pre-roll**: ~0.3s of audio before speech onset is included to avoid clipping word beginnings
- **Safety cap**: If speech exceeds 30s without a pause, force a chunk boundary (model max is 35s)
### Threading Model
Two threads communicating via `queue.Queue`:
1. **Audio thread** (sounddevice callback + VAD logic): captures audio, runs VAD state machine, pushes completed speech segments onto the queue
2. **Transcription thread**: pulls segments from the queue, runs `processor() -> model.generate() -> processor.decode()`, prints results
No state carried between segments. Each is transcribed independently.
### Output
Timestamped lines printed to stdout as each segment is transcribed:
```
[00:03] Good morning, this is a test of the live captioning system.
[00:08] The model seems to be picking up my voice pretty well.
```
### Shutdown
Ctrl+C sets a stop flag via signal handler. The audio stream stops, any buffered speech is flushed and transcribed, then the program exits cleanly.
## CLI Interface
```
uv run python transcribe.py --stream # stream, default language (en)
uv run python transcribe.py --stream --lang ja # stream in Japanese
uv run python transcribe.py --mic [duration] # existing fixed-duration mode
uv run python transcribe.py # existing demo file mode
```
### Startup Sequence
1. Print "Loading model..." and load model
2. Record ~0.5s of ambient audio, compute silence threshold
3. Print threshold info and "Listening... (Ctrl+C to stop)"
4. Begin streaming
## Dependencies
No new dependencies. Uses: `sounddevice`, `numpy`, `threading`, `queue`, `signal`, `time` (all already available).
## Code Organization
All new logic in `transcribe.py`. File grows from ~50 to ~150-180 lines. No new files.
## Constraints
- Model max input: 35s per chunk (safety cap at 30s)
- Sampling rate must be 16kHz
- Single-channel (mono) audio only
Generated
-27
View File
@@ -1,27 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1779786838,
"narHash": "sha256-0geHoGiR5f8qiXg+gO4rSF6Up6Var+kKqiOv9AO/uUc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "f44f7788c891fbe5542177df78374f8cdab10e8f",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}
-31
View File
@@ -1,31 +0,0 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
};
outputs =
{ nixpkgs, ... }:
let
system = "x86_64-linux";
pkgs = import nixpkgs {
inherit system;
config.allowUnfree = true;
};
in
{
devShells.${system}.default = pkgs.mkShell {
packages = with pkgs; [
uv
python314
portaudio
cudaPackages.cudatoolkit
wtype
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath [
pkgs.portaudio
pkgs.cudaPackages.cudatoolkit
];
};
};
}
-29
View File
@@ -1,29 +0,0 @@
[project]
name = "cohere-transcribe"
version = "0.1.0"
description = "Live speech transcription using Cohere ASR"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"accelerate>=1.13.0",
"huggingface-hub>=1.16.1",
"librosa>=0.11.0",
"protobuf>=7.35.0",
"sentencepiece>=0.2.1",
"sounddevice>=0.5.5",
"soundfile>=0.13.1",
"torch>=2.12.0",
"transformers>=5.9.0",
"typer[all]>=0.15.0",
]
[project.scripts]
cohere = "cohere_transcribe.cli:main"
cohere-transcribe = "cohere_transcribe.cli:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/cohere_transcribe"]
View File
-35
View File
@@ -1,35 +0,0 @@
import subprocess
import sys
from typing import Protocol
class InputBackend(Protocol):
def type_text(self, text: str) -> None: ...
def send_key(self, key: str) -> None: ...
class WtypeBackend:
def type_text(self, text: str) -> None:
try:
subprocess.run(["wtype", "--", text], check=True, timeout=10)
except FileNotFoundError:
print("wtype not found — install it for keyboard injection", file=sys.stderr)
except subprocess.SubprocessError as e:
print(f"wtype error: {e}", file=sys.stderr)
def send_key(self, key: str) -> None:
try:
subprocess.run(["wtype", "-k", key], check=True, timeout=10)
except FileNotFoundError:
print("wtype not found — install it for keyboard injection", file=sys.stderr)
except subprocess.SubprocessError as e:
print(f"wtype error: {e}", file=sys.stderr)
class PrintBackend:
def type_text(self, text: str) -> None:
print(text, end="", flush=True)
def send_key(self, key: str) -> None:
key_map = {"Return": "\n", "Tab": "\t", "BackSpace": "\b"}
print(key_map.get(key, f"[{key}]"), end="", flush=True)
-3
View File
@@ -1,3 +0,0 @@
from .cli import main
__all__ = ["main"]
-126
View File
@@ -1,126 +0,0 @@
import os
import subprocess
import sys
import time
import typer
from rich.console import Console
from ..daemon import STATE_FILE, is_running, read_state, stop_daemon
app = typer.Typer(help="Cohere live transcription — speaks into your keyboard.")
console = Console()
@app.command()
def on(
language: str = typer.Option("en", "--lang", "-l", help="Language code"),
pause: float = typer.Option(0.3, "--pause", "-p", help="Seconds of silence before sending text"),
foreground: bool = typer.Option(False, "--fg", help="Run in foreground (don't daemonize)"),
):
"""Start transcribing and typing into your focused window."""
if is_running():
console.print("[yellow]Already running.[/yellow]")
raise typer.Exit(1)
if foreground:
from ..daemon import run_daemon
console.print("[green]Starting cohere (foreground)...[/green]")
run_daemon(language, pause=pause)
return
console.print("[green]Starting cohere daemon...[/green]")
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
cmd = [sys.executable, "-m", "cohere_transcribe.daemon_main", "--lang", language]
if pause != 0.3:
cmd += ["--pause", str(pause)]
subprocess.Popen(
cmd,
start_new_session=True,
stdin=subprocess.DEVNULL,
stdout=open(os.path.join(os.path.dirname(STATE_FILE), "daemon.log"), "a"),
stderr=subprocess.STDOUT,
)
for _ in range(50):
time.sleep(0.1)
if is_running():
break
if is_running():
console.print("[green]Cohere is on — speak and it types.[/green]")
else:
console.print("[red]Failed to start daemon. Check ~/.local/state/cohere/daemon.log[/red]")
raise typer.Exit(1)
@app.command()
def off():
"""Stop transcribing."""
if not is_running():
console.print("[yellow]Not running.[/yellow]")
raise typer.Exit(0)
if stop_daemon():
console.print("[red]Cohere is off.[/red]")
else:
console.print("[red]Failed to stop daemon.[/red]")
raise typer.Exit(1)
@app.command()
def status():
"""Show whether cohere is running."""
state = read_state()
running = is_running()
if running:
started = state.get("started_at", 0)
elapsed = time.time() - started
minutes = int(elapsed) // 60
console.print(f"[green]ON[/green] — running for {minutes}m")
else:
console.print("[dim]OFF[/dim]")
@app.command()
def transcribe(
audio_file: str = typer.Argument(None, help="Audio file to transcribe"),
mic: int = typer.Option(None, "--mic", "-m", help="Record from mic for N seconds"),
stream: bool = typer.Option(False, "--stream", "-s", help="Live streaming mode (prints to terminal)"),
language: str = typer.Option("en", "--lang", "-l", help="Language code"),
pause: float = typer.Option(0.3, "--pause", "-p", help="Seconds of silence before sending text"),
):
"""One-shot transcription (file, mic, or stream to terminal)."""
from ..model import load_model, transcribe_audio
from ..vad import pause_seconds_to_frames
if stream:
from ..stream import stream_transcribe
processor, model = load_model()
stream_transcribe(processor, model, language, silence_frames=pause_seconds_to_frames(pause))
elif mic is not None:
from ..model import record_audio
processor, model = load_model()
try:
audio = record_audio(mic)
console.print("Transcribing...")
text = transcribe_audio(processor, model, audio, language)
console.print(f"\n{text}\n")
except OSError as e:
console.print(f"[red]Microphone error: {e}[/red]")
raise typer.Exit(1)
elif audio_file:
from transformers.audio_utils import load_audio as load_audio_file
from ..model import SAMPLE_RATE
processor, model = load_model()
audio = load_audio_file(audio_file, sampling_rate=SAMPLE_RATE)
text = transcribe_audio(processor, model, audio, language)
console.print(f"\n{text}\n")
else:
console.print("[yellow]Provide an audio file, --mic, or --stream[/yellow]")
raise typer.Exit(1)
def main():
app()
-55
View File
@@ -1,55 +0,0 @@
import re
from .backend import InputBackend
KEY_COMMANDS: dict[str, list[str]] = {
"new line": ["Return"],
"newline": ["Return"],
"enter": ["Return"],
"press enter": ["Return"],
"new paragraph": ["Return", "Return"],
"tab": ["Tab"],
"backspace": ["BackSpace"],
}
PUNCTUATION: dict[str, str] = {
"question mark": "?",
"exclamation mark": "!",
"exclamation point": "!",
"period": ".",
"full stop": ".",
"comma": ",",
"colon": ":",
"semicolon": ";",
"open quote": '"',
"close quote": '"',
"open paren": "(",
"close paren": ")",
}
def _build_pattern(commands: dict) -> re.Pattern:
sorted_keys = sorted(commands.keys(), key=len, reverse=True)
escaped = [re.escape(k) for k in sorted_keys]
return re.compile(r"\b(" + "|".join(escaped) + r")\b", re.IGNORECASE)
_KEY_PATTERN = _build_pattern(KEY_COMMANDS)
_PUNCT_PATTERN = _build_pattern(PUNCTUATION)
def process_and_output(text: str, backend: InputBackend) -> None:
text = _PUNCT_PATTERN.sub(lambda m: PUNCTUATION[m.group(1).lower()], text)
text = re.sub(r"\s+([?.!,;:)\"])", r"\1", text)
parts = _KEY_PATTERN.split(text)
for part in parts:
cmd = part.strip().lower()
if cmd in KEY_COMMANDS:
for key in KEY_COMMANDS[cmd]:
backend.send_key(key)
else:
cleaned = part.strip()
if cleaned:
backend.type_text(cleaned + " ")
-133
View File
@@ -1,133 +0,0 @@
import json
import os
import signal
import queue
import threading
import time
import numpy as np
import sounddevice as sd
from .backend import WtypeBackend
from .commands import process_and_output
from .model import SAMPLE_RATE, load_model, transcribe_audio
from .vad import DEFAULT_SILENCE_FRAMES, FRAME_SIZE, VADStateMachine, calibrate_silence, pause_seconds_to_frames
STATE_DIR = os.path.expanduser("~/.local/state/cohere")
STATE_FILE = os.path.join(STATE_DIR, "state.json")
LOG_FILE = os.path.join(STATE_DIR, "daemon.log")
def _write_state(pid: int, status: str):
os.makedirs(STATE_DIR, exist_ok=True)
with open(STATE_FILE, "w") as f:
json.dump({"pid": pid, "status": status, "started_at": time.time()}, f)
_backend = WtypeBackend()
def read_state() -> dict | None:
try:
with open(STATE_FILE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def is_running() -> bool:
state = read_state()
if state is None:
return False
pid = state.get("pid")
if pid is None:
return False
try:
os.kill(pid, 0)
return True
except OSError:
return False
def stop_daemon() -> bool:
state = read_state()
if state is None:
return False
pid = state.get("pid")
if pid is None:
return False
try:
os.kill(pid, signal.SIGTERM)
for _ in range(20):
time.sleep(0.1)
try:
os.kill(pid, 0)
except OSError:
break
_write_state(pid, "stopped")
return True
except OSError:
_write_state(pid, "stopped")
return False
def run_daemon(language: str = "en", pause: float | None = None):
pid = os.getpid()
_write_state(pid, "starting")
def handle_sigterm(signum, frame):
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_sigterm)
silence_frames = pause_seconds_to_frames(pause) if pause else DEFAULT_SILENCE_FRAMES
processor, model = load_model()
threshold = calibrate_silence()
vad = VADStateMachine(threshold, silence_frames=silence_frames)
seg_queue: queue.Queue = queue.Queue()
stop_event = threading.Event()
start_time = time.monotonic()
_write_state(pid, "running")
def transcription_worker():
while not stop_event.is_set() or not seg_queue.empty():
try:
_seg_start, audio = seg_queue.get(timeout=0.5)
except queue.Empty:
continue
text = transcribe_audio(processor, model, audio, language)
text = text.strip()
if text:
process_and_output(text, _backend)
worker = threading.Thread(target=transcription_worker, daemon=True)
worker.start()
def audio_callback(indata, frames, time_info, status):
if stop_event.is_set():
return
elapsed = time.monotonic() - start_time
result = vad.process_frame(indata[:, 0].copy(), elapsed)
if result is not None:
seg_queue.put(result)
stream = sd.InputStream(
samplerate=SAMPLE_RATE, channels=1, dtype="float32",
callback=audio_callback, blocksize=FRAME_SIZE,
)
try:
with stream:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
stop_event.set()
if vad.speaking and vad.segment:
seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))
worker.join(timeout=30)
_write_state(pid, "stopped")
-9
View File
@@ -1,9 +0,0 @@
import argparse
from .daemon import run_daemon
parser = argparse.ArgumentParser()
parser.add_argument("--lang", default="en")
parser.add_argument("--pause", type=float, default=None)
args = parser.parse_args()
run_daemon(args.lang, pause=args.pause)
-32
View File
@@ -1,32 +0,0 @@
import numpy as np
from transformers import AutoProcessor, CohereAsrForConditionalGeneration
from transformers.audio_utils import load_audio
MODEL_ID = "CohereLabs/cohere-transcribe-03-2026"
SAMPLE_RATE = 16000
def load_model():
print("Loading model...")
processor = AutoProcessor.from_pretrained(MODEL_ID)
model = CohereAsrForConditionalGeneration.from_pretrained(
MODEL_ID, device_map="auto"
)
return processor, model
def transcribe_audio(processor, model, audio, language="en"):
inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language)
inputs.to(model.device, dtype=model.dtype)
outputs = model.generate(**inputs, max_new_tokens=256)
texts = processor.decode(outputs, skip_special_tokens=True)
return " ".join(texts) if isinstance(texts, list) else texts
def record_audio(duration):
import sounddevice as sd
print(f"Recording for {duration} seconds...")
audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
return audio.flatten()
-64
View File
@@ -1,64 +0,0 @@
import sys
import queue
import threading
import time
import numpy as np
import sounddevice as sd
from .model import SAMPLE_RATE, transcribe_audio
from .vad import DEFAULT_SILENCE_FRAMES, FRAME_SIZE, VADStateMachine, calibrate_silence
def stream_transcribe(processor, model, language, silence_frames=DEFAULT_SILENCE_FRAMES):
threshold = calibrate_silence()
vad = VADStateMachine(threshold, silence_frames=silence_frames)
seg_queue = queue.Queue()
stop_event = threading.Event()
start_time = time.monotonic()
def transcription_worker():
while not stop_event.is_set() or not seg_queue.empty():
try:
seg_start, audio = seg_queue.get(timeout=0.5)
except queue.Empty:
continue
minutes = int(seg_start) // 60
seconds = int(seg_start) % 60
text = transcribe_audio(processor, model, audio, language)
if text.strip():
print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}")
worker = threading.Thread(target=transcription_worker, daemon=True)
worker.start()
def audio_callback(indata, frames, time_info, status):
if stop_event.is_set():
return
elapsed = time.monotonic() - start_time
result = vad.process_frame(indata[:, 0].copy(), elapsed)
if result is not None:
seg_queue.put(result)
print("Listening... (Ctrl+C to stop)")
stream = sd.InputStream(
samplerate=SAMPLE_RATE, channels=1, dtype="float32",
callback=audio_callback, blocksize=FRAME_SIZE,
)
try:
with stream:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
stop_event.set()
if vad.speaking and vad.segment:
seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))
worker.join(timeout=30)
if worker.is_alive():
print("Warning: transcription worker did not finish in time.", file=sys.stderr)
print("\nDone.")
-78
View File
@@ -1,78 +0,0 @@
import collections
import numpy as np
import sounddevice as sd
from .model import SAMPLE_RATE
FRAME_SIZE = 800 # 50ms at 16kHz
PRE_ROLL_FRAMES = 6 # ~0.3s of audio before speech onset
DEFAULT_SILENCE_FRAMES = 16 # ~0.8s of silence to end a segment
SPEECH_ONSET_FRAMES = 3 # ~150ms of speech to trigger
MAX_SPEECH_SECONDS = 30 # force chunk boundary
def pause_seconds_to_frames(seconds: float) -> int:
return max(1, round(seconds / (FRAME_SIZE / SAMPLE_RATE)))
def calibrate_silence(duration=0.5):
print("Calibrating silence threshold...")
audio = sd.rec(int(duration * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
rms = np.sqrt(np.mean(audio ** 2))
threshold = max(rms * 3, 0.01)
print(f" Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}")
return threshold
class VADStateMachine:
def __init__(self, threshold, silence_frames=DEFAULT_SILENCE_FRAMES):
self.threshold = threshold
self.silence_limit = silence_frames
self.speaking = False
self.speech_frames = 0
self.silence_frames = 0
self.pre_roll = collections.deque(maxlen=PRE_ROLL_FRAMES)
self.segment = []
self.segment_start_time = 0.0
def process_frame(self, frame, elapsed_time):
"""Process one 50ms frame. Returns a (start_time, audio_array) tuple when a
complete speech segment is detected, otherwise None."""
rms = np.sqrt(np.mean(frame ** 2))
is_loud = rms > self.threshold
if not self.speaking:
self.pre_roll.append(frame)
if is_loud:
self.speech_frames += 1
if self.speech_frames >= SPEECH_ONSET_FRAMES:
self.speaking = True
self.silence_frames = 0
self.segment = list(self.pre_roll)
self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE)
self.pre_roll = collections.deque(maxlen=PRE_ROLL_FRAMES)
else:
self.speech_frames = 0
return None
self.segment.append(frame)
if is_loud:
self.silence_frames = 0
else:
self.silence_frames += 1
segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE
if self.silence_frames >= self.silence_limit or segment_duration >= MAX_SPEECH_SECONDS:
result = (self.segment_start_time, np.concatenate(self.segment))
self.speaking = False
self.speech_frames = 0
self.silence_frames = 0
self.segment = []
self.pre_roll = collections.deque(maxlen=PRE_ROLL_FRAMES)
return result
return None
-88
View File
@@ -1,88 +0,0 @@
"""Quick microphone tests. Run: uv run python test_mic.py"""
import numpy as np
import sounddevice as sd
import sys
import time
SAMPLE_RATE = 16000
def test_device_info():
"""Show which device will be used for recording."""
default_input = sd.default.device[0]
info = sd.query_devices(default_input)
print(f"Default input device [{default_input}]: {info['name']}")
print(f" Max input channels: {info['max_input_channels']}")
print(f" Default sample rate: {info['default_samplerate']}")
assert info["max_input_channels"] > 0, "Default device has no input channels!"
print(" PASS\n")
def test_record_1s():
"""Record 1 second and check we got non-silent audio."""
print("Recording 1 second... (speak or make noise!)")
audio = sd.rec(SAMPLE_RATE, samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
audio = audio.flatten()
peak = np.max(np.abs(audio))
rms = np.sqrt(np.mean(audio ** 2))
print(f" Samples: {len(audio)}")
print(f" Peak amplitude: {peak:.4f}")
print(f" RMS: {rms:.6f}")
assert len(audio) == SAMPLE_RATE, f"Expected {SAMPLE_RATE} samples, got {len(audio)}"
assert peak > 0, "All zeros — mic not capturing anything"
if peak < 0.001:
print(" WARNING: Very low signal — mic might be muted or too far away")
else:
print(" Signal level looks good")
print(" PASS\n")
def test_record_levels():
"""Record 3 seconds in 1-second chunks, show live levels."""
print("Recording 3 seconds — speak during seconds 2-3 for comparison...")
for i in range(3):
audio = sd.rec(SAMPLE_RATE, samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
audio = audio.flatten()
rms = np.sqrt(np.mean(audio ** 2))
peak = np.max(np.abs(audio))
bar = "#" * int(min(peak * 200, 50))
print(f" Second {i+1}: peak={peak:.4f} rms={rms:.6f} |{bar}")
print(" PASS\n")
def test_stream_callback():
"""Test that InputStream callback fires correctly."""
frames_received = []
def callback(indata, frames, time_info, status):
if status:
print(f" Status: {status}")
frames_received.append(len(indata))
print("Testing InputStream callback for 1 second...")
with sd.InputStream(samplerate=SAMPLE_RATE, channels=1, dtype="float32",
callback=callback, blocksize=800):
time.sleep(1)
total_frames = sum(frames_received)
expected = SAMPLE_RATE
print(f" Callbacks fired: {len(frames_received)}")
print(f" Total frames: {total_frames} (expected ~{expected})")
print(f" Blocksize per callback: {frames_received[0] if frames_received else 'N/A'}")
assert len(frames_received) > 0, "No callbacks received!"
assert abs(total_frames - expected) < expected * 0.2, f"Frame count off by >20%"
print(" PASS\n")
if __name__ == "__main__":
print("=== Microphone Tests ===\n")
test_device_info()
test_record_1s()
test_record_levels()
test_stream_callback()
print("All tests passed!")
Generated
-1327
View File
File diff suppressed because it is too large Load Diff