Skip to content

Instantly share code, notes, and snippets.

@dumkydewilde
Last active August 27, 2025 08:47
Show Gist options
  • Select an option

  • Save dumkydewilde/59ae86e933fe5825730b6918264eb07d to your computer and use it in GitHub Desktop.

Select an option

Save dumkydewilde/59ae86e933fe5825730b6918264eb07d to your computer and use it in GitHub Desktop.
Custom dbt materialization for Snowflake Streamlit Apps
{% materialization streamlit, adapter='snowflake', supported_languages = ['python'] %}
{% set original_query_tag = set_query_tag() %}
{% set identifier = model['alias'] %}
{% set app_name = config.get('app_name', identifier) %}
{% set stage_name = config.get('stage', 'streamlit_apps') %}
{% set create_stage = config.get('create_stage', true) %}
{% set stage_id = [database, schema, stage_name] | join('.') %}
{% set include_files = config.get('include_files', []) %}
{% if config.get('include_manifest', true) %}
{{ include_files.append('target/manifest.json') }}
{% endif %}
{%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table'
) -%}
{% set main_file = config.get('main_file', model.path) %}
{{ run_hooks(pre_hooks) }}
{% if existing_relation is not none and flags.FULL_REFRESH %}
{% call statement('drop_statement') %}
{{ log("Dropping app and stage", info=True)}}
DROP STREAMLIT IF EXISTS {{ target_relation }};
DROP STAGE IF EXISTS {{ stage_id }};
{% endcall %}
{% endif %}
{% if create_stage %}
{% call statement('create_stage') %}
{{ log('Creating stage: ' ~ stage_id, info=True) }}
CREATE OR ALTER STAGE {{ stage_id }}
DIRECTORY = (ENABLE = TRUE)
COMMENT = 'Stage for Streamlit application: {{ app_name }}';
{% endcall %}
{% endif %}
{% call statement('upload_files') %}
USE {{ database }}.{{ schema }};
PUT 'file://{{ model.original_file_path }}' @{{ stage_id }}/{{ app_name }}
OVERWRITE = TRUE
AUTO_COMPRESS = FALSE;
{% for file in config.get('include_files', []) %}
{% set dest = ([stage_name, app_name] + file.split('/')[:-1]) | join('/') %}
{% if 'environment.yml' in file %}
{% set dest = [stage_name, app_name, 'environment.yml'] | join('/') %}
{% endif %}
{{ log('Uploading file: ' ~ file ~ ' to: ' ~ database ~ '.' ~ schema ~'.' ~ dest, info=True) }}
PUT 'file://{{ file }}' @{{ dest }}
OVERWRITE = TRUE
AUTO_COMPRESS = FALSE;
{% endfor %}
{% for file in config.get('pages', []) %}
{% set dest = [stage_name, app_name, 'pages'] | join('/') %}
{{ log('Uploading page: ' ~ file ~ ' to: ' ~ database ~ '.' ~ schema ~'.' ~ dest, info=True) }}
PUT 'file://{{ file }}' @{{ dest }}
OVERWRITE = TRUE
AUTO_COMPRESS = FALSE;
{% endfor %}
{% endcall %}
{% call statement('main') %}
CREATE OR REPLACE STREAMLIT {{ target_relation }}
FROM @{{ stage_id }}/{{ app_name }}
MAIN_FILE = '{{ model.original_file_path.split("/") | last | replace("/", "") }}'
QUERY_WAREHOUSE = '{{ config.get("warehouse", target.warehouse) }}';
{% endcall %}
{{ run_hooks(post_hooks) }}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
models:
- name: source_freshness_dashboard
config:
materialized: streamlit
include_files:
- app_data/utils.py
pages:
- app_data/pages/other_documentation_page.py
stage: my_streamlit_apps
import streamlit as st
from app_data.utils import get_relation_name_from_manifest, get_or_create_session
session = get_or_create_session()
st.set_page_config(
page_title="Customer Example Page",
page_icon="🔭",
layout="wide"
)
customer_count = session.sql(f"select count(*) as ct from {get_relation_name_from_manifest('dim_customer')}").collect()
st.metric(label="Total Customers", value=customer_count[0]['CT'])
def model(dbt, session):
return pd.Dataframe()
import json, os
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import SnowparkSessionException
def snowpark_sso_connection() -> Session:
connection_parameters = {
"ACCOUNT": os.getenv('SNOWFLAKE_ACCOUNT'),
"USER": os.getenv('SNOWFLAKE_USER'),
"AUTHENTICATOR": "externalbrowser",
}
session = Session.builder.configs(connection_parameters).create()
return session
def get_or_create_session() -> Session:
try:
return get_active_session() # Should be availabe in the remote Streamlit environment
except SnowparkSessionException:
os.environ['LOCAL_SNOWPARK'] = '1'
return snowpark_sso_connection() # Use this for local development
def get_manifest(manifest_path: str = "target/manifest.json"):
try:
with open(manifest_path, 'r') as f:
manifest = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"manifest.json not found at {manifest_path}. Has dbt been run?")
return manifest
def get_relation_name_from_manifest(model_name):
"""
Get the relation_name for a model from the manifest.json file.
Args:
model_name (str): The name of the model (without project prefix)
Returns:
str: The relation_name for the model
"""
manifest = get_manifest()
# Look for the model in the nodes section
for node_id, node in manifest['nodes'].items():
# Check if this is a model node
if node_id.startswith('model.') and node['name'] == model_name:
return node.get('relation_name')
# If we get here, the model wasn't found
raise ValueError(f"Model '{model_name}' not found in the manifest")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment