Skip to content

Instantly share code, notes, and snippets.

@jkerhin
Created April 20, 2025 18:06
Show Gist options
  • Select an option

  • Save jkerhin/bc6532157ccfde14806d591cbfc5b6a5 to your computer and use it in GitHub Desktop.

Select an option

Save jkerhin/bc6532157ccfde14806d591cbfc5b6a5 to your computer and use it in GitHub Desktop.
Example Downloader
import time
import logging
from datetime import datetime, UTC
from pathlib import Path
import requests
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker, Session
from my_models import Download, Status
CWD: Path = Path(".")
SHUTDOWN: bool = False # Repalce with Event if/when using threds/async
SLEEP_SEC: float = 30.0
log: logging.Logger = logging.getLogger(__name__)
class DownloadWorker:
def __init__(
self,
req_session: requests.Session,
db_sessionmaker: sessionmaker[Session],
remote_dir: Path,
temp_dir: Path = CWD,
):
self.req_session = req_session
self.db_sessionmaker = db_sessionmaker
self.remote_dir = remote_dir
self.temp_dir = temp_dir
self.n_sleeps = 0
async def run(self):
"""Main loop that orchestrates the downloads"""
log.debug("Download worker starting")
while not SHUTDOWN:
next_id = self._next_id()
if not next_id:
log.debug("No downloads in backlog, sleeping %f seconds", SLEEP_SEC)
time.sleep(SLEEP_SEC)
continue
q_download = select(Download).where(Download.id == next_id)
with self.db_sessionmaker() as db_ses:
dl = db_ses.execute(q_download).scalar_one()
filename = dl.filename
url = dl.url
dl.status = Status.IN_PROGRESS
dl.download_begin = datetime.now(tz=UTC)
db_ses.commit()
log.debug("Begin download of %s from %s", filename, url)
dl_status, n_bytes = self.download(filename=filename, url=url)
log.debug("End download of %s - %s", filename, str(dl_status))
with self.db_sessionmaker() as db_ses:
dl = db_ses.execute(q_download).scalar_one()
dl.status = dl_status
dl.n_bytes = n_bytes
dl.download_end = datetime.now(tz=UTC)
db_ses.commit()
if not dl_status == Status.LOCAL_COMPLETE:
# Do error handling here
continue
# Expect Status.COMPLETE on succesful move
move_status = self._move_to_remote(filename=filename)
with self.db_sessionmaker() as db_ses:
dl = db_ses.execute(q_download).scalar_one()
dl.status = move_status
log.debug("Download worker complete")
def _next_id(self) -> int | None:
"""Fetch the id of the Download that should be started next"""
with self.db_sessionmaker() as db_ses:
q_id = select(Download.id).order_by(Download.priority, Download.insert_time)
if next_id := db_ses.execute(
q_id.where(Download.Status == Status.PARTIAL_DOWNLOAD)
).scalar_one_or_none():
return next_id
return db_ses.execute(
q_id.where(Download.Status == Status.NOT_STARTED)
).scalar_one_or_none()
def _move_to_remote(self, filename: str) -> Status: ...
def download(self, filename: str, url: str) -> tuple[Status, int | None]:
"""Business logic and error handling"""
chunk_size = 5**20
# Check errors for local file before moving to web requests
try:
self.temp_dir.mkdir(parents=True, exist_ok=True)
local_pth = self.temp_dir / filename
local_bytes = None
if local_pth.is_file():
local_bytes = local_pth.stat().st_size
except IOError as err:
log.error("...")
# Error handling
return Status.IO_ERROR, local_bytes
try:
r_head = self.req_session.head(url)
r_head.raise_for_status()
# Get filename, filesize, etc out of header
extra_headers = {}
if local_bytes:
extra_headers["Range"] = f"bytes={local_bytes}-"
with (
self.req_session.get(url=url, headers=extra_headers, stream=True) as req,
local_pth.open("ab") as hdl,
):
for chunk in req.iter_content(chunk_size=chunk_size):
# update TQDM here too
hdl.write(chunk)
if SHUTDOWN:
local_bytes = local_pth.stat().st_size
return Status.PARTIAL_DOWNLOAD, local_bytes
except HttpException1:
log.error("...")
return Status.DOWNLOAD_ERROR, None
except HttpException2:
# If it needs to be handled differently...
log.error("...")
return Status.OTHER_ERROR, None
local_bytes = local_pth.stat().st_size
# Check filesize, wrap with own error handling if needed
return Status.LOCAL_COMPLETE, local_bytes
def build_requests_session() -> requests.Session:
# PKI, auth, timeout settings, &c
...
if __name__ == "__main__":
# Pull env vars &c
db_sessionmaker = MySessionmaker()
requests_session = build_requests_session()
worker = DownloadWorker(
req_session=requests_session,
db_sessionmaker=db_sessionmaker,
remote_dir=remote_dir,
)
log.info("Starting downloader")
try:
worker.run()
except KeyboardInterrupt:
log.critical("Recieved keyboard interrupt, shutting down")
SHUTDOWN = True
log.info("Complete")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment