Created
September 6, 2024 16:13
-
-
Save cbpygit/a9b57597ce2db715a81d8686ae838636 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import gzip | |
| from typing import List | |
| from concurrent.futures import ThreadPoolExecutor | |
| from google.cloud import bigquery, storage | |
| import gcsfs | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| class ChunkedNDJSONBigQueryUploader: | |
| """ | |
| A class to upload a DataFrame to BigQuery in chunks using Google Cloud Storage as a temporary staging area. | |
| Attributes | |
| ---------- | |
| bucket_uri : str | |
| The URI of the GCS bucket where temporary files will be stored. | |
| chunk_size : int | |
| The number of rows per chunk to split the DataFrame. | |
| max_workers : int | |
| The number of threads to use for concurrent operations. | |
| Methods | |
| ------- | |
| upload(dataframe: pd.DataFrame, schema: List[dict], table_id: str) -> None | |
| Orchestrates the entire upload process of a DataFrame to BigQuery. | |
| """ | |
| def __init__(self, bucket_uri: str, chunk_size: int = 5000, max_workers: int = 32): | |
| """ | |
| Initializes the ChunkedNDJSONBigQueryUploader with the specified bucket URI, chunk size, and number of workers. | |
| Parameters | |
| ---------- | |
| bucket_uri : str | |
| The URI of the GCS bucket where temporary files will be stored. | |
| chunk_size : int, optional | |
| The number of rows per chunk to split the DataFrame (default is 5000). | |
| max_workers : int, optional | |
| The number of threads to use for concurrent operations (default is 32). | |
| """ | |
| self.bucket_uri = bucket_uri | |
| self.chunk_size = chunk_size | |
| self.max_workers = max_workers | |
| self.bigquery_client = bigquery.Client() | |
| self.gcs_fs = gcsfs.GCSFileSystem() | |
| def upload(self, dataframe: pd.DataFrame, schema: List[dict], table_id: str) -> None: | |
| """ | |
| Orchestrates the entire upload process of a DataFrame to BigQuery. | |
| Parameters | |
| ---------- | |
| dataframe : pd.DataFrame | |
| The DataFrame to be uploaded. | |
| schema : List[dict] | |
| The schema of the BigQuery table. | |
| table_id : str | |
| The BigQuery table ID where data will be loaded. | |
| """ | |
| # Step 1: Write DataFrame chunks directly to GCS as compressed NDJSON files | |
| file_paths = self._write_compressed_ndjson_chunks_to_gcs(dataframe) | |
| # Step 2: Load files into BigQuery | |
| self._load_files_to_bigquery(file_paths, schema, table_id) | |
| # Step 3: Clean up files from GCS | |
| self._cleanup_gcs(file_paths) | |
| def _write_compressed_ndjson_chunks_to_gcs(self, dataframe: pd.DataFrame) -> List[str]: | |
| """ | |
| Splits the DataFrame into smaller chunks and writes them as compressed NDJSON files directly to GCS. | |
| Parameters | |
| ---------- | |
| dataframe : pd.DataFrame | |
| The DataFrame to be split into chunks. | |
| Returns | |
| ------- | |
| List[str] | |
| List of GCS file URIs of the created compressed NDJSON files. | |
| """ | |
| chunks = [dataframe.iloc[i:i + self.chunk_size] for i in range(0, len(dataframe), self.chunk_size)] | |
| file_paths = [f"{self.bucket_uri}/chunk_{i}.ndjson.gz" for i in range(len(chunks))] | |
| logging.info("Writing %d chunks directly to GCS as compressed NDJSON files", len(chunks)) | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| executor.map(self._write_compressed_ndjson_to_gcs, chunks, file_paths) | |
| logging.info("Finished writing compressed NDJSON files to GCS") | |
| return file_paths | |
| def _write_compressed_ndjson_to_gcs(self, chunk: pd.DataFrame, file_path: str) -> None: | |
| """ | |
| Writes a DataFrame chunk as a compressed NDJSON file directly to GCS. | |
| Parameters | |
| ---------- | |
| chunk : pd.DataFrame | |
| The DataFrame chunk to write. | |
| file_path : str | |
| The GCS URI where the compressed NDJSON file will be stored. | |
| """ | |
| # Open the GCS file using gcsfs and write as compressed NDJSON | |
| with self.gcs_fs.open(file_path, 'wb') as f: | |
| with gzip.GzipFile(fileobj=f, mode='wb') as gzipped_file: | |
| gzipped_file.write(chunk.to_json(orient='records', lines=True).encode('utf-8')) | |
| logging.debug("Written compressed NDJSON file to GCS: %s", file_path) | |
| def _load_files_to_bigquery(self, file_paths: List[str], schema: List[dict], table_id: str) -> None: | |
| """ | |
| Loads the compressed NDJSON files from GCS to a BigQuery table. | |
| Parameters | |
| ---------- | |
| file_paths : List[str] | |
| The list of GCS file URIs to be loaded. | |
| schema : List[dict] | |
| The schema of the BigQuery table. | |
| table_id : str | |
| The BigQuery table ID where data will be loaded. | |
| """ | |
| job_config = bigquery.LoadJobConfig( | |
| schema=schema, | |
| source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, | |
| ) | |
| logging.info("Starting BigQuery load job for table: %s", table_id) | |
| load_job = self.bigquery_client.load_table_from_uri(file_paths, table_id, job_config=job_config) | |
| load_job.result() # Wait for the load job to complete | |
| logging.info("BigQuery load job completed successfully") | |
| def _cleanup_gcs(self, file_paths: List[str]) -> None: | |
| """ | |
| Cleans up the temporary files in the GCS bucket. | |
| Parameters | |
| ---------- | |
| file_paths : List[str] | |
| The list of GCS file URIs to be cleaned up. | |
| """ | |
| for file_path in file_paths: | |
| self.gcs_fs.rm(file_path) | |
| logging.debug("Deleted GCS file: %s", file_path) | |
| logging.info("Cleanup completed") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment