Skip to content

Instantly share code, notes, and snippets.

@gbaeke
Created October 2, 2024 16:32
Show Gist options
  • Select an option

  • Save gbaeke/6dcb823f01b8a111b93538962784cda5 to your computer and use it in GitHub Desktop.

Select an option

Save gbaeke/6dcb823f01b8a111b93538962784cda5 to your computer and use it in GitHub Desktop.
OpenAI realtime API example
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import asyncio
import base64
import os
import pyaudio
import wave
import numpy as np
import soundfile as sf
from azure.core.credentials import AzureKeyCredential
from dotenv import load_dotenv
from scipy.signal import resample
import threading
from rtclient import InputAudioTranscription, RTClient, RTInputItem, RTOutputItem, RTResponse, ServerVAD
# Audio recording configuration
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 24000
TEMP_FILENAME = "temp_recording.wav"
def record_audio():
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
print("Recording... Press Enter to stop.")
frames = []
recording = True
def stop_recording():
nonlocal recording
input() # Wait for Enter key
recording = False
# Start a separate thread to wait for Enter key
stop_thread = threading.Thread(target=stop_recording)
stop_thread.start()
while recording:
data = stream.read(CHUNK)
frames.append(data)
print("Recording stopped.")
stream.stop_stream()
stream.close()
p.terminate()
# Save the recorded audio to a temporary file
wf = wave.open(TEMP_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
def play_audio(filename):
p = pyaudio.PyAudio()
wf = wave.open(filename, 'rb')
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
data = wf.readframes(CHUNK)
while data:
stream.write(data)
data = wf.readframes(CHUNK)
stream.stop_stream()
stream.close()
p.terminate()
def resample_audio(audio_data, original_sample_rate, target_sample_rate):
number_of_samples = round(len(audio_data) * float(target_sample_rate) / original_sample_rate)
resampled_audio = resample(audio_data, number_of_samples)
return resampled_audio.astype(np.int16)
async def send_audio(client: RTClient, audio_file_path: str):
sample_rate = 24000
duration_ms = 100
samples_per_chunk = sample_rate * (duration_ms / 1000)
bytes_per_sample = 2
bytes_per_chunk = int(samples_per_chunk * bytes_per_sample)
extra_params = (
{
"samplerate": sample_rate,
"channels": 1,
"subtype": "PCM_16",
}
if audio_file_path.endswith(".raw")
else {}
)
audio_data, original_sample_rate = sf.read(audio_file_path, dtype="int16", **extra_params)
if original_sample_rate != sample_rate:
audio_data = resample_audio(audio_data, original_sample_rate, sample_rate)
audio_bytes = audio_data.tobytes()
for i in range(0, len(audio_bytes), bytes_per_chunk):
chunk = audio_bytes[i : i + bytes_per_chunk]
await client.send_audio(chunk)
async def receive_control(client: RTClient):
async for control in client.control_messages():
if control is not None:
print(f"Received a control message: {control.type}")
else:
break
async def receive_item(item: RTOutputItem, out_dir: str):
prefix = f"[response={item.response_id}][item={item.id}]"
audio_data = None
audio_transcript = None
text_data = None
arguments = None
async for chunk in item:
if chunk.type == "audio_transcript":
audio_transcript = (audio_transcript or "") + chunk.data
elif chunk.type == "audio":
if audio_data is None:
audio_data = bytearray()
audio_bytes = base64.b64decode(chunk.data)
audio_data.extend(audio_bytes)
elif chunk.type == "tool_call_arguments":
arguments = (arguments or "") + chunk.data
elif chunk.type == "text":
text_data = (text_data or "") + chunk.data
if text_data is not None:
print(prefix, f"Text: {text_data}")
with open(os.path.join(out_dir, f"{item.id}.text.txt"), "w") as out:
out.write(text_data)
if audio_data is not None:
print(prefix, f"Audio received with length: {len(audio_data)}")
with open(os.path.join(out_dir, "response.wav"), "wb") as out:
audio_array = np.frombuffer(audio_data, dtype=np.int16)
sf.write(out, audio_array, samplerate=24000)
if audio_transcript is not None:
print(prefix, f"Audio Transcript: {audio_transcript}")
with open(os.path.join(out_dir, f"{item.id}.audio_transcript.txt"), "w") as out:
out.write(audio_transcript)
if arguments is not None:
print(prefix, f"Tool Call Arguments: {arguments}")
with open(os.path.join(out_dir, f"{item.id}.tool.streamed.json"), "w") as out:
out.write(arguments)
async def receive_response(client: RTClient, response: RTResponse, out_dir: str):
prefix = f"[response={response.id}]"
async for item in response:
print(prefix, f"Received item {item.id}")
asyncio.create_task(receive_item(item, out_dir))
print(prefix, "Response completed")
await client.close()
async def receive_input_item(item: RTInputItem):
prefix = f"[input_item={item.id}]"
await item
print(prefix, f"Previous Id: {item.previous_id}")
print(prefix, f"Transcript: {item.transcript}")
print(prefix, f"Audio Start [ms]: {item.audio_start_ms}")
print(prefix, f"Audio End [ms]: {item.audio_end_ms}")
async def receive_items(client: RTClient, out_dir: str):
async for item in client.items():
if isinstance(item, RTResponse):
asyncio.create_task(receive_response(client, item, out_dir))
else:
asyncio.create_task(receive_input_item(item))
async def receive_messages(client: RTClient, out_dir: str):
await asyncio.gather(
receive_items(client, out_dir),
receive_control(client),
)
async def run(client: RTClient, audio_file_path: str, out_dir: str):
print("Configuring Session...", end="", flush=True)
await client.configure(
turn_detection=ServerVAD(), input_audio_transcription=InputAudioTranscription(model="whisper-1")
)
print("Done")
await asyncio.gather(send_audio(client, audio_file_path), receive_messages(client, out_dir))
def get_env_var(var_name: str) -> str:
value = os.environ.get(var_name)
if not value:
raise OSError(f"Environment variable '{var_name}' is not set or is empty.")
return value
async def with_azure_openai(audio_file_path: str, out_dir: str):
endpoint = get_env_var("AZURE_OPENAI_ENDPOINT")
key = get_env_var("AZURE_OPENAI_API_KEY")
deployment = get_env_var("AZURE_OPENAI_DEPLOYMENT")
async with RTClient(url=endpoint, key_credential=AzureKeyCredential(key), azure_deployment=deployment) as client:
await run(client, audio_file_path, out_dir)
async def main():
load_dotenv()
out_dir = "output"
os.makedirs(out_dir, exist_ok=True)
while True:
input("Press Enter to start recording...")
record_audio()
print("Sending audio to Azure...")
await with_azure_openai(TEMP_FILENAME, out_dir)
print("AI response:")
response_file = os.path.join(out_dir, "response.wav")
if os.path.exists(response_file):
print("Playing AI response...")
play_audio(response_file)
else:
print("No audio response received.")
# Clean up temporary files
os.remove(TEMP_FILENAME)
for file in os.listdir(out_dir):
file_path = os.path.join(out_dir, file)
if file != "response.wav":
os.remove(file_path)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment