Skip to content

Instantly share code, notes, and snippets.

@stahnma
Created June 25, 2025 21:02
Show Gist options
  • Select an option

  • Save stahnma/b1949063c8dbc804d26269a32bc4a9ed to your computer and use it in GitHub Desktop.

Select an option

Save stahnma/b1949063c8dbc804d26269a32bc4a9ed to your computer and use it in GitHub Desktop.
data_dir: ./data
sources:
stdin:
type: stdin
decoding:
codec: json
mode: multiline
# json_files:
# type: file
# include:
# - "/Users/stahnma/go/src/github.com/stahnma/catalogservice-event-parser/events/2025/02/**/*.json"
# read_from: beginning
# ignore_older: 9999999 # ensure all files are read
# fingerprinting:
# strategy: device_and_inode
# ignored_header_bytes: 0
# read_once: true
# encoding:
# charset: utf-8
transforms:
remap:
type: remap
inputs: [stdin]
source: |
.ts_str = to_string!(.timestamp) + "Z"
.parsed_ts = parse_timestamp!(.ts_str, "%+")
.route_string = to_string!(.route)
.route_parts = []
.catalog_op = null
for_each(split(.route_string, "/")) -> |_, segment| {
if is_string(segment) && !is_empty(segment) && !starts_with(segment, "{") {
push(.route_parts, segment)
.catalog_op = segment
}
}
. = {
"event": .event,
"timestamp": .parsed_ts,
"weekday": format_timestamp!(.parsed_ts, "%A", "UTC"),
"date": upcase(format_timestamp!(.parsed_ts, "%Y-%b-%d")),
"uuid": .uuid,
"route": .route,
"flox_ci": exists(.request_header_map."flox-ci"),
"user_agent": .request_header_map."user-agent",
"ip": parse_regex!(.request_header_map."forwarded", r'for=(?P<ip>\d{1,3}(?:\.\d{1,3}){3})').ip,
"method": .request_scope.method,
"raw_path": .request_scope.raw_path,
"query_string": .request_scope.query_string,
"monitor": contains!(.request_header_map."user-agent", "synthetic-monitoring-agent"),
"flox_device_uuid": .request_header_map."flox-device-uuid",
"catalog_operation": .catalog_op
}
del(.parsed_ts)
del(.route_string)
del(.route_parts)
del(.seg_str)
sinks:
# console:
# type: console
# inputs: [remap]
# target: stdout
# encoding:
# codec: json
# pretty: true
#file:
# type: file
# inputs: [remap]
# path: ./output.json
# encoding:
# codec: json
# pretty: true
postgres_out:
type: postgres
inputs: [remap]
endpoint: "postgresql://pguser:pgpass@localhost:15432/pgdb"
table: catalog_events
batch:
max_events: 500
timeout_secs: 10
acknowledgements:
enabled: true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment