Created
December 6, 2025 07:51
-
-
Save cheeseonamonkey/da4199da468e63bad235777563f6b0b9 to your computer and use it in GitHub Desktop.
rec_vad_vosk_agc.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Record voice activity, apply light AGC, and transcribe with Whisper (English only).""" | |
| import argparse | |
| import json | |
| import collections | |
| import os | |
| import queue | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| import webrtcvad | |
| # Paths | |
| OUTPUT_DIR = Path.home() / "rec_vad_vosk" | |
| AUDIO_DIR = OUTPUT_DIR / "audio" | |
| LOG_PATH = OUTPUT_DIR / "transcript.jsonl" | |
| # Audio capture | |
| SAMPLE_RATE = 16000 | |
| CHANNELS = 1 | |
| FRAME_MS = 30 | |
| FRAME_LEN = SAMPLE_RATE * FRAME_MS // 1000 | |
| # WebRTC VAD | |
| VAD_AGGRESSIVENESS = 3 # 0–3; higher is stricter | |
| PRE_FRAMES = 8 | |
| SPEECH_START_FRAMES = 3 | |
| SPEECH_END_FRAMES = 15 | |
| # Segments | |
| SEG_PADDING_MS = 200 | |
| MIN_SEG_SEC = 0.6 | |
| MAX_SEG_SEC = 20 # hard cut to avoid runaway segments | |
| # Frame gating | |
| MIN_FRAME_RMS = 0.004 # drop very quiet / steady noise frames | |
| HUMAN_FREQ_LOW = 80 | |
| HUMAN_FREQ_HIGH = 3400 | |
| MIN_HUMAN_BAND_RATIO = 0.15 | |
| # Automatic gain control | |
| TARGET_RMS = 0.10 | |
| MIN_GAIN = 0.25 | |
| MAX_GAIN = 8.0 | |
| PEAK_CLIP_LIMIT = 0.99 | |
| AGC_EPS = 1e-5 | |
| # Transcription | |
| WHISPER_MODEL_NAME = os.environ.get("WHISPER_MODEL_NAME", "base.en") | |
| WHISPER_DEVICE = os.environ.get("WHISPER_DEVICE", "cpu") | |
| WHISPER_LANGUAGE = "en" | |
| MIN_TEXT_LEN = 1 | |
| @dataclass | |
| class SegmentInfo: | |
| timestamp: float | |
| path: Path | |
| duration: float | |
| rms_before: float | |
| peak_before: float | |
| gain: float | |
| rms_after: float | |
| peak_after: float | |
| STOP = threading.Event() | |
| AUDIO_Q: "queue.Queue[SegmentInfo]" = queue.Queue() | |
| vad = webrtcvad.Vad(VAD_AGGRESSIVENESS) | |
| def log(msg: str) -> None: | |
| print(f"[{datetime.now()}] {msg}", flush=True) | |
| def ensure_dirs() -> None: | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| AUDIO_DIR.mkdir(parents=True, exist_ok=True) | |
| def write_jsonl(obj: dict) -> None: | |
| with LOG_PATH.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(obj, ensure_ascii=False) + "\n") | |
| def load_whisper_model(): | |
| try: | |
| import whisper | |
| except ImportError as exc: | |
| raise SystemExit( | |
| "Whisper backend requires the 'whisper' package. " | |
| "Install it with 'pip install -U openai-whisper'." | |
| ) from exc | |
| log(f"Loading Whisper '{WHISPER_MODEL_NAME}' on {WHISPER_DEVICE} (English only)") | |
| return whisper.load_model(WHISPER_MODEL_NAME, device=WHISPER_DEVICE) | |
| def concat_and_pad(frames: list[np.ndarray]) -> Optional[np.ndarray]: | |
| if not frames: | |
| return None | |
| audio = np.concatenate(frames).astype(np.float32) / 32768.0 | |
| pad = int(SAMPLE_RATE * SEG_PADDING_MS / 1000) | |
| if pad: | |
| audio = np.pad(audio, (pad, pad)) | |
| return audio | |
| def apply_agc(audio: np.ndarray) -> Optional[Tuple[np.ndarray, Tuple[float, float, float, float, float]]]: | |
| cur_rms = float(np.sqrt(np.mean(audio ** 2))) | |
| if cur_rms < AGC_EPS: | |
| return None | |
| cur_peak = float(np.max(np.abs(audio))) | |
| gain = float(np.clip(TARGET_RMS / cur_rms, MIN_GAIN, MAX_GAIN)) | |
| processed = audio * gain | |
| np.clip(processed, -PEAK_CLIP_LIMIT, PEAK_CLIP_LIMIT, out=processed) | |
| new_rms = float(np.sqrt(np.mean(processed ** 2))) | |
| new_peak = float(np.max(np.abs(processed))) | |
| return processed, (cur_rms, cur_peak, gain, new_rms, new_peak) | |
| def human_band_ratio(audio: np.ndarray) -> float: | |
| fft = np.fft.rfft(audio) | |
| power = np.abs(fft) ** 2 | |
| freqs = np.fft.rfftfreq(len(audio), 1 / SAMPLE_RATE) | |
| mask = (freqs >= HUMAN_FREQ_LOW) & (freqs <= HUMAN_FREQ_HIGH) | |
| if not mask.any(): | |
| return 0.0 | |
| total = float(np.sum(power)) | |
| return float(np.sum(power[mask]) / total) if total > AGC_EPS else 0.0 | |
| def save_segment(frames: list[np.ndarray]) -> Optional[SegmentInfo]: | |
| audio = concat_and_pad(frames) | |
| if audio is None: | |
| return None | |
| agc_result = apply_agc(audio) | |
| if agc_result is None: | |
| log("Segment dropped: near silent (RMS too low).") | |
| return None | |
| processed, stats = agc_result | |
| cur_rms, cur_peak, gain, new_rms, new_peak = stats | |
| duration = len(processed) / SAMPLE_RATE | |
| if duration < MIN_SEG_SEC: | |
| log(f"Segment dropped: too short ({duration:.2f}s).") | |
| return None | |
| timestamp = time.time() | |
| stamp = datetime.fromtimestamp(timestamp).strftime("%Y%m%d_%H%M%S_%f") | |
| path = AUDIO_DIR / f"seg_{stamp}.wav" | |
| sf.write(path, processed, SAMPLE_RATE) | |
| info = SegmentInfo( | |
| timestamp=timestamp, | |
| path=path, | |
| duration=duration, | |
| rms_before=cur_rms, | |
| peak_before=cur_peak, | |
| gain=gain, | |
| rms_after=new_rms, | |
| peak_after=new_peak, | |
| ) | |
| log( | |
| f"Saved {path} | dur={duration:.2f}s gain={gain:.2f} " | |
| f"rms {cur_rms:.3f}->{new_rms:.3f} peak {cur_peak:.3f}->{new_peak:.3f}" | |
| ) | |
| return info | |
| def record_loop(device: Optional[object]) -> None: | |
| log( | |
| f"Starting recorder (WebRTC VAD, speech segments). " | |
| f"Input device: {device if device is not None else 'default'}" | |
| ) | |
| ring = collections.deque(maxlen=PRE_FRAMES) | |
| triggered = False | |
| voiced_frames: list[np.ndarray] = [] | |
| unvoiced_count = 0 | |
| seg_start_ts: Optional[float] = None | |
| try: | |
| with sd.InputStream( | |
| samplerate=SAMPLE_RATE, | |
| channels=CHANNELS, | |
| dtype="int16", | |
| blocksize=FRAME_LEN, | |
| device=device, | |
| ) as stream: | |
| while not STOP.is_set(): | |
| frame, overflowed = stream.read(FRAME_LEN) | |
| if overflowed: | |
| log("WARN: audio overflow") | |
| frame = frame[:, 0] if frame.ndim == 2 else frame | |
| frame_i16 = frame.astype(np.int16) | |
| frame_f32 = frame_i16.astype(np.float32) / 32768.0 | |
| frame_rms = float(np.sqrt(np.mean(frame_f32 ** 2))) | |
| band_ratio = human_band_ratio(frame_f32) if frame_rms > AGC_EPS else 0.0 | |
| vad_flag = vad.is_speech(frame_i16.tobytes(), SAMPLE_RATE) | |
| is_speech = ( | |
| vad_flag | |
| and frame_rms >= MIN_FRAME_RMS | |
| and band_ratio >= MIN_HUMAN_BAND_RATIO | |
| ) | |
| if not triggered: | |
| ring.append((frame_i16, is_speech)) | |
| voiced = sum(1 for _, v in ring if v) | |
| if voiced >= SPEECH_START_FRAMES: | |
| triggered = True | |
| seg_start_ts = time.time() | |
| voiced_frames = [f for f, _ in ring] | |
| ring.clear() | |
| unvoiced_count = 0 | |
| log("Speech start") | |
| else: | |
| voiced_frames.append(frame_i16) | |
| if seg_start_ts: | |
| elapsed = time.time() - seg_start_ts | |
| else: | |
| elapsed = 0 | |
| if seg_start_ts and elapsed >= MAX_SEG_SEC: | |
| triggered = False | |
| unvoiced_count = 0 | |
| seg_start_ts = None | |
| info = save_segment(voiced_frames) | |
| if info: | |
| AUDIO_Q.put(info) | |
| voiced_frames = [] | |
| ring.clear() | |
| log(f"Segment auto-cut at {elapsed:.1f}s (max length).") | |
| continue | |
| if is_speech: | |
| unvoiced_count = 0 | |
| else: | |
| unvoiced_count += 1 | |
| if unvoiced_count >= SPEECH_END_FRAMES: | |
| triggered = False | |
| unvoiced_count = 0 | |
| seg_start_ts = None | |
| info = save_segment(voiced_frames) | |
| if info: | |
| AUDIO_Q.put(info) | |
| voiced_frames = [] | |
| ring.clear() | |
| if triggered and voiced_frames: | |
| info = save_segment(voiced_frames) | |
| if info: | |
| AUDIO_Q.put(info) | |
| seg_start_ts = None | |
| except Exception as exc: # pragma: no cover - runtime only | |
| log(f"Recorder error: {exc}") | |
| log("Recorder stopped") | |
| def transcribe_segment(path: Path, model) -> Tuple[str, dict]: | |
| result = model.transcribe( | |
| str(path), | |
| language=WHISPER_LANGUAGE, | |
| task="transcribe", | |
| verbose=False, | |
| ) | |
| text = result.get("text", "").strip() | |
| return text, result | |
| def transcribe_loop(model) -> None: | |
| log("Transcriber started") | |
| while not (STOP.is_set() and AUDIO_Q.empty()): | |
| try: | |
| segment = AUDIO_Q.get(timeout=1) | |
| except queue.Empty: | |
| continue | |
| log(f"Transcribing {segment.path} …") | |
| try: | |
| text, result = transcribe_segment(segment.path, model) | |
| except Exception as exc: # pragma: no cover - runtime only | |
| log(f"Whisper error {segment.path}: {exc}") | |
| continue | |
| if len(text) < MIN_TEXT_LEN: | |
| log(f"No transcript for {segment.path} (len={len(text)}), skipping log.") | |
| continue | |
| write_jsonl({ | |
| "timestamp": segment.timestamp, | |
| "audio": str(segment.path), | |
| "text": text, | |
| "duration": segment.duration, | |
| "backend": "whisper-en", | |
| "agc": { | |
| "gain": segment.gain, | |
| "rms_before": segment.rms_before, | |
| "rms_after": segment.rms_after, | |
| "peak_before": segment.peak_before, | |
| "peak_after": segment.peak_after, | |
| }, | |
| }) | |
| log(f"Done: {text!r}") | |
| log("Transcriber stopped") | |
| def list_input_devices() -> None: | |
| log("Available input devices:") | |
| try: | |
| devices = sd.query_devices() | |
| except Exception as exc: | |
| log(f"Could not query devices: {exc}") | |
| return | |
| for idx, dev in enumerate(devices): | |
| if dev.get("max_input_channels", 0) <= 0: | |
| continue | |
| log( | |
| f"[{idx}] {dev.get('name')} " | |
| f"(inputs={dev.get('max_input_channels')}, " | |
| f"default_sr={dev.get('default_samplerate')})" | |
| ) | |
| def coerce_device(device_arg: Optional[str]): | |
| if device_arg is None: | |
| return None | |
| try: | |
| return int(device_arg) | |
| except (TypeError, ValueError): | |
| return device_arg | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description="Record speech with WebRTC VAD + Whisper transcription (English only)." | |
| ) | |
| parser.add_argument( | |
| "--device", | |
| help="Input device index or name (as reported by sounddevice).", | |
| ) | |
| parser.add_argument( | |
| "--list-devices", | |
| action="store_true", | |
| help="List available input devices and exit.", | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| ensure_dirs() | |
| if args.list_devices: | |
| list_input_devices() | |
| return | |
| device = coerce_device(args.device) | |
| try: | |
| sd.check_input_settings( | |
| device=device, | |
| samplerate=SAMPLE_RATE, | |
| channels=CHANNELS, | |
| dtype="int16", | |
| ) | |
| except Exception as exc: | |
| raise SystemExit(f"Invalid input device {device!r}: {exc}") from exc | |
| model = load_whisper_model() | |
| log(f"Output dir: {OUTPUT_DIR}") | |
| log(f"Input device: {device if device is not None else 'default'}") | |
| log("Press Ctrl+C to stop.") | |
| recorder = threading.Thread(target=record_loop, args=(device,), daemon=True) | |
| transcriber = threading.Thread(target=transcribe_loop, args=(model,), daemon=True) | |
| recorder.start() | |
| transcriber.start() | |
| try: | |
| while recorder.is_alive(): | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| STOP.set() | |
| recorder.join(timeout=5) | |
| transcriber.join(timeout=5) | |
| log("Stopped.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment