Created
April 20, 2025 18:06
-
-
Save jkerhin/bc6532157ccfde14806d591cbfc5b6a5 to your computer and use it in GitHub Desktop.
Example Downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import logging | |
| from datetime import datetime, UTC | |
| from pathlib import Path | |
| import requests | |
| from sqlalchemy import select | |
| from sqlalchemy.orm import sessionmaker, Session | |
| from my_models import Download, Status | |
| CWD: Path = Path(".") | |
| SHUTDOWN: bool = False # Repalce with Event if/when using threds/async | |
| SLEEP_SEC: float = 30.0 | |
| log: logging.Logger = logging.getLogger(__name__) | |
| class DownloadWorker: | |
| def __init__( | |
| self, | |
| req_session: requests.Session, | |
| db_sessionmaker: sessionmaker[Session], | |
| remote_dir: Path, | |
| temp_dir: Path = CWD, | |
| ): | |
| self.req_session = req_session | |
| self.db_sessionmaker = db_sessionmaker | |
| self.remote_dir = remote_dir | |
| self.temp_dir = temp_dir | |
| self.n_sleeps = 0 | |
| async def run(self): | |
| """Main loop that orchestrates the downloads""" | |
| log.debug("Download worker starting") | |
| while not SHUTDOWN: | |
| next_id = self._next_id() | |
| if not next_id: | |
| log.debug("No downloads in backlog, sleeping %f seconds", SLEEP_SEC) | |
| time.sleep(SLEEP_SEC) | |
| continue | |
| q_download = select(Download).where(Download.id == next_id) | |
| with self.db_sessionmaker() as db_ses: | |
| dl = db_ses.execute(q_download).scalar_one() | |
| filename = dl.filename | |
| url = dl.url | |
| dl.status = Status.IN_PROGRESS | |
| dl.download_begin = datetime.now(tz=UTC) | |
| db_ses.commit() | |
| log.debug("Begin download of %s from %s", filename, url) | |
| dl_status, n_bytes = self.download(filename=filename, url=url) | |
| log.debug("End download of %s - %s", filename, str(dl_status)) | |
| with self.db_sessionmaker() as db_ses: | |
| dl = db_ses.execute(q_download).scalar_one() | |
| dl.status = dl_status | |
| dl.n_bytes = n_bytes | |
| dl.download_end = datetime.now(tz=UTC) | |
| db_ses.commit() | |
| if not dl_status == Status.LOCAL_COMPLETE: | |
| # Do error handling here | |
| continue | |
| # Expect Status.COMPLETE on succesful move | |
| move_status = self._move_to_remote(filename=filename) | |
| with self.db_sessionmaker() as db_ses: | |
| dl = db_ses.execute(q_download).scalar_one() | |
| dl.status = move_status | |
| log.debug("Download worker complete") | |
| def _next_id(self) -> int | None: | |
| """Fetch the id of the Download that should be started next""" | |
| with self.db_sessionmaker() as db_ses: | |
| q_id = select(Download.id).order_by(Download.priority, Download.insert_time) | |
| if next_id := db_ses.execute( | |
| q_id.where(Download.Status == Status.PARTIAL_DOWNLOAD) | |
| ).scalar_one_or_none(): | |
| return next_id | |
| return db_ses.execute( | |
| q_id.where(Download.Status == Status.NOT_STARTED) | |
| ).scalar_one_or_none() | |
| def _move_to_remote(self, filename: str) -> Status: ... | |
| def download(self, filename: str, url: str) -> tuple[Status, int | None]: | |
| """Business logic and error handling""" | |
| chunk_size = 5**20 | |
| # Check errors for local file before moving to web requests | |
| try: | |
| self.temp_dir.mkdir(parents=True, exist_ok=True) | |
| local_pth = self.temp_dir / filename | |
| local_bytes = None | |
| if local_pth.is_file(): | |
| local_bytes = local_pth.stat().st_size | |
| except IOError as err: | |
| log.error("...") | |
| # Error handling | |
| return Status.IO_ERROR, local_bytes | |
| try: | |
| r_head = self.req_session.head(url) | |
| r_head.raise_for_status() | |
| # Get filename, filesize, etc out of header | |
| extra_headers = {} | |
| if local_bytes: | |
| extra_headers["Range"] = f"bytes={local_bytes}-" | |
| with ( | |
| self.req_session.get(url=url, headers=extra_headers, stream=True) as req, | |
| local_pth.open("ab") as hdl, | |
| ): | |
| for chunk in req.iter_content(chunk_size=chunk_size): | |
| # update TQDM here too | |
| hdl.write(chunk) | |
| if SHUTDOWN: | |
| local_bytes = local_pth.stat().st_size | |
| return Status.PARTIAL_DOWNLOAD, local_bytes | |
| except HttpException1: | |
| log.error("...") | |
| return Status.DOWNLOAD_ERROR, None | |
| except HttpException2: | |
| # If it needs to be handled differently... | |
| log.error("...") | |
| return Status.OTHER_ERROR, None | |
| local_bytes = local_pth.stat().st_size | |
| # Check filesize, wrap with own error handling if needed | |
| return Status.LOCAL_COMPLETE, local_bytes | |
| def build_requests_session() -> requests.Session: | |
| # PKI, auth, timeout settings, &c | |
| ... | |
| if __name__ == "__main__": | |
| # Pull env vars &c | |
| db_sessionmaker = MySessionmaker() | |
| requests_session = build_requests_session() | |
| worker = DownloadWorker( | |
| req_session=requests_session, | |
| db_sessionmaker=db_sessionmaker, | |
| remote_dir=remote_dir, | |
| ) | |
| log.info("Starting downloader") | |
| try: | |
| worker.run() | |
| except KeyboardInterrupt: | |
| log.critical("Recieved keyboard interrupt, shutting down") | |
| SHUTDOWN = True | |
| log.info("Complete") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment