Created
September 22, 2025 11:30
-
-
Save TheRealXaiL/ddff83c55dd7347de1a8b837943ae687 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| AudioSocket Server - Optimized Audio Quality Version | |
| """ | |
| import asyncio | |
| import struct | |
| import logging | |
| import sys | |
| import json | |
| import httpx | |
| import io | |
| import wave | |
| import time | |
| import base64 | |
| import subprocess | |
| import tempfile | |
| import os | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| DEEPINFRA_API_KEY = "EDIT" | |
| DEEPINFRA_MODEL = "meta-llama/Llama-3.3-70B-Instruct" | |
| DEEPINFRA_URL = "https://api.deepinfra.com/v1/openai/chat/completions" | |
| # OpenAI Whisper Configuration | |
| OPENAI_API_KEY = "EDIT" | |
| WHISPER_URL = "https://api.openai.com/v1/audio/transcriptions" | |
| # ElevenLabs Configuration | |
| ELEVENLABS_API_KEY = "EDIT" | |
| ELEVENLABS_VOICE_ID = "EDIT" | |
| class AudioSocketSession: | |
| def __init__(self, reader, writer): | |
| self.reader = reader | |
| self.writer = writer | |
| self.audio_buffer = bytearray() | |
| self.conversation_history = [] | |
| self.last_process_time = time.time() | |
| self.total_audio_packets = 0 | |
| self.processing = False | |
| async def transcribe_audio_openai(self, audio_data): | |
| """Send audio to OpenAI Whisper for transcription""" | |
| try: | |
| logger.info(f"Sending {len(audio_data)} bytes to OpenAI Whisper...") | |
| wav_buffer = io.BytesIO() | |
| with wave.open(wav_buffer, 'wb') as wav: | |
| wav.setnchannels(1) | |
| wav.setsampwidth(2) | |
| wav.setframerate(8000) | |
| wav.writeframes(audio_data) | |
| wav_buffer.seek(0) | |
| headers = { | |
| "Authorization": f"Bearer {OPENAI_API_KEY}" | |
| } | |
| files = { | |
| 'file': ('audio.wav', wav_buffer, 'audio/wav'), | |
| 'model': (None, 'whisper-1'), | |
| 'language': (None, 'en'), | |
| 'response_format': (None, 'json') | |
| } | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| response = await client.post(WHISPER_URL, headers=headers, files=files) | |
| if response.status_code == 200: | |
| result = response.json() | |
| text = result.get('text', '').strip() | |
| logger.info(f"β TRANSCRIBED: '{text}'") | |
| return text | |
| else: | |
| logger.error(f"OpenAI Whisper error: {response.status_code}") | |
| return "" | |
| except Exception as e: | |
| logger.error(f"Transcription error: {e}") | |
| return "" | |
| async def text_to_speech_elevenlabs_ffmpeg(self, text): | |
| """Convert text to speech using ElevenLabs with FFmpeg conversion""" | |
| try: | |
| logger.info(f"Sending to ElevenLabs TTS: '{text}'") | |
| # Use non-streaming for better quality | |
| url = f"https://api.elevenlabs.io/v1/text-to-speech/{ELEVENLABS_VOICE_ID}" | |
| headers = { | |
| "xi-api-key": ELEVENLABS_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "text": text, | |
| "model_id": "eleven_monolingual_v1", | |
| "voice_settings": { | |
| "stability": 0.5, | |
| "similarity_boost": 0.75 | |
| } | |
| } | |
| async with httpx.AsyncClient(timeout=60) as client: | |
| response = await client.post(url, headers=headers, json=data) | |
| if response.status_code == 200: | |
| mp3_data = response.content | |
| logger.info(f"β Received {len(mp3_data)} bytes of MP3") | |
| # Use FFmpeg for high-quality conversion | |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as mp3_file: | |
| mp3_file.write(mp3_data) | |
| mp3_path = mp3_file.name | |
| with tempfile.NamedTemporaryFile(suffix='.raw', delete=False) as pcm_file: | |
| pcm_path = pcm_file.name | |
| try: | |
| # FFmpeg conversion with high quality settings | |
| cmd = [ | |
| 'ffmpeg', '-i', mp3_path, | |
| '-ar', '8000', # 8kHz sample rate | |
| '-ac', '1', # Mono | |
| '-f', 's16le', # 16-bit signed little-endian PCM | |
| '-acodec', 'pcm_s16le', | |
| '-af', 'highpass=f=200,lowpass=f=3400,volume=1.5', # Telephony filters + volume boost | |
| '-y', pcm_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| # Read the PCM data | |
| with open(pcm_path, 'rb') as f: | |
| pcm_data = f.read() | |
| logger.info(f"β Converted to {len(pcm_data)} bytes of PCM") | |
| # Send with proper pacing | |
| chunk_size = 320 | |
| for i in range(0, len(pcm_data), chunk_size): | |
| chunk = pcm_data[i:i+chunk_size] | |
| if len(chunk) < chunk_size: | |
| chunk += b'\x00' * (chunk_size - len(chunk)) | |
| msg = struct.pack('!BH', 0x10, chunk_size) + chunk | |
| self.writer.write(msg) | |
| await self.writer.drain() | |
| await asyncio.sleep(0.020) # Exact 20ms timing | |
| logger.info(f"β Sent {len(pcm_data)//chunk_size} packets") | |
| return True | |
| else: | |
| logger.error(f"FFmpeg error: {result.stderr}") | |
| return False | |
| finally: | |
| # Clean up temp files | |
| os.unlink(mp3_path) | |
| os.unlink(pcm_path) | |
| else: | |
| logger.error(f"ElevenLabs error: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"TTS error: {e}") | |
| return False | |
| async def get_llm_response(self, user_text): | |
| """Get response from DeepInfra LLM""" | |
| try: | |
| if not user_text: | |
| return "" | |
| logger.info(f"Sending to LLM: '{user_text}'") | |
| self.conversation_history.append({"role": "user", "content": user_text}) | |
| messages = [ | |
| {"role": "system", "content": """You are the Director of Information Security at a major corporation. You ONLY discuss topics related to: | |
| - Information security, cybersecurity, and data protection | |
| - Security policies, procedures, and best practices | |
| - Security incidents, threats, and vulnerabilities | |
| - Compliance (SOC2, ISO 27001, GDPR, etc.) | |
| - Security tools and technologies | |
| If someone asks about ANYTHING else, politely redirect: "I only discuss information security matters. Do you have any security-related questions?" | |
| Keep responses brief (1-2 sentences) and professional."""}, | |
| ] + self.conversation_history[-6:] | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {DEEPINFRA_API_KEY}" | |
| } | |
| data = { | |
| "model": DEEPINFRA_MODEL, | |
| "messages": messages, | |
| "max_tokens": 50, | |
| "temperature": 0.7, | |
| "stream": False | |
| } | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| response = await client.post(DEEPINFRA_URL, headers=headers, json=data) | |
| if response.status_code == 200: | |
| result = response.json() | |
| ai_text = result['choices'][0]['message']['content'] | |
| logger.info(f"β AI RESPONSE: '{ai_text}'") | |
| self.conversation_history.append({"role": "assistant", "content": ai_text}) | |
| return ai_text | |
| else: | |
| logger.error(f"LLM error: {response.status_code}") | |
| return "I'm having trouble understanding. Could you repeat that?" | |
| except Exception as e: | |
| logger.error(f"LLM error: {e}") | |
| return "Sorry, I'm having technical difficulties." | |
| async def send_silence(self, duration_ms): | |
| """Send silence for specified duration""" | |
| packets = duration_ms // 20 | |
| silence = b'\x00' * 320 | |
| for _ in range(packets): | |
| msg = struct.pack('!BH', 0x10, len(silence)) + silence | |
| self.writer.write(msg) | |
| await self.writer.drain() | |
| await asyncio.sleep(0.02) | |
| async def process_audio(self): | |
| """Process accumulated audio""" | |
| if self.processing or len(self.audio_buffer) < 8000: | |
| return | |
| self.processing = True | |
| audio_copy = bytes(self.audio_buffer) | |
| self.audio_buffer.clear() | |
| logger.info(f"π€ PROCESSING AUDIO: {len(audio_copy)} bytes") | |
| text = await self.transcribe_audio_openai(audio_copy) | |
| if text and text.lower() not in ['', '.', '...', 'you', 'the']: | |
| response = await self.get_llm_response(text) | |
| if response: | |
| success = await self.text_to_speech_elevenlabs_ffmpeg(response) | |
| if not success: | |
| await self.send_silence(2000) | |
| else: | |
| logger.info("No meaningful text transcribed") | |
| self.processing = False | |
| self.last_process_time = time.time() | |
| async def handle(self): | |
| """Handle the call session""" | |
| try: | |
| addr = self.writer.get_extra_info('peername') | |
| logger.info(f"π NEW CALL from {addr}") | |
| async def periodic_processor(): | |
| while True: | |
| await asyncio.sleep(3) | |
| if len(self.audio_buffer) > 8000: | |
| await self.process_audio() | |
| processor_task = asyncio.create_task(periodic_processor()) | |
| try: | |
| greeting_sent = False | |
| while True: | |
| header = await self.reader.read(3) | |
| if len(header) < 3: | |
| break | |
| msg_type = header[0] | |
| msg_len = struct.unpack('>H', header[1:3])[0] | |
| if msg_len > 0: | |
| payload = await self.reader.read(msg_len) | |
| else: | |
| payload = b'' | |
| if msg_type == 0x00: | |
| logger.info("π CALL ENDED") | |
| break | |
| elif msg_type == 0x01: | |
| version = payload[0] if payload else 0 | |
| logger.info(f"AudioSocket version: {version}") | |
| if not greeting_sent: | |
| greeting = "This is the Director of Information Security. How can I help you with security matters?" | |
| success = await self.text_to_speech_elevenlabs_ffmpeg(greeting) | |
| if not success: | |
| await self.send_silence(2000) | |
| greeting_sent = True | |
| elif msg_type == 0x10: | |
| self.total_audio_packets += 1 | |
| if not self.processing: | |
| self.audio_buffer.extend(payload) | |
| if self.total_audio_packets % 100 == 0: | |
| logger.info(f"Received {self.total_audio_packets} packets") | |
| finally: | |
| processor_task.cancel() | |
| if len(self.audio_buffer) > 8000: | |
| await self.process_audio() | |
| logger.info(f"π CALL COMPLETE. Total packets: {self.total_audio_packets}") | |
| except Exception as e: | |
| logger.error(f"Session error: {e}") | |
| finally: | |
| try: | |
| self.writer.close() | |
| await self.writer.wait_closed() | |
| except: | |
| pass | |
| async def handle_connection(reader, writer): | |
| session = AudioSocketSession(reader, writer) | |
| await session.handle() | |
| async def main(): | |
| logger.info("="*50) | |
| logger.info("FALSEFLAG - OPTIMIZED AUDIO QUALITY") | |
| logger.info("="*50) | |
| logger.info("Using FFmpeg for audio conversion") | |
| logger.info("="*50) | |
| # Check FFmpeg | |
| result = subprocess.run(['ffmpeg', '-version'], capture_output=True) | |
| if result.returncode == 0: | |
| logger.info("β FFmpeg is available") | |
| else: | |
| logger.error("β FFmpeg not found!") | |
| server = await asyncio.start_server( | |
| handle_connection, | |
| '127.0.0.1', | |
| 8080 | |
| ) | |
| logger.info("β Server READY on port 8080") | |
| logger.info("π Dial edit to test") | |
| logger.info("="*50) | |
| async with server: | |
| await server.serve_forever() | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| logger.info("Server stopped") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment