Skip to content

Instantly share code, notes, and snippets.

@abdalrohman
Created November 10, 2024 18:10
Show Gist options
  • Select an option

  • Save abdalrohman/51ad66bf92954471471ee2dc1ce68686 to your computer and use it in GitHub Desktop.

Select an option

Save abdalrohman/51ad66bf92954471471ee2dc1ce68686 to your computer and use it in GitHub Desktop.
HisnMuslim Content Downloader and Processor
"""
HisnMuslim Content Downloader and Processor
This script downloads and processes Islamic Athkar from HisnMuslim API.
It supports multiple output formats, includes progress tracking, and organized file structure.
Supports downloading specific Athkar by ID (valid range: 1-132).
Author: M.Abdulrahman Alnaseer's
Original API: https://www.hisnmuslim.com/api/ar/husn_ar.json
Website: https://hisnmuslim.com
Requirements:
- Python 3.12+
- requests
- rich (for progress tracking)
- pydantic (for data validation)
"""
import asyncio
import json
import logging
import re
from collections.abc import Iterable
from dataclasses import dataclass
from enum import StrEnum, auto
from pathlib import Path
from typing import Literal, Sequence, TypeAlias
import requests
from pydantic import BaseModel
from rich.logging import RichHandler
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(message)s", handlers=[RichHandler(rich_tracebacks=True)])
logger = logging.getLogger("hisnmuslim_downloader")
# Type definitions and constants
SupportedLanguages: TypeAlias = Literal["en", "ar"]
MIN_THIKR_ID = 1
MAX_THIKR_ID = 132
class ThikrIdError(ValueError):
"""Custom exception for invalid Thikr IDs."""
pass
class OutputFormat(StrEnum):
"""Supported output formats for content."""
MARKDOWN = auto()
JSON = auto()
TEXT = auto()
HTML = auto()
@dataclass(frozen=True, slots=True)
class AthkarEntry:
"""Represents a single thikr entry."""
id: int
title: str
audio_url: str
text_url: str
def __post_init__(self):
if not MIN_THIKR_ID <= self.id <= MAX_THIKR_ID:
raise ThikrIdError(f"Invalid Thikr ID: {self.id}. Must be between {MIN_THIKR_ID} and {MAX_THIKR_ID}")
class LanguageConfig(BaseModel):
"""Configuration for a supported language."""
id: int
language: str
base_url: str
@dataclass(frozen=True, slots=True)
class AthkarContent:
"""Content details for a single thikr."""
id: int
arabic_text: str
transliteration: str
translation: str
repeat_count: int
audio_url: str
class HisnMuslimAPI:
"""Interface for interacting with the HisnMuslim API."""
BASE_API_URL = "https://hisnmuslim.com/api"
def __init__(self, language: SupportedLanguages = "ar") -> None:
self.language = language
self.session = requests.Session()
self._initialize_api()
self._athkar_cache: dict[int, AthkarEntry] | None = None
def _initialize_api(self) -> None:
"""Initialize API configuration and validate connection."""
try:
response = self.session.get(f"{self.BASE_API_URL}/husn.json")
response.raise_for_status()
response.encoding = "utf-8-sig"
data = response.json()
# Find matching language configuration
lang_name = "العربية" if self.language == "ar" else "English"
matching_config = next((cfg for cfg in data["MAIN"] if cfg["LANGUAGE"] == lang_name), None)
if not matching_config:
raise ValueError(f"Unsupported language: {self.language}")
self.api_url = matching_config["LANGUAGE_URL"]
logger.info(f"Successfully initialized API for language: {self.language}")
except requests.RequestException as e:
logger.error(f"Failed to initialize API: {e}")
raise
def validate_thikr_ids(self, ids: Iterable[int]) -> None:
"""Validate a sequence of Thikr IDs."""
invalid_ids = [id_ for id_ in ids if not MIN_THIKR_ID <= id_ <= MAX_THIKR_ID]
if invalid_ids:
raise ThikrIdError(
f"Invalid Thikr IDs: {invalid_ids}. " f"IDs must be between {MIN_THIKR_ID} and {MAX_THIKR_ID}"
)
def fetch_athkar_list(self) -> list[AthkarEntry]:
"""Fetch the list of all available Athkar."""
if self._athkar_cache is not None:
return list(self._athkar_cache.values())
try:
response = self.session.get(self.api_url)
response.raise_for_status()
response.encoding = "utf-8-sig"
data = response.json()
language_key = "العربية" if self.language == "ar" else "English"
entries = [
AthkarEntry(id=entry["ID"], title=entry["TITLE"], audio_url=entry["AUDIO_URL"], text_url=entry["TEXT"])
for entry in data[language_key]
]
# Cache the entries
self._athkar_cache = {entry.id: entry for entry in entries}
return entries
except requests.RequestException as e:
logger.error(f"Failed to fetch Athkar list: {e}")
raise
def get_athkar_by_ids(self, ids: Sequence[int]) -> list[AthkarEntry]:
"""Fetch specific Athkar by their IDs."""
# Validate IDs first
self.validate_thikr_ids(ids)
# Ensure we have the cache
if self._athkar_cache is None:
self.fetch_athkar_list()
# Get requested entries
entries = []
for id_ in ids:
try:
entries.append(self._athkar_cache[id_]) # type: ignore
except KeyError:
logger.warning(f"Thikr ID {id_} not found in cache")
return entries
def fetch_athkar_content(self, url: str) -> list[AthkarContent]:
"""Fetch detailed content for a specific Athkar."""
try:
response = self.session.get(url)
response.raise_for_status()
response.encoding = "utf-8-sig"
data = response.json()
category_name = next(iter(data.keys()))
return [
AthkarContent(
id=item["ID"],
arabic_text=item["Text"]
if str(MAX_THIKR_ID) in url and self.language == "en"
else item["ARABIC_TEXT"], # idx number 132 have Text instead of ARABIC_TEXT
transliteration=item.get("LANGUAGE_ARABIC_TRANSLATED_TEXT", None),
translation=item.get("TRANSLATED_TEXT", None),
repeat_count=item["REPEAT"],
audio_url=item["AUDIO"],
)
for item in data[category_name]
]
except requests.RequestException as e:
logger.error(f"Failed to fetch Athkar content: {e}")
raise
class ContentProcessor:
"""Process and save Athkar content in various formats."""
def __init__(self, base_path: Path) -> None:
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
def _create_markdown(self, entry: AthkarEntry, content: list[AthkarContent]) -> str:
"""Generate markdown formatted content."""
markdown = [f"# {entry.title}\n"]
for item in content:
markdown.extend(
[
f"## {item.arabic_text}",
f"### Transliteration\n{item.transliteration}",
f"### Translation\n{item.translation}",
f"### Repeat {item.repeat_count} times",
f"### [Audio]({item.audio_url})\n",
]
)
return "\n".join(markdown)
def _create_json(self, entry: AthkarEntry, content: list[AthkarContent]) -> dict:
"""Generate JSON formatted content."""
return {
"title": entry.title,
"id": entry.id,
"content": [
{
"id": item.id,
"arabic": item.arabic_text,
"transliteration": item.transliteration,
"translation": item.translation,
"repeat_count": item.repeat_count,
"audio_url": item.audio_url,
}
for item in content
],
}
async def save_content(self, entry: AthkarEntry, content: list[AthkarContent], formats: set[OutputFormat]) -> None:
"""Save content in specified formats."""
# Create directory for this entry
entry_dir = self.base_path / f"{entry.id}_{sanitize_name(entry.title)}"
entry_dir.mkdir(exist_ok=True)
for fmt in formats:
if fmt == OutputFormat.MARKDOWN:
content_str = self._create_markdown(entry, content)
(entry_dir / "content.md").write_text(content_str, encoding="utf-8")
elif fmt == OutputFormat.JSON:
content_dict = self._create_json(entry, content)
(entry_dir / "content.json").write_text(
json.dumps(content_dict, ensure_ascii=False, indent=2), encoding="utf-8"
)
class AudioDownloader:
"""Handle downloading of audio content."""
def __init__(self, base_path: Path) -> None:
self.base_path = Path(base_path)
self.session = requests.Session()
def download_audio(self, url: str, output_path: Path) -> None:
"""Download audio file from URL."""
try:
response = self.session.get(url, stream=True)
response.raise_for_status()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
except requests.RequestException as e:
logger.error(f"Failed to download audio from {url}: {e}")
raise
class HisnMuslimDownloader:
"""Main coordinator for downloading and processing HisnMuslim content."""
def __init__(
self,
language: SupportedLanguages = "ar",
output_dir: str | Path = "./HisnMuslim_Content",
formats: set[OutputFormat] = {OutputFormat.MARKDOWN, OutputFormat.JSON},
) -> None:
self.api = HisnMuslimAPI(language)
self.base_path = Path(output_dir)
self.processor = ContentProcessor(self.base_path)
self.audio_downloader = AudioDownloader(self.base_path)
self.formats = formats
async def download_all(self) -> None:
"""Download and process all available content."""
logger.info("Starting content download...")
athkar_list = self.api.fetch_athkar_list()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
) as progress:
# Download content
content_task = progress.add_task("Downloading content...", total=len(athkar_list))
for entry in athkar_list:
try:
# Fetch and save content
content = self.api.fetch_athkar_content(entry.text_url)
await self.processor.save_content(entry, content, self.formats)
# Download audio files
entry_dir = self.base_path / f"{entry.id}_{sanitize_name(entry.title)}" / "audio"
entry_dir.mkdir(exist_ok=True)
# Download main audio
self.audio_downloader.download_audio(entry.audio_url, entry_dir / "full_audio.mp3")
# Download individual athkar audio
for idx, item in enumerate(content, 1):
self.audio_downloader.download_audio(item.audio_url, entry_dir / f"part_{idx}.mp3")
progress.update(content_task, advance=1)
except Exception as e:
logger.error(f"Error processing entry {entry.id}: {e}")
continue
logger.info("Content download completed successfully!")
async def download_specific(self, thikr_ids: Sequence[int]) -> None:
"""Download specific Athkar by their IDs."""
logger.info(f"Starting download for Athkar IDs: {thikr_ids}")
try:
# Get specific entries
entries = self.api.get_athkar_by_ids(thikr_ids)
if not entries:
logger.warning("No valid Athkar entries found for the specified IDs")
return
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
) as progress:
content_task = progress.add_task("Downloading selected Athkar...", total=len(entries))
for entry in entries:
try:
# Fetch and save content
content = self.api.fetch_athkar_content(entry.text_url)
await self.processor.save_content(entry, content, self.formats)
# Download audio files
entry_dir = self.base_path / f"{entry.id}_{sanitize_name(entry.title)}" / "audio"
entry_dir.mkdir(exist_ok=True)
# Download main audio
self.audio_downloader.download_audio(entry.audio_url, entry_dir / "full_audio.mp3")
# Download individual athkar audio
for idx, item in enumerate(content, 1):
self.audio_downloader.download_audio(item.audio_url, entry_dir / f"part_{idx}.mp3")
progress.update(content_task, advance=1)
except Exception as e:
logger.error(f"Error processing entry {entry.id}: {e}")
continue
logger.info("Selected Athkar download completed successfully!")
except ThikrIdError as e:
logger.error(f"Invalid Thikr IDs provided: {e}")
except Exception as e:
logger.error(f"Unexpected error during download: {e}")
# helpers
def sanitize_name(name: str) -> str:
"""Sanitize the filename by removing/replacing invalid characters and making it filesystem safe.
:param name: Original filename or text to be sanitized
:return str: Sanitized filename safe for use in most filesystems
"""
# Remove everything in parentheses and their contents
name = re.sub(r"\s*\([^)]*\)", "", name)
# List of characters to be replaced with underscore
# Including : < > " / \ | ? * and other special characters
name = re.sub(r'[:/<>"|\\?*]', "_", name)
# Replace other non-word characters (except hyphen) with underscore
name = re.sub(r"[^\w\s-]", "_", name)
# Replace multiple spaces or underscores with single underscore
name = re.sub(r"[\s_]+", "_", name)
# Remove leading/trailing underscores and convert to lowercase
name = name.strip("_").lower()
# Handle empty string case
if not name:
name = "unnamed_file"
# Handle starting with periods (hidden files in Unix)
if name.startswith("."):
name = f"dot_{name[1:]}"
# Ensure name length is within filesystem limits (255 is common max)
if len(name) > 255:
# Keep extension if exists
parts = name.rsplit("_", 1)
if len(parts) > 1 and "." in parts[1]:
name, ext = name.rsplit(".", 1)
name = f"{name[:250]}.{ext}"
else:
name = name[:255]
return name
# Example usage
if __name__ == "__main__":
downloader = HisnMuslimDownloader(
language="en",
formats={
OutputFormat.MARKDOWN,
OutputFormat.JSON,
},
)
# Example: Download specific Athkar
specific_ids = list(range(1, 133))
# remove entry 126 from the list https://www.hisnmuslim.com/api/en/126.json
# because it's have missing quotes in the key of the json object.
specific_ids.pop(125)
asyncio.run(downloader.download_specific(specific_ids))
# Example: Download all Athkar
# asyncio.run(downloader.download_all())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment