Skip to content

Instantly share code, notes, and snippets.

@GiowGiow
Last active January 20, 2025 08:56
Show Gist options
  • Select an option

  • Save GiowGiow/b73a08694342a8217961594d6be8d41b to your computer and use it in GitHub Desktop.

Select an option

Save GiowGiow/b73a08694342a8217961594d6be8d41b to your computer and use it in GitHub Desktop.
Downloads season pack in Sonarr for each entry missing the last episode of a season. This solves the problem of trackers that don't release the last episode, but the season pack on completion
"""
python3 download_missing_episodes.py --sonarr-url http://host:port --api-key api_key --dry-run
-------------------------------------------------------------------------------------------------------------------------
usage: download_missing_episodes.py [-h] [--dry-run] --sonarr-url SONARR_URL --api-key API_KEY [--process-some-missing] [--process-all-missing]
[--log-level {DEBUG,INFO,WARNING,ERROR}]
Sonarr Season Pack Downloader
options:
-h, --help show this help message and exit
--dry-run Only check for missing episodes without downloading or changing show types.
--sonarr-url SONARR_URL
Your Sonarr URL (e.g., http://localhost:8989)
--api-key API_KEY Your Sonarr API key
--process-some-missing
Process seasons missing some episodes (not just all or last episode)
--process-all-missing
Process seasons missing all episodes (not just all or last episode)
--log-level {DEBUG,INFO,WARNING,ERROR}
Set the logging level (default: INFO)
"""
import requests
import datetime
import argparse
import logging
import sys
class SonarrClient:
def __init__(self, sonarr_url, api_key):
self.sonarr_url = sonarr_url.rstrip("/")
self.api_key = api_key
self.headers = {"Content-Type": "application/json", "X-Api-Key": self.api_key}
def get_monitored_series(self):
"""Fetch all monitored series."""
logging.info("Fetching all series from Sonarr...")
url = f"{self.sonarr_url}/api/v3/series"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
all_series = response.json()
monitored_series = [
series for series in all_series if series.get("monitored")
]
logging.info(f"Found {len(monitored_series)} monitored series.")
return monitored_series
except requests.RequestException as e:
logging.error(f"Error fetching series: {e}")
return []
def is_anime(self, series):
"""Determine if the series is an anime based on seriesType."""
return series.get("seriesType") == "anime"
def change_series_type_to_standard(self, series):
"""Change the seriesType from 'anime' to 'standard'."""
series_id = series.get("id")
series_title = series.get("title", "Unknown Title")
if not series_id:
logging.warning(f"Series '{series_title}' has no ID. Skipping type change.")
return False
# Update the seriesType
series["seriesType"] = "standard"
url = f"{self.sonarr_url}/api/v3/series/{series_id}"
try:
response = requests.put(url, json=series, headers=self.headers)
response.raise_for_status()
logging.info(
f"Changed seriesType to 'standard' for Series '{series_title}' (ID: {series_id})."
)
return True
except requests.RequestException as e:
logging.error(
f"Error changing seriesType for Series '{series_title}' (ID: {series_id}): {e}"
)
return False
def get_monitored_seasons(self, series):
"""Retrieve monitored seasons for a given series."""
monitored_seasons = set()
seasons = series.get("seasons", [])
for season in seasons:
if season.get("monitored", False):
monitored_seasons.add(season.get("seasonNumber"))
return monitored_seasons
def _parse_air_date(self, air_date_str):
if air_date_str:
try:
air_datetime = datetime.datetime.strptime(
air_date_str, "%Y-%m-%dT%H:%M:%SZ"
)
return air_datetime.date()
except ValueError:
return None
return None
def analyze_season_files(self, series_id, monitored_seasons):
"""Analyze monitored seasons for missing files."""
url = f"{self.sonarr_url}/api/v3/episode?seriesId={series_id}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
episodes = response.json()
except requests.RequestException as e:
logging.error(f"Error fetching episodes for Series ID {series_id}: {e}")
return {}
# Initialize season status dictionary
seasons_status = {}
today = datetime.datetime.now(datetime.timezone.utc).date()
for episode in episodes:
season = episode.get("seasonNumber")
if season not in monitored_seasons:
continue # Skip unmonitored seasons
if season not in seasons_status:
seasons_status[season] = {
"total_episodes": 0,
"downloaded_episodes": 0,
"missing_episodes": 0,
"last_episode_air_date": None,
"last_episode_has_file": False,
"all_episodes_aired": True, # Assumes all have aired until proven otherwise
}
seasons_status[season]["total_episodes"] += 1
# Check if the episode has aired
air_date = self._parse_air_date(episode.get("airDateUtc"))
if air_date:
if air_date > today:
seasons_status[season]["all_episodes_aired"] = False
else:
# If there's no valid air date, assume the episode hasn't aired
seasons_status[season]["all_episodes_aired"] = False
# Check if the episode has a file
if episode.get("hasFile", False):
seasons_status[season]["downloaded_episodes"] += 1
else:
seasons_status[season]["missing_episodes"] += 1
# Determine the last aired episode
if air_date:
last_air_date = seasons_status[season]["last_episode_air_date"]
if (last_air_date is None) or (air_date > last_air_date):
seasons_status[season]["last_episode_air_date"] = air_date
seasons_status[season]["last_episode_has_file"] = episode.get(
"hasFile", False
)
return seasons_status
def trigger_season_search_and_select(self, series_id, season_number):
"""
Trigger a season search, iterate over results, and push the selected release to Sonarr.
Parameters:
series_id (int): The ID of the series.
season_number (int): The season number to search for.
"""
search_url = f"{self.sonarr_url}/api/v3/release"
push_url = f"{self.sonarr_url}/api/v3/release" # Corrected endpoint
query_params = {"seriesId": series_id, "seasonNumber": season_number}
# Allowed rejection substrings
allowed_rejections = [
"Existing file meets cutoff",
"Existing file on disk is of equal or higher preference",
]
try:
# Trigger the search
logging.info(
f"Triggering season search for Series ID {series_id}, Season {season_number}."
)
response = requests.get(
search_url, headers=self.headers, params=query_params
)
response.raise_for_status()
# Parse the response
search_results = response.json()
if not search_results:
logging.info(
f"No releases found for Series ID {series_id}, Season {season_number}."
)
return False
# Iterate over releases to find a suitable one
for release in search_results:
title = release.get("title")
seeders = release.get("seeders", 0)
download_allowed = release.get("downloadAllowed")
rejected = release.get("rejected", False)
rejections = release.get("rejections", [])
logging.info(f"Checking release: {title} | Seeders: {seeders}")
# Skip if downloading is not allowed
if not download_allowed:
logging.info(f"Skipping {title}: Download not allowed.")
continue
if seeders == 0:
logging.info(f"Skipping {title}: 0 seeder torrent")
continue
# Validate rejection criteria
if rejected:
if len(rejections) != 2 or not all(
any(allowed in rejection for allowed in allowed_rejections)
for rejection in rejections
):
logging.info(
f"Skipping {title} due to rejection mismatch: {rejections}"
)
continue
# Add the required fields to the release payload
release_payload = release.copy()
release_payload.update(
{
"approved": True,
"temporarilyRejected": False,
"rejected": False,
}
)
# Push the selected release to Sonarr
try:
push_response = requests.post(
push_url, json=release_payload, headers=self.headers
)
push_response.raise_for_status()
logging.info(f"Successfully pushed release to Sonarr: {title}")
return True
except requests.RequestException as push_error:
logging.error(f"Failed to push release {title}: {push_error}")
return False
logging.info("No suitable release found after filtering.")
return False
except requests.RequestException as e:
logging.error(
f"Error during Sonarr search for Series ID {series_id}, Season {season_number}: {e}"
)
return False
def get_download_queue(self):
"""Retrieve the current download queue from Sonarr."""
url = f"{self.sonarr_url}/api/v3/queue"
try:
querystring = {"pageSize": "2000"}
response = requests.get(url, headers=self.headers, params=querystring)
response.raise_for_status()
queue_response = response.json()
# Extract the 'records' field which contains the queue items
queue_items = queue_response.get("records", [])
return queue_items
except requests.RequestException as e:
logging.error(f"Error fetching download queue: {e}")
return []
def process_series(client, dry_run, process_some_missing, process_all_missing):
"""Process all monitored series and handle season pack downloads."""
monitored_series = client.get_monitored_series()
if not monitored_series:
logging.info("No monitored series found. Exiting.")
return
# Fetch the download queue once to avoid multiple API calls
download_queue = client.get_download_queue()
series_ids_in_queue = {item["seriesId"] for item in download_queue}
logging.info("Checking for missing episodes...")
for series in monitored_series:
series_id = series.get("id")
series_title = series.get("title", "Unknown Title")
if not series_id:
logging.warning(f"Series '{series_title}' has no ID. Skipping.")
continue
monitored_seasons = client.get_monitored_seasons(series)
if not monitored_seasons:
logging.debug(
f"Series '{series_title}' has no monitored seasons. Skipping."
)
continue
# Check if the series is an anime
if client.is_anime(series):
# Analyze seasons
seasons_status = client.analyze_season_files(series_id, monitored_seasons)
# Collect seasons missing all episodes
missing_all_seasons = [
season_number
for season_number, status in seasons_status.items()
if status["all_episodes_aired"]
and status["missing_episodes"] == status["total_episodes"]
]
if missing_all_seasons and process_all_missing:
logging.info(
f"Anime '{series_title}' (ID: {series_id}) has {len(missing_all_seasons)} season(s) missing all episodes."
)
if not dry_run:
# Change seriesType from 'anime' to 'standard'
success = client.change_series_type_to_standard(series)
if success:
# Trigger season search for each missing season
for season_number in missing_all_seasons:
client.trigger_season_search_and_select(
series_id, season_number
)
else:
logging.debug(
f"Dry run enabled. Skipping type change and downloads for Series '{series_title}', Seasons: {missing_all_seasons}."
)
else:
logging.debug(
f"No seasons missing all episodes for Series '{series_title}' (ID: {series_id})."
)
else:
# For standard shows, handle as before
seasons_status = client.analyze_season_files(series_id, monitored_seasons)
for season_number, status in seasons_status.items():
# Only process seasons where all episodes have aired
if not status["all_episodes_aired"]:
logging.debug(
f"Series '{series_title}' (ID: {series_id}), Season {season_number} has not all episodes aired yet."
)
continue # Skip seasons that are not fully aired
# Case 1: Missing All Episodes
if (
status["missing_episodes"] == status["total_episodes"]
and process_all_missing
):
logging.info(
f"Series '{series_title}' (ID: {series_id}), Season {season_number} is missing all episodes."
)
# Check if the series has active downloads
if series_id in series_ids_in_queue:
logging.info(
f"Series '{series_title}' (ID: {series_id}) has active downloads. Skipping."
)
continue
if not dry_run:
success = client.trigger_season_search_and_select(
series_id, season_number
)
if success:
logging.info(
f"Season pack search triggered for Series '{series_title}', Season {season_number}."
)
else:
logging.debug(
f"Dry run enabled. Skipping download for Series '{series_title}', Season {season_number}."
)
continue # Move to next season
# Case 2: Missing Only the Last Aired Episode
if (
status["missing_episodes"] == 1
and not status["last_episode_has_file"]
):
logging.info(
f"Series '{series_title}' (ID: {series_id}), Season {season_number} is missing the last aired episode."
)
# Check if the series has active downloads
if series_id in series_ids_in_queue:
logging.info(
f"Series '{series_title}' (ID: {series_id}) has active downloads. Skipping."
)
continue
if not dry_run:
success = client.trigger_season_search_and_select(
series_id, season_number
)
if success:
logging.info(
f"Season pack search triggered for Series '{series_title}', Season {season_number}."
)
else:
logging.debug(
f"Dry run enabled. Skipping download for Series '{series_title}', Season {season_number}."
)
continue # Move to next season
# Case 3: Missing Multiple Episodes
if status["missing_episodes"] > 1 and process_some_missing:
logging.info(
f"Series '{series_title}' (ID: {series_id}), Season {season_number} is missing {status['missing_episodes']} episodes."
)
# Check if the series has active downloads
if series_id in series_ids_in_queue:
logging.info(
f"Series '{series_title}' (ID: {series_id}) has active downloads. Skipping."
)
continue
if not dry_run:
success = client.trigger_season_search_and_select(
series_id, season_number
)
if success:
logging.info(
f"Season pack search triggered for Series '{series_title}', Season {season_number}."
)
else:
logging.info(
f"Dry run enabled. Skipping download for Series '{series_title}', Season {season_number}."
)
def main():
parser = argparse.ArgumentParser(description="Sonarr Season Pack Downloader")
parser.add_argument(
"--dry-run",
action="store_true",
help="Only check for missing episodes without downloading or changing show types.",
)
parser.add_argument(
"--sonarr-url",
type=str,
required=True,
help="Your Sonarr URL (e.g., http://localhost:8989)",
)
parser.add_argument(
"--api-key",
type=str,
required=True,
help="Your Sonarr API key",
)
parser.add_argument(
"--process-some-missing",
action="store_true",
help="Process seasons missing some episodes (not just all or last episode)",
)
parser.add_argument(
"--process-all-missing",
action="store_true",
help="Process seasons missing all episodes (not just all or last episode)",
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Set the logging level (default: INFO)",
)
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
dry_run = args.dry_run
if dry_run:
logging.info(
"Running in dry-run mode. No downloads or type changes will be triggered."
)
# Create Sonarr client
client = SonarrClient(args.sonarr_url, args.api_key)
process_series(client, dry_run, args.process_some_missing, args.process_all_missing)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment