Created
November 10, 2024 18:10
-
-
Save abdalrohman/51ad66bf92954471471ee2dc1ce68686 to your computer and use it in GitHub Desktop.
HisnMuslim Content Downloader and Processor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| HisnMuslim Content Downloader and Processor | |
| This script downloads and processes Islamic Athkar from HisnMuslim API. | |
| It supports multiple output formats, includes progress tracking, and organized file structure. | |
| Supports downloading specific Athkar by ID (valid range: 1-132). | |
| Author: M.Abdulrahman Alnaseer's | |
| Original API: https://www.hisnmuslim.com/api/ar/husn_ar.json | |
| Website: https://hisnmuslim.com | |
| Requirements: | |
| - Python 3.12+ | |
| - requests | |
| - rich (for progress tracking) | |
| - pydantic (for data validation) | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import re | |
| from collections.abc import Iterable | |
| from dataclasses import dataclass | |
| from enum import StrEnum, auto | |
| from pathlib import Path | |
| from typing import Literal, Sequence, TypeAlias | |
| import requests | |
| from pydantic import BaseModel | |
| from rich.logging import RichHandler | |
| from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="%(message)s", handlers=[RichHandler(rich_tracebacks=True)]) | |
| logger = logging.getLogger("hisnmuslim_downloader") | |
| # Type definitions and constants | |
| SupportedLanguages: TypeAlias = Literal["en", "ar"] | |
| MIN_THIKR_ID = 1 | |
| MAX_THIKR_ID = 132 | |
| class ThikrIdError(ValueError): | |
| """Custom exception for invalid Thikr IDs.""" | |
| pass | |
| class OutputFormat(StrEnum): | |
| """Supported output formats for content.""" | |
| MARKDOWN = auto() | |
| JSON = auto() | |
| TEXT = auto() | |
| HTML = auto() | |
| @dataclass(frozen=True, slots=True) | |
| class AthkarEntry: | |
| """Represents a single thikr entry.""" | |
| id: int | |
| title: str | |
| audio_url: str | |
| text_url: str | |
| def __post_init__(self): | |
| if not MIN_THIKR_ID <= self.id <= MAX_THIKR_ID: | |
| raise ThikrIdError(f"Invalid Thikr ID: {self.id}. Must be between {MIN_THIKR_ID} and {MAX_THIKR_ID}") | |
| class LanguageConfig(BaseModel): | |
| """Configuration for a supported language.""" | |
| id: int | |
| language: str | |
| base_url: str | |
| @dataclass(frozen=True, slots=True) | |
| class AthkarContent: | |
| """Content details for a single thikr.""" | |
| id: int | |
| arabic_text: str | |
| transliteration: str | |
| translation: str | |
| repeat_count: int | |
| audio_url: str | |
| class HisnMuslimAPI: | |
| """Interface for interacting with the HisnMuslim API.""" | |
| BASE_API_URL = "https://hisnmuslim.com/api" | |
| def __init__(self, language: SupportedLanguages = "ar") -> None: | |
| self.language = language | |
| self.session = requests.Session() | |
| self._initialize_api() | |
| self._athkar_cache: dict[int, AthkarEntry] | None = None | |
| def _initialize_api(self) -> None: | |
| """Initialize API configuration and validate connection.""" | |
| try: | |
| response = self.session.get(f"{self.BASE_API_URL}/husn.json") | |
| response.raise_for_status() | |
| response.encoding = "utf-8-sig" | |
| data = response.json() | |
| # Find matching language configuration | |
| lang_name = "العربية" if self.language == "ar" else "English" | |
| matching_config = next((cfg for cfg in data["MAIN"] if cfg["LANGUAGE"] == lang_name), None) | |
| if not matching_config: | |
| raise ValueError(f"Unsupported language: {self.language}") | |
| self.api_url = matching_config["LANGUAGE_URL"] | |
| logger.info(f"Successfully initialized API for language: {self.language}") | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to initialize API: {e}") | |
| raise | |
| def validate_thikr_ids(self, ids: Iterable[int]) -> None: | |
| """Validate a sequence of Thikr IDs.""" | |
| invalid_ids = [id_ for id_ in ids if not MIN_THIKR_ID <= id_ <= MAX_THIKR_ID] | |
| if invalid_ids: | |
| raise ThikrIdError( | |
| f"Invalid Thikr IDs: {invalid_ids}. " f"IDs must be between {MIN_THIKR_ID} and {MAX_THIKR_ID}" | |
| ) | |
| def fetch_athkar_list(self) -> list[AthkarEntry]: | |
| """Fetch the list of all available Athkar.""" | |
| if self._athkar_cache is not None: | |
| return list(self._athkar_cache.values()) | |
| try: | |
| response = self.session.get(self.api_url) | |
| response.raise_for_status() | |
| response.encoding = "utf-8-sig" | |
| data = response.json() | |
| language_key = "العربية" if self.language == "ar" else "English" | |
| entries = [ | |
| AthkarEntry(id=entry["ID"], title=entry["TITLE"], audio_url=entry["AUDIO_URL"], text_url=entry["TEXT"]) | |
| for entry in data[language_key] | |
| ] | |
| # Cache the entries | |
| self._athkar_cache = {entry.id: entry for entry in entries} | |
| return entries | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to fetch Athkar list: {e}") | |
| raise | |
| def get_athkar_by_ids(self, ids: Sequence[int]) -> list[AthkarEntry]: | |
| """Fetch specific Athkar by their IDs.""" | |
| # Validate IDs first | |
| self.validate_thikr_ids(ids) | |
| # Ensure we have the cache | |
| if self._athkar_cache is None: | |
| self.fetch_athkar_list() | |
| # Get requested entries | |
| entries = [] | |
| for id_ in ids: | |
| try: | |
| entries.append(self._athkar_cache[id_]) # type: ignore | |
| except KeyError: | |
| logger.warning(f"Thikr ID {id_} not found in cache") | |
| return entries | |
| def fetch_athkar_content(self, url: str) -> list[AthkarContent]: | |
| """Fetch detailed content for a specific Athkar.""" | |
| try: | |
| response = self.session.get(url) | |
| response.raise_for_status() | |
| response.encoding = "utf-8-sig" | |
| data = response.json() | |
| category_name = next(iter(data.keys())) | |
| return [ | |
| AthkarContent( | |
| id=item["ID"], | |
| arabic_text=item["Text"] | |
| if str(MAX_THIKR_ID) in url and self.language == "en" | |
| else item["ARABIC_TEXT"], # idx number 132 have Text instead of ARABIC_TEXT | |
| transliteration=item.get("LANGUAGE_ARABIC_TRANSLATED_TEXT", None), | |
| translation=item.get("TRANSLATED_TEXT", None), | |
| repeat_count=item["REPEAT"], | |
| audio_url=item["AUDIO"], | |
| ) | |
| for item in data[category_name] | |
| ] | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to fetch Athkar content: {e}") | |
| raise | |
| class ContentProcessor: | |
| """Process and save Athkar content in various formats.""" | |
| def __init__(self, base_path: Path) -> None: | |
| self.base_path = Path(base_path) | |
| self.base_path.mkdir(parents=True, exist_ok=True) | |
| def _create_markdown(self, entry: AthkarEntry, content: list[AthkarContent]) -> str: | |
| """Generate markdown formatted content.""" | |
| markdown = [f"# {entry.title}\n"] | |
| for item in content: | |
| markdown.extend( | |
| [ | |
| f"## {item.arabic_text}", | |
| f"### Transliteration\n{item.transliteration}", | |
| f"### Translation\n{item.translation}", | |
| f"### Repeat {item.repeat_count} times", | |
| f"### [Audio]({item.audio_url})\n", | |
| ] | |
| ) | |
| return "\n".join(markdown) | |
| def _create_json(self, entry: AthkarEntry, content: list[AthkarContent]) -> dict: | |
| """Generate JSON formatted content.""" | |
| return { | |
| "title": entry.title, | |
| "id": entry.id, | |
| "content": [ | |
| { | |
| "id": item.id, | |
| "arabic": item.arabic_text, | |
| "transliteration": item.transliteration, | |
| "translation": item.translation, | |
| "repeat_count": item.repeat_count, | |
| "audio_url": item.audio_url, | |
| } | |
| for item in content | |
| ], | |
| } | |
| async def save_content(self, entry: AthkarEntry, content: list[AthkarContent], formats: set[OutputFormat]) -> None: | |
| """Save content in specified formats.""" | |
| # Create directory for this entry | |
| entry_dir = self.base_path / f"{entry.id}_{sanitize_name(entry.title)}" | |
| entry_dir.mkdir(exist_ok=True) | |
| for fmt in formats: | |
| if fmt == OutputFormat.MARKDOWN: | |
| content_str = self._create_markdown(entry, content) | |
| (entry_dir / "content.md").write_text(content_str, encoding="utf-8") | |
| elif fmt == OutputFormat.JSON: | |
| content_dict = self._create_json(entry, content) | |
| (entry_dir / "content.json").write_text( | |
| json.dumps(content_dict, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| class AudioDownloader: | |
| """Handle downloading of audio content.""" | |
| def __init__(self, base_path: Path) -> None: | |
| self.base_path = Path(base_path) | |
| self.session = requests.Session() | |
| def download_audio(self, url: str, output_path: Path) -> None: | |
| """Download audio file from URL.""" | |
| try: | |
| response = self.session.get(url, stream=True) | |
| response.raise_for_status() | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with output_path.open("wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to download audio from {url}: {e}") | |
| raise | |
| class HisnMuslimDownloader: | |
| """Main coordinator for downloading and processing HisnMuslim content.""" | |
| def __init__( | |
| self, | |
| language: SupportedLanguages = "ar", | |
| output_dir: str | Path = "./HisnMuslim_Content", | |
| formats: set[OutputFormat] = {OutputFormat.MARKDOWN, OutputFormat.JSON}, | |
| ) -> None: | |
| self.api = HisnMuslimAPI(language) | |
| self.base_path = Path(output_dir) | |
| self.processor = ContentProcessor(self.base_path) | |
| self.audio_downloader = AudioDownloader(self.base_path) | |
| self.formats = formats | |
| async def download_all(self) -> None: | |
| """Download and process all available content.""" | |
| logger.info("Starting content download...") | |
| athkar_list = self.api.fetch_athkar_list() | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| TimeElapsedColumn(), | |
| ) as progress: | |
| # Download content | |
| content_task = progress.add_task("Downloading content...", total=len(athkar_list)) | |
| for entry in athkar_list: | |
| try: | |
| # Fetch and save content | |
| content = self.api.fetch_athkar_content(entry.text_url) | |
| await self.processor.save_content(entry, content, self.formats) | |
| # Download audio files | |
| entry_dir = self.base_path / f"{entry.id}_{sanitize_name(entry.title)}" / "audio" | |
| entry_dir.mkdir(exist_ok=True) | |
| # Download main audio | |
| self.audio_downloader.download_audio(entry.audio_url, entry_dir / "full_audio.mp3") | |
| # Download individual athkar audio | |
| for idx, item in enumerate(content, 1): | |
| self.audio_downloader.download_audio(item.audio_url, entry_dir / f"part_{idx}.mp3") | |
| progress.update(content_task, advance=1) | |
| except Exception as e: | |
| logger.error(f"Error processing entry {entry.id}: {e}") | |
| continue | |
| logger.info("Content download completed successfully!") | |
| async def download_specific(self, thikr_ids: Sequence[int]) -> None: | |
| """Download specific Athkar by their IDs.""" | |
| logger.info(f"Starting download for Athkar IDs: {thikr_ids}") | |
| try: | |
| # Get specific entries | |
| entries = self.api.get_athkar_by_ids(thikr_ids) | |
| if not entries: | |
| logger.warning("No valid Athkar entries found for the specified IDs") | |
| return | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| TimeElapsedColumn(), | |
| ) as progress: | |
| content_task = progress.add_task("Downloading selected Athkar...", total=len(entries)) | |
| for entry in entries: | |
| try: | |
| # Fetch and save content | |
| content = self.api.fetch_athkar_content(entry.text_url) | |
| await self.processor.save_content(entry, content, self.formats) | |
| # Download audio files | |
| entry_dir = self.base_path / f"{entry.id}_{sanitize_name(entry.title)}" / "audio" | |
| entry_dir.mkdir(exist_ok=True) | |
| # Download main audio | |
| self.audio_downloader.download_audio(entry.audio_url, entry_dir / "full_audio.mp3") | |
| # Download individual athkar audio | |
| for idx, item in enumerate(content, 1): | |
| self.audio_downloader.download_audio(item.audio_url, entry_dir / f"part_{idx}.mp3") | |
| progress.update(content_task, advance=1) | |
| except Exception as e: | |
| logger.error(f"Error processing entry {entry.id}: {e}") | |
| continue | |
| logger.info("Selected Athkar download completed successfully!") | |
| except ThikrIdError as e: | |
| logger.error(f"Invalid Thikr IDs provided: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error during download: {e}") | |
| # helpers | |
| def sanitize_name(name: str) -> str: | |
| """Sanitize the filename by removing/replacing invalid characters and making it filesystem safe. | |
| :param name: Original filename or text to be sanitized | |
| :return str: Sanitized filename safe for use in most filesystems | |
| """ | |
| # Remove everything in parentheses and their contents | |
| name = re.sub(r"\s*\([^)]*\)", "", name) | |
| # List of characters to be replaced with underscore | |
| # Including : < > " / \ | ? * and other special characters | |
| name = re.sub(r'[:/<>"|\\?*]', "_", name) | |
| # Replace other non-word characters (except hyphen) with underscore | |
| name = re.sub(r"[^\w\s-]", "_", name) | |
| # Replace multiple spaces or underscores with single underscore | |
| name = re.sub(r"[\s_]+", "_", name) | |
| # Remove leading/trailing underscores and convert to lowercase | |
| name = name.strip("_").lower() | |
| # Handle empty string case | |
| if not name: | |
| name = "unnamed_file" | |
| # Handle starting with periods (hidden files in Unix) | |
| if name.startswith("."): | |
| name = f"dot_{name[1:]}" | |
| # Ensure name length is within filesystem limits (255 is common max) | |
| if len(name) > 255: | |
| # Keep extension if exists | |
| parts = name.rsplit("_", 1) | |
| if len(parts) > 1 and "." in parts[1]: | |
| name, ext = name.rsplit(".", 1) | |
| name = f"{name[:250]}.{ext}" | |
| else: | |
| name = name[:255] | |
| return name | |
| # Example usage | |
| if __name__ == "__main__": | |
| downloader = HisnMuslimDownloader( | |
| language="en", | |
| formats={ | |
| OutputFormat.MARKDOWN, | |
| OutputFormat.JSON, | |
| }, | |
| ) | |
| # Example: Download specific Athkar | |
| specific_ids = list(range(1, 133)) | |
| # remove entry 126 from the list https://www.hisnmuslim.com/api/en/126.json | |
| # because it's have missing quotes in the key of the json object. | |
| specific_ids.pop(125) | |
| asyncio.run(downloader.download_specific(specific_ids)) | |
| # Example: Download all Athkar | |
| # asyncio.run(downloader.download_all()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment