|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
import re
|
|
import time
|
|
import logging
|
|
from io import BytesIO
|
|
from faster_whisper import WhisperModel
|
|
import torch
|
|
import warnings
|
|
from itertools import permutations
|
|
import colorama
|
|
from colorama import Fore, Style
|
|
|
|
# Initialize colorama
|
|
colorama.init(autoreset=True)
|
|
|
|
# Suppress warnings
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
filename='transcription.log',
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
)
|
|
console = logging.StreamHandler()
|
|
console.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
|
|
console.setFormatter(formatter)
|
|
logging.getLogger('').addHandler(console)
|
|
|
|
# Function to log system and GPU information
|
|
def log_system_info(device, compute_type, model_size):
|
|
logging.info("Starting transcription script.")
|
|
|
|
# Log GPU information
|
|
if torch.cuda.is_available():
|
|
gpu_count = torch.cuda.device_count()
|
|
logging.info(f"Number of GPUs: {gpu_count}")
|
|
for i in range(gpu_count):
|
|
gpu_name = torch.cuda.get_device_name(i)
|
|
logging.info(f"GPU {i}: {gpu_name}")
|
|
gpu_properties = torch.cuda.get_device_properties(i)
|
|
logging.info(f"GPU {i} properties: {gpu_properties}")
|
|
else:
|
|
logging.error("CUDA is not available. Ensure that CUDA and cuDNN are properly installed.")
|
|
|
|
# Log Torch version
|
|
logging.info(f"PyTorch version: {torch.__version__}")
|
|
|
|
# Log model and compute type
|
|
logging.info(f"Model size: '{model_size}'")
|
|
logging.info(f"Compute type: '{compute_type}'")
|
|
logging.info(f"Device: '{device}'")
|
|
|
|
# Ensure CUDA is available
|
|
def check_cuda():
|
|
if not torch.cuda.is_available():
|
|
logging.error("CUDA is not available. Please install CUDA and cuDNN, and ensure they are properly configured.")
|
|
exit(1)
|
|
|
|
# Set the RSS feed URL directly
|
|
RSS_FEED_URL = 'https://feeds.megaphone.fm/ASEMS8895807375' # EBHAS
|
|
|
|
# Function to fetch and parse the RSS feed
|
|
def fetch_rss_feed(rss_url):
|
|
try:
|
|
response = requests.get(rss_url, timeout=10)
|
|
response.raise_for_status()
|
|
return response.content
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error fetching RSS feed: {e}")
|
|
return None
|
|
|
|
# Function to extract the first episode URL from RSS feed
|
|
def get_first_episode(rss_feed_content):
|
|
try:
|
|
root = ET.fromstring(rss_feed_content)
|
|
items = root.findall('.//item')
|
|
if len(items) >= 1:
|
|
item = items[0] # Get the first item
|
|
# Extract title and audio file URL (enclosure URL)
|
|
title = item.find('title').text
|
|
enclosure = item.find('enclosure')
|
|
if enclosure is not None:
|
|
audio_url = enclosure.attrib['url']
|
|
return title, audio_url
|
|
logging.error("No episodes found in the RSS feed.")
|
|
return None, None
|
|
except ET.ParseError as e:
|
|
logging.error(f"Error parsing RSS feed: {e}")
|
|
return None, None
|
|
|
|
# Function to download the episode into memory
|
|
def download_episode_to_memory(audio_url):
|
|
try:
|
|
logging.info(f"Downloading audio from URL: {audio_url}")
|
|
response = requests.get(audio_url, stream=True, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
audio_bytes = BytesIO()
|
|
for chunk in response.iter_content(chunk_size=1024 * 1024): # 1 MB chunks
|
|
if chunk:
|
|
audio_bytes.write(chunk)
|
|
audio_bytes.seek(0)
|
|
logging.info("Audio downloaded into memory.")
|
|
return audio_bytes
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error downloading audio: {e}")
|
|
return None
|
|
|
|
# Function to process audio and search for the secret word
|
|
def process_audio(audio_bytes, phrases, device, compute_type, model_size, start_time):
|
|
# Initialize model
|
|
logging.info("Initializing Whisper model...")
|
|
|
|
model = WhisperModel(
|
|
model_size,
|
|
device=device,
|
|
compute_type=compute_type,
|
|
)
|
|
|
|
try:
|
|
# Transcribe audio using model, without VAD filtering
|
|
logging.info("Transcribing audio without VAD filtering...")
|
|
segments, info = model.transcribe(
|
|
audio_bytes,
|
|
language='en',
|
|
beam_size=1,
|
|
vad_filter=False, # VAD filtering is disabled
|
|
word_timestamps=True,
|
|
)
|
|
|
|
# Save transcription to a txt file
|
|
transcription_text = ''.join([segment.text for segment in segments])
|
|
with open('transcription.txt', 'w', encoding='utf-8') as f:
|
|
f.write(transcription_text)
|
|
logging.info("Transcription saved to 'transcription.txt'.")
|
|
|
|
# Debugging: Print lines containing 'secret' or 'code'
|
|
print("Lines containing 'secret' or 'code':")
|
|
for segment in segments:
|
|
if 'secret' in segment.text.lower() or 'code' in segment.text.lower():
|
|
start_time_seg = segment.start if segment.start is not None else 0.0
|
|
end_time_seg = segment.end if segment.end is not None else 0.0
|
|
try:
|
|
print(f"[{start_time_seg:.2f}s - {end_time_seg:.2f}s]: {segment.text.strip()}")
|
|
except TypeError:
|
|
# If formatting fails, print times without formatting
|
|
print(f"[{start_time_seg}s - {end_time_seg}s]: {segment.text.strip()}")
|
|
|
|
# Search for the secret word in the transcription text
|
|
secret_word = find_secret_word_in_text(transcription_text, phrases)
|
|
if secret_word:
|
|
# Find the timestamp and line where the secret word appears
|
|
timestamp, line = find_secret_word_timestamp_and_line(segments, secret_word)
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
logging.info(f"Time taken to find secret word: {duration:.2f} seconds")
|
|
|
|
# Print the secret word in red
|
|
print(f"Secret word: {Fore.RED}{secret_word}{Style.RESET_ALL}")
|
|
|
|
# Print the timestamp and line
|
|
if timestamp is not None:
|
|
print(f"Timestamp: {timestamp:.2f} seconds")
|
|
else:
|
|
print("Timestamp: Unknown")
|
|
print(f"Line: {line}")
|
|
|
|
# Logging
|
|
timestamp_str = f"{timestamp:.2f}" if timestamp is not None else "Unknown"
|
|
logging.info(f"Found secret word: {secret_word} at {timestamp_str} seconds in line: {line}")
|
|
|
|
# Generate and print the most grammatical 5-word combination, highlighted in green
|
|
best_sentence = generate_best_sentence(secret_word)
|
|
print(f"{Fore.GREEN}Most grammatical sentence: {best_sentence}{Style.RESET_ALL}")
|
|
|
|
return secret_word
|
|
else:
|
|
logging.info("Secret word not found.")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error transcribing audio: {e}")
|
|
return None
|
|
|
|
# Function to find the secret word in the transcription text
|
|
def find_secret_word_in_text(text, phrases):
|
|
for phrase in phrases:
|
|
match = re.search(phrase, text, re.IGNORECASE)
|
|
if match:
|
|
secret_word = match.group(1).strip(".")
|
|
return secret_word
|
|
return None
|
|
|
|
# Function to find the timestamp and line where the secret word appears
|
|
def find_secret_word_timestamp_and_line(segments, secret_word):
|
|
for segment in segments:
|
|
if secret_word.lower() in segment.text.lower():
|
|
timestamp = segment.start if segment.start is not None else None
|
|
line = segment.text.strip()
|
|
return timestamp, line
|
|
return None, None
|
|
|
|
# Function to generate the most grammatical 5-word sentence
|
|
def generate_best_sentence(secret_word):
|
|
words = ["listening", "to", "shameless", "media", secret_word]
|
|
all_permutations = permutations(words)
|
|
best_sentence = ""
|
|
best_score = float('-inf')
|
|
|
|
for perm in all_permutations:
|
|
sentence = ' '.join(perm)
|
|
# Simple scoring function
|
|
score = 0
|
|
if perm[0] == "listening" and perm[1] == "to":
|
|
score += 2
|
|
if perm[2] == "shameless" and perm[3] == "media":
|
|
score += 2
|
|
if perm[-1] == secret_word:
|
|
score += 1
|
|
# Additional scoring for common phrases
|
|
if "listening to shameless media" in sentence:
|
|
score += 3
|
|
if score > best_score:
|
|
best_score = score
|
|
best_sentence = sentence
|
|
|
|
return best_sentence
|
|
|
|
# Main function
|
|
def main():
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
compute_type = "int8" # Using int8 for maximum speed
|
|
model_size = "tiny"
|
|
|
|
check_cuda()
|
|
log_system_info(device, compute_type, model_size)
|
|
input("Press Enter to start processing...") # Wait for keypress
|
|
start_time = time.time()
|
|
|
|
rss_feed_content = fetch_rss_feed(RSS_FEED_URL)
|
|
|
|
if rss_feed_content:
|
|
# Get the first episode
|
|
title, audio_url = get_first_episode(rss_feed_content)
|
|
if not audio_url:
|
|
logging.error("No episode found in the RSS feed.")
|
|
return
|
|
|
|
logging.info(f"Processing episode: {title}")
|
|
|
|
# Step 1: Download the episode into memory
|
|
audio_bytes = download_episode_to_memory(audio_url)
|
|
if not audio_bytes:
|
|
return
|
|
|
|
# Step 2: Define the phrases to search for
|
|
phrases = [
|
|
r"your secret word is (\w+)",
|
|
r"psst.*?your secret word is (\w+)",
|
|
r"your code word is (\w+)",
|
|
r"psst.*?your code word is (\w+)",
|
|
r"the secret word is (\w+)",
|
|
r"the code word is (\w+)",
|
|
r"secret word is (\w+)",
|
|
r"code word is (\w+)",
|
|
r"your secret word is (\w+)[^\w]",
|
|
r"your secret word is (\w+)\b",
|
|
r"your secret word is (\w+)[.?!]",
|
|
r"your secret word is (\w+)'s",
|
|
]
|
|
|
|
# Step 3: Process audio and search for the secret word
|
|
secret_word = process_audio(audio_bytes, phrases, device, compute_type, model_size, start_time)
|
|
if secret_word:
|
|
logging.info(f"Found secret word in '{title}': {secret_word}")
|
|
else:
|
|
logging.info("Secret word not found.")
|
|
|
|
# Run the script
|
|
if __name__ == "__main__":
|
|
total_start_time = time.time()
|
|
logging.info(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(total_start_time))}")
|
|
|
|
main()
|
|
|
|
total_end_time = time.time()
|
|
logging.info(f"End time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(total_end_time))}")
|
|
|
|
total_duration = total_end_time - total_start_time
|
|
logging.info(f"Total duration: {total_duration:.2f} seconds")
|