Skip to content

Instantly share code, notes, and snippets.

@lkd70
Created January 12, 2025 16:00
Show Gist options
  • Select an option

  • Save lkd70/3e865bd80c10b396e710db3fc6249916 to your computer and use it in GitHub Desktop.

Select an option

Save lkd70/3e865bd80c10b396e710db3fc6249916 to your computer and use it in GitHub Desktop.
Upload to Telegram from Radarr/Sonarr/Lidarr with Telegram Topic support.
#!/bin/bash
# Configurable working directory
WORKING_DIR="/torrent/utils/message-telegram"
# Change to the working directory
cd "$WORKING_DIR" || { echo "Failed to change to working directory: $WORKING_DIR"; exit 1; }
# Function to send telegram message with error handling
send_telegram_message() {
local caption="$1"
local file="$2"
local topic="$3"
local max_retries=3
local retry_count=0
local success=false
while [ $retry_count -lt $max_retries ] && [ "$success" = false ]; do
if [ -n "$file" ]; then
# Send message with file
if python3 "$WORKING_DIR/sendMessage.py" --caption="$caption" --file="$file" --topic "$topic" 2>&1; then
success=true
else
retry_count=$((retry_count + 1))
echo "Failed to send message with file (attempt $retry_count/$max_retries)"
[ $retry_count -lt $max_retries ] && sleep 5
fi
else
# Send message without file
if python3 "$WORKING_DIR/sendMessage.py" --caption="$caption" --topic "$topic" 2>&1; then
success=true
else
retry_count=$((retry_count + 1))
echo "Failed to send message (attempt $retry_count/$max_retries)"
[ $retry_count -lt $max_retries ] && sleep 5
fi
fi
done
if [ "$success" = false ]; then
echo "ERROR: Failed to send Telegram message after $max_retries attempts"
# Optionally, you could log to a file or send to a backup channel
echo "[$(date '+%Y-%m-%d %H:%M:%S')] Failed to send message: $caption" >> "$WORKING_DIR/error.log"
return 1
fi
return 0
}
# ============= TEST MODE =============
# Leave test_mode empty to disable testing
TEST_MODE="" # Options: radarr, sonarr, lidarr
if [ "$TEST_MODE" = "radarr" ]; then
# Test data for Radarr
radarr_moviefile_path="/etc/hosts" # Using a file that exists on most systems
radarr_movie_title="Test Movie"
radarr_movie_year="2025"
radarr_movie_imdbid="tt0468569"
radarr_moviefile_quality="1080p Bluray"
elif [ "$TEST_MODE" = "sonarr" ]; then
# Test data for Sonarr
sonarr_episodefile_path="/etc/hosts"
sonarr_series_title="Test Series"
sonarr_series_year="2025"
sonarr_series_imdbid="tt0944947"
sonarr_episodefile_quality="1080p WEB-DL"
sonarr_episode_seasonnumber="1"
sonarr_episode_episodenumber="1"
sonarr_episode_title="Pilot Episode"
elif [ "$TEST_MODE" = "lidarr" ]; then
# Test data for Lidarr
lidarr_trackfile_path="/etc/hosts"
lidarr_artist_name="Test Artist"
lidarr_album_title="Test Album"
lidarr_album_releasedate="2025-01-01"
lidarr_trackfile_quality="FLAC"
lidarr_album_type="Album"
lidarr_album_genre="Rock"
fi
# ====================================
# Function to handle test events
handle_test_event() {
local service=$1
echo "Handling test event for $service"
caption="\n**🧪 Test Event**\nService: **$service**\nStatus: **Success**\nTimestamp: **$(date '+%Y-%m-%d %H:%M:%S')**"
send_telegram_message "$caption" "" "Log"
send_telegram_message "#####################################" "" "Log"
echo "Test message sent for $service"
exit 0
}
# Check for test events first
if [ "$radarr_eventtype" = "Test" ]; then
handle_test_event "Radarr"
elif [ "$sonarr_eventtype" = "Test" ]; then
handle_test_event "Sonarr"
elif [ "$lidarr_eventtype" = "Test" ]; then
handle_test_event "Lidarr"
fi
# Regular event handling
if [ -n "$radarr_moviefile_path" ]; then
# Radarr variables
src=$radarr_moviefile_path
title=$radarr_movie_title
year=$radarr_movie_year
imdbid=$radarr_movie_imdbid
quality=$radarr_moviefile_quality
media_type="Movie"
topic="Films"
elif [ -n "$sonarr_episodefile_path" ]; then
# Sonarr variables
src=$sonarr_episodefile_path
title="$sonarr_series_title"
year=$sonarr_series_year
imdbid=$sonarr_series_imdbid
quality=$sonarr_episodefile_quality
# Add episode-specific information
episode_info="S${sonarr_episode_seasonnumber}E${sonarr_episode_episodenumber}"
[ -n "$sonarr_episode_title" ] && episode_info="$episode_info - $sonarr_episode_title"
title="$title $episode_info"
media_type="TV Show"
topic="Shows"
elif [ -n "$lidarr_trackfile_path" ]; then
# Lidarr variables
src=$lidarr_trackfile_path
artist=$lidarr_artist_name
album=$lidarr_album_title
year=$lidarr_album_releasedate
quality=$lidarr_trackfile_quality
media_type="Music"
topic="Music"
# Format year from release date (assumes YYYY-MM-DD format)
if [ -n "$year" ]; then
year=$(echo $year | cut -d'-' -f1)
fi
# Combine artist and album for title
title="$artist - $album"
# Add additional music info if available
[ -n "$lidarr_album_type" ] && extra_info="$lidarr_album_type"
[ -n "$lidarr_album_genre" ] && extra_info="${extra_info:+$extra_info, }$lidarr_album_genre"
else
echo "Error: No Radarr, Sonarr, or Lidarr environment variables detected"
exit 1
fi
# Build the caption
caption="\nName: **$title"
[ -n "$year" ] && caption="$caption ($year)"
caption="$caption**\nQuality: **$quality**\nType: **$media_type**"
# Add extra info for music
if [ "$media_type" = "Music" ] && [ -n "$extra_info" ]; then
caption="$caption\nInfo: **$extra_info**"
fi
# Add IMDB link if available (for movies and TV shows)
if [ -n "$imdbid" ]; then
caption="${caption}\n**[IMDB](https://www.imdb.com/title/${imdbid})**"
fi
echo "Uploading file: '$src' with caption: '$caption'"
if ! send_telegram_message "$caption" "$src" "$topic"; then
echo "Failed to send file upload message"
exit 1
fi
if ! send_telegram_message "#####################################" "" "$topic"; then
echo "Failed to send separator message"
exit 1
fi
echo "Uploaded $caption"
exit 0
# you'll need to pip install argparse and telethon
import argparse
import os
import logging
from telethon import TelegramClient
from telethon.tl.functions.channels import GetForumTopicsRequest
from telethon.tl.types import PeerChannel, ForumTopicDeleted
from telethon.errors import RPCError
api_id = 'YOUR_APP_ID'
api_hash = 'YOUR_APP_HASH'
phone_number = 'YOUR_PHONE_NUMBER'
supergroup_id = -100000000000000 # Replace with the supergroup ID.
# Initialize the Telegram client
client = TelegramClient('session_name', api_id, api_hash)
# Maximum file size per chunk (2GB in bytes)
MAX_FILE_SIZE = 2 * 1024 * 1024 * 1024 # 2GB
# Set up logging
logging.basicConfig(
filename="log.txt",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
def progress_callback(current, total):
percent = (current / total) * 100
print(f"\rUploading... {percent:.2f}% ({current} of {total} bytes)", end="")
if current == total:
print()
async def find_topic_id(supergroup, topic_name):
"""
Find topic ID with error handling for deleted topics
"""
try:
forum_topics = await client(GetForumTopicsRequest(
channel=supergroup,
offset_date=None,
offset_id=0,
offset_topic=0,
limit=100
))
for topic in forum_topics.topics:
# Skip deleted topics
if isinstance(topic, ForumTopicDeleted):
continue
try:
if topic.title == topic_name:
return topic.top_message
except AttributeError:
# Skip topics that don't have expected attributes
continue
logger.error(f"Topic with name '{topic_name}' not found in the supergroup.")
print(f"Topic with name '{topic_name}' not found in the supergroup.")
return None
except RPCError as e:
logger.error(f"RPC Error while fetching topics: {e}")
print(f"Error fetching topics: {e}")
return None
async def main(caption, file_path=None, topic_name=None, format_mode="markdown"):
# Connect to Telegram
await client.start(phone=phone_number)
try:
# Resolve the supergroup entity
supergroup = await client.get_entity(PeerChannel(supergroup_id))
logger.info(f"Resolved supergroup: {supergroup.title}")
print(f"Resolved supergroup: {supergroup.title}")
except ValueError as e:
logger.error(f"Error resolving supergroup: {e}")
print(f"Error resolving supergroup: {e}")
return 1
# If a topic name is provided, find the topic
topic_starter_message_id = None
if topic_name:
topic_starter_message_id = await find_topic_id(supergroup, topic_name)
if topic_starter_message_id is None:
return 1
# Determine the parse mode
parse_mode = None if format_mode == "off" else format_mode
try:
# Handle file upload if provided
if file_path:
if not os.path.exists(file_path):
logger.error(f"Error: File '{file_path}' does not exist.")
print(f"Error: File '{file_path}' does not exist.")
return 1
file_size = os.path.getsize(file_path)
if file_size > MAX_FILE_SIZE:
logger.info(f"File '{file_path}' is larger than 2GB. Splitting into chunks...")
print(f"File '{file_path}' is larger than 2GB. Splitting into chunks...")
await split_and_upload_file(file_path, caption, supergroup, topic_starter_message_id, parse_mode)
else:
logger.info(f"Uploading file '{file_path}'...")
print(f"Uploading file '{file_path}'...")
file = await client.upload_file(file_path, progress_callback=progress_callback)
await client.send_file(
entity=supergroup,
file=file,
caption=caption,
reply_to=topic_starter_message_id,
parse_mode=parse_mode
)
logger.info(f"File '{file_path}' uploaded successfully!")
print(f"File '{file_path}' uploaded successfully!")
else:
# Send message only
logger.info(f"Sending message: {caption}")
print(f"Sending message: {caption}")
await client.send_message(
entity=supergroup,
message=caption,
reply_to=topic_starter_message_id,
parse_mode=parse_mode
)
logger.info("Message sent successfully!")
print("Message sent successfully!")
return 0
except Exception as e:
logger.error(f"Error during operation: {e}")
print(f"Error: {e}")
return 1
async def split_and_upload_file(file_path, caption, supergroup, topic_starter_message_id, parse_mode):
"""
Splits a large file into chunks and uploads them sequentially.
"""
try:
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
part_number = 1
with open(file_path, "rb") as f:
while True:
chunk = f.read(MAX_FILE_SIZE)
if not chunk:
break
chunk_file_name = f"{file_name}.part{part_number}"
with open(chunk_file_name, "wb") as chunk_file:
chunk_file.write(chunk)
logger.info(f"Uploading part {part_number} of '{file_name}'...")
print(f"Uploading part {part_number} of '{file_name}'...")
file = await client.upload_file(chunk_file_name, progress_callback=progress_callback)
await client.send_file(
entity=supergroup,
file=file,
caption=f"{caption} (Part {part_number})",
reply_to=topic_starter_message_id,
parse_mode=parse_mode
)
logger.info(f"Part {part_number} of '{file_name}' uploaded successfully!")
print(f"Part {part_number} of '{file_name}' uploaded successfully!")
os.remove(chunk_file_name)
part_number += 1
logger.info(f"All parts of '{file_name}' uploaded successfully!")
print(f"All parts of '{file_name}' uploaded successfully!")
return 0
except Exception as e:
logger.error(f"Error during file split and upload: {e}")
print(f"Error: {e}")
return 1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Send a message or file to a Telegram supergroup or topic.")
parser.add_argument("--caption", type=str, required=True, help="Caption or message to send.")
parser.add_argument("--file", type=str, help="Path to the file to upload (optional).")
parser.add_argument("--topic", type=str, help="Name of the topic to send the message or file to (optional).")
parser.add_argument(
"--format",
type=str,
choices=["off", "markdown", "html"],
default="markdown",
help="Message formatting mode: 'off' (plain text), 'markdown' (default), or 'html'."
)
args = parser.parse_args()
# Run the script and get the exit code
with client:
exit_code = client.loop.run_until_complete(
main(args.caption.replace("\\n", "\n"), args.file, args.topic, args.format)
)
exit(exit_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment