Skip to content

Instantly share code, notes, and snippets.

@DurvalMenezes
Created October 12, 2025 18:52
Show Gist options
  • Select an option

  • Save DurvalMenezes/c0ad08a0f7fa2516e71fb858d292829a to your computer and use it in GitHub Desktop.

Select an option

Save DurvalMenezes/c0ad08a0f7fa2516e71fb858d292829a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
###############################################################################################
#ezs_sync.py: syncs files from a EZShare WiFi SDCard to the local directory
#
#Description:
# syncs the remote filesystem in the EZShare this machine is connected to (via WiFi),to the
# current local directory, using the device HTTP server; the general idea is to do what a
# `wget -rN -cp` would do, if the URIs the EZShare exports did not mangle the file names and
# therefore force us to take them from the anchor text, plus omit directory names from these
# same URLs and so force us to also handle them, plus special treatment for timestamps in the
# future and timezone conversion.
#
#Authors:
# Initial code by Claude.AI: https://claude.ai/share/a98522d6-c5a6-41ac-8426-b4504a37cc25
# Further debugging, fixing and enhancing: 2025/10/12 Durval Menezes
#
#License:
# WTFPL: https://www.wtfpl.net/about/
###############################################################################################
"""
HTTP Directory Downloader
Downloads files recursively from an HTTP directory listing, preserving timestamps
and checking file integrity, plus trying to be as robust as possible re:errors
"""
import os
import re
import time
import logging
from datetime import datetime, timezone, timedelta
from urllib.parse import urljoin
import requests
#these are needed for requests_retry_session() below:
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
#We need to get UTC offset to correct the EZShare HTML timestamps (which are in GMT) to localtime
# Note: not all timestamps are served in UTC by EZShare, exceptions include /Journal.dat
# and others; not a great problem as these files are simply redownloaded due to different
# timestamps, so no copies are skipped and no data is lost.
LOCAL_TIME = datetime.now(timezone.utc).astimezone()
UTC_OFFSET = LOCAL_TIME.utcoffset().total_seconds()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
LOGGER = logging.getLogger(__name__)
#Maximum HTTP retries
MAX_RETRIES = 5
#Implements HTTP GET with automatic retries
def requests_retry_session(
retries=MAX_RETRIES,
backoff_factor=1,
status_forcelist="",
psession=None
):
"""returns a session object set for automatic retries on both network and HTTP errors."""
if status_forcelist == "":
status_forcelist = list(range(400, 599)) #all possible HTTP Status error codes
mysession = psession or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
method_whitelist=frozenset(['GET', 'POST']), # Ensure POST is included
)
adapter = HTTPAdapter(max_retries=retry)
mysession.mount('http://', adapter)
mysession.mount('https://', adapter)
return mysession
SESSION = requests_retry_session()
class EZShareDirectoryDownloader:
"""EZShare recursive directory downloader."""
def __init__(self, base_url):
self.base_url = base_url
self.stats = {
'sdirs_checked': 0,
'sdirs_created': 0,
'files_checked': 0,
'files_created': 0,
'files_updated': 0,
'bytes_checked': 0,
'files_downloaded': 0,
'bytes_downloaded': 0,
'start_time': time.time()
}
def parse_size(self, size_str):
"""Convert size string (e.g., '7750KB') to bytes."""
size_str = size_str.strip().upper()
if size_str.endswith('KB'):
return int(size_str[:-2]) * 1024, 1024
elif size_str.endswith('MB'):
return int(float(size_str[:-2]) * 1024 * 1024), 1024 * 1024
elif size_str.endswith('GB'):
return int(float(size_str[:-2]) * 1024 * 1024 * 1024), 1024 * 1024 * 1024
elif size_str.endswith('B'):
return int(size_str[:-1]), 1
else:
return int(size_str), 1
def parse_timestamp(self, timestamp_str):
"""Parse timestamp string to datetime object."""
timestamp_str = timestamp_str.strip()
# Handle the specific format with spaces in the date
# '2021-11- 6 16:17: 0' -> '2021-11-06 16:17:00'
normalized = re.sub(
r'(\d{4})-\s*(\d{1,2})-\s*(\d{1,2})\s+(\d{1,2}):\s*(\d{1,2}):\s*(\d{1,2})',
r'\1-\2-\3 \4:\5:\6', timestamp_str
)
timestamp = datetime.strptime(normalized.strip(), '%Y-%m-%d %H:%M:%S') + \
timedelta(seconds=UTC_OFFSET) #adjusts from EZShare timezone (UTC) to localtime
return timestamp
def set_file_timestamp(self, filepath, timestamp):
"""Set the modification time of a file."""
timestamp_seconds = timestamp.timestamp()
os.utime(filepath, (timestamp_seconds, timestamp_seconds))
def file_exists_with_same_attributes(self, filepath, size_bytes, size_tolerance, timestamp):
"""Check if file exists with same size and timestamp."""
LOGGER.debug(f"checking file {filepath}")
if not os.path.exists(filepath):
LOGGER.debug(f"local file does not exist")
return False
file_stat = os.stat(filepath)
file_size = file_stat.st_size
file_mtime = datetime.fromtimestamp(file_stat.st_mtime)
# Compare size
if abs(file_size != size_bytes) > size_tolerance:
LOGGER.debug(f"sizes differ by more than {size_tolerance} tolerance")
return False
# Check if the remote file time is in the future; if yes, logs it and returns it as
# different (because we can't trust the timestamp, so it's better to copy it regardless)
# Note: this happens in cards written by the Resmed Airsense 10 CPAP machine,
# in the file 'Journal.dat' in the root of the card: its timestamp is set to
# 2030-05-15 23:29:30 GMT, its size is 64KB and never changes, BUT its content does
# change. This way we don't skip copying it.
if timestamp > datetime.now():
LOGGER.debug("remote file timestamp is in the future: always consider it as different")
return False
# Compare timestamp (within 1 second tolerance)
time_diff = abs((file_mtime - timestamp).total_seconds())
if time_diff > 1:
LOGGER.debug(f"timestamps differ by {time_diff}, which is more than 1 second")
return False
LOGGER.debug(f"local and remote files do not differ")
return True
def download_file(self, url, filepath, size_expected, size_tolerance):
"""Download a file from URL and verify its size."""
LOGGER.info(f"Downloading: {url} -> {filepath}")
for i in range(MAX_RETRIES):
try:
with requests.get(url, stream=True, timeout=5) as r:
r.raise_for_status()
with open(filepath, 'wb') as f:
size_downloaded = 0
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
size_downloaded += len(chunk)
break #all chunks downloaded
except (requests.exceptions.RequestException, ConnectionError, TimeoutError) as e:
LOGGER.info(f"Exception during download: {e}")
if i == MAX_RETRIES - 1:
LOGGER.info(f"No retries left, aborting")
raise
LOGGER.info(f"Retrying in {2**i} seconds")
time.sleep(2 ** i) # Exponential backoff
if abs(size_downloaded - size_expected) > size_tolerance:
raise Exception(f"Size mismatch for {filepath}:" + \
"expected {size_expected}, got {size_downloaded}")
self.stats['files_downloaded'] += 1
self.stats['bytes_downloaded'] += size_downloaded
return size_downloaded
def parse_directory_listing(self, html_content, current_url, local_dir):
"""Parse HTML directory listing and process files/directories."""
self.stats['sdirs_checked'] += 1
# Ensure local directory exists
if os.path.isfile(local_dir):
LOGGER.info(f"local subdirectory {local_dir} exists but as a file, removing it")
os.remove(local_dir)
if not os.path.exists(local_dir):
LOGGER.info(f"creating local subdirectory {local_dir}")
os.makedirs(local_dir)
self.stats['sdirs_created'] += 1
# Save current directory
original_dir = os.getcwd()
os.chdir(local_dir)
try:
# Pattern for files: timestamp, size, URL, filename
# 2030- 5-15 23:29:30 64KB
# <a href="http://192.168.4.1/download?file=JOURNAL.DAT"> Journal.dat</a>
file_pattern = (r'(\d{4}-\s*\d{1,2}-\s*\d{1,2}\s+\d{1,2}:\s*\d{1,2}:\s*\d{1,2})'
r'\s+(\d+(?:KB|MB|GB|B)?)\s*<a href="([^"]+)">\s*([^<]+)</a>')
# Pattern for directories: timestamp, <DIR>, URI, sdirname
# 2025- 4-23 3: 4:16 &lt;DIR&gt;
# <a href="dir?dir=A:%5CDATALOG"> DATALOG</a>
dir_pattern = (r'(\d{4}-\s*\d{1,2}-\s*\d{1,2}\s+\d{1,2}:\s*\d{1,2}:\s*\d{1,2})'
r'\s+\&lt;DIR\&gt;\s*<a href="([^"]+)">\s*([^<]+)</a>')
# Process files
LOGGER.debug(f"html_content='{html_content}', dir_pattern='{file_pattern}'")
for match in re.finditer(file_pattern, html_content):
timestamp_str, size_str, file_url, filename = match.groups()
try:
timestamp = self.parse_timestamp(timestamp_str)
size_bytes, size_units = self.parse_size(size_str)
size_tolerance = size_units
self.stats['files_checked'] += 1
self.stats['bytes_checked'] += size_bytes
# Check if file exists with same attributes
if self.file_exists_with_same_attributes(filename, size_bytes,
size_tolerance, timestamp):
LOGGER.debug(f"Skipping existing file with same attributes: {filename}")
else:
if os.path.isdir(filename):
LOGGER.info("local file {filename} exists but as a sdir, removing it")
os.remove(filename)
if not os.path.exists(filename):
LOGGER.debug(f"local file {filename} does not exist, creating it")
self.stats['files_created'] += 1
else:
self.stats['files_updated'] += 1
# Download the file
full_url = urljoin(current_url, file_url)
self.download_file(full_url, filename, size_bytes, size_tolerance)
# Set timestamp
self.set_file_timestamp(filename, timestamp)
except Exception as e:
LOGGER.error(f"Error processing file {filename}: {e}")
raise
# Process directories
LOGGER.debug(f"html_content='{html_content}', dir_pattern='{dir_pattern}'")
for match in re.finditer(dir_pattern, html_content):
timestamp_str, dir_uri, sdirname = match.groups()
if sdirname == "." or sdirname == "..": #only process real directories
LOGGER.debug(f"skipping pseudo-directory '{sdirname}'")
else:
LOGGER.debug(f"processing directory: '{sdirname}'")
try:
# Construct new URL
sub_url = urljoin(current_url, dir_uri)
LOGGER.debug(f"Entering directory: {sdirname}")
# Recursively process subdirectory
response = SESSION.get(sub_url, timeout=5)
response.raise_for_status()
self.parse_directory_listing(response.text, sub_url, sdirname)
except Exception as e:
LOGGER.error(f"Error processing directory {sdirname}: {e}")
raise
finally:
# Restore original directory
os.chdir(original_dir)
def run(self):
"""Main execution method."""
try:
LOGGER.info(f"Starting download from: {self.base_url}")
# Get initial directory listing
response = SESSION.get(self.base_url, timeout=5)
response.raise_for_status()
# Parse and process the listing
self.parse_directory_listing(response.text, self.base_url, '.')
# Calculate elapsed time
elapsed_time = time.time() - self.stats['start_time']
# Print statistics
print("\n" + "="*60)
print("DOWNLOAD COMPLETED")
print("="*60)
print(f"Time elapsed: {elapsed_time:.2f} seconds")
print(f"Directories checked: {self.stats['sdirs_checked']}")
print(f"Directories created: {self.stats['sdirs_created']}")
print(f"Files checked: {self.stats['files_checked']}")
print(f"Files created: {self.stats['files_created']}")
print(f"Files updated: {self.stats['files_updated']}")
print(f"Bytes checked: {self.stats['bytes_checked']:,} bytes")
print(f"Files downloaded: {self.stats['files_downloaded']}")
print(f"Bytes downloaded: {self.stats['bytes_downloaded']:,} bytes")
print("="*60)
except Exception as e:
LOGGER.error(f"Fatal error: {e}")
raise
def main():
"""Main entry point."""
# URL to start downloading from
url = 'http://192.168.4.1/dir?dir=A:'
# Create downloader and run
downloader = EZShareDirectoryDownloader(url)
downloader.run()
if __name__ == "__main__":
main()
#Eof ezs_sync.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment