Skip to content

Instantly share code, notes, and snippets.

@TheRealXaiL
Created September 22, 2025 11:30
Show Gist options
  • Select an option

  • Save TheRealXaiL/ddff83c55dd7347de1a8b837943ae687 to your computer and use it in GitHub Desktop.

Select an option

Save TheRealXaiL/ddff83c55dd7347de1a8b837943ae687 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
AudioSocket Server - Optimized Audio Quality Version
"""
import asyncio
import struct
import logging
import sys
import json
import httpx
import io
import wave
import time
import base64
import subprocess
import tempfile
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
DEEPINFRA_API_KEY = "EDIT"
DEEPINFRA_MODEL = "meta-llama/Llama-3.3-70B-Instruct"
DEEPINFRA_URL = "https://api.deepinfra.com/v1/openai/chat/completions"
# OpenAI Whisper Configuration
OPENAI_API_KEY = "EDIT"
WHISPER_URL = "https://api.openai.com/v1/audio/transcriptions"
# ElevenLabs Configuration
ELEVENLABS_API_KEY = "EDIT"
ELEVENLABS_VOICE_ID = "EDIT"
class AudioSocketSession:
def __init__(self, reader, writer):
self.reader = reader
self.writer = writer
self.audio_buffer = bytearray()
self.conversation_history = []
self.last_process_time = time.time()
self.total_audio_packets = 0
self.processing = False
async def transcribe_audio_openai(self, audio_data):
"""Send audio to OpenAI Whisper for transcription"""
try:
logger.info(f"Sending {len(audio_data)} bytes to OpenAI Whisper...")
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(8000)
wav.writeframes(audio_data)
wav_buffer.seek(0)
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
files = {
'file': ('audio.wav', wav_buffer, 'audio/wav'),
'model': (None, 'whisper-1'),
'language': (None, 'en'),
'response_format': (None, 'json')
}
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(WHISPER_URL, headers=headers, files=files)
if response.status_code == 200:
result = response.json()
text = result.get('text', '').strip()
logger.info(f"βœ… TRANSCRIBED: '{text}'")
return text
else:
logger.error(f"OpenAI Whisper error: {response.status_code}")
return ""
except Exception as e:
logger.error(f"Transcription error: {e}")
return ""
async def text_to_speech_elevenlabs_ffmpeg(self, text):
"""Convert text to speech using ElevenLabs with FFmpeg conversion"""
try:
logger.info(f"Sending to ElevenLabs TTS: '{text}'")
# Use non-streaming for better quality
url = f"https://api.elevenlabs.io/v1/text-to-speech/{ELEVENLABS_VOICE_ID}"
headers = {
"xi-api-key": ELEVENLABS_API_KEY,
"Content-Type": "application/json"
}
data = {
"text": text,
"model_id": "eleven_monolingual_v1",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.75
}
}
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(url, headers=headers, json=data)
if response.status_code == 200:
mp3_data = response.content
logger.info(f"βœ… Received {len(mp3_data)} bytes of MP3")
# Use FFmpeg for high-quality conversion
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as mp3_file:
mp3_file.write(mp3_data)
mp3_path = mp3_file.name
with tempfile.NamedTemporaryFile(suffix='.raw', delete=False) as pcm_file:
pcm_path = pcm_file.name
try:
# FFmpeg conversion with high quality settings
cmd = [
'ffmpeg', '-i', mp3_path,
'-ar', '8000', # 8kHz sample rate
'-ac', '1', # Mono
'-f', 's16le', # 16-bit signed little-endian PCM
'-acodec', 'pcm_s16le',
'-af', 'highpass=f=200,lowpass=f=3400,volume=1.5', # Telephony filters + volume boost
'-y', pcm_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Read the PCM data
with open(pcm_path, 'rb') as f:
pcm_data = f.read()
logger.info(f"βœ… Converted to {len(pcm_data)} bytes of PCM")
# Send with proper pacing
chunk_size = 320
for i in range(0, len(pcm_data), chunk_size):
chunk = pcm_data[i:i+chunk_size]
if len(chunk) < chunk_size:
chunk += b'\x00' * (chunk_size - len(chunk))
msg = struct.pack('!BH', 0x10, chunk_size) + chunk
self.writer.write(msg)
await self.writer.drain()
await asyncio.sleep(0.020) # Exact 20ms timing
logger.info(f"βœ… Sent {len(pcm_data)//chunk_size} packets")
return True
else:
logger.error(f"FFmpeg error: {result.stderr}")
return False
finally:
# Clean up temp files
os.unlink(mp3_path)
os.unlink(pcm_path)
else:
logger.error(f"ElevenLabs error: {response.status_code}")
return False
except Exception as e:
logger.error(f"TTS error: {e}")
return False
async def get_llm_response(self, user_text):
"""Get response from DeepInfra LLM"""
try:
if not user_text:
return ""
logger.info(f"Sending to LLM: '{user_text}'")
self.conversation_history.append({"role": "user", "content": user_text})
messages = [
{"role": "system", "content": """You are the Director of Information Security at a major corporation. You ONLY discuss topics related to:
- Information security, cybersecurity, and data protection
- Security policies, procedures, and best practices
- Security incidents, threats, and vulnerabilities
- Compliance (SOC2, ISO 27001, GDPR, etc.)
- Security tools and technologies
If someone asks about ANYTHING else, politely redirect: "I only discuss information security matters. Do you have any security-related questions?"
Keep responses brief (1-2 sentences) and professional."""},
] + self.conversation_history[-6:]
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {DEEPINFRA_API_KEY}"
}
data = {
"model": DEEPINFRA_MODEL,
"messages": messages,
"max_tokens": 50,
"temperature": 0.7,
"stream": False
}
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(DEEPINFRA_URL, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
ai_text = result['choices'][0]['message']['content']
logger.info(f"βœ… AI RESPONSE: '{ai_text}'")
self.conversation_history.append({"role": "assistant", "content": ai_text})
return ai_text
else:
logger.error(f"LLM error: {response.status_code}")
return "I'm having trouble understanding. Could you repeat that?"
except Exception as e:
logger.error(f"LLM error: {e}")
return "Sorry, I'm having technical difficulties."
async def send_silence(self, duration_ms):
"""Send silence for specified duration"""
packets = duration_ms // 20
silence = b'\x00' * 320
for _ in range(packets):
msg = struct.pack('!BH', 0x10, len(silence)) + silence
self.writer.write(msg)
await self.writer.drain()
await asyncio.sleep(0.02)
async def process_audio(self):
"""Process accumulated audio"""
if self.processing or len(self.audio_buffer) < 8000:
return
self.processing = True
audio_copy = bytes(self.audio_buffer)
self.audio_buffer.clear()
logger.info(f"🎀 PROCESSING AUDIO: {len(audio_copy)} bytes")
text = await self.transcribe_audio_openai(audio_copy)
if text and text.lower() not in ['', '.', '...', 'you', 'the']:
response = await self.get_llm_response(text)
if response:
success = await self.text_to_speech_elevenlabs_ffmpeg(response)
if not success:
await self.send_silence(2000)
else:
logger.info("No meaningful text transcribed")
self.processing = False
self.last_process_time = time.time()
async def handle(self):
"""Handle the call session"""
try:
addr = self.writer.get_extra_info('peername')
logger.info(f"πŸ“ž NEW CALL from {addr}")
async def periodic_processor():
while True:
await asyncio.sleep(3)
if len(self.audio_buffer) > 8000:
await self.process_audio()
processor_task = asyncio.create_task(periodic_processor())
try:
greeting_sent = False
while True:
header = await self.reader.read(3)
if len(header) < 3:
break
msg_type = header[0]
msg_len = struct.unpack('>H', header[1:3])[0]
if msg_len > 0:
payload = await self.reader.read(msg_len)
else:
payload = b''
if msg_type == 0x00:
logger.info("πŸ“ž CALL ENDED")
break
elif msg_type == 0x01:
version = payload[0] if payload else 0
logger.info(f"AudioSocket version: {version}")
if not greeting_sent:
greeting = "This is the Director of Information Security. How can I help you with security matters?"
success = await self.text_to_speech_elevenlabs_ffmpeg(greeting)
if not success:
await self.send_silence(2000)
greeting_sent = True
elif msg_type == 0x10:
self.total_audio_packets += 1
if not self.processing:
self.audio_buffer.extend(payload)
if self.total_audio_packets % 100 == 0:
logger.info(f"Received {self.total_audio_packets} packets")
finally:
processor_task.cancel()
if len(self.audio_buffer) > 8000:
await self.process_audio()
logger.info(f"πŸ“ž CALL COMPLETE. Total packets: {self.total_audio_packets}")
except Exception as e:
logger.error(f"Session error: {e}")
finally:
try:
self.writer.close()
await self.writer.wait_closed()
except:
pass
async def handle_connection(reader, writer):
session = AudioSocketSession(reader, writer)
await session.handle()
async def main():
logger.info("="*50)
logger.info("FALSEFLAG - OPTIMIZED AUDIO QUALITY")
logger.info("="*50)
logger.info("Using FFmpeg for audio conversion")
logger.info("="*50)
# Check FFmpeg
result = subprocess.run(['ffmpeg', '-version'], capture_output=True)
if result.returncode == 0:
logger.info("βœ… FFmpeg is available")
else:
logger.error("❌ FFmpeg not found!")
server = await asyncio.start_server(
handle_connection,
'127.0.0.1',
8080
)
logger.info("βœ… Server READY on port 8080")
logger.info("πŸ“ž Dial edit to test")
logger.info("="*50)
async with server:
await server.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Server stopped")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment