Skip to content

Instantly share code, notes, and snippets.

@carlsc2
Last active November 9, 2023 02:57
Show Gist options
  • Select an option

  • Save carlsc2/731f7cab40324f42cb2838a4a4004fbf to your computer and use it in GitHub Desktop.

Select an option

Save carlsc2/731f7cab40324f42cb2838a4a4004fbf to your computer and use it in GitHub Desktop.
This is a basic script using openai whisper variants to read from both mic and stereo mix and write transcriptions to a file. There is no speaker diarization, but mic and system are tagged as such. Additionally, the audio streams are denoised and split based on silence (determined by moving average of decibels with threshold tuned to human speec…
"""
================================================================================
MIT License
Copyright (c) 2023 Craig Carlson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
================================================================================
This is a basic script using openai whisper variants to read from both mic and
stereo mix and write transcriptions to a file. There is no speaker diarization,
but mic and system are tagged as such. Additionally, the audio streams are
denoised and split based on silence (determined by moving average of decibels
with threshold tuned to human speech).
Stopping the script is a bit janky but it's currently designed to hard exit when
Ctrl+C is pressed on a windows console.
"""
import sys
import threading
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from queue import Queue
from time import perf_counter, sleep
import noisereduce as nr
import numpy as np
import pyaudio as pa
import torch
import win32api
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
# from transformers.models.whisper.english_normalizer import \
# EnglishTextNormalizer
FORMAT = pa.paInt16
NP_FORMAT = np.int16
CHANNELS = 1
CHUNK = 1024
RATE = 16000 # int(p.get_default_input_device_info()['defaultSampleRate'])
np.seterr('raise')
@dataclass
class AudioSegment:
clip: np.ndarray
ts: datetime
tag: str
class StreamingMovingAverage:
def __init__(self, window_size):
self.values = deque(maxlen=window_size)
self.values.append(0)
self.sum = 0
@property
def average(self) -> float:
return float(self.sum) / len(self.values)
def process(self, value):
self.sum += value - self.values.popleft()
self.values.append(value)
def convert_to_decibel(arr: np.ndarray):
ref = 1.0
absarr = np.abs(arr)
return np.where(absarr > 0.0000000001, 20 * np.log10(absarr / ref), -60)
def get_noise(device_index: int):
try:
audio = pa.PyAudio()
stream = audio.open(format=FORMAT, channels=CHANNELS,
rate=RATE, input=True,
frames_per_buffer=CHUNK,
input_device_index=device_index)
frames = []
RECORD_SECONDS = 0.25
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(np.frombuffer(data, dtype=NP_FORMAT))
return np.hstack(frames).astype(NP_FORMAT)
# stop Recording
finally:
stream.stop_stream()
stream.close()
audio.terminate()
class SpeechRecorder():
"""Record 16kHz audio sample of speech"""
def __init__(self, stop_event: threading.Event, device_index: int = None, noise_sample=None) -> None:
if noise_sample is not None:
self.noise_sample = get_noise(device_index)
else:
self.noise_sample = noise_sample
self.audio_clip = np.array([], dtype=NP_FORMAT)
self.last_detect_time = None
self.db_avg = StreamingMovingAverage(10)
self.device_index = device_index
self.stop_event = stop_event
def callback(self, in_data, frame_count, time_info, flag):
# copy, since original is read-only
data = np.frombuffer(in_data, dtype=NP_FORMAT)
fdb = np.median(convert_to_decibel(data))
self.db_avg.process(fdb)
# here everything is ok for first time
out_data = nr.reduce_noise(y=data, sr=16000, y_noise=self.noise_sample,
stationary=True, n_fft=512)
ctime = perf_counter()
# increase sensitivity for first spike
if self.last_detect_time is None and fdb >= 40:
self.last_detect_time = ctime
# print("FIRST DETECT TIME: ", self.last_detect_time)
if self.db_avg.average >= 40:
self.last_detect_time = ctime
# print("LAST DETECT TIME: ", self.last_detect_time)
if self.last_detect_time is not None:
# print("delta:", ctime - self.last_detect_time)
if ctime - self.last_detect_time > 1:
return out_data.tobytes(), pa.paComplete
if self.last_detect_time is not None:
self.audio_clip = np.append(self.audio_clip, out_data)
return out_data.tobytes(), pa.paContinue
def record_sample(self):
try:
audio = pa.PyAudio()
stream = audio.open(format=FORMAT,
rate=RATE,
channels=CHANNELS,
input=True,
output=False,
frames_per_buffer=CHUNK,
input_device_index=self.device_index,
stream_callback=self.callback)
stream.start_stream()
start_time = perf_counter()
# main input loop
# print("recording...")
while stream.is_active() and not self.stop_event.is_set():
# constrain each audio sample to under 30 seconds max
if perf_counter() - start_time > 29:
break
sleep(0.05)
# print("Done.")
stream.stop_stream()
finally:
stream.close()
audio.terminate()
# define our torch configuration
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = "distil-whisper/distil-medium.en"
# model_id = "distil-whisper/distil-large-v2"
# load the model + processor
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_id, torch_dtype=torch_dtype, use_safetensors=True, low_cpu_mem_usage=True)
model = model.to(device)
processor = AutoProcessor.from_pretrained(model_id)
# normalizer = EnglishTextNormalizer(
# processor.tokenizer.english_spelling_normalizer)
def process_audio(audio_segment: AudioSegment, logfile):
# print("==== Processing audio clip... ====")
# start_time = perf_counter()
input_features = processor(audio=audio_segment.clip,
sampling_rate=16000,
return_tensors="pt")
input_features = input_features.to(device, dtype=torch_dtype)
pred_ids = model.generate(**input_features, max_new_tokens=128)
transcription = processor.batch_decode(
pred_ids, skip_special_tokens=True)[0]
print(transcription)
logfile.write(
f"{audio_segment.ts} {audio_segment.tag}: {transcription} \n")
# normalized_transcription = normalizer(transcription)
# print(normalized_transcription)
# print(f"==== Elapsed: {perf_counter() - start_time: 0.5f} ====")
def is_silence(audio_data, threshold=0.01):
if not np.any(audio_data):
return True
# Calculate the root mean square (RMS) of the audio data
rms = np.sqrt(np.mean(audio_data ** 2))
# print(rms)
# Compare the RMS value to the threshold
return rms < threshold
class AudioDeviceRecorder(threading.Thread):
"""Record audio segments from a particular device, placing the recordings in a queue"""
def __init__(self, device_index: int, result_queue: Queue, stop_event: threading.Event, tag: str):
super().__init__()
self.device_index = device_index
self.result_queue = result_queue
self.stop_event = stop_event
self.tag = tag
def run(self):
noise_sample = get_noise(self.device_index)
while not self.stop_event.is_set():
recorder = SpeechRecorder(
self.stop_event, self.device_index, noise_sample)
recorder.record_sample()
segment = AudioSegment(recorder.audio_clip.astype(
np.float32), datetime.now(), self.tag)
self.result_queue.put_nowait(segment)
noise_sample = recorder.audio_clip
class AudioTranscriber:
def __init__(self, log_file: Path):
self.log_file = log_file
def start(self):
"""Start processing audio snippets"""
# first, identify the stereo mix device
p = pa.PyAudio()
stereo_mix_device_index = -1
for i in range(p.get_device_count()):
dev = p.get_device_info_by_index(i)
if (dev['name'].startswith('Stereo Mix') and dev['hostApi'] == 0):
stereo_mix_device_index = dev['index']
print('stereo mix device index', stereo_mix_device_index)
if stereo_mix_device_index == -1:
print("Failed to find Stereo Mix device")
sys.exit(1)
# Create a queue to store results
result_queue = Queue()
stop_event = threading.Event()
# Define a function to handle Ctrl+C
def ctrl_c_handler(_):
print("Ctrl+C pressed. Cleaning up and exiting...")
stop_event.set()
sys.exit(0)
# Register the Ctrl+C handler using pywin32
win32api.SetConsoleCtrlHandler(ctrl_c_handler, True)
mic_recorder = AudioDeviceRecorder(
None, result_queue, stop_event, "[microphone]")
sys_recorder = AudioDeviceRecorder(
stereo_mix_device_index, result_queue, stop_event, "[system]")
mic_recorder.start()
sys_recorder.start()
# Main thread can continue to do other work or access the results
try:
with self.log_file.open("a+") as logfile:
logfile.write(f"==== Begin recording at {datetime.now()} ====")
while not stop_event.is_set():
try:
audio_segment = result_queue.get(timeout=1)
except:
continue
# print("Received audio clip")
result_queue.task_done()
if not is_silence(audio_segment.clip, 20):
process_audio(audio_segment, logfile)
# else:
# print("Discard silent audio")
except KeyboardInterrupt:
print("Stopping main thread")
self.stop_event.set()
# Clean up when the main thread is interrupted (e.g., Ctrl+C)
sys.exit(0)
def main():
start_time = datetime.now()
log_file = Path(f"audio_log_{start_time.timestamp()}.txt")
with log_file.open("w+") as logfile:
logfile.write(f"==== Begin recording at {start_time} ====\n")
transciber = AudioTranscriber(log_file)
transciber.start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment