Created
July 24, 2025 21:05
-
-
Save mlivingston40/098e73c49ee455e28a37e78b54f9ac68 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| from typing import List, Any, Optional | |
| import logging | |
| import tempfile | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| import simplejson | |
| S3_CLIENT = boto3.client('s3') | |
| def write_data_to_path(data: Any, bucket_name: str, file_name: str, list_dicts_convert=False) -> None: | |
| tmp = tempfile.NamedTemporaryFile() | |
| with open(tmp.name, 'w') as f: | |
| if list_dicts_convert: | |
| for d in data: | |
| simplejson.dump(d, f, ignore_nan=True) | |
| else: | |
| simplejson.dump(data, f, ignore_nan=True) | |
| try: | |
| response = S3_CLIENT.upload_file(tmp.name, bucket_name, file_name) | |
| except ClientError as e: | |
| logging.error(e) | |
| tmp.close() | |
| def read_json_from_path(bucket: str, file_name: str) -> dict: | |
| tmp = tempfile.NamedTemporaryFile() | |
| try: | |
| S3_CLIENT.download_file(bucket, file_name, tmp.name) | |
| blob = json.load(open(tmp.name, 'r')) | |
| tmp.close() | |
| return blob | |
| except: | |
| tmp.close() | |
| return {} | |
| def get_all_files_in_dir(bucket: str, path: str) -> List[str]: | |
| paginator = S3_CLIENT.get_paginator('list_objects_v2') | |
| pages = paginator.paginate(Bucket=bucket, Prefix=path) | |
| full_list = [] | |
| for page in pages: | |
| if 'Contents' in page.keys(): | |
| full_list.extend([obj['Key'] for obj in page['Contents']]) | |
| return full_list | |
| def read_latest_from_dir(bucket: str, path: str) -> dict: | |
| savefiles = get_all_files_in_dir(bucket, path) | |
| if not savefiles: | |
| return {} | |
| latest_filename = max(savefiles) | |
| return read_json_from_path(bucket, latest_filename) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment