Skip to content

Instantly share code, notes, and snippets.

@cheeseonamonkey
Created December 6, 2025 07:51
Show Gist options
  • Select an option

  • Save cheeseonamonkey/da4199da468e63bad235777563f6b0b9 to your computer and use it in GitHub Desktop.

Select an option

Save cheeseonamonkey/da4199da468e63bad235777563f6b0b9 to your computer and use it in GitHub Desktop.
rec_vad_vosk_agc.py
#!/usr/bin/env python3
"""Record voice activity, apply light AGC, and transcribe with Whisper (English only)."""
import argparse
import json
import collections
import os
import queue
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
import numpy as np
import sounddevice as sd
import soundfile as sf
import webrtcvad
# Paths
OUTPUT_DIR = Path.home() / "rec_vad_vosk"
AUDIO_DIR = OUTPUT_DIR / "audio"
LOG_PATH = OUTPUT_DIR / "transcript.jsonl"
# Audio capture
SAMPLE_RATE = 16000
CHANNELS = 1
FRAME_MS = 30
FRAME_LEN = SAMPLE_RATE * FRAME_MS // 1000
# WebRTC VAD
VAD_AGGRESSIVENESS = 3 # 0–3; higher is stricter
PRE_FRAMES = 8
SPEECH_START_FRAMES = 3
SPEECH_END_FRAMES = 15
# Segments
SEG_PADDING_MS = 200
MIN_SEG_SEC = 0.6
MAX_SEG_SEC = 20 # hard cut to avoid runaway segments
# Frame gating
MIN_FRAME_RMS = 0.004 # drop very quiet / steady noise frames
HUMAN_FREQ_LOW = 80
HUMAN_FREQ_HIGH = 3400
MIN_HUMAN_BAND_RATIO = 0.15
# Automatic gain control
TARGET_RMS = 0.10
MIN_GAIN = 0.25
MAX_GAIN = 8.0
PEAK_CLIP_LIMIT = 0.99
AGC_EPS = 1e-5
# Transcription
WHISPER_MODEL_NAME = os.environ.get("WHISPER_MODEL_NAME", "base.en")
WHISPER_DEVICE = os.environ.get("WHISPER_DEVICE", "cpu")
WHISPER_LANGUAGE = "en"
MIN_TEXT_LEN = 1
@dataclass
class SegmentInfo:
timestamp: float
path: Path
duration: float
rms_before: float
peak_before: float
gain: float
rms_after: float
peak_after: float
STOP = threading.Event()
AUDIO_Q: "queue.Queue[SegmentInfo]" = queue.Queue()
vad = webrtcvad.Vad(VAD_AGGRESSIVENESS)
def log(msg: str) -> None:
print(f"[{datetime.now()}] {msg}", flush=True)
def ensure_dirs() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
def write_jsonl(obj: dict) -> None:
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
def load_whisper_model():
try:
import whisper
except ImportError as exc:
raise SystemExit(
"Whisper backend requires the 'whisper' package. "
"Install it with 'pip install -U openai-whisper'."
) from exc
log(f"Loading Whisper '{WHISPER_MODEL_NAME}' on {WHISPER_DEVICE} (English only)")
return whisper.load_model(WHISPER_MODEL_NAME, device=WHISPER_DEVICE)
def concat_and_pad(frames: list[np.ndarray]) -> Optional[np.ndarray]:
if not frames:
return None
audio = np.concatenate(frames).astype(np.float32) / 32768.0
pad = int(SAMPLE_RATE * SEG_PADDING_MS / 1000)
if pad:
audio = np.pad(audio, (pad, pad))
return audio
def apply_agc(audio: np.ndarray) -> Optional[Tuple[np.ndarray, Tuple[float, float, float, float, float]]]:
cur_rms = float(np.sqrt(np.mean(audio ** 2)))
if cur_rms < AGC_EPS:
return None
cur_peak = float(np.max(np.abs(audio)))
gain = float(np.clip(TARGET_RMS / cur_rms, MIN_GAIN, MAX_GAIN))
processed = audio * gain
np.clip(processed, -PEAK_CLIP_LIMIT, PEAK_CLIP_LIMIT, out=processed)
new_rms = float(np.sqrt(np.mean(processed ** 2)))
new_peak = float(np.max(np.abs(processed)))
return processed, (cur_rms, cur_peak, gain, new_rms, new_peak)
def human_band_ratio(audio: np.ndarray) -> float:
fft = np.fft.rfft(audio)
power = np.abs(fft) ** 2
freqs = np.fft.rfftfreq(len(audio), 1 / SAMPLE_RATE)
mask = (freqs >= HUMAN_FREQ_LOW) & (freqs <= HUMAN_FREQ_HIGH)
if not mask.any():
return 0.0
total = float(np.sum(power))
return float(np.sum(power[mask]) / total) if total > AGC_EPS else 0.0
def save_segment(frames: list[np.ndarray]) -> Optional[SegmentInfo]:
audio = concat_and_pad(frames)
if audio is None:
return None
agc_result = apply_agc(audio)
if agc_result is None:
log("Segment dropped: near silent (RMS too low).")
return None
processed, stats = agc_result
cur_rms, cur_peak, gain, new_rms, new_peak = stats
duration = len(processed) / SAMPLE_RATE
if duration < MIN_SEG_SEC:
log(f"Segment dropped: too short ({duration:.2f}s).")
return None
timestamp = time.time()
stamp = datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M%S_%f")
path = AUDIO_DIR / f"seg_{stamp}.wav"
sf.write(path, processed, SAMPLE_RATE)
info = SegmentInfo(
timestamp=timestamp,
path=path,
duration=duration,
rms_before=cur_rms,
peak_before=cur_peak,
gain=gain,
rms_after=new_rms,
peak_after=new_peak,
)
log(
f"Saved {path} | dur={duration:.2f}s gain={gain:.2f} "
f"rms {cur_rms:.3f}->{new_rms:.3f} peak {cur_peak:.3f}->{new_peak:.3f}"
)
return info
def record_loop(device: Optional[object]) -> None:
log(
f"Starting recorder (WebRTC VAD, speech segments). "
f"Input device: {device if device is not None else 'default'}"
)
ring = collections.deque(maxlen=PRE_FRAMES)
triggered = False
voiced_frames: list[np.ndarray] = []
unvoiced_count = 0
seg_start_ts: Optional[float] = None
try:
with sd.InputStream(
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
blocksize=FRAME_LEN,
device=device,
) as stream:
while not STOP.is_set():
frame, overflowed = stream.read(FRAME_LEN)
if overflowed:
log("WARN: audio overflow")
frame = frame[:, 0] if frame.ndim == 2 else frame
frame_i16 = frame.astype(np.int16)
frame_f32 = frame_i16.astype(np.float32) / 32768.0
frame_rms = float(np.sqrt(np.mean(frame_f32 ** 2)))
band_ratio = human_band_ratio(frame_f32) if frame_rms > AGC_EPS else 0.0
vad_flag = vad.is_speech(frame_i16.tobytes(), SAMPLE_RATE)
is_speech = (
vad_flag
and frame_rms >= MIN_FRAME_RMS
and band_ratio >= MIN_HUMAN_BAND_RATIO
)
if not triggered:
ring.append((frame_i16, is_speech))
voiced = sum(1 for _, v in ring if v)
if voiced >= SPEECH_START_FRAMES:
triggered = True
seg_start_ts = time.time()
voiced_frames = [f for f, _ in ring]
ring.clear()
unvoiced_count = 0
log("Speech start")
else:
voiced_frames.append(frame_i16)
if seg_start_ts:
elapsed = time.time() - seg_start_ts
else:
elapsed = 0
if seg_start_ts and elapsed >= MAX_SEG_SEC:
triggered = False
unvoiced_count = 0
seg_start_ts = None
info = save_segment(voiced_frames)
if info:
AUDIO_Q.put(info)
voiced_frames = []
ring.clear()
log(f"Segment auto-cut at {elapsed:.1f}s (max length).")
continue
if is_speech:
unvoiced_count = 0
else:
unvoiced_count += 1
if unvoiced_count >= SPEECH_END_FRAMES:
triggered = False
unvoiced_count = 0
seg_start_ts = None
info = save_segment(voiced_frames)
if info:
AUDIO_Q.put(info)
voiced_frames = []
ring.clear()
if triggered and voiced_frames:
info = save_segment(voiced_frames)
if info:
AUDIO_Q.put(info)
seg_start_ts = None
except Exception as exc: # pragma: no cover - runtime only
log(f"Recorder error: {exc}")
log("Recorder stopped")
def transcribe_segment(path: Path, model) -> Tuple[str, dict]:
result = model.transcribe(
str(path),
language=WHISPER_LANGUAGE,
task="transcribe",
verbose=False,
)
text = result.get("text", "").strip()
return text, result
def transcribe_loop(model) -> None:
log("Transcriber started")
while not (STOP.is_set() and AUDIO_Q.empty()):
try:
segment = AUDIO_Q.get(timeout=1)
except queue.Empty:
continue
log(f"Transcribing {segment.path} …")
try:
text, result = transcribe_segment(segment.path, model)
except Exception as exc: # pragma: no cover - runtime only
log(f"Whisper error {segment.path}: {exc}")
continue
if len(text) < MIN_TEXT_LEN:
log(f"No transcript for {segment.path} (len={len(text)}), skipping log.")
continue
write_jsonl({
"timestamp": segment.timestamp,
"audio": str(segment.path),
"text": text,
"duration": segment.duration,
"backend": "whisper-en",
"agc": {
"gain": segment.gain,
"rms_before": segment.rms_before,
"rms_after": segment.rms_after,
"peak_before": segment.peak_before,
"peak_after": segment.peak_after,
},
})
log(f"Done: {text!r}")
log("Transcriber stopped")
def list_input_devices() -> None:
log("Available input devices:")
try:
devices = sd.query_devices()
except Exception as exc:
log(f"Could not query devices: {exc}")
return
for idx, dev in enumerate(devices):
if dev.get("max_input_channels", 0) <= 0:
continue
log(
f"[{idx}] {dev.get('name')} "
f"(inputs={dev.get('max_input_channels')}, "
f"default_sr={dev.get('default_samplerate')})"
)
def coerce_device(device_arg: Optional[str]):
if device_arg is None:
return None
try:
return int(device_arg)
except (TypeError, ValueError):
return device_arg
def parse_args():
parser = argparse.ArgumentParser(
description="Record speech with WebRTC VAD + Whisper transcription (English only)."
)
parser.add_argument(
"--device",
help="Input device index or name (as reported by sounddevice).",
)
parser.add_argument(
"--list-devices",
action="store_true",
help="List available input devices and exit.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
ensure_dirs()
if args.list_devices:
list_input_devices()
return
device = coerce_device(args.device)
try:
sd.check_input_settings(
device=device,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype="int16",
)
except Exception as exc:
raise SystemExit(f"Invalid input device {device!r}: {exc}") from exc
model = load_whisper_model()
log(f"Output dir: {OUTPUT_DIR}")
log(f"Input device: {device if device is not None else 'default'}")
log("Press Ctrl+C to stop.")
recorder = threading.Thread(target=record_loop, args=(device,), daemon=True)
transcriber = threading.Thread(target=transcribe_loop, args=(model,), daemon=True)
recorder.start()
transcriber.start()
try:
while recorder.is_alive():
time.sleep(1)
except KeyboardInterrupt:
STOP.set()
recorder.join(timeout=5)
transcriber.join(timeout=5)
log("Stopped.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment