Skip to content

Instantly share code, notes, and snippets.

@samisalkosuo
Created November 26, 2025 07:04
Show Gist options
  • Select an option

  • Save samisalkosuo/1d160aba4c491570fb87cca871d0d677 to your computer and use it in GitHub Desktop.

Select an option

Save samisalkosuo/1d160aba4c491570fb87cca871d0d677 to your computer and use it in GitHub Desktop.
from streamsets.sdk import ControlHub
import os
import sys
import json
STREAMSETS_CREDENTIAL_ID = os.environ.get("STREAMSETS_CREDENTIAL_ID", "<default-credential-id>")
STREAMSETS_TOKEN = os.environ.get("STREAMSETS_TOKEN", "<default-token>")
STREAMSETS_ENGINE_ID = os.environ.get("STREAMSETS_ENGINE_ID", "<default-engine-id>")
STREAMSETS_PIPELINE_NAME = os.environ.get("STREAMSETS_PIPELINE_NAME", "Lab - Drone Ops - MQTT to Kafka")
ASTRADB_API_ENDPOINT = os.environ.get("ASTRADB_API_ENDPOINT", "<astradb-api-endpoint>")
ASTRADB_KEYSPACE_NAME = os.environ.get("ASTRADB_KEYSPACE_NAME", "default_keyspace")
ASTRADB_COLLECTION_NAME = os.environ.get("ASTRADB_COLLECTION_NAME", "drone_telemetry")
ASTRADB_APPLICATION_TOKEN = os.environ.get("ASTRADB_APPLICATION_TOKEN", "<AstraCS:token>")
MQTT_BROKER_URL = "tcp://mqtt.lab-mqtt.svc.cluster.local:1883"
#update commit message from argument if available
try:
COMMIT_MESSAGE = sys.argv[1]
except IndexError:
COMMIT_MESSAGE = "updated"
sch = ControlHub(credential_id=STREAMSETS_CREDENTIAL_ID, token=STREAMSETS_TOKEN)
#print(sch)
try:
#get pipeline
pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME)
print(f"Creating new version of pipeline: {pipeline}")
except ValueError:
#create pipeline
engine = sch.engines.get(id=STREAMSETS_ENGINE_ID)
pipeline_builder = sch.get_pipeline_builder(engine_type='data_collector', engine_id=engine.id)
pipeline = pipeline_builder.build(STREAMSETS_PIPELINE_NAME)
sch.publish_pipeline(pipeline, commit_message='created')
#get newly created pipeline
pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME)
print(f"Pipeline created: {pipeline}")
#print(pipeline)
#stages
stages = pipeline.stages
#print(stages)
#remove all stages
for stage in stages:
pipeline.remove_stages(stage)
#create origin stage
origin_mqtt_stage = pipeline.add_stage(label = 'MQTT Subscriber', type='origin')
#origin_mqtt_stage.description("Drone telemetry raw data")
#print(origin_mqtt_stage.configuration)
origin_mqtt_stage.data_format = "JSON"
origin_mqtt_stage.broker_url = MQTT_BROKER_URL
origin_mqtt_stage.topic_filter = ["drone.telemetry"]
origin_mqtt_stage.clean_session = True
origin_mqtt_stage.label = "Drone telemetry from MQTT"
#help(origin_mqtt_stage)
#print(origin_mqtt_stage.client_id)
#field remover stage
field_remover_stage = pipeline.add_stage(label = 'Field Remover')
field_remover_stage.label = "Remove unnecessary fields"
#field_remover_stage.action = "Remove Listed Fields"
field_remover_stage.fields = ["/power",
"/camera",
"/imu",
"/magnetometer_uT",
"/gnss",
"/env",
"/motors",
"/cpu_temp_c",
"/link_quality_pct",
"/barometer_hpa",
"/airspeed_mps",
"/battery_pct"
]
#help(field_remover_stage)
#field renamer stage
field_renamer_stage = pipeline.add_stage(label = 'Field Renamer')
field_renamer_stage.label = "Rename fields"
#help(field_renamer_stage)
field_renamer_stage.fields_to_rename = [
{"fromFieldExpression": "/ts", "toFieldExpression": "/event_time"},
{"fromFieldExpression": "/alt_m", "toFieldExpression": "/altitude"},
{"fromFieldExpression": "/lat", "toFieldExpression": "/latitude"},
{"fromFieldExpression": "/lon", "toFieldExpression": "/longitude"},
{"fromFieldExpression": "/speed_mps", "toFieldExpression": "/speed"},
{"fromFieldExpression": "/heading_deg", "toFieldExpression": "/heading"}
]
#field type converter
field_type_converter_stage = pipeline.add_stage(label = 'Field Type Converter')
field_type_converter_stage.label = "Convert types"
field_type_converter_stage.conversions_by_field_name = [
{
"fields": [
"/event_time"
],
"targetType": "DATETIME",
"dateFormat": "OTHER",
"otherDateFormat": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
}
]
#create stream selector stage
match_patrol_predicate = "${str:matches(record:value('/insertOne/document/phase'), \"patrol\")}"
default_predicate = "default"
stream_selector_stage = pipeline.add_stage(label = 'Stream Selector')
stream_selector_stage.label = "Check drone phase, discard non-patrol"
stream_selector_stage.predicates = [
match_patrol_predicate,
default_predicate
]
#discard destination
discard_stage = pipeline.add_stage(label = 'Trash', type='destination')
discard_stage.label = "Discard"
#trash
#trash_stage = pipeline.add_stage(label = 'Trash', type='destination')
#add Astra DB REST destination
destination_astradb_stage = pipeline.add_stage(label = 'HTTP Client', type='destination')
destination_astradb_stage.label = "Drone telemetry to Astra DB"
destination_astradb_stage.resource_url = f"{ASTRADB_API_ENDPOINT}/api/json/v1/{ASTRADB_KEYSPACE_NAME}/{ASTRADB_COLLECTION_NAME}"
destination_astradb_stage.headers = [
{
"key": "Token",
"value": f"{ASTRADB_APPLICATION_TOKEN}"
},
{
"key": "Content-Type",
"value": "application/json"
}
]#help(destination_astradb_stage)
#stream selector stage (discard all records not in "patrol" phase)
#${str:matches(record:value('/transaction_type'), "deposit") || str:matches(record:value('/transaction_type'), "withdrawal")}
#help(field_type_converter_stage)
#create dev identity stages for checking message in control plane GUI
dev_identity_1_stage = pipeline.add_stage(label = 'Dev Identity')
dev_identity_1_stage.label = "Dev Identity 1"
dev_identity_2_stage = pipeline.add_stage(label = 'Dev Identity')
dev_identity_2_stage.label = "Dev Identity 2"
#connect stream selector to destination
# Check where in the list of predicates the condition is
predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == match_patrol_predicate))
# Connect the stream_selector stage to the stage
stream_selector_stage.connect_outputs(stages=[dev_identity_1_stage], output_lane_index=predicate_index)
#discard all but patrol messages
# Check where in the list of predicates the condition is
predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == default_predicate))
# Connect the stream_selector stage to the stage
stream_selector_stage.connect_outputs(stages=[dev_identity_2_stage], output_lane_index=predicate_index)
#help(stream_selector_stage)
#javascript stage
create_astradb_json_stage = pipeline.add_stage(label = 'JavaScript Evaluator')
create_astradb_json_stage.label = "Create JSON for Astra DB"
create_astradb_json_stage.script = """
var records = sdc.records;
for(var i = 0; i < records.length; i++) {
try {
var documentMap = {};
documentMap["drone_id"] = records[i].value['drone_id'];
documentMap["event_time"] = records[i].value['event_time'];
documentMap["phase"] = records[i].value['phase'];
documentMap["altitude"] = records[i].value['altitude'];
documentMap["latitude"] = records[i].value['latitude'];
documentMap["longitude"] = records[i].value['longitude'];
documentMap["speed"] = records[i].value['speed'];
documentMap["heading"] = records[i].value['heading'];
var insertOneMap = {'insertOne': {'document':documentMap}}
records[i].value['insertOne'] = insertOneMap;
var newRecord = sdc.createRecord(records[i].sourceId + '-insertOne');
var newRecordFields = sdc.createMap(false);
newRecord.value = insertOneMap;
sdc.output.write(newRecord);
} catch (e) {
// Send record to error
sdc.error.write(records[i], e);
}
}
"""
#help(create_astradb_json_stage)
#connect stages
origin_mqtt_stage >> field_remover_stage
field_remover_stage >> field_renamer_stage
field_renamer_stage >> field_type_converter_stage
field_type_converter_stage >> create_astradb_json_stage
create_astradb_json_stage >> stream_selector_stage
dev_identity_1_stage >> destination_astradb_stage
dev_identity_2_stage >> discard_stage
#stream_selector_stage >> trash_stage
sch.publish_pipeline(pipeline, commit_message = COMMIT_MESSAGE)
print("Pipeline published.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment