Last active
January 20, 2025 08:56
-
-
Save GiowGiow/b73a08694342a8217961594d6be8d41b to your computer and use it in GitHub Desktop.
Downloads season pack in Sonarr for each entry missing the last episode of a season. This solves the problem of trackers that don't release the last episode, but the season pack on completion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| python3 download_missing_episodes.py --sonarr-url http://host:port --api-key api_key --dry-run | |
| ------------------------------------------------------------------------------------------------------------------------- | |
| usage: download_missing_episodes.py [-h] [--dry-run] --sonarr-url SONARR_URL --api-key API_KEY [--process-some-missing] [--process-all-missing] | |
| [--log-level {DEBUG,INFO,WARNING,ERROR}] | |
| Sonarr Season Pack Downloader | |
| options: | |
| -h, --help show this help message and exit | |
| --dry-run Only check for missing episodes without downloading or changing show types. | |
| --sonarr-url SONARR_URL | |
| Your Sonarr URL (e.g., http://localhost:8989) | |
| --api-key API_KEY Your Sonarr API key | |
| --process-some-missing | |
| Process seasons missing some episodes (not just all or last episode) | |
| --process-all-missing | |
| Process seasons missing all episodes (not just all or last episode) | |
| --log-level {DEBUG,INFO,WARNING,ERROR} | |
| Set the logging level (default: INFO) | |
| """ | |
| import requests | |
| import datetime | |
| import argparse | |
| import logging | |
| import sys | |
| class SonarrClient: | |
| def __init__(self, sonarr_url, api_key): | |
| self.sonarr_url = sonarr_url.rstrip("/") | |
| self.api_key = api_key | |
| self.headers = {"Content-Type": "application/json", "X-Api-Key": self.api_key} | |
| def get_monitored_series(self): | |
| """Fetch all monitored series.""" | |
| logging.info("Fetching all series from Sonarr...") | |
| url = f"{self.sonarr_url}/api/v3/series" | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| all_series = response.json() | |
| monitored_series = [ | |
| series for series in all_series if series.get("monitored") | |
| ] | |
| logging.info(f"Found {len(monitored_series)} monitored series.") | |
| return monitored_series | |
| except requests.RequestException as e: | |
| logging.error(f"Error fetching series: {e}") | |
| return [] | |
| def is_anime(self, series): | |
| """Determine if the series is an anime based on seriesType.""" | |
| return series.get("seriesType") == "anime" | |
| def change_series_type_to_standard(self, series): | |
| """Change the seriesType from 'anime' to 'standard'.""" | |
| series_id = series.get("id") | |
| series_title = series.get("title", "Unknown Title") | |
| if not series_id: | |
| logging.warning(f"Series '{series_title}' has no ID. Skipping type change.") | |
| return False | |
| # Update the seriesType | |
| series["seriesType"] = "standard" | |
| url = f"{self.sonarr_url}/api/v3/series/{series_id}" | |
| try: | |
| response = requests.put(url, json=series, headers=self.headers) | |
| response.raise_for_status() | |
| logging.info( | |
| f"Changed seriesType to 'standard' for Series '{series_title}' (ID: {series_id})." | |
| ) | |
| return True | |
| except requests.RequestException as e: | |
| logging.error( | |
| f"Error changing seriesType for Series '{series_title}' (ID: {series_id}): {e}" | |
| ) | |
| return False | |
| def get_monitored_seasons(self, series): | |
| """Retrieve monitored seasons for a given series.""" | |
| monitored_seasons = set() | |
| seasons = series.get("seasons", []) | |
| for season in seasons: | |
| if season.get("monitored", False): | |
| monitored_seasons.add(season.get("seasonNumber")) | |
| return monitored_seasons | |
| def _parse_air_date(self, air_date_str): | |
| if air_date_str: | |
| try: | |
| air_datetime = datetime.datetime.strptime( | |
| air_date_str, "%Y-%m-%dT%H:%M:%SZ" | |
| ) | |
| return air_datetime.date() | |
| except ValueError: | |
| return None | |
| return None | |
| def analyze_season_files(self, series_id, monitored_seasons): | |
| """Analyze monitored seasons for missing files.""" | |
| url = f"{self.sonarr_url}/api/v3/episode?seriesId={series_id}" | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| episodes = response.json() | |
| except requests.RequestException as e: | |
| logging.error(f"Error fetching episodes for Series ID {series_id}: {e}") | |
| return {} | |
| # Initialize season status dictionary | |
| seasons_status = {} | |
| today = datetime.datetime.now(datetime.timezone.utc).date() | |
| for episode in episodes: | |
| season = episode.get("seasonNumber") | |
| if season not in monitored_seasons: | |
| continue # Skip unmonitored seasons | |
| if season not in seasons_status: | |
| seasons_status[season] = { | |
| "total_episodes": 0, | |
| "downloaded_episodes": 0, | |
| "missing_episodes": 0, | |
| "last_episode_air_date": None, | |
| "last_episode_has_file": False, | |
| "all_episodes_aired": True, # Assumes all have aired until proven otherwise | |
| } | |
| seasons_status[season]["total_episodes"] += 1 | |
| # Check if the episode has aired | |
| air_date = self._parse_air_date(episode.get("airDateUtc")) | |
| if air_date: | |
| if air_date > today: | |
| seasons_status[season]["all_episodes_aired"] = False | |
| else: | |
| # If there's no valid air date, assume the episode hasn't aired | |
| seasons_status[season]["all_episodes_aired"] = False | |
| # Check if the episode has a file | |
| if episode.get("hasFile", False): | |
| seasons_status[season]["downloaded_episodes"] += 1 | |
| else: | |
| seasons_status[season]["missing_episodes"] += 1 | |
| # Determine the last aired episode | |
| if air_date: | |
| last_air_date = seasons_status[season]["last_episode_air_date"] | |
| if (last_air_date is None) or (air_date > last_air_date): | |
| seasons_status[season]["last_episode_air_date"] = air_date | |
| seasons_status[season]["last_episode_has_file"] = episode.get( | |
| "hasFile", False | |
| ) | |
| return seasons_status | |
| def trigger_season_search_and_select(self, series_id, season_number): | |
| """ | |
| Trigger a season search, iterate over results, and push the selected release to Sonarr. | |
| Parameters: | |
| series_id (int): The ID of the series. | |
| season_number (int): The season number to search for. | |
| """ | |
| search_url = f"{self.sonarr_url}/api/v3/release" | |
| push_url = f"{self.sonarr_url}/api/v3/release" # Corrected endpoint | |
| query_params = {"seriesId": series_id, "seasonNumber": season_number} | |
| # Allowed rejection substrings | |
| allowed_rejections = [ | |
| "Existing file meets cutoff", | |
| "Existing file on disk is of equal or higher preference", | |
| ] | |
| try: | |
| # Trigger the search | |
| logging.info( | |
| f"Triggering season search for Series ID {series_id}, Season {season_number}." | |
| ) | |
| response = requests.get( | |
| search_url, headers=self.headers, params=query_params | |
| ) | |
| response.raise_for_status() | |
| # Parse the response | |
| search_results = response.json() | |
| if not search_results: | |
| logging.info( | |
| f"No releases found for Series ID {series_id}, Season {season_number}." | |
| ) | |
| return False | |
| # Iterate over releases to find a suitable one | |
| for release in search_results: | |
| title = release.get("title") | |
| seeders = release.get("seeders", 0) | |
| download_allowed = release.get("downloadAllowed") | |
| rejected = release.get("rejected", False) | |
| rejections = release.get("rejections", []) | |
| logging.info(f"Checking release: {title} | Seeders: {seeders}") | |
| # Skip if downloading is not allowed | |
| if not download_allowed: | |
| logging.info(f"Skipping {title}: Download not allowed.") | |
| continue | |
| if seeders == 0: | |
| logging.info(f"Skipping {title}: 0 seeder torrent") | |
| continue | |
| # Validate rejection criteria | |
| if rejected: | |
| if len(rejections) != 2 or not all( | |
| any(allowed in rejection for allowed in allowed_rejections) | |
| for rejection in rejections | |
| ): | |
| logging.info( | |
| f"Skipping {title} due to rejection mismatch: {rejections}" | |
| ) | |
| continue | |
| # Add the required fields to the release payload | |
| release_payload = release.copy() | |
| release_payload.update( | |
| { | |
| "approved": True, | |
| "temporarilyRejected": False, | |
| "rejected": False, | |
| } | |
| ) | |
| # Push the selected release to Sonarr | |
| try: | |
| push_response = requests.post( | |
| push_url, json=release_payload, headers=self.headers | |
| ) | |
| push_response.raise_for_status() | |
| logging.info(f"Successfully pushed release to Sonarr: {title}") | |
| return True | |
| except requests.RequestException as push_error: | |
| logging.error(f"Failed to push release {title}: {push_error}") | |
| return False | |
| logging.info("No suitable release found after filtering.") | |
| return False | |
| except requests.RequestException as e: | |
| logging.error( | |
| f"Error during Sonarr search for Series ID {series_id}, Season {season_number}: {e}" | |
| ) | |
| return False | |
| def get_download_queue(self): | |
| """Retrieve the current download queue from Sonarr.""" | |
| url = f"{self.sonarr_url}/api/v3/queue" | |
| try: | |
| querystring = {"pageSize": "2000"} | |
| response = requests.get(url, headers=self.headers, params=querystring) | |
| response.raise_for_status() | |
| queue_response = response.json() | |
| # Extract the 'records' field which contains the queue items | |
| queue_items = queue_response.get("records", []) | |
| return queue_items | |
| except requests.RequestException as e: | |
| logging.error(f"Error fetching download queue: {e}") | |
| return [] | |
| def process_series(client, dry_run, process_some_missing, process_all_missing): | |
| """Process all monitored series and handle season pack downloads.""" | |
| monitored_series = client.get_monitored_series() | |
| if not monitored_series: | |
| logging.info("No monitored series found. Exiting.") | |
| return | |
| # Fetch the download queue once to avoid multiple API calls | |
| download_queue = client.get_download_queue() | |
| series_ids_in_queue = {item["seriesId"] for item in download_queue} | |
| logging.info("Checking for missing episodes...") | |
| for series in monitored_series: | |
| series_id = series.get("id") | |
| series_title = series.get("title", "Unknown Title") | |
| if not series_id: | |
| logging.warning(f"Series '{series_title}' has no ID. Skipping.") | |
| continue | |
| monitored_seasons = client.get_monitored_seasons(series) | |
| if not monitored_seasons: | |
| logging.debug( | |
| f"Series '{series_title}' has no monitored seasons. Skipping." | |
| ) | |
| continue | |
| # Check if the series is an anime | |
| if client.is_anime(series): | |
| # Analyze seasons | |
| seasons_status = client.analyze_season_files(series_id, monitored_seasons) | |
| # Collect seasons missing all episodes | |
| missing_all_seasons = [ | |
| season_number | |
| for season_number, status in seasons_status.items() | |
| if status["all_episodes_aired"] | |
| and status["missing_episodes"] == status["total_episodes"] | |
| ] | |
| if missing_all_seasons and process_all_missing: | |
| logging.info( | |
| f"Anime '{series_title}' (ID: {series_id}) has {len(missing_all_seasons)} season(s) missing all episodes." | |
| ) | |
| if not dry_run: | |
| # Change seriesType from 'anime' to 'standard' | |
| success = client.change_series_type_to_standard(series) | |
| if success: | |
| # Trigger season search for each missing season | |
| for season_number in missing_all_seasons: | |
| client.trigger_season_search_and_select( | |
| series_id, season_number | |
| ) | |
| else: | |
| logging.debug( | |
| f"Dry run enabled. Skipping type change and downloads for Series '{series_title}', Seasons: {missing_all_seasons}." | |
| ) | |
| else: | |
| logging.debug( | |
| f"No seasons missing all episodes for Series '{series_title}' (ID: {series_id})." | |
| ) | |
| else: | |
| # For standard shows, handle as before | |
| seasons_status = client.analyze_season_files(series_id, monitored_seasons) | |
| for season_number, status in seasons_status.items(): | |
| # Only process seasons where all episodes have aired | |
| if not status["all_episodes_aired"]: | |
| logging.debug( | |
| f"Series '{series_title}' (ID: {series_id}), Season {season_number} has not all episodes aired yet." | |
| ) | |
| continue # Skip seasons that are not fully aired | |
| # Case 1: Missing All Episodes | |
| if ( | |
| status["missing_episodes"] == status["total_episodes"] | |
| and process_all_missing | |
| ): | |
| logging.info( | |
| f"Series '{series_title}' (ID: {series_id}), Season {season_number} is missing all episodes." | |
| ) | |
| # Check if the series has active downloads | |
| if series_id in series_ids_in_queue: | |
| logging.info( | |
| f"Series '{series_title}' (ID: {series_id}) has active downloads. Skipping." | |
| ) | |
| continue | |
| if not dry_run: | |
| success = client.trigger_season_search_and_select( | |
| series_id, season_number | |
| ) | |
| if success: | |
| logging.info( | |
| f"Season pack search triggered for Series '{series_title}', Season {season_number}." | |
| ) | |
| else: | |
| logging.debug( | |
| f"Dry run enabled. Skipping download for Series '{series_title}', Season {season_number}." | |
| ) | |
| continue # Move to next season | |
| # Case 2: Missing Only the Last Aired Episode | |
| if ( | |
| status["missing_episodes"] == 1 | |
| and not status["last_episode_has_file"] | |
| ): | |
| logging.info( | |
| f"Series '{series_title}' (ID: {series_id}), Season {season_number} is missing the last aired episode." | |
| ) | |
| # Check if the series has active downloads | |
| if series_id in series_ids_in_queue: | |
| logging.info( | |
| f"Series '{series_title}' (ID: {series_id}) has active downloads. Skipping." | |
| ) | |
| continue | |
| if not dry_run: | |
| success = client.trigger_season_search_and_select( | |
| series_id, season_number | |
| ) | |
| if success: | |
| logging.info( | |
| f"Season pack search triggered for Series '{series_title}', Season {season_number}." | |
| ) | |
| else: | |
| logging.debug( | |
| f"Dry run enabled. Skipping download for Series '{series_title}', Season {season_number}." | |
| ) | |
| continue # Move to next season | |
| # Case 3: Missing Multiple Episodes | |
| if status["missing_episodes"] > 1 and process_some_missing: | |
| logging.info( | |
| f"Series '{series_title}' (ID: {series_id}), Season {season_number} is missing {status['missing_episodes']} episodes." | |
| ) | |
| # Check if the series has active downloads | |
| if series_id in series_ids_in_queue: | |
| logging.info( | |
| f"Series '{series_title}' (ID: {series_id}) has active downloads. Skipping." | |
| ) | |
| continue | |
| if not dry_run: | |
| success = client.trigger_season_search_and_select( | |
| series_id, season_number | |
| ) | |
| if success: | |
| logging.info( | |
| f"Season pack search triggered for Series '{series_title}', Season {season_number}." | |
| ) | |
| else: | |
| logging.info( | |
| f"Dry run enabled. Skipping download for Series '{series_title}', Season {season_number}." | |
| ) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Sonarr Season Pack Downloader") | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Only check for missing episodes without downloading or changing show types.", | |
| ) | |
| parser.add_argument( | |
| "--sonarr-url", | |
| type=str, | |
| required=True, | |
| help="Your Sonarr URL (e.g., http://localhost:8989)", | |
| ) | |
| parser.add_argument( | |
| "--api-key", | |
| type=str, | |
| required=True, | |
| help="Your Sonarr API key", | |
| ) | |
| parser.add_argument( | |
| "--process-some-missing", | |
| action="store_true", | |
| help="Process seasons missing some episodes (not just all or last episode)", | |
| ) | |
| parser.add_argument( | |
| "--process-all-missing", | |
| action="store_true", | |
| help="Process seasons missing all episodes (not just all or last episode)", | |
| ) | |
| parser.add_argument( | |
| "--log-level", | |
| type=str, | |
| default="INFO", | |
| choices=["DEBUG", "INFO", "WARNING", "ERROR"], | |
| help="Set the logging level (default: INFO)", | |
| ) | |
| args = parser.parse_args() | |
| # Setup logging | |
| logging.basicConfig( | |
| level=getattr(logging, args.log_level.upper(), logging.INFO), | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[logging.StreamHandler(sys.stdout)], | |
| ) | |
| dry_run = args.dry_run | |
| if dry_run: | |
| logging.info( | |
| "Running in dry-run mode. No downloads or type changes will be triggered." | |
| ) | |
| # Create Sonarr client | |
| client = SonarrClient(args.sonarr_url, args.api_key) | |
| process_series(client, dry_run, args.process_some_missing, args.process_all_missing) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment