Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save sfc-gh-vsekar/4d4677f6f3db5a07793b4136e433fce2 to your computer and use it in GitHub Desktop.

Select an option

Save sfc-gh-vsekar/4d4677f6f3db5a07793b4136e433fce2 to your computer and use it in GitHub Desktop.
large scale parallel processing with Snowpark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Indexing LiDar (.Laz) files with Snowpark adopting Multi-Thread and AsychJobs\n",
"\n",
"This notebook demonstrates a pattern and solution to process large amount (approx. 850,000) LiDar files\n",
"using Snowpark.\n",
"\n",
"#### Steps:\n",
"- Identify the files, in the stage, to be proccessed into a table 'usgs_data_files'\n",
"- Group the files into seperate \"partitions\" or \"buckets\".\n",
"- Define a stored procedure \"batch_parse_and_extract_lazmetadata_sproc\" that takes a bucket id as a parameter and then :\n",
" - define a function to parse the metadata information from the LiDar files\n",
" - creates a mini-batch, 10 files at a time.\n",
" - uses [Joblib.Parallel](https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#running-concurrent-tasks-with-worker-processes) functionality to parse these minibatches\n",
" - captures the result and stores them in a parquet file\n",
" - uploads the parque file to a stage\n",
"- Create XS warehouses (6 in total), to further allowing for parallel processing.\n",
"- Create a table that allocates equal quantities of bucket to each warehouse.\n",
"- Invokes the stored procedures using [asynch jobs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/async_job).\n",
"- Create a table to store the parquet file, which got uploaded in the above stored procedure \"batch_parse_and_extract_lazmetadata_sproc\"\n",
"\n",
"#### Observations\n",
" - approx 850,000 files\n",
" - average 30 min to process 1 bucket, containing 9,000 files\n",
" - processing all the 850k files approx. 35 minutes\n",
" \n",
"#### Dataset\n",
" - open dataset : https://registry.opendata.aws/usgs-lidar/\n",
"\n",
"## Initialization"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: laspy in /Users/vsekar/opt/anaconda3/envs/venkat_env/lib/python3.9/site-packages (2.5.3)\n",
"Requirement already satisfied: shapely in /Users/vsekar/opt/anaconda3/envs/venkat_env/lib/python3.9/site-packages (2.0.1)\n",
"Requirement already satisfied: pyproj in /Users/vsekar/opt/anaconda3/envs/venkat_env/lib/python3.9/site-packages (3.6.1)\n",
"Requirement already satisfied: numpy in /Users/vsekar/opt/anaconda3/envs/venkat_env/lib/python3.9/site-packages (from laspy) (1.26.2)\n",
"Requirement already satisfied: certifi in /Users/vsekar/opt/anaconda3/envs/venkat_env/lib/python3.9/site-packages (from pyproj) (2023.11.17)\n"
]
}
],
"source": [
"!pip install laspy shapely pyproj"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Establish a Snowpark session, which will be used by Tool\n",
"\n",
"from snowflake.snowpark.session import Session\n",
"\n",
"# Setup the snowflake connection information\n",
"snowflake_connection_info = {\n",
" \"url\": \"https://<account locator>.snowflakecomputing.com\"\n",
" ,\"account\": \"<account locator>\"\n",
" ,\"account_name\": \"<account identifier>, do not include the organization name\"\n",
" ,\"organization\": \"<account org name>\"\n",
" ,\"user\": \"XXXX\"\n",
" ,\"password\": \"XXXX\"\n",
"}\n",
"\n",
"# I am establishing 2 snowpark sessions. \n",
"# One for DML processing with Snowflake and Another for interacting with the API.\n",
"sp_session = Session.builder.configs(snowflake_connection_info).create()\n",
"\n",
"sp_session.use_role(f'''venkat_app_dev''')\n",
"sp_session.use_schema(f'''venkat_db.public''')\n",
"sp_session.use_warehouse(f'''venkat_compute_wh''')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(status='Stage area CODE_STG successfully created.')]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Stage for storing python code\n",
"\n",
"sql_stmt = f'''\n",
" create or replace stage code_stg\n",
" comment = 'used for storing python code'\n",
" '''\n",
"sp_session.sql(sql_stmt).collect()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(status='USGS_LIDAR_PUBLIC_STG already exists, statement succeeded.')]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Map external stage, containing LiDar data\n",
"\n",
"# Ref : https://registry.opendata.aws/usgs-lidar/\n",
"\n",
"# The above is in us-west-2; same as my current Snowflake account\n",
"\n",
"\n",
"s3_dir = 's3://usgs-lidar-public/'\n",
"\n",
"sql_stmt = f'''\n",
" create stage if not exists usgs_lidar_public_stg\n",
" url = '{s3_dir}'\n",
" comment = 'USGS LiDar public dataset : https://registry.opendata.aws/usgs-lidar/'\n",
" '''\n",
"sp_session.sql(sql_stmt).collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"----\n",
"## Partitioning or Bucketing \n",
"\n",
"We want to parallelize the parsing functionality, to achieve this we form groups / partitions of files. This does not mean we are moving or copying files around; but rather setup a grouping marker and assign the files to it. Since there are no relationship between these files, they can be grouped in any order. Instead of doing this in-memory using pandas/numpy and other mechanism, i am using Snowflake. This will allow for easier interrogation and also enables stored procs to query the table and retrieve the list of files easily.\n",
"\n",
"- First we do a list stage operation and store the results in a table 'usgs_data_files'\n",
"- Then using [NTILE](https://docs.snowflake.com/en/sql-reference/functions/ntile), we do the grouping and store the group id/partition in the column 'processing_bucket'.\n",
"\n",
"I have set the groups to be 100; this is an arbitrary number."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" create or replace transient table usgs_data_files (\n",
" file_id varchar\n",
" ,file_path varchar\n",
" ,file_name varchar\n",
" ,file_size number\n",
" ,stage_path varchar\n",
" ,processing_bucket int\n",
" )\n",
" comment = 'used for storing all the laz file used.';\n",
" list @usgs_lidar_public_stg/USGS_LPC_TX_Panhandle_B10_2017_LAS_2019/ept-data/; \n",
"insert into usgs_data_files (file_id ,file_path ,file_name ,file_size ,stage_path) \n",
" select\n",
" replace(\"name\" ,'s3://usgs-lidar-public/','') as file_id\n",
" ,\"name\" as file_path\n",
" ,split_part(file_path ,'/' ,-1) as file_name\n",
" ,\"size\" as file_size\n",
" ,replace(file_path ,'s3://usgs-lidar-public/', '@usgs_lidar_public_stg/') as stage_path\n",
" from table(result_scan(last_query_id()));\n"
]
}
],
"source": [
"# Store the list of lidar files found in the directory\n",
"\n",
"s3_url = 's3://usgs-lidar-public/'\n",
"sub_dir = 'USGS_LPC_TX_Panhandle_B10_2017_LAS_2019/ept-data'\n",
"sql_stmts = [\n",
" ''' create or replace transient table usgs_data_files (\n",
" file_id varchar\n",
" ,file_path varchar\n",
" ,file_name varchar\n",
" ,file_size number\n",
" ,stage_path varchar\n",
" ,processing_bucket int\n",
" )\n",
" comment = 'used for storing all the laz file used.';'''\n",
" \n",
" # Ensure to capture all the files into a table. We are not using DIRECTORY as \n",
" # the No of files are large and will not work(will error out, if stage is directory enabled).\n",
" ,f''' list @usgs_lidar_public_stg/{sub_dir}/; '''\n",
"\n",
" # Store the results into a table for later processing\n",
" ,f'''insert into usgs_data_files (file_id ,file_path ,file_name ,file_size ,stage_path) \n",
" select\n",
" replace(\"name\" ,'{s3_url}','') as file_id\n",
" ,\"name\" as file_path\n",
" ,split_part(file_path ,'/' ,-1) as file_name\n",
" ,\"size\" as file_size\n",
" ,replace(file_path ,'{s3_url}', '@usgs_lidar_public_stg/') as stage_path\n",
" from table(result_scan(last_query_id()));'''\n",
"]\n",
"\n",
"for stmt in sql_stmts:\n",
" print(stmt)\n",
" sp_session.sql(stmt).collect()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--------------------------------------\n",
"|\"PROCESSING_BUCKET\" |\"FILE_COUNT\" |\n",
"--------------------------------------\n",
"|10 |8939 |\n",
"|21 |8939 |\n",
"|37 |8939 |\n",
"|49 |8939 |\n",
"|31 |8939 |\n",
"|90 |8939 |\n",
"|30 |8939 |\n",
"|40 |8939 |\n",
"|14 |8939 |\n",
"|35 |8939 |\n",
"--------------------------------------\n",
"\n"
]
}
],
"source": [
"# Bucket the ids\n",
"\n",
"# The number of buckets is used to control the level of parallel processes\n",
"total_bucket_count = 100\n",
"\n",
"sql_stmt = f'''\n",
" merge into usgs_data_files as tgt \n",
" using (\n",
" select file_id \n",
" ,ntile({total_bucket_count}) over ( order by file_id ) as processing_bucket\n",
" from usgs_data_files\n",
" ) as src\n",
" on tgt.file_id = src.file_id\n",
" when matched then \n",
" update set tgt.processing_bucket = src.processing_bucket'''\n",
"sp_session.sql(sql_stmt).collect()\n",
"\n",
"# Get the filecount and display the distribution\n",
"sql_stmt = '''\n",
"select processing_bucket ,count(file_id) as file_count\n",
"from usgs_data_files\n",
"group by all\n",
"order by file_count desc '''\n",
"spdf = sp_session.sql(sql_stmt)\n",
"spdf.show(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Indexing - Parallel processing with JobLib\n",
"\n",
"We now define the stored procedure 'batch_parse_and_extract_lazmetadata_sproc', which essentially for a given group_id / bucket\n",
"retreives the list of files and parse the metadata from the files. Key points to note:\n",
"\n",
"- I am using the [Joblib.parallel](https://joblib.readthedocs.io/en/latest/parallel.html), to help with the multi-threading. Refer to a sample in [Doc: Running Concurrent Tasks with Worker Processes](https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#running-concurrent-tasks-with-worker-processes)\n",
"- The data once parsed could technically be written to a table, but instead i am saving the data as a parquet and uploading to a stage. I had taken this approach, as each write_pandas like operation results in creating a temporary stage & table step. Staging the parquet file and then doing a copy operation, later in the pipeline, saves some time."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"The version of package 'laspy' in the local environment is 2.5.3, which does not fit the criteria for the requirement 'laspy'. Your UDF might not work when the package version is different between the server and your local environment.\n",
"Package 'lazrs-python' is not installed in the local environment. Your UDF might not work when the package is installed on the server but not on your local environment.\n",
"The version of package 'snowflake-snowpark-python' in the local environment is 1.12.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.\n",
"The version of package 'joblib' in the local environment is 1.3.2, which does not fit the criteria for the requirement 'joblib'. Your UDF might not work when the package version is different between the server and your local environment.\n"
]
}
],
"source": [
"# Define store procedure for indexing\n",
"\n",
"from datetime import datetime ,timedelta\n",
"import snowflake.snowpark.types as T\n",
"import snowflake.snowpark.functions as F\n",
"from snowflake.snowpark.files import SnowflakeFile\n",
"import os ,json\n",
"import pandas as pd\n",
"\n",
"def _get_metadata_of_file(p_file_id ,p_file_url ,p_iteration ,p_iteration_row_idx ,p_iteration_loop_start_time ,p_process_start_time):\n",
"\tret = {'file_id': p_file_id ,'iteration' : p_iteration \n",
"\t\t\t\t,'iteration_row_idx': p_iteration_row_idx ,'error': ''}\n",
"\timport laspy\n",
"\t\t\n",
"\tstart_timer = datetime.now()\n",
"\ttry:\n",
"\t\twith SnowflakeFile.open(p_file_url ,'rb' ,require_scoped_url=False) as f:\n",
"\t\t\t# Ref: https://laspy.readthedocs.io/en/latest/basic.html#\n",
"\t\t\tlas = laspy.read(f)\n",
"\t\t\tlas_header = las.header\n",
"\n",
"\t\t\tret['date'] = las_header.date\n",
"\t\t\tret['point_format_id'] = las_header.point_format.id\n",
"\t\t\tret['point_count'] = las_header.point_count\n",
"\t\t\tret['dimensions'] = list(\n",
"\t\t\t\t\tlas_header.point_format.standard_dimension_names)\n",
"\n",
"\t\t\tcrs = las_header.parse_crs()\n",
"\t\t\tif crs is not None:\n",
"\t\t\t\t\tret['crs'] = las_header.parse_crs().to_epsg()\n",
"\n",
"\t\t\t# ret['status'] = True\n",
"\texcept Exception as e:\n",
"\t\t\t# Errors could happen, if file is corrupted\n",
"\t\t\tret['error'] = str(e)\n",
"\t\t\t\n",
"\tend_timer = datetime.now()\n",
"\tret['fileread_elapsed_secs'] = int( (end_timer - start_timer).total_seconds() )\n",
"\tret['iteration_elapsed_secs'] = int( (end_timer - p_iteration_loop_start_time).total_seconds() )\n",
"\tret['process_elapsed_mins'] = int( (end_timer - p_process_start_time).total_seconds()/60 )\n",
"\n",
"\treturn ret\n",
"\n",
"def _get_total_file_count_for_bucket(p_processing_bucket ,p_session):\n",
"\tspdf = (p_session.table('usgs_data_files')\n",
"\t\t\t\t\t.select('file_id' ,'stage_path')\n",
"\t\t\t\t\t.filter(f'(processing_bucket = {p_processing_bucket})')\n",
"\t\t\t\t\t)\n",
"\treturn spdf.count()\n",
"\n",
"def _get_iterator_for_listof_files_to_process(p_processing_bucket ,p_session):\n",
"\tspdf = (p_session.table('usgs_data_files')\n",
"\t\t\t\t.select('file_id' ,'stage_path')\n",
"\t\t\t\t.filter(f'(processing_bucket = {p_processing_bucket})')\n",
"\t\t\t\t)\n",
"\treturn spdf\n",
"\n",
"def _extract_metadata_of_files(p_processing_bucket ,p_batch_count ,p_session):\n",
"\timport joblib\n",
"\tfrom joblib import Parallel, delayed\n",
"\t\n",
"\tbatch_count = 10 if p_batch_count <= 0 else p_batch_count\n",
"\tbatch_count = p_batch_count\n",
"\tmetadata_results = []\n",
"\tprocess_start_time = datetime.now()\n",
"\n",
"\ttotal_file_count = _get_total_file_count_for_bucket(p_processing_bucket ,p_session)\n",
"\n",
"\t# Ensure that there are files to be processed\n",
"\tif total_file_count < 1:\n",
"\t\treturn total_file_count ,0 ,metadata_results\n",
"\t\n",
"\tlistof_files_spdf = _get_iterator_for_listof_files_to_process(p_processing_bucket ,p_session)\n",
"\tlistof_files_iter = listof_files_spdf.to_local_iterator()\n",
"\tfiles_processed_count = 0\n",
"\tsub_threads = []\n",
"\n",
"\t# Ref: https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#running-concurrent-tasks-with-worker-processes\n",
"\t# Ref : https://joblib.readthedocs.io/en/latest/parallel.html\n",
"\t\n",
"\t# (optional for faster processing.) we set backend to loky a.\n",
"\t# joblib.parallel_backend('loky')\n",
"\n",
"\twith Parallel(n_jobs=-1) as parallel:\n",
"\t\titeration = 0\n",
"\n",
"\t\twhile files_processed_count < total_file_count:\n",
"\n",
"\t\t\titeration_loop_start_time = datetime.now()\n",
"\n",
"\t\t\t# Retreive the next batch of files to process\n",
"\t\t\tsub_threads.clear()\n",
"\t\t\tfor i in range(1, batch_count+1):\n",
"\t\t\t\trow = next(listof_files_iter ,None)\n",
"\t\t\t\tif row is None:\n",
"\t\t\t\t\tbreak\n",
"\t\t\t\tt = delayed(_get_metadata_of_file)(row['FILE_ID'] ,row['STAGE_PATH'] \n",
"\t\t\t\t\t\t\t,iteration ,i ,iteration_loop_start_time ,process_start_time)\n",
"\t\t\t\tsub_threads.extend([t])\n",
"\n",
"\t\t\t# invoke parallel to do the processing\n",
"\t\t\tresults = parallel(sub_threads)\n",
"\n",
"\t\t\t# store the response into the array\n",
"\t\t\tmetadata_results.extend(results)\n",
"\t\t\t\n",
"\t\t\titeration += 1\n",
"\t\t\tfiles_processed_count = len(metadata_results)\n",
"\n",
"\t\t\t# if iteration >= 5:\n",
"\t\t\t# \tbreak\n",
"\t\t\t\n",
"\treturn total_file_count ,files_processed_count ,metadata_results\n",
"\n",
"def _save_to_parquet(p_processing_bucket ,p_metadata_results ,p_batch_chunk ,p_save_folder ):\n",
"\tsave_folder = p_save_folder\n",
"\tif not os.path.exists(save_folder):\n",
"\t\tos.makedirs(save_folder)\n",
"\n",
"\tsave_basefilename = f'lidar_metadata_{p_processing_bucket}_{p_batch_chunk}.parquet.gz'\n",
"\t# We will be doing a merge operation, and the metadata will be stored in\n",
"\t# a variant column, so we are creating a dataframe that would help us to\n",
"\t# achieve this.\n",
"\td = [ { 'file_id': r['file_id'] ,'parsed_metadata' : r} \n",
"\t\t\tfor r in p_metadata_results ]\n",
"\tdf = pd.DataFrame(d)\n",
"\tdf.to_parquet(f'{save_folder}/{save_basefilename}',compression='gzip')\n",
"\treturn save_folder ,save_basefilename\n",
"\n",
"def _upload_file_to_stage(p_local_dir: str ,p_target_stage: str ,p_stage_dir: str,p_session: Session):\n",
"\tfiles_count = 0\n",
"\tfor path, subdirs, files in os.walk(p_local_dir):\n",
"\t\t\tfl_names = [ name for name in files if '.parquet.gz' in name ]\n",
"\t\t\tfiles_count += len(fl_names)\n",
"\t\t\t\n",
"\t# get the list of folders where parquet files are present\n",
"\tdata_dirs = { path for path, subdirs, files in os.walk(p_local_dir) for name in files if '.parquet.gz' in name }\n",
" \n",
"\tfor idx, parquet_dir in enumerate(data_dirs):\n",
"\t\t\t\n",
"\t\t\t# build the path to where the file will be staged\n",
"\t\t\tstage_dir = parquet_dir.replace(p_local_dir , p_stage_dir)\n",
"\n",
"\t\t\t# print(f' {p_local_dir} => @{p_target_stage}{stage_dir}')\n",
"\t\t\tp_session.file.put(\n",
"\t\t\t\t\tlocal_file_name = f'{parquet_dir}/*.parquet.gz'\n",
"\t\t\t\t\t,stage_location = f'{p_target_stage}/{stage_dir}/'\n",
"\t\t\t\t\t,auto_compress=False ,overwrite=True ,parallel=20 )\n",
"\t\n",
"\treturn True\n",
"\n",
"@F.sproc(name='batch_parse_and_extract_lazmetadata_sproc', \n",
" packages=[\"pandas\", 'laspy' ,\"lazrs-python\" ,'snowflake-snowpark-python'\n",
" ,'pyproj' ,'shapely', 'joblib']\n",
" ,is_permanent=True, \n",
" replace=True,\n",
" stage_location='@code_stg', \n",
" session=sp_session\n",
" )\n",
"def parse_laz_extract_metadata_sproc(p_session: Session ,p_processing_bucket: int ,p_batch_count: int) -> dict:\n",
" \n",
"\tret = {'total_processed_count' : 0}\n",
"\ttotal_file_count ,total_files_processed_count ,metadata_results = \\\n",
"\t\t_extract_metadata_of_files(p_processing_bucket ,p_batch_count ,p_session)\n",
"\tret['step'] = 1\n",
"\tret['total_file_count'] = total_file_count\n",
"\tret['total_processed_count'] = total_files_processed_count\n",
"\n",
"\tif total_file_count >= 1:\n",
"\t\tsave_folder = f'/tmp/lidar_metadata_{p_processing_bucket}'\n",
"\t\t_save_to_parquet(p_processing_bucket ,metadata_results ,'' ,save_folder)\n",
"\t\t_upload_file_to_stage(save_folder ,'@code_stg' \n",
"\t\t\t\t,f'data/lidar_metadata_{p_processing_bucket}'\n",
"\t\t\t\t,p_session)\n",
"\t\tret['step'] = 2\n",
"\t\n",
"\treturn ret"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Demo run the calculation for a specific processing bucket\n",
"# This code is for defined here for quick testing for a specific bucket\n",
"\n",
"# sp_session.sql('''alter warehouse venkat_compute_wh set warehouse_size = XSMALL ''').collect()\n",
"\n",
"# start_timer = datetime.now()\n",
"# print(f'Started at: {start_timer}')\n",
"# sql_stmt = f'''\n",
"# call batch_parse_and_extract_lazmetadata_sproc(1 ,10) \n",
"# '''\n",
"# spdf = sp_session.sql(sql_stmt)\n",
"# r = spdf.collect()\n",
"# end_timer = datetime.now()\n",
"# elapsed = int( (end_timer - start_timer).total_seconds() )\n",
"# print(f'Elapsed : {elapsed} seconds')\n",
"\t\n",
"# sp_session.sql('''alter warehouse venkat_compute_wh set warehouse_size = XSMALL ''').collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Asych job\n",
"\n",
"In this section, we formulate the steps to parallelize the workload. We will be invoking the stored procedure 'batch_parse_and_extract_lazmetadata_sproc' using [Asynch Job](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/async_job). As of today, even if we call 1 or more instance of the stored procedure will end up using only 1 node in a multi-cluster warehouse. Hence setting all the stored procedure in a single node will not be effective and will result in constraining the WH.\n",
"\n",
"So instead we create multiple warehouses and distribute the workload against each of these. For this we also take a steps to map / allocate groups of buckets to warehouse.In this demo we will running 17 instance on one warehouse (WH_1) and another 17 on warehouse (WH_2) and so on so forth.\n",
"\n",
"Once this map has been defined, we invoke the jobs."
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"# Define a function that will iterate a given list of bucketid and invoke them\n",
"\n",
"import time\n",
"from typing import List\n",
"\n",
"def _invoke_asynch_jobs(p_session: Session ,p_warehouse: str ,p_processing_buckets) -> dict:\n",
" query_ids = []\n",
" jobs = []\n",
"\n",
" # ensure to switch the warehouse of choice\n",
" # p_session.use_warehouse(p_warehouse)\n",
"\n",
" for bucked_id in p_processing_buckets:\n",
" sql_stmt = f'call batch_parse_and_extract_lazmetadata_sproc({bucked_id} ,10)'\n",
" q_id = p_session.sql(sql_stmt).collect_nowait().query_id\n",
" async_job = p_session.create_async_job(q_id)\n",
" \n",
" # Keep a pointer of queryids for potential later introspection\n",
" query_ids.extend([q_id])\n",
"\n",
" # Keeping track of job ids is crucial, as we need to wait for completion\n",
" jobs.extend([async_job])\n",
"\n",
" # Interleave the jobs with some time between them\n",
" time.sleep(5)\n",
"\n",
" return query_ids ,jobs\n",
"\t\n",
"def _wait_for_job_completion(p_session ,p_jobs):\n",
" sleep_time_multiplier_minutes = 10 #arbitrary time in minutes (10 times 60 seconds) to wait for and check for job completions\n",
" multipliers = [sleep_time_multiplier_minutes]\n",
" total_job_count = len(p_jobs)\n",
" completed_jobs_count = 0\n",
" while (completed_jobs_count < total_job_count):\n",
" time.sleep(sleep_time_multiplier_minutes * 60)\n",
"\n",
" # Count the number of jobs that has completed\n",
" completed_jobs_count = len( [j for j in p_jobs \n",
" if j.is_done() == True] )\n",
" \n",
" # reduce the sleeping cycle count, for faster verification\n",
" if (completed_jobs_count > 0):\n",
" # need to ensure that we do this verification not below 30 seconds\n",
" seconds_multiplier = max((10/completed_jobs_count) ,0.5)\n",
" multipliers.extend([seconds_multiplier])\n",
" \n",
" return multipliers\n",
"\n",
"# This step of wrapping the call into a stored procedure is not available today, hopefully an \n",
"# approach like this can be possible in the future. \n",
"# @F.sproc(name='invoke_metadataextractorjobs_for_warehouse_sproc', \n",
"# packages=['snowflake-snowpark-python']\n",
"# ,is_permanent=True, \n",
"# replace=True,\n",
"# stage_location='@code_stg', \n",
"# session=sp_session\n",
"# )\n",
"# def invoke_metadataextractorjobs_for_warehouse_sproc(p_session: Session ,p_warehouse: str ,p_processing_buckets: str) -> dict:\n",
"# ret = {'total_processed_count' : 0}\n",
"\n",
"# processing_buckets = [int(x) for x in p_processing_buckets.split(',')]\n",
"# query_ids ,jobs = _invoke_asynch_jobs(p_session ,p_warehouse ,processing_buckets)\n",
"# ret['query_ids'] = query_ids\n",
"# ret['sleep_time_multipliers'] = _wait_for_job_completion(p_session ,jobs)\n",
"\n",
"# return ret"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Will be creating warhouse : venkat_compute_wh_1 \n",
"Will be creating warhouse : venkat_compute_wh_2 \n",
"Will be creating warhouse : venkat_compute_wh_3 \n",
"Will be creating warhouse : venkat_compute_wh_4 \n",
"Will be creating warhouse : venkat_compute_wh_5 \n"
]
}
],
"source": [
"# Create warehouses for parallel processing\n",
"\n",
"processing_warehouses = ['venkat_compute_wh']\n",
"warehouse_count = 5\n",
"sql_stmts = [\n",
" 'use role sysadmin'\n",
"]\n",
"\n",
"dev_role = 'venkat_app_dev'\n",
"for i in range(1,warehouse_count + 1):\n",
" wh_name = f'venkat_compute_wh_{i}'\n",
" print(f'Will be creating warhouse : {wh_name} ')\n",
" s = f'''\n",
" create or replace warehouse {wh_name}\n",
" with warehouse_size='x-small'\n",
" max_cluster_count = 1\n",
" min_cluster_count = 1\n",
" scaling_policy = economy\n",
" auto_suspend = 600\n",
" auto_resume = true\n",
" initially_suspended = false'''\n",
" \n",
" g1 = f'''grant all privileges on warehouse {wh_name}\n",
" to role {dev_role}'''\n",
" \n",
" g2 = f'''grant usage on warehouse {wh_name}\n",
" to role {dev_role}'''\n",
" \n",
" sql_stmts.extend([s ,g1 ,g2])\n",
" processing_warehouses.extend([wh_name])\n",
"\n",
"sql_stmts.extend(['use role venkat_app_dev '])\n",
"\n",
"for sql_stmt in sql_stmts:\n",
" sp_session.sql(sql_stmt).collect()"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" The warehouse VENKAT_COMPUTE_WH will be processing the following buckets: \n",
"\t 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17 \n",
" The warehouse VENKAT_COMPUTE_WH_1 will be processing the following buckets: \n",
"\t 18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34 \n",
" The warehouse VENKAT_COMPUTE_WH_2 will be processing the following buckets: \n",
"\t 35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51 \n",
" The warehouse VENKAT_COMPUTE_WH_3 will be processing the following buckets: \n",
"\t 52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68 \n",
" The warehouse VENKAT_COMPUTE_WH_4 will be processing the following buckets: \n",
"\t 69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84 \n",
" The warehouse VENKAT_COMPUTE_WH_5 will be processing the following buckets: \n",
"\t 85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100 \n"
]
}
],
"source": [
"# calculate Bucket to warehouse allocation\n",
"\n",
"import numpy as np \n",
"\n",
"total_bucket_count = 100\n",
"buckets = range(1, total_bucket_count + 1)\n",
"warehouse_count = len(processing_warehouses)\n",
"\n",
"# Split the buckets into equal count / warehouse and allocate\n",
"bucket_splits = np.array_split(buckets, warehouse_count)\n",
"\n",
"warehouse_to_bucket_allocation = [\n",
" (processing_warehouses[idx] ,s)\n",
" # (processing_warehouses[idx] ,','.join(map(str, s)))\n",
" for idx ,s in enumerate(bucket_splits)\n",
"]\n",
"\n",
"for wh ,processing_buckets in warehouse_to_bucket_allocation:\n",
" s = ','.join(map(str,processing_buckets))\n",
" print(f' The warehouse {wh.upper()} will be processing the following buckets: \\n\\t {s} ')\n"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" Starting (asynch) jobs on venkat_compute_wh for buckets: \n",
"\t [ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17] \n",
" Starting (asynch) jobs on venkat_compute_wh_1 for buckets: \n",
"\t [18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34] \n",
" Starting (asynch) jobs on venkat_compute_wh_2 for buckets: \n",
"\t [35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51] \n",
" Starting (asynch) jobs on venkat_compute_wh_3 for buckets: \n",
"\t [52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68] \n",
" Starting (asynch) jobs on venkat_compute_wh_4 for buckets: \n",
"\t [69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84] \n",
" Starting (asynch) jobs on venkat_compute_wh_5 for buckets: \n",
"\t [ 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100] \n"
]
}
],
"source": [
"# Invoke proc that does the asynch process\n",
"\n",
"query_ids = []\n",
"jobs = []\n",
"for wh ,processing_buckets in warehouse_to_bucket_allocation:\n",
" print(f' Starting (asynch) jobs on {wh} for buckets: \\n\\t {processing_buckets} ')\n",
" sp_session.use_warehouse(wh)\n",
" l_query_ids ,l_jobs = _invoke_asynch_jobs(sp_session ,wh ,processing_buckets)\n",
" query_ids.extend([l_query_ids])\n",
" jobs.extend(l_jobs)"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[10, 0.5]"
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Wait for job completions\n",
"\n",
"_wait_for_job_completion(sp_session ,jobs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Querying parsed metadata\n",
"\n",
"We now load the parquet files, which contains the metadata into a table; which then can be used for\n",
"downstream data processing needs."
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"# define table and copy data into table\n",
"\n",
"sql_stmts = [\n",
" # create a table where the data will be copied into\n",
" '''create or replace transient table parsed_lidar_metadata (\n",
"\t\tfile_id varchar\n",
"\t\t,parsed_metadata variant\n",
"\t)'''\n",
" \n",
"\t# define a file format\n",
"\t,'''create or replace file format ff_parquet\n",
" \t\t\ttype = parquet'''\n",
" \n",
"\t# copy the data into the table\n",
" ,'''copy into parsed_lidar_metadata\n",
"\t\t\t\tfrom @code_stg/data\n",
"\t\t\t\tpattern = '.*parquet.gz' \n",
"\t\t\t\tfile_format = ( format_name = ff_parquet ) \n",
"\t\t\t\tmatch_by_column_name = case_insensitive'''\n",
"]\n",
"for sql_stmt in sql_stmts:\n",
" sp_session.sql(sql_stmt).collect()"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-----------------------------------------------------------------------------------------------------------\n",
"|\"FILE_ID\" |\"PARSED_METADATA\" |\n",
"-----------------------------------------------------------------------------------------------------------\n",
"|USGS_LPC_TX_Panhandle_B10_2017_LAS_2019/ept-dat... |{ |\n",
"| | \"crs\": 3857, |\n",
"| | \"date\": \"2019-09-25\", |\n",
"| | \"dimensions\": [ |\n",
"| | \"X\", |\n",
"| | \"Y\", |\n",
"| | \"Z\", |\n",
"| | \"intensity\", |\n",
"| | \"return_number\", |\n",
"| | \"number_of_returns\", |\n",
"| | \"scan_direction_flag\", |\n",
"| | \"edge_of_flight_line\", |\n",
"| | \"classification\", |\n",
"| | \"synthetic\", |\n",
"| | \"key_point\", |\n",
"| | \"withheld\", |\n",
"| | \"scan_angle_rank\", |\n",
"| | \"user_data\", |\n",
"| | \"point_source_id\", |\n",
"| | \"gps_time\" |\n",
"| | ], |\n",
"| | \"error\": \"\", |\n",
"| | \"file_id\": \"USGS_LPC_TX_Panhandle_B10_2017_LA... |\n",
"| | \"fileread_elapsed_secs\": 0, |\n",
"| | \"iteration\": 0, |\n",
"| | \"iteration_elapsed_secs\": 1, |\n",
"| | \"iteration_row_idx\": 1, |\n",
"| | \"point_count\": 46504, |\n",
"| | \"point_format_id\": 1, |\n",
"| | \"process_elapsed_mins\": 0 |\n",
"| |} |\n",
"|USGS_LPC_TX_Panhandle_B10_2017_LAS_2019/ept-dat... |{ |\n",
"| | \"crs\": 3857, |\n",
"| | \"date\": \"2019-09-25\", |\n",
"| | \"dimensions\": [ |\n",
"| | \"X\", |\n",
"| | \"Y\", |\n",
"| | \"Z\", |\n",
"| | \"intensity\", |\n",
"| | \"return_number\", |\n",
"| | \"number_of_returns\", |\n",
"| | \"scan_direction_flag\", |\n",
"| | \"edge_of_flight_line\", |\n",
"| | \"classification\", |\n",
"| | \"synthetic\", |\n",
"| | \"key_point\", |\n",
"| | \"withheld\", |\n",
"| | \"scan_angle_rank\", |\n",
"| | \"user_data\", |\n",
"| | \"point_source_id\", |\n",
"| | \"gps_time\" |\n",
"| | ], |\n",
"| | \"error\": \"\", |\n",
"| | \"file_id\": \"USGS_LPC_TX_Panhandle_B10_2017_LA... |\n",
"| | \"fileread_elapsed_secs\": 0, |\n",
"| | \"iteration\": 0, |\n",
"| | \"iteration_elapsed_secs\": 1, |\n",
"| | \"iteration_row_idx\": 2, |\n",
"| | \"point_count\": 17916, |\n",
"| | \"point_format_id\": 1, |\n",
"| | \"process_elapsed_mins\": 0 |\n",
"| |} |\n",
"-----------------------------------------------------------------------------------------------------------\n",
"\n"
]
}
],
"source": [
"# show some sample records\n",
"\n",
"spdf = sp_session.table('parsed_lidar_metadata')\n",
"spdf.show(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## Finished"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Finished!!\n"
]
}
],
"source": [
"print('Finished!!')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venkat_env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment