Skip to content

Instantly share code, notes, and snippets.

@cbpygit
Created September 6, 2024 16:13
Show Gist options
  • Select an option

  • Save cbpygit/a9b57597ce2db715a81d8686ae838636 to your computer and use it in GitHub Desktop.

Select an option

Save cbpygit/a9b57597ce2db715a81d8686ae838636 to your computer and use it in GitHub Desktop.
import pandas as pd
import gzip
from typing import List
from concurrent.futures import ThreadPoolExecutor
from google.cloud import bigquery, storage
import gcsfs
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ChunkedNDJSONBigQueryUploader:
"""
A class to upload a DataFrame to BigQuery in chunks using Google Cloud Storage as a temporary staging area.
Attributes
----------
bucket_uri : str
The URI of the GCS bucket where temporary files will be stored.
chunk_size : int
The number of rows per chunk to split the DataFrame.
max_workers : int
The number of threads to use for concurrent operations.
Methods
-------
upload(dataframe: pd.DataFrame, schema: List[dict], table_id: str) -> None
Orchestrates the entire upload process of a DataFrame to BigQuery.
"""
def __init__(self, bucket_uri: str, chunk_size: int = 5000, max_workers: int = 32):
"""
Initializes the ChunkedNDJSONBigQueryUploader with the specified bucket URI, chunk size, and number of workers.
Parameters
----------
bucket_uri : str
The URI of the GCS bucket where temporary files will be stored.
chunk_size : int, optional
The number of rows per chunk to split the DataFrame (default is 5000).
max_workers : int, optional
The number of threads to use for concurrent operations (default is 32).
"""
self.bucket_uri = bucket_uri
self.chunk_size = chunk_size
self.max_workers = max_workers
self.bigquery_client = bigquery.Client()
self.gcs_fs = gcsfs.GCSFileSystem()
def upload(self, dataframe: pd.DataFrame, schema: List[dict], table_id: str) -> None:
"""
Orchestrates the entire upload process of a DataFrame to BigQuery.
Parameters
----------
dataframe : pd.DataFrame
The DataFrame to be uploaded.
schema : List[dict]
The schema of the BigQuery table.
table_id : str
The BigQuery table ID where data will be loaded.
"""
# Step 1: Write DataFrame chunks directly to GCS as compressed NDJSON files
file_paths = self._write_compressed_ndjson_chunks_to_gcs(dataframe)
# Step 2: Load files into BigQuery
self._load_files_to_bigquery(file_paths, schema, table_id)
# Step 3: Clean up files from GCS
self._cleanup_gcs(file_paths)
def _write_compressed_ndjson_chunks_to_gcs(self, dataframe: pd.DataFrame) -> List[str]:
"""
Splits the DataFrame into smaller chunks and writes them as compressed NDJSON files directly to GCS.
Parameters
----------
dataframe : pd.DataFrame
The DataFrame to be split into chunks.
Returns
-------
List[str]
List of GCS file URIs of the created compressed NDJSON files.
"""
chunks = [dataframe.iloc[i:i + self.chunk_size] for i in range(0, len(dataframe), self.chunk_size)]
file_paths = [f"{self.bucket_uri}/chunk_{i}.ndjson.gz" for i in range(len(chunks))]
logging.info("Writing %d chunks directly to GCS as compressed NDJSON files", len(chunks))
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
executor.map(self._write_compressed_ndjson_to_gcs, chunks, file_paths)
logging.info("Finished writing compressed NDJSON files to GCS")
return file_paths
def _write_compressed_ndjson_to_gcs(self, chunk: pd.DataFrame, file_path: str) -> None:
"""
Writes a DataFrame chunk as a compressed NDJSON file directly to GCS.
Parameters
----------
chunk : pd.DataFrame
The DataFrame chunk to write.
file_path : str
The GCS URI where the compressed NDJSON file will be stored.
"""
# Open the GCS file using gcsfs and write as compressed NDJSON
with self.gcs_fs.open(file_path, 'wb') as f:
with gzip.GzipFile(fileobj=f, mode='wb') as gzipped_file:
gzipped_file.write(chunk.to_json(orient='records', lines=True).encode('utf-8'))
logging.debug("Written compressed NDJSON file to GCS: %s", file_path)
def _load_files_to_bigquery(self, file_paths: List[str], schema: List[dict], table_id: str) -> None:
"""
Loads the compressed NDJSON files from GCS to a BigQuery table.
Parameters
----------
file_paths : List[str]
The list of GCS file URIs to be loaded.
schema : List[dict]
The schema of the BigQuery table.
table_id : str
The BigQuery table ID where data will be loaded.
"""
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
logging.info("Starting BigQuery load job for table: %s", table_id)
load_job = self.bigquery_client.load_table_from_uri(file_paths, table_id, job_config=job_config)
load_job.result() # Wait for the load job to complete
logging.info("BigQuery load job completed successfully")
def _cleanup_gcs(self, file_paths: List[str]) -> None:
"""
Cleans up the temporary files in the GCS bucket.
Parameters
----------
file_paths : List[str]
The list of GCS file URIs to be cleaned up.
"""
for file_path in file_paths:
self.gcs_fs.rm(file_path)
logging.debug("Deleted GCS file: %s", file_path)
logging.info("Cleanup completed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment