Instantly share code, notes, and snippets.
Created
October 12, 2025 18:52
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
-
Save DurvalMenezes/c0ad08a0f7fa2516e71fb858d292829a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| ############################################################################################### | |
| #ezs_sync.py: syncs files from a EZShare WiFi SDCard to the local directory | |
| # | |
| #Description: | |
| # syncs the remote filesystem in the EZShare this machine is connected to (via WiFi),to the | |
| # current local directory, using the device HTTP server; the general idea is to do what a | |
| # `wget -rN -cp` would do, if the URIs the EZShare exports did not mangle the file names and | |
| # therefore force us to take them from the anchor text, plus omit directory names from these | |
| # same URLs and so force us to also handle them, plus special treatment for timestamps in the | |
| # future and timezone conversion. | |
| # | |
| #Authors: | |
| # Initial code by Claude.AI: https://claude.ai/share/a98522d6-c5a6-41ac-8426-b4504a37cc25 | |
| # Further debugging, fixing and enhancing: 2025/10/12 Durval Menezes | |
| # | |
| #License: | |
| # WTFPL: https://www.wtfpl.net/about/ | |
| ############################################################################################### | |
| """ | |
| HTTP Directory Downloader | |
| Downloads files recursively from an HTTP directory listing, preserving timestamps | |
| and checking file integrity, plus trying to be as robust as possible re:errors | |
| """ | |
| import os | |
| import re | |
| import time | |
| import logging | |
| from datetime import datetime, timezone, timedelta | |
| from urllib.parse import urljoin | |
| import requests | |
| #these are needed for requests_retry_session() below: | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| #We need to get UTC offset to correct the EZShare HTML timestamps (which are in GMT) to localtime | |
| # Note: not all timestamps are served in UTC by EZShare, exceptions include /Journal.dat | |
| # and others; not a great problem as these files are simply redownloaded due to different | |
| # timestamps, so no copies are skipped and no data is lost. | |
| LOCAL_TIME = datetime.now(timezone.utc).astimezone() | |
| UTC_OFFSET = LOCAL_TIME.utcoffset().total_seconds() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| LOGGER = logging.getLogger(__name__) | |
| #Maximum HTTP retries | |
| MAX_RETRIES = 5 | |
| #Implements HTTP GET with automatic retries | |
| def requests_retry_session( | |
| retries=MAX_RETRIES, | |
| backoff_factor=1, | |
| status_forcelist="", | |
| psession=None | |
| ): | |
| """returns a session object set for automatic retries on both network and HTTP errors.""" | |
| if status_forcelist == "": | |
| status_forcelist = list(range(400, 599)) #all possible HTTP Status error codes | |
| mysession = psession or requests.Session() | |
| retry = Retry( | |
| total=retries, | |
| read=retries, | |
| connect=retries, | |
| backoff_factor=backoff_factor, | |
| status_forcelist=status_forcelist, | |
| method_whitelist=frozenset(['GET', 'POST']), # Ensure POST is included | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry) | |
| mysession.mount('http://', adapter) | |
| mysession.mount('https://', adapter) | |
| return mysession | |
| SESSION = requests_retry_session() | |
| class EZShareDirectoryDownloader: | |
| """EZShare recursive directory downloader.""" | |
| def __init__(self, base_url): | |
| self.base_url = base_url | |
| self.stats = { | |
| 'sdirs_checked': 0, | |
| 'sdirs_created': 0, | |
| 'files_checked': 0, | |
| 'files_created': 0, | |
| 'files_updated': 0, | |
| 'bytes_checked': 0, | |
| 'files_downloaded': 0, | |
| 'bytes_downloaded': 0, | |
| 'start_time': time.time() | |
| } | |
| def parse_size(self, size_str): | |
| """Convert size string (e.g., '7750KB') to bytes.""" | |
| size_str = size_str.strip().upper() | |
| if size_str.endswith('KB'): | |
| return int(size_str[:-2]) * 1024, 1024 | |
| elif size_str.endswith('MB'): | |
| return int(float(size_str[:-2]) * 1024 * 1024), 1024 * 1024 | |
| elif size_str.endswith('GB'): | |
| return int(float(size_str[:-2]) * 1024 * 1024 * 1024), 1024 * 1024 * 1024 | |
| elif size_str.endswith('B'): | |
| return int(size_str[:-1]), 1 | |
| else: | |
| return int(size_str), 1 | |
| def parse_timestamp(self, timestamp_str): | |
| """Parse timestamp string to datetime object.""" | |
| timestamp_str = timestamp_str.strip() | |
| # Handle the specific format with spaces in the date | |
| # '2021-11- 6 16:17: 0' -> '2021-11-06 16:17:00' | |
| normalized = re.sub( | |
| r'(\d{4})-\s*(\d{1,2})-\s*(\d{1,2})\s+(\d{1,2}):\s*(\d{1,2}):\s*(\d{1,2})', | |
| r'\1-\2-\3 \4:\5:\6', timestamp_str | |
| ) | |
| timestamp = datetime.strptime(normalized.strip(), '%Y-%m-%d %H:%M:%S') + \ | |
| timedelta(seconds=UTC_OFFSET) #adjusts from EZShare timezone (UTC) to localtime | |
| return timestamp | |
| def set_file_timestamp(self, filepath, timestamp): | |
| """Set the modification time of a file.""" | |
| timestamp_seconds = timestamp.timestamp() | |
| os.utime(filepath, (timestamp_seconds, timestamp_seconds)) | |
| def file_exists_with_same_attributes(self, filepath, size_bytes, size_tolerance, timestamp): | |
| """Check if file exists with same size and timestamp.""" | |
| LOGGER.debug(f"checking file {filepath}") | |
| if not os.path.exists(filepath): | |
| LOGGER.debug(f"local file does not exist") | |
| return False | |
| file_stat = os.stat(filepath) | |
| file_size = file_stat.st_size | |
| file_mtime = datetime.fromtimestamp(file_stat.st_mtime) | |
| # Compare size | |
| if abs(file_size != size_bytes) > size_tolerance: | |
| LOGGER.debug(f"sizes differ by more than {size_tolerance} tolerance") | |
| return False | |
| # Check if the remote file time is in the future; if yes, logs it and returns it as | |
| # different (because we can't trust the timestamp, so it's better to copy it regardless) | |
| # Note: this happens in cards written by the Resmed Airsense 10 CPAP machine, | |
| # in the file 'Journal.dat' in the root of the card: its timestamp is set to | |
| # 2030-05-15 23:29:30 GMT, its size is 64KB and never changes, BUT its content does | |
| # change. This way we don't skip copying it. | |
| if timestamp > datetime.now(): | |
| LOGGER.debug("remote file timestamp is in the future: always consider it as different") | |
| return False | |
| # Compare timestamp (within 1 second tolerance) | |
| time_diff = abs((file_mtime - timestamp).total_seconds()) | |
| if time_diff > 1: | |
| LOGGER.debug(f"timestamps differ by {time_diff}, which is more than 1 second") | |
| return False | |
| LOGGER.debug(f"local and remote files do not differ") | |
| return True | |
| def download_file(self, url, filepath, size_expected, size_tolerance): | |
| """Download a file from URL and verify its size.""" | |
| LOGGER.info(f"Downloading: {url} -> {filepath}") | |
| for i in range(MAX_RETRIES): | |
| try: | |
| with requests.get(url, stream=True, timeout=5) as r: | |
| r.raise_for_status() | |
| with open(filepath, 'wb') as f: | |
| size_downloaded = 0 | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| size_downloaded += len(chunk) | |
| break #all chunks downloaded | |
| except (requests.exceptions.RequestException, ConnectionError, TimeoutError) as e: | |
| LOGGER.info(f"Exception during download: {e}") | |
| if i == MAX_RETRIES - 1: | |
| LOGGER.info(f"No retries left, aborting") | |
| raise | |
| LOGGER.info(f"Retrying in {2**i} seconds") | |
| time.sleep(2 ** i) # Exponential backoff | |
| if abs(size_downloaded - size_expected) > size_tolerance: | |
| raise Exception(f"Size mismatch for {filepath}:" + \ | |
| "expected {size_expected}, got {size_downloaded}") | |
| self.stats['files_downloaded'] += 1 | |
| self.stats['bytes_downloaded'] += size_downloaded | |
| return size_downloaded | |
| def parse_directory_listing(self, html_content, current_url, local_dir): | |
| """Parse HTML directory listing and process files/directories.""" | |
| self.stats['sdirs_checked'] += 1 | |
| # Ensure local directory exists | |
| if os.path.isfile(local_dir): | |
| LOGGER.info(f"local subdirectory {local_dir} exists but as a file, removing it") | |
| os.remove(local_dir) | |
| if not os.path.exists(local_dir): | |
| LOGGER.info(f"creating local subdirectory {local_dir}") | |
| os.makedirs(local_dir) | |
| self.stats['sdirs_created'] += 1 | |
| # Save current directory | |
| original_dir = os.getcwd() | |
| os.chdir(local_dir) | |
| try: | |
| # Pattern for files: timestamp, size, URL, filename | |
| # 2030- 5-15 23:29:30 64KB | |
| # <a href="http://192.168.4.1/download?file=JOURNAL.DAT"> Journal.dat</a> | |
| file_pattern = (r'(\d{4}-\s*\d{1,2}-\s*\d{1,2}\s+\d{1,2}:\s*\d{1,2}:\s*\d{1,2})' | |
| r'\s+(\d+(?:KB|MB|GB|B)?)\s*<a href="([^"]+)">\s*([^<]+)</a>') | |
| # Pattern for directories: timestamp, <DIR>, URI, sdirname | |
| # 2025- 4-23 3: 4:16 <DIR> | |
| # <a href="dir?dir=A:%5CDATALOG"> DATALOG</a> | |
| dir_pattern = (r'(\d{4}-\s*\d{1,2}-\s*\d{1,2}\s+\d{1,2}:\s*\d{1,2}:\s*\d{1,2})' | |
| r'\s+\<DIR\>\s*<a href="([^"]+)">\s*([^<]+)</a>') | |
| # Process files | |
| LOGGER.debug(f"html_content='{html_content}', dir_pattern='{file_pattern}'") | |
| for match in re.finditer(file_pattern, html_content): | |
| timestamp_str, size_str, file_url, filename = match.groups() | |
| try: | |
| timestamp = self.parse_timestamp(timestamp_str) | |
| size_bytes, size_units = self.parse_size(size_str) | |
| size_tolerance = size_units | |
| self.stats['files_checked'] += 1 | |
| self.stats['bytes_checked'] += size_bytes | |
| # Check if file exists with same attributes | |
| if self.file_exists_with_same_attributes(filename, size_bytes, | |
| size_tolerance, timestamp): | |
| LOGGER.debug(f"Skipping existing file with same attributes: {filename}") | |
| else: | |
| if os.path.isdir(filename): | |
| LOGGER.info("local file {filename} exists but as a sdir, removing it") | |
| os.remove(filename) | |
| if not os.path.exists(filename): | |
| LOGGER.debug(f"local file {filename} does not exist, creating it") | |
| self.stats['files_created'] += 1 | |
| else: | |
| self.stats['files_updated'] += 1 | |
| # Download the file | |
| full_url = urljoin(current_url, file_url) | |
| self.download_file(full_url, filename, size_bytes, size_tolerance) | |
| # Set timestamp | |
| self.set_file_timestamp(filename, timestamp) | |
| except Exception as e: | |
| LOGGER.error(f"Error processing file {filename}: {e}") | |
| raise | |
| # Process directories | |
| LOGGER.debug(f"html_content='{html_content}', dir_pattern='{dir_pattern}'") | |
| for match in re.finditer(dir_pattern, html_content): | |
| timestamp_str, dir_uri, sdirname = match.groups() | |
| if sdirname == "." or sdirname == "..": #only process real directories | |
| LOGGER.debug(f"skipping pseudo-directory '{sdirname}'") | |
| else: | |
| LOGGER.debug(f"processing directory: '{sdirname}'") | |
| try: | |
| # Construct new URL | |
| sub_url = urljoin(current_url, dir_uri) | |
| LOGGER.debug(f"Entering directory: {sdirname}") | |
| # Recursively process subdirectory | |
| response = SESSION.get(sub_url, timeout=5) | |
| response.raise_for_status() | |
| self.parse_directory_listing(response.text, sub_url, sdirname) | |
| except Exception as e: | |
| LOGGER.error(f"Error processing directory {sdirname}: {e}") | |
| raise | |
| finally: | |
| # Restore original directory | |
| os.chdir(original_dir) | |
| def run(self): | |
| """Main execution method.""" | |
| try: | |
| LOGGER.info(f"Starting download from: {self.base_url}") | |
| # Get initial directory listing | |
| response = SESSION.get(self.base_url, timeout=5) | |
| response.raise_for_status() | |
| # Parse and process the listing | |
| self.parse_directory_listing(response.text, self.base_url, '.') | |
| # Calculate elapsed time | |
| elapsed_time = time.time() - self.stats['start_time'] | |
| # Print statistics | |
| print("\n" + "="*60) | |
| print("DOWNLOAD COMPLETED") | |
| print("="*60) | |
| print(f"Time elapsed: {elapsed_time:.2f} seconds") | |
| print(f"Directories checked: {self.stats['sdirs_checked']}") | |
| print(f"Directories created: {self.stats['sdirs_created']}") | |
| print(f"Files checked: {self.stats['files_checked']}") | |
| print(f"Files created: {self.stats['files_created']}") | |
| print(f"Files updated: {self.stats['files_updated']}") | |
| print(f"Bytes checked: {self.stats['bytes_checked']:,} bytes") | |
| print(f"Files downloaded: {self.stats['files_downloaded']}") | |
| print(f"Bytes downloaded: {self.stats['bytes_downloaded']:,} bytes") | |
| print("="*60) | |
| except Exception as e: | |
| LOGGER.error(f"Fatal error: {e}") | |
| raise | |
| def main(): | |
| """Main entry point.""" | |
| # URL to start downloading from | |
| url = 'http://192.168.4.1/dir?dir=A:' | |
| # Create downloader and run | |
| downloader = EZShareDirectoryDownloader(url) | |
| downloader.run() | |
| if __name__ == "__main__": | |
| main() | |
| #Eof ezs_sync.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment