Compare commits

..

14 Commits

Author SHA1 Message Date
tomatocream 58fa7526fb feat: add --normalize compressor + limiter for input audio
Adds a feedforward dynamic range compressor with a brick-wall limiter
applied in the audio callback. Quiet speech gets +12 dB makeup gain,
loud bursts are attenuated 4:1 above -20 dBFS, and the output is
hard-limited at -1 dBFS so nothing clips. Enabled via --normalize/-n
on `cohere on` and `cohere transcribe`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 22:56:21 +08:00
tomatocream 853b5523e5 feat: add --device flag and devices command for mic selection
Lets the user pick an input device by index or name substring. Adds
`cohere devices` for listing. For devices that don't support 16kHz
natively (e.g. Sipeed MicArray hw at 48kHz), captures at the device's
native rate and resamples to 16kHz via scipy.signal.resample_poly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 22:51:06 +08:00
tomatocream c487ba8c08 feat: filter short audio segments (mic bumps) and add debug notebook
Mic bumps produce transient spikes that pass VAD onset detection but
contain no real speech — the model hallucinates "thank you" from them.
Added MIN_SPEECH_SECONDS (0.3s) filter to discard segments where the
actual speech portion is too short.

Added a Jupyter notebook (notebooks/audio_debug.ipynb) for real-time
audio visualization: streams RMS + peak amplitude into a live Plotly
FigureWidget, then provides post-hoc waveform inspection, segment
playback, and side-by-side segment comparison.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-01 16:16:31 +08:00
tomatocream a727899ee5 Initial commit: add CLAUDE.md and transcribe.py 2026-05-31 01:05:48 +08:00
tomatocream 50f8d158c4 feat: add voice command processing and input backend interface
Introduce InputBackend protocol with WtypeBackend and PrintBackend,
and a command processor that translates spoken commands (enter, new line,
question mark, comma, etc.) into key presses and punctuation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 21:37:20 +08:00
tomatocream f083e424c9 feat: make silence pause duration configurable via --pause flag
Default is 0.3s for responsive typing. Configurable on both
`cohere on --pause` and `cohere transcribe --stream --pause`.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 21:12:26 +08:00
tomatocream 92d8ba28d0 feat: add Typer CLI with daemon mode and wtype keyboard injection
Replace argparse CLI with Typer-based CLI supporting `cohere on/off/status`
commands. The daemon runs transcription in the background and types into the
focused Wayland window via wtype. Adds wtype to flake.nix and fixes the
hatchling build backend.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 21:09:32 +08:00
tomatocream 8d517b3ea8 refactor: restructure project into src layout with proper packaging
Split monolithic transcribe.py into focused modules under
src/cohere_transcribe/ (model, vad, stream, cli), move tests into
tests/, add hatchling build system and CLI entry point, remove
unused shell.nix and main.py.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 00:45:56 +08:00
tomatocream cbea62b2a9 fix: add portaudio to LD_LIBRARY_PATH and add flake lockfile
Move LD_LIBRARY_PATH out of env block and include portaudio so
audio devices are discoverable at runtime. Add flake.lock and
a quick microphone test script.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-30 00:42:36 +08:00
tomatocream 843ec534d1 fix: handle processor.decode returning a list of strings
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-29 03:00:09 +08:00
tomatocream cf18335235 fix: simplify audio callback, use deque for pre-roll, add worker timeout warning
- Remove frame_buf accumulation: blocksize=FRAME_SIZE guarantees indata is
  exactly FRAME_SIZE samples, so buffering was unnecessary. Use indata[:, 0].copy()
  to avoid stale references from sounddevice's buffer reuse.
- Replace pre_roll list with collections.deque(maxlen=PRE_ROLL_FRAMES) to
  eliminate manual bounds-checking (pop(0)) on every frame.
- Warn to stderr if the transcription worker thread outlives its 30s join timeout.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 02:48:51 +08:00
tomatocream 747a4772b6 feat: implement live streaming transcription with VAD
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 02:46:13 +08:00
tomatocream d62fcdd1cd feat: add silence calibration and VAD state machine
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 02:45:09 +08:00
tomatocream 4605be5bc9 refactor: switch to argparse, add --stream and --lang flags
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 02:43:47 +08:00
21 changed files with 4798 additions and 30 deletions
+4
View File
@@ -8,3 +8,7 @@ wheels/
# Virtual environments # Virtual environments
.venv .venv
# Nix
.direnv/
result
+60
View File
@@ -0,0 +1,60 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project
cohere-transcribe — live speech-to-text using the Cohere ASR model (`CohereLabs/cohere-transcribe-03-2026`) via HuggingFace Transformers. Captures microphone audio, runs voice activity detection (VAD) to segment speech, transcribes each segment, and either prints text or injects it into the focused window via `wtype` (Wayland).
## Development Environment
Nix flake provides the dev shell (Python 3.14, portaudio, CUDA toolkit, wtype, uv). Direnv activates it automatically. Python deps managed by uv.
```bash
# Install/sync Python deps
uv sync
# Run the CLI (installed as entry point)
uv run cohere on # start daemon (background, types into focused window)
uv run cohere off # stop daemon
uv run cohere status
uv run cohere transcribe --stream # live transcribe to terminal
uv run cohere transcribe --mic 5 # record 5s then transcribe
uv run cohere transcribe file.wav # transcribe file
# Run mic tests
uv run python tests/test_mic.py
```
## Architecture
Two modes share the same model/VAD pipeline but differ in output:
- **Daemon mode** (`cohere on`): runs as a background process, transcribes speech segments and injects text into the focused window via `wtype`. State tracked in `~/.local/state/cohere/state.json`. The daemon is spawned by the CLI (`cli.py`) which launches `daemon_main.py` as a detached subprocess.
- **Stream/one-shot mode** (`cohere transcribe`): runs in foreground, prints transcriptions to stdout with timestamps.
### Key modules
- `model.py` — model loading (`load_model`) and transcription (`transcribe_audio`). Single source of truth for `MODEL_ID` and `SAMPLE_RATE` (16kHz).
- `vad.py` — RMS-based voice activity detection with `VADStateMachine`. Calibrates ambient noise threshold at startup. Configurable silence duration triggers segment boundaries.
- `stream.py` — streaming transcription loop: audio callback feeds VAD, completed segments go to a transcription worker thread via queue.
- `daemon.py` — same streaming pattern as `stream.py` but outputs via `wtype` instead of print. Also contains daemon lifecycle management (state file, PID tracking, start/stop).
- `cli/cli.py` — Typer CLI with `on`/`off`/`status`/`transcribe` commands.
- `transcribe.py` — original standalone script (not part of the package).
### Data flow
```
Microphone → sounddevice.InputStream (50ms frames)
→ VADStateMachine.process_frame()
→ speech segment detected → Queue
→ transcription_worker thread → transcribe_audio()
→ output (wtype or print)
```
## Conventions
- Package uses src layout (`src/cohere_transcribe/`), built with hatchling.
- Entry points: `cohere` and `cohere-transcribe` both map to `cohere_transcribe.cli:main`.
- VAD constants are in `vad.py` (frame size, pre-roll, silence limits, max segment length).
- Daemon state lives at `~/.local/state/cohere/` (state.json, daemon.log).
Generated
+27
View File
@@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1779786838,
"narHash": "sha256-0geHoGiR5f8qiXg+gO4rSF6Up6Var+kKqiOv9AO/uUc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "f44f7788c891fbe5542177df78374f8cdab10e8f",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}
+2 -2
View File
@@ -19,13 +19,13 @@
python314 python314
portaudio portaudio
cudaPackages.cudatoolkit cudaPackages.cudatoolkit
wtype
]; ];
env = {
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath [ LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath [
pkgs.portaudio
pkgs.cudaPackages.cudatoolkit pkgs.cudaPackages.cudatoolkit
]; ];
}; };
}; };
};
} }
-6
View File
@@ -1,6 +0,0 @@
def main():
print("Hello from cohere!")
if __name__ == "__main__":
main()
File diff suppressed because one or more lines are too long
+22 -2
View File
@@ -1,7 +1,7 @@
[project] [project]
name = "cohere" name = "cohere-transcribe"
version = "0.1.0" version = "0.1.0"
description = "Add your description here" description = "Live speech transcription using Cohere ASR"
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
dependencies = [ dependencies = [
@@ -14,4 +14,24 @@ dependencies = [
"soundfile>=0.13.1", "soundfile>=0.13.1",
"torch>=2.12.0", "torch>=2.12.0",
"transformers>=5.9.0", "transformers>=5.9.0",
"typer[all]>=0.15.0",
]
[project.scripts]
cohere = "cohere_transcribe.cli:main"
cohere-transcribe = "cohere_transcribe.cli:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/cohere_transcribe"]
[dependency-groups]
dev = [
"anywidget>=0.11.0",
"ipywidgets>=8.1.8",
"jupyterlab>=4.5.7",
"plotly>=6.7.0",
] ]
-15
View File
@@ -1,15 +0,0 @@
{ pkgs ? import <nixpkgs> { config.allowUnfree = true; } }:
pkgs.mkShell {
buildInputs = with pkgs; [
portaudio
cudaPackages.cudatoolkit
uv
python314
];
shellHook = ''
export LD_LIBRARY_PATH="${pkgs.cudaPackages.cudatoolkit}/lib:$LD_LIBRARY_PATH"
echo "Dev shell ready - microphone input enabled"
'';
}
View File
+35
View File
@@ -0,0 +1,35 @@
import subprocess
import sys
from typing import Protocol
class InputBackend(Protocol):
def type_text(self, text: str) -> None: ...
def send_key(self, key: str) -> None: ...
class WtypeBackend:
def type_text(self, text: str) -> None:
try:
subprocess.run(["wtype", "--", text], check=True, timeout=10)
except FileNotFoundError:
print("wtype not found — install it for keyboard injection", file=sys.stderr)
except subprocess.SubprocessError as e:
print(f"wtype error: {e}", file=sys.stderr)
def send_key(self, key: str) -> None:
try:
subprocess.run(["wtype", "-k", key], check=True, timeout=10)
except FileNotFoundError:
print("wtype not found — install it for keyboard injection", file=sys.stderr)
except subprocess.SubprocessError as e:
print(f"wtype error: {e}", file=sys.stderr)
class PrintBackend:
def type_text(self, text: str) -> None:
print(text, end="", flush=True)
def send_key(self, key: str) -> None:
key_map = {"Return": "\n", "Tab": "\t", "BackSpace": "\b"}
print(key_map.get(key, f"[{key}]"), end="", flush=True)
+3
View File
@@ -0,0 +1,3 @@
from .cli import main
__all__ = ["main"]
+166
View File
@@ -0,0 +1,166 @@
import os
import subprocess
import sys
import time
import typer
from rich.console import Console
from ..daemon import STATE_FILE, is_running, read_state, stop_daemon
app = typer.Typer(help="Cohere live transcription — speaks into your keyboard.")
console = Console()
def _parse_device(value: str | None):
if value is None:
return None
try:
return int(value)
except ValueError:
return value
@app.command()
def on(
language: str = typer.Option("en", "--lang", "-l", help="Language code"),
pause: float = typer.Option(0.3, "--pause", "-p", help="Seconds of silence before sending text"),
device: str = typer.Option(None, "--device", "-d", help="Input device index or name substring (see `cohere devices`)"),
normalize: bool = typer.Option(False, "--normalize", "-n", help="Enable compressor + limiter to even out loudness"),
foreground: bool = typer.Option(False, "--fg", help="Run in foreground (don't daemonize)"),
):
"""Start transcribing and typing into your focused window."""
if is_running():
console.print("[yellow]Already running.[/yellow]")
raise typer.Exit(1)
if foreground:
from ..daemon import run_daemon
console.print("[green]Starting cohere (foreground)...[/green]")
run_daemon(language, pause=pause, device=_parse_device(device), normalize=normalize)
return
console.print("[green]Starting cohere daemon...[/green]")
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
cmd = [sys.executable, "-m", "cohere_transcribe.daemon_main", "--lang", language]
if pause != 0.3:
cmd += ["--pause", str(pause)]
if device is not None:
cmd += ["--device", device]
if normalize:
cmd += ["--normalize"]
subprocess.Popen(
cmd,
start_new_session=True,
stdin=subprocess.DEVNULL,
stdout=open(os.path.join(os.path.dirname(STATE_FILE), "daemon.log"), "a"),
stderr=subprocess.STDOUT,
)
for _ in range(50):
time.sleep(0.1)
if is_running():
break
if is_running():
console.print("[green]Cohere is on — speak and it types.[/green]")
else:
console.print("[red]Failed to start daemon. Check ~/.local/state/cohere/daemon.log[/red]")
raise typer.Exit(1)
@app.command()
def off():
"""Stop transcribing."""
if not is_running():
console.print("[yellow]Not running.[/yellow]")
raise typer.Exit(0)
if stop_daemon():
console.print("[red]Cohere is off.[/red]")
else:
console.print("[red]Failed to stop daemon.[/red]")
raise typer.Exit(1)
@app.command()
def status():
"""Show whether cohere is running."""
state = read_state()
running = is_running()
if running:
started = state.get("started_at", 0)
elapsed = time.time() - started
minutes = int(elapsed) // 60
console.print(f"[green]ON[/green] — running for {minutes}m")
else:
console.print("[dim]OFF[/dim]")
@app.command()
def transcribe(
audio_file: str = typer.Argument(None, help="Audio file to transcribe"),
mic: int = typer.Option(None, "--mic", "-m", help="Record from mic for N seconds"),
stream: bool = typer.Option(False, "--stream", "-s", help="Live streaming mode (prints to terminal)"),
language: str = typer.Option("en", "--lang", "-l", help="Language code"),
pause: float = typer.Option(0.3, "--pause", "-p", help="Seconds of silence before sending text"),
device: str = typer.Option(None, "--device", "-d", help="Input device index or name substring (see `cohere devices`)"),
normalize: bool = typer.Option(False, "--normalize", "-n", help="Enable compressor + limiter to even out loudness"),
):
"""One-shot transcription (file, mic, or stream to terminal)."""
from ..model import load_model, transcribe_audio
from ..vad import pause_seconds_to_frames
dev = _parse_device(device)
if stream:
from ..stream import stream_transcribe
processor, model = load_model()
stream_transcribe(processor, model, language, silence_frames=pause_seconds_to_frames(pause), device=dev, normalize=normalize)
elif mic is not None:
from ..model import record_audio
processor, model = load_model()
try:
audio = record_audio(mic, device=dev, normalize=normalize)
console.print("Transcribing...")
text = transcribe_audio(processor, model, audio, language)
console.print(f"\n{text}\n")
except OSError as e:
console.print(f"[red]Microphone error: {e}[/red]")
raise typer.Exit(1)
elif audio_file:
from transformers.audio_utils import load_audio as load_audio_file
from ..model import SAMPLE_RATE
processor, model = load_model()
audio = load_audio_file(audio_file, sampling_rate=SAMPLE_RATE)
text = transcribe_audio(processor, model, audio, language)
console.print(f"\n{text}\n")
else:
console.print("[yellow]Provide an audio file, --mic, or --stream[/yellow]")
raise typer.Exit(1)
@app.command()
def devices():
"""List available audio input devices."""
import sounddevice as sd
default_in = sd.default.device[0]
for idx, dev in enumerate(sd.query_devices()):
if dev["max_input_channels"] <= 0:
continue
marker = "[green]*[/green]" if idx == default_in else " "
hostapi = sd.query_hostapis(dev["hostapi"])["name"]
console.print(
f"{marker} [bold]{idx:>2}[/bold] {dev['name']} "
f"[dim]({dev['max_input_channels']}ch, {int(dev['default_samplerate'])}Hz, {hostapi})[/dim]"
)
console.print(
"\n[dim]Tip: indices can shift between runs on PipeWire. "
"Prefer [bold]-d pipewire[/bold] (uses PipeWire's default source) or pass a name substring like [bold]-d Sipeed[/bold].[/dim]"
)
def main():
app()
+55
View File
@@ -0,0 +1,55 @@
import re
from .backend import InputBackend
KEY_COMMANDS: dict[str, list[str]] = {
"new line": ["Return"],
"newline": ["Return"],
"enter": ["Return"],
"press enter": ["Return"],
"new paragraph": ["Return", "Return"],
"tab": ["Tab"],
"backspace": ["BackSpace"],
}
PUNCTUATION: dict[str, str] = {
"question mark": "?",
"exclamation mark": "!",
"exclamation point": "!",
"period": ".",
"full stop": ".",
"comma": ",",
"colon": ":",
"semicolon": ";",
"open quote": '"',
"close quote": '"',
"open paren": "(",
"close paren": ")",
}
def _build_pattern(commands: dict) -> re.Pattern:
sorted_keys = sorted(commands.keys(), key=len, reverse=True)
escaped = [re.escape(k) for k in sorted_keys]
return re.compile(r"\b(" + "|".join(escaped) + r")\b", re.IGNORECASE)
_KEY_PATTERN = _build_pattern(KEY_COMMANDS)
_PUNCT_PATTERN = _build_pattern(PUNCTUATION)
def process_and_output(text: str, backend: InputBackend) -> None:
text = _PUNCT_PATTERN.sub(lambda m: PUNCTUATION[m.group(1).lower()], text)
text = re.sub(r"\s+([?.!,;:)\"])", r"\1", text)
parts = _KEY_PATTERN.split(text)
for part in parts:
cmd = part.strip().lower()
if cmd in KEY_COMMANDS:
for key in KEY_COMMANDS[cmd]:
backend.send_key(key)
else:
cleaned = part.strip()
if cleaned:
backend.type_text(cleaned + " ")
+52
View File
@@ -0,0 +1,52 @@
import math
import numpy as np
from .model import SAMPLE_RATE
class Compressor:
"""Feedforward dynamic range compressor + brick-wall limiter for speech.
Per sample: track an attack/release-smoothed envelope of |x|, compute gain
reduction above the threshold, apply makeup gain, then hard-limit to the ceiling.
"""
def __init__(
self,
threshold_db: float = -20.0,
ratio: float = 4.0,
attack_ms: float = 5.0,
release_ms: float = 80.0,
makeup_db: float = 12.0,
ceiling: float = 10 ** (-1.0 / 20), # -1 dBFS
sample_rate: int = SAMPLE_RATE,
):
self.threshold_db = threshold_db
self.ratio = ratio
self.makeup_gain = 10 ** (makeup_db / 20)
self.ceiling = ceiling
self.knee = 1.0 - 1.0 / ratio
self.a_att = math.exp(-1.0 / (attack_ms * 0.001 * sample_rate))
self.a_rel = math.exp(-1.0 / (release_ms * 0.001 * sample_rate))
self.envelope = 0.0
def process(self, x: np.ndarray) -> np.ndarray:
abs_x = np.abs(x)
env_out = np.empty_like(x)
e = self.envelope
a_att = self.a_att
a_rel = self.a_rel
for i in range(len(x)):
target = abs_x[i]
coef = a_att if target > e else a_rel
e = coef * e + (1.0 - coef) * target
env_out[i] = e
self.envelope = e
env_db = 20.0 * np.log10(np.maximum(env_out, 1e-10))
over = env_db - self.threshold_db
gr_db = np.where(over > 0, over * self.knee, 0.0)
gain = 10 ** (-gr_db / 20.0) * self.makeup_gain
y = x * gain
return np.clip(y, -self.ceiling, self.ceiling).astype(np.float32)
+152
View File
@@ -0,0 +1,152 @@
import json
import os
import signal
import queue
import threading
import time
import numpy as np
import sounddevice as sd
from .backend import WtypeBackend
from .commands import process_and_output
from .compressor import Compressor
from .model import SAMPLE_RATE, load_model, transcribe_audio
from .vad import (
DEFAULT_SILENCE_FRAMES,
FRAME_SIZE,
VADStateMachine,
calibrate_silence,
describe_input_device,
pause_seconds_to_frames,
resample_to_target,
resolve_input_rate,
)
STATE_DIR = os.path.expanduser("~/.local/state/cohere")
STATE_FILE = os.path.join(STATE_DIR, "state.json")
LOG_FILE = os.path.join(STATE_DIR, "daemon.log")
def _write_state(pid: int, status: str):
os.makedirs(STATE_DIR, exist_ok=True)
with open(STATE_FILE, "w") as f:
json.dump({"pid": pid, "status": status, "started_at": time.time()}, f)
_backend = WtypeBackend()
def read_state() -> dict | None:
try:
with open(STATE_FILE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def is_running() -> bool:
state = read_state()
if state is None:
return False
pid = state.get("pid")
if pid is None:
return False
try:
os.kill(pid, 0)
return True
except OSError:
return False
def stop_daemon() -> bool:
state = read_state()
if state is None:
return False
pid = state.get("pid")
if pid is None:
return False
try:
os.kill(pid, signal.SIGTERM)
for _ in range(20):
time.sleep(0.1)
try:
os.kill(pid, 0)
except OSError:
break
_write_state(pid, "stopped")
return True
except OSError:
_write_state(pid, "stopped")
return False
def run_daemon(language: str = "en", pause: float | None = None, device=None, normalize: bool = False):
pid = os.getpid()
_write_state(pid, "starting")
def handle_sigterm(signum, frame):
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, handle_sigterm)
silence_frames = pause_seconds_to_frames(pause) if pause else DEFAULT_SILENCE_FRAMES
processor, model = load_model()
print(f"Using input device: {describe_input_device(device)}")
comp = Compressor() if normalize else None
if comp:
print(" Normalization: compressor+limiter enabled")
threshold = calibrate_silence(device=device, compressor=comp)
capture_rate = resolve_input_rate(device)
capture_blocksize = FRAME_SIZE * capture_rate // SAMPLE_RATE
vad = VADStateMachine(threshold, silence_frames=silence_frames)
seg_queue: queue.Queue = queue.Queue()
stop_event = threading.Event()
start_time = time.monotonic()
_write_state(pid, "running")
def transcription_worker():
while not stop_event.is_set() or not seg_queue.empty():
try:
_seg_start, audio = seg_queue.get(timeout=0.5)
except queue.Empty:
continue
text = transcribe_audio(processor, model, audio, language)
text = text.strip()
if text:
process_and_output(text, _backend)
worker = threading.Thread(target=transcription_worker, daemon=True)
worker.start()
def audio_callback(indata, frames, time_info, status):
if stop_event.is_set():
return
elapsed = time.monotonic() - start_time
frame = resample_to_target(indata[:, 0].copy(), capture_rate)
if comp is not None:
frame = comp.process(frame)
result = vad.process_frame(frame, elapsed)
if result is not None:
seg_queue.put(result)
stream = sd.InputStream(
samplerate=capture_rate, channels=1, dtype="float32",
callback=audio_callback, blocksize=capture_blocksize, device=device,
)
try:
with stream:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
stop_event.set()
if vad.speaking and vad.segment:
seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))
worker.join(timeout=30)
_write_state(pid, "stopped")
+21
View File
@@ -0,0 +1,21 @@
import argparse
from .daemon import run_daemon
def _parse_device(value):
if value is None:
return None
try:
return int(value)
except ValueError:
return value
parser = argparse.ArgumentParser()
parser.add_argument("--lang", default="en")
parser.add_argument("--pause", type=float, default=None)
parser.add_argument("--device", default=None)
parser.add_argument("--normalize", action="store_true")
args = parser.parse_args()
run_daemon(args.lang, pause=args.pause, device=_parse_device(args.device), normalize=args.normalize)
+38
View File
@@ -0,0 +1,38 @@
import numpy as np
from transformers import AutoProcessor, CohereAsrForConditionalGeneration
from transformers.audio_utils import load_audio
MODEL_ID = "CohereLabs/cohere-transcribe-03-2026"
SAMPLE_RATE = 16000
def load_model():
print("Loading model...")
processor = AutoProcessor.from_pretrained(MODEL_ID)
model = CohereAsrForConditionalGeneration.from_pretrained(
MODEL_ID, device_map="auto"
)
return processor, model
def transcribe_audio(processor, model, audio, language="en"):
inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt", language=language)
inputs.to(model.device, dtype=model.dtype)
outputs = model.generate(**inputs, max_new_tokens=256)
texts = processor.decode(outputs, skip_special_tokens=True)
return " ".join(texts) if isinstance(texts, list) else texts
def record_audio(duration, device=None, normalize=False):
import sounddevice as sd
from .vad import resolve_input_rate, resample_to_target
print(f"Recording for {duration} seconds...")
rate = resolve_input_rate(device)
audio = sd.rec(int(duration * rate), samplerate=rate, channels=1, dtype="float32", device=device)
sd.wait()
audio = resample_to_target(audio.flatten(), rate)
if normalize:
from .compressor import Compressor
audio = Compressor().process(audio)
return audio
+74
View File
@@ -0,0 +1,74 @@
import sys
import queue
import threading
import time
import numpy as np
import sounddevice as sd
from .compressor import Compressor
from .model import SAMPLE_RATE, transcribe_audio
from .vad import DEFAULT_SILENCE_FRAMES, FRAME_SIZE, VADStateMachine, calibrate_silence, describe_input_device, resample_to_target, resolve_input_rate
def stream_transcribe(processor, model, language, silence_frames=DEFAULT_SILENCE_FRAMES, device=None, normalize=False):
print(f"Using input device: {describe_input_device(device)}")
comp = Compressor() if normalize else None
if comp:
print(" Normalization: compressor+limiter enabled")
threshold = calibrate_silence(device=device, compressor=comp)
capture_rate = resolve_input_rate(device)
capture_blocksize = FRAME_SIZE * capture_rate // SAMPLE_RATE
vad = VADStateMachine(threshold, silence_frames=silence_frames)
seg_queue = queue.Queue()
stop_event = threading.Event()
start_time = time.monotonic()
def transcription_worker():
while not stop_event.is_set() or not seg_queue.empty():
try:
seg_start, audio = seg_queue.get(timeout=0.5)
except queue.Empty:
continue
minutes = int(seg_start) // 60
seconds = int(seg_start) % 60
text = transcribe_audio(processor, model, audio, language)
if text.strip():
print(f"[{minutes:02d}:{seconds:02d}] {text.strip()}")
worker = threading.Thread(target=transcription_worker, daemon=True)
worker.start()
def audio_callback(indata, frames, time_info, status):
if stop_event.is_set():
return
elapsed = time.monotonic() - start_time
frame = resample_to_target(indata[:, 0].copy(), capture_rate)
if comp is not None:
frame = comp.process(frame)
result = vad.process_frame(frame, elapsed)
if result is not None:
seg_queue.put(result)
print("Listening... (Ctrl+C to stop)")
stream = sd.InputStream(
samplerate=capture_rate, channels=1, dtype="float32",
callback=audio_callback, blocksize=capture_blocksize, device=device,
)
try:
with stream:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
stop_event.set()
if vad.speaking and vad.segment:
seg_queue.put((vad.segment_start_time, np.concatenate(vad.segment)))
worker.join(timeout=30)
if worker.is_alive():
print("Warning: transcription worker did not finish in time.", file=sys.stderr)
print("\nDone.")
+126
View File
@@ -0,0 +1,126 @@
import collections
from math import gcd
import numpy as np
import sounddevice as sd
from .model import SAMPLE_RATE
FRAME_SIZE = 800 # 50ms at 16kHz
PRE_ROLL_FRAMES = 6 # ~0.3s of audio before speech onset
DEFAULT_SILENCE_FRAMES = 16 # ~0.8s of silence to end a segment
SPEECH_ONSET_FRAMES = 3 # ~150ms of speech to trigger
MAX_SPEECH_SECONDS = 30 # force chunk boundary
MIN_LOUD_FRAMES = 8 # need at least ~400ms of loud frames to count as speech
def pause_seconds_to_frames(seconds: float) -> int:
return max(1, round(seconds / (FRAME_SIZE / SAMPLE_RATE)))
def _query_input(device):
resolved = device if device is not None else sd.default.device[0]
info = sd.query_devices(resolved)
if info["max_input_channels"] < 1:
raise ValueError(
f"Device {device!r} ({info['name']}) is not an input device. "
f"Run `cohere devices` to see current input indices — they can shift between runs on PipeWire."
)
return info
def describe_input_device(device) -> str:
info = _query_input(device)
return info["name"]
def resolve_input_rate(device) -> int:
"""Pick a samplerate the device will accept. Prefer SAMPLE_RATE; if the device
refuses (e.g. raw ALSA hw: that doesn't resample), fall back to its native rate."""
info = _query_input(device)
try:
sd.check_input_settings(device=device, samplerate=SAMPLE_RATE, channels=1, dtype="float32")
return SAMPLE_RATE
except sd.PortAudioError:
rate = int(info["default_samplerate"])
print(f" Device doesn't support {SAMPLE_RATE}Hz; capturing at {rate}Hz and resampling.")
return rate
def resample_to_target(audio: np.ndarray, src_rate: int) -> np.ndarray:
if src_rate == SAMPLE_RATE:
return audio
from scipy.signal import resample_poly
g = gcd(SAMPLE_RATE, src_rate)
return resample_poly(audio, SAMPLE_RATE // g, src_rate // g).astype(np.float32)
def calibrate_silence(duration=0.5, device=None, compressor=None):
print("Calibrating silence threshold...")
rate = resolve_input_rate(device)
audio = sd.rec(int(duration * rate), samplerate=rate, channels=1, dtype="float32", device=device)
sd.wait()
audio = resample_to_target(audio.flatten(), rate)
if compressor is not None:
audio = compressor.process(audio)
rms = np.sqrt(np.mean(audio ** 2))
threshold = max(rms * 3, 0.01)
print(f" Ambient RMS: {rms:.4f}, threshold: {threshold:.4f}")
return threshold
class VADStateMachine:
def __init__(self, threshold, silence_frames=DEFAULT_SILENCE_FRAMES):
self.threshold = threshold
self.silence_limit = silence_frames
self.speaking = False
self.speech_frames = 0
self.silence_frames = 0
self.loud_frames = 0
self.pre_roll = collections.deque(maxlen=PRE_ROLL_FRAMES)
self.segment = []
self.segment_start_time = 0.0
def process_frame(self, frame, elapsed_time):
"""Process one 50ms frame. Returns a (start_time, audio_array) tuple when a
complete speech segment is detected, otherwise None."""
rms = np.sqrt(np.mean(frame ** 2))
is_loud = rms > self.threshold
if not self.speaking:
self.pre_roll.append(frame)
if is_loud:
self.speech_frames += 1
if self.speech_frames >= SPEECH_ONSET_FRAMES:
self.speaking = True
self.silence_frames = 0
self.segment = list(self.pre_roll)
self.segment_start_time = max(0.0, elapsed_time - len(self.pre_roll) * FRAME_SIZE / SAMPLE_RATE)
self.pre_roll = collections.deque(maxlen=PRE_ROLL_FRAMES)
else:
self.speech_frames = 0
return None
self.segment.append(frame)
if is_loud:
self.silence_frames = 0
self.loud_frames += 1
else:
self.silence_frames += 1
segment_duration = len(self.segment) * FRAME_SIZE / SAMPLE_RATE
if self.silence_frames >= self.silence_limit or segment_duration >= MAX_SPEECH_SECONDS:
result = None
if self.loud_frames >= MIN_LOUD_FRAMES:
result = (self.segment_start_time, np.concatenate(self.segment))
self.speaking = False
self.speech_frames = 0
self.silence_frames = 0
self.loud_frames = 0
self.segment = []
self.pre_roll = collections.deque(maxlen=PRE_ROLL_FRAMES)
return result
return None
+88
View File
@@ -0,0 +1,88 @@
"""Quick microphone tests. Run: uv run python test_mic.py"""
import numpy as np
import sounddevice as sd
import sys
import time
SAMPLE_RATE = 16000
def test_device_info():
"""Show which device will be used for recording."""
default_input = sd.default.device[0]
info = sd.query_devices(default_input)
print(f"Default input device [{default_input}]: {info['name']}")
print(f" Max input channels: {info['max_input_channels']}")
print(f" Default sample rate: {info['default_samplerate']}")
assert info["max_input_channels"] > 0, "Default device has no input channels!"
print(" PASS\n")
def test_record_1s():
"""Record 1 second and check we got non-silent audio."""
print("Recording 1 second... (speak or make noise!)")
audio = sd.rec(SAMPLE_RATE, samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
audio = audio.flatten()
peak = np.max(np.abs(audio))
rms = np.sqrt(np.mean(audio ** 2))
print(f" Samples: {len(audio)}")
print(f" Peak amplitude: {peak:.4f}")
print(f" RMS: {rms:.6f}")
assert len(audio) == SAMPLE_RATE, f"Expected {SAMPLE_RATE} samples, got {len(audio)}"
assert peak > 0, "All zeros — mic not capturing anything"
if peak < 0.001:
print(" WARNING: Very low signal — mic might be muted or too far away")
else:
print(" Signal level looks good")
print(" PASS\n")
def test_record_levels():
"""Record 3 seconds in 1-second chunks, show live levels."""
print("Recording 3 seconds — speak during seconds 2-3 for comparison...")
for i in range(3):
audio = sd.rec(SAMPLE_RATE, samplerate=SAMPLE_RATE, channels=1, dtype="float32")
sd.wait()
audio = audio.flatten()
rms = np.sqrt(np.mean(audio ** 2))
peak = np.max(np.abs(audio))
bar = "#" * int(min(peak * 200, 50))
print(f" Second {i+1}: peak={peak:.4f} rms={rms:.6f} |{bar}")
print(" PASS\n")
def test_stream_callback():
"""Test that InputStream callback fires correctly."""
frames_received = []
def callback(indata, frames, time_info, status):
if status:
print(f" Status: {status}")
frames_received.append(len(indata))
print("Testing InputStream callback for 1 second...")
with sd.InputStream(samplerate=SAMPLE_RATE, channels=1, dtype="float32",
callback=callback, blocksize=800):
time.sleep(1)
total_frames = sum(frames_received)
expected = SAMPLE_RATE
print(f" Callbacks fired: {len(frames_received)}")
print(f" Total frames: {total_frames} (expected ~{expected})")
print(f" Blocksize per callback: {frames_received[0] if frames_received else 'N/A'}")
assert len(frames_received) > 0, "No callbacks received!"
assert abs(total_frames - expected) < expected * 0.2, f"Frame count off by >20%"
print(" PASS\n")
if __name__ == "__main__":
print("=== Microphone Tests ===\n")
test_device_info()
test_record_1s()
test_record_levels()
test_stream_callback()
print("All tests passed!")
Generated
+1084 -2
View File
File diff suppressed because it is too large Load Diff