Skip to content

Instantly share code, notes, and snippets.

@samisalkosuo
Last active January 20, 2026 11:38
Show Gist options
  • Select an option

  • Save samisalkosuo/d5494aefc4df2f7a691906cf09fbbde8 to your computer and use it in GitHub Desktop.

Select an option

Save samisalkosuo/d5494aefc4df2f7a691906cf09fbbde8 to your computer and use it in GitHub Desktop.
Sample Spark-application running on IBM watsonx.data and decorated with IBM Databand SDK.
# %% [markdown]
# Drone Ops raw data from PostgreSQL to watsonx.data
#
# Uses Databand. Code uses Databand Python SDK to decorate with Databand tracking.
#
# See https://www.ibm.com/docs/en/dobd?topic=python-tracking-functions.
#
# See also Databand docs https://www.ibm.com/docs/en/dobd?topic=integrations-code-based-workflows.
# %%
#variables
APP_NAME = "drone-ops-bronze-to-silver-1min"
JOB_NAME = "Lab - Drone Ops - Bronze To Silver (1min)"
SCHEMA_NAME = "drone_swarm_ops"
CATALOG_NAME = "lab_catalog"
BUCKET_NAME = "lab-bucket"
JDBC_URL = "jdbc:postgresql://postgres.lab-postgres.svc.cluster.local:5432/drone_swarm_ops"
JDBC_PROPS = {"user":"demo", "password":"passw0rd", "driver":"org.postgresql.Driver"}
#imports
from pyspark.sql import SparkSession
import os
from datetime import datetime, timedelta
import time
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col, to_utc_timestamp
import base64,getpass
import warnings
from dbnd import dbnd_tracking, task, dataset_op_logger, log_metric, log_dataframe
warnings.filterwarnings('ignore')
# %%
#init Spark
#spark config from watsonx.data Infrastructure Manager Spark engines Default Spark configuration
spark_config="""
ae.spark.databand.access_token=eyJtlbiJ9fQ.BjdOnb94
ae.spark.databand.url=https://ibm-sales-sandbox.databand.ai/
spark.wxd.api.endpoint=https://lhconsole-api-svc.cpd.svc.cluster.local:3333
"""
# Parse non-empty, non-comment lines into key/value pairs
spark_conf_lines = [
line.strip().split("=", 1)
for line in spark_config.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# %%
LASTTIMESTAMP_TABLE_NAME="lasttimestamp"
#limit telemetry read, use empty string to disabe
LIMIT=" limit 100000"
@task(task_family="Spark initialization")
def init_spark():
builder = SparkSession.builder
for k, v in spark_conf_lines:
builder = builder.config(k.strip(), v.strip())
spark = builder.appName(APP_NAME).enableHiveSupport().getOrCreate()
return spark
#functions
@task(task_family=f"Create database {CATALOG_NAME}.{SCHEMA_NAME} if not exists")
def create_database(spark):
spark.sql(f"create database if not exists {CATALOG_NAME}.{SCHEMA_NAME} LOCATION 's3a://{BUCKET_NAME}/'")
@task(task_family=f"Create table {LASTTIMESTAMP_TABLE_NAME} if not exists")
def create_lasttimestamp_table(spark):
spark.sql(f"create table if not exists {CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME} (last_timestamp timestamp) using iceberg")
@task(task_family=f"Create table drone_telemetry_1min if not exists")
def create_dronetelemetry1min_table(spark):
# 1-minute aggregate table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min (
bucket TIMESTAMP, -- window start (UTC minute)
drone_id STRING,
lat_last DOUBLE,
lon_last DOUBLE,
alt_last DOUBLE,
speed_avg DOUBLE,
speed_max DOUBLE,
airspeed_avg DOUBLE,
heading_avg DOUBLE,
battery_avg DOUBLE,
battery_min DOUBLE,
hdop_avg DOUBLE,
sats_avg DOUBLE,
motor_temp_avg DOUBLE,
motor_temp_max DOUBLE,
link_quality_avg DOUBLE,
cam_active_samples BIGINT,
cam_thermal_samples BIGINT,
samples BIGINT
)
USING iceberg
""")
# PARTITIONED BY (days(bucket)) -- after USING iceberg
#note: as of 10.10.2025 PARTITIONED BY (days(bucket)) is not supported in watsonx.data
@task(task_family=f"Get raw drone telemetry data from PostgresQL")
def get_drone_telemetry_from_postgres(spark):
df = (spark.read.format("jdbc")
.option("url", JDBC_URL)
.option("dbtable", "public.drone_telemetry")
.options(**JDBC_PROPS)
.load()
)
unique_file_name = "drone-telemetry-data"
with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=df)
return df
@task(task_family=f"Get latest timestamp")
def getLastTimeStamp(spark):
# Read watermark from a small Delta table
wm = spark.read.format("iceberg").load(f"{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}").first()
last_ts = wm["last_timestamp"] if wm else "1970-01-01T00:00:00Z"
return last_ts
@task(task_family=f"Get drone telemetry since latest timestamp")
def getTelemetrySinceLastTimeStamp(spark,lastTimeStamp):
bronze = (spark.read.format("jdbc")
.option("url", JDBC_URL)
.option("dbtable", f"(select * from public.drone_telemetry where ts > '{lastTimeStamp}' order by ts asc {LIMIT}) as q")
.options(**JDBC_PROPS)
.load()
)
unique_file_name = "drone-telemetry-data-since-latest-timestamp"
with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=bronze)
return bronze
#create 1 minute bucket columns to telemetry data
@task(task_family=f"Create 1 minute bucket columns to telemetry data")
def create1MinuteBucketColumns(spark, droneTelemetryDF):
droneTelemetryDF1Min = droneTelemetryDF.withColumn("bucket", F.window("ts", "1 minute").getField("start"))
#droneTelemetryDF1Min.count()
print("Added 1 minute bucket columns to telemetry data")
# struct-max trick to pick the last lat/lon/alt within each bucket
last_lat_struct = F.max(F.struct("ts", "lat")).alias("s_lat")
last_lon_struct = F.max(F.struct("ts", "lon")).alias("s_lon")
last_alt_struct = F.max(F.struct("ts", "alt_m")).alias("s_alt")
agg = (droneTelemetryDF1Min.groupBy("bucket", "drone_id")
.agg(
last_lat_struct, last_lon_struct, last_alt_struct,
F.avg("speed_mps").alias("speed_avg"),
F.max("speed_mps").alias("speed_max"),
F.avg("airspeed_mps").alias("airspeed_avg"),
F.avg("heading_deg").alias("heading_avg"),
F.avg("battery_pct").alias("battery_avg"),
F.min("battery_pct").alias("battery_min"),
F.avg("gnss_hdop").alias("hdop_avg"),
F.avg("gnss_satellites").alias("sats_avg"),
F.avg("motor_temp_c").alias("motor_temp_avg"),
F.max("motor_temp_c").alias("motor_temp_max"),
F.avg("link_quality_pct").alias("link_quality_avg"),
F.sum(F.col("camera_thermal_active").cast("int")).alias("cam_thermal_samples"),
F.sum(F.col("camera_active").cast("int")).alias("cam_active_samples"),
F.count(F.lit(1)).alias("samples"),
)
.select(
"bucket", "drone_id",
F.col("s_lat").getField("lat").alias("lat_last"),
F.col("s_lon").getField("lon").alias("lon_last"),
F.col("s_alt").getField("alt_m").alias("alt_last"),
"speed_avg","speed_max","airspeed_avg","heading_avg",
"battery_avg","battery_min",
"hdop_avg","sats_avg",
"motor_temp_avg","motor_temp_max",
"link_quality_avg",
"cam_active_samples","cam_thermal_samples","samples"
)
)
print(f"Created aggregated 1 min telemetry data. Aggregated rows: {agg.count()}")
unique_file_name = "drone-telemetry-data-with-1-minute-buckets"
with dataset_op_logger(unique_file_name, "write", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=agg)
# for row in agg.take(1):
# print("----- row -----")
# for col, val in zip(agg.columns, row):
# print(f"{col}: {val}")
#write/merge data to table so that there are no duplicates
agg.createOrReplaceTempView("agg_tmp")
spark.sql(f"""
MERGE INTO {CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min t
USING agg_tmp s
ON t.bucket = s.bucket AND t.drone_id = s.drone_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
#agg.write.format("iceberg").mode("append").save(f"{CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min")
print(f"Wrote {agg.count()} rows of aggregated 1 min telemetry data to table '{CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min'")
# Log writing the Drone telemetry 1 minute buckets
log_dataframe("drone-telemetry-data-with-1-minute-buckets", agg, with_schema=True, with_stats=True)
@task(task_family=f"Update latest telemetry timestamp")
def updateLatestTimeStamp(spark, droneTelemetryDF):
max_ts = droneTelemetryDF.agg(F.max("ts").alias("m")).first()["m"]
print(f"Latest timestamp of telemetry data: {max_ts}")
if max_ts:
df = spark.createDataFrame([(max_ts,)], "last_timestamp timestamp")
df.write.format("iceberg").mode("overwrite").save(f"{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}")
print(f"Stored latest timestamp {max_ts} to '{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}' table")
unique_file_name = "drone-telemetry-data-since-latest-timestamp"
with dataset_op_logger(unique_file_name, "write", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=df)
#main function
def aggregateRawDataTo1MinBuckets():
# Current date and time
#nowISOString = datetime.now().isoformat()
#run_name=f"{nowISOString} - {JOB_NAME}",
with dbnd_tracking(job_name = JOB_NAME,
conf={
"tracking": {
"track_source_code": True
},
"log": {
"preview_head_bytes": 2048,
"preview_tail_bytes": 2048
}
}
):
start_time_milliseconds = int(round(time.time() * 1000))
spark = init_spark()
print("Spark initialized")
create_database(spark)
create_lasttimestamp_table(spark)
create_dronetelemetry1min_table(spark)
print("Database and tables created if they didn't exist")
#get telemetry since last timestamp
lastTimeStamp = getLastTimeStamp(spark)
print(f"Last timestamp: {lastTimeStamp}")
droneTelemetryDF = getTelemetrySinceLastTimeStamp(spark, lastTimeStamp)
telemetryCount = droneTelemetryDF.count()
print(f"Fetched {telemetryCount} rows of telemetry data since {lastTimeStamp}")
create1MinuteBucketColumns(spark, droneTelemetryDF)
updateLatestTimeStamp(spark, droneTelemetryDF)
end_time_milliseconds = int(round(time.time() * 1000))
elapsed_time = end_time_milliseconds - start_time_milliseconds
log_metric('elapsed-time', elapsed_time)
print("Functions declared")
# %%
print("Raw data aggregation start.")
aggregateRawDataTo1MinBuckets()
print("Raw data aggregation end.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment