Skip to content

Instantly share code, notes, and snippets.

@davidgin
Created September 8, 2025 06:28
Show Gist options
  • Select an option

  • Save davidgin/487d6c60bb037466c0aaa5dcb751b106 to your computer and use it in GitHub Desktop.

Select an option

Save davidgin/487d6c60bb037466c0aaa5dcb751b106 to your computer and use it in GitHub Desktop.

I will print the markdown file to the console for you.

πŸ“˜ PySpark Structured Streaming: The Ultimate Cheat Sheet

This comprehensive guide covers the core concepts, common transformations, and advanced operations for building robust and scalable streaming applications with PySpark.

1. Fundamentals: Query Life Cycle

The life of a streaming query follows a simple pattern:

  1. Define a Source: Use spark.readStream to create a streaming DataFrame.

  2. Apply Transformations: Manipulate the data using various DataFrame APIs.

  3. Start a Sink: Use df.writeStream to start the query and write to an output sink.

Note on Modern Formats: For production, Iceberg and Delta Lake are highly recommended as they provide ACID transactions, schema evolution, and other table-management features on top of Parquet files.

πŸ“ Code Template

  1. Read from a source (e.g., file-based JSON) input_df = spark.readStream .format("json") .option("path", "your/input/data") .schema(your_schema) .load()

  2. Apply transformations (e.g., filter and select) transformed_df = input_df .filter("value.status = 'active'") .select("value.id", "value.timestamp")

  3. Write to a sink (e.g., console for debugging) query = transformed_df.writeStream .format("console") .outputMode("append") .option("checkpointLocation", "your/checkpoint/path") .start()

Wait for the query to terminate query.awaitTermination()

2. Basic Transformations (Stateless)

These operations are straightforward and work on each micro-batch independently. They are the same as those for static DataFrames.

Operation Code Example Description
select() df.select("col1", "col2") Selects a subset of columns.
filter() / where() df.filter(df.age > 25) Filters rows based on a condition.
withColumn() df.withColumn("new_col", F.col("old_col") + 1) Adds a new column or replaces an existing one.
drop() df.drop("unwanted_col") Removes a column.
unionAll() df1.unionAll(df2) Merges two streaming DataFrames with a compatible schema.

3. Stateful Operations

These operations require maintaining state across micro-batches. Checkpointing and watermarking are crucial for managing this state.

πŸ“Œ Aggregations

  • Concept: Grouping rows and applying an aggregation function (e.g., count, sum, avg). Requires outputMode("complete") for updating aggregates.

  • Code Example:

from pyspark.sql.functions import count

Count occurrences of a category counts_df = df.groupBy("category").agg(count("*").alias("count"))

query = counts_df.writeStream .format("console") .outputMode("complete") .option("checkpointLocation", "/path/to/counts_checkpoint") .start()

🌊 Windowed Aggregations & Watermarking

  • Concept: Grouping data based on a time window, often used for event-time analysis. Watermarking is used to manage late data and prevent state explosion.

  • Code Example:

from pyspark.sql.functions import window, col

Define a 5-minute watermark on the 'timestamp' column with 10-minute tumbling windows windowed_df = df.withWatermark("timestamp", "5 minutes") .groupBy(window(col("timestamp"), "10 minutes")) .count()

query = windowed_df.writeStream .format("console") .outputMode("append") .option("checkpointLocation", "/path/to/window_checkpoint") .start()

4. Joins

Structured Streaming supports joins between a streaming DataFrame and a static DataFrame, as well as between two streaming DataFrames.

🀝 Stream-Static Joins

  • Concept: Joining a continuous stream of data with a static, finite DataFrame (e.g., a lookup table). The static DataFrame is broadcast to all workers for efficiency.

  • Code Example:

Static lookup table (e.g., user info) users_df = spark.read.json("/path/to/users_info.json")

Join the stream with the static DataFrame joined_df = stream_df.join(users_df, "user_id")

This is a stateless join and supports any output mode query = joined_df.writeStream .format("parquet") .option("path", "/path/to/output") .outputMode("append") .option("checkpointLocation", "/path/to/join_checkpoint") .start()

πŸ”— Stream-Stream Joins

  • Concept: Joining two streams. This is a complex stateful operation that requires a time-based condition and a watermark to manage the state of unmatched data from both sides.

  • Code Example (Inner Join with a condition and watermark):

from pyspark.sql.functions import expr, from_json from pyspark.sql.types import StructType, StructField, StringType, TimestampType

Define schemas for both streams schema1 = StructType([ StructField("order_id", StringType()), StructField("timestamp", TimestampType()), StructField("product_id", StringType()) ]) schema2 = StructType([ StructField("payment_id", StringType()), StructField("timestamp", TimestampType()), StructField("order_id", StringType()) ])

Read streams with event-time and watermark orders_stream = spark.readStream.format("kafka").option(...).load() .select(from_json(col("value").cast("string"), schema1).alias("data")) .select("data.*") .withWatermark("timestamp", "10 minutes")

payments_stream = spark.readStream.format("kafka").option(...).load() .select(from_json(col("value").cast("string"), schema2).alias("data")) .select("data.*") .withWatermark("timestamp", "15 minutes")

Join both streams on 'order_id' and a time-based condition joined_stream = orders_stream.join( payments_stream, expr(""" orders_stream.order_id = payments_stream.order_id AND payments_stream.timestamp >= orders_stream.timestamp AND payments_stream.timestamp <= orders_stream.timestamp + interval 1 hour """) )

query = joined_stream.writeStream .format("console") .outputMode("append") .option("checkpointLocation", "/path/to/stream_stream_join_checkpoint") .start()

5. Advanced Spark SQL Functions

Structured Streaming supports a wide range of built-in functions, including aggregate, percentile, and analytical functions.

lit() - Literal Value

  • Concept: Creates a column with a constant, literal value. Useful for adding static metadata to a stream.

  • Code Example:

from pyspark.sql.functions import lit

Add a new column with a literal string value df_with_status = df.withColumn("processing_status", lit("processed"))

percentile_approx() - Approximate Percentile

  • Concept: Computes the approximate percentile of a numeric column. This is a powerful aggregate function for analyzing data distributions in a streaming context.

  • Code Example:

from pyspark.sql.functions import percentile_approx

Group by a category and calculate the 95th percentile of 'latency_ms' percentile_df = df.groupBy("category") .agg( percentile_approx("latency_ms", lit(0.95)).alias("p95_latency") )

query = percentile_df.writeStream .format("console") .outputMode("complete") .option("checkpointLocation", "/path/to/percentile_checkpoint") .start()

Analytical Functions (Window Functions)

  • Concept: Analytical functions perform a calculation across a set of table rows that are related to the current row. Unlike regular aggregates, they don't collapse rows. They require a Window specification.

  • Important: Window functions require outputMode("update") or outputMode("complete") to continuously recalculate the state, as new data can change the rank or running total of previous rows.

Ranking Functions
  • Concept: Assign a rank to each row within a window. rank() provides a rank with gaps, while dense_rank() provides a rank without gaps. row_number() assigns a unique, sequential number. ntile() distributes rows into a specified number of groups.

  • Code Example:

from pyspark.sql.window import Window from pyspark.sql.functions import rank, dense_rank, row_number, col, desc

A window specification for ranking window_spec = Window.partitionBy("category").orderBy(desc("metric_value"))

Assign a rank with gaps ranked_df = df.withColumn("rank", rank().over(window_spec))

Assign a dense rank (no gaps) dense_ranked_df = df.withColumn("dense_rank", dense_rank().over(window_spec))

Assign a unique row number row_numbered_df = df.withColumn("row_number", row_number().over(window_spec))

Window-Aggregation Functions
  • Concept: Perform an aggregate calculation over a defined window of rows. This is different from a standard groupBy as it returns a value for each row rather than a single aggregated value for the group.

  • Code Example (Running Total and Average):

from pyspark.sql.window import Window from pyspark.sql.functions import sum, avg, col

A window specification for a running total window_spec_sum = Window.partitionBy("category") .orderBy("timestamp") .rowsBetween(Window.unboundedPreceding, Window.currentRow)

Calculate a running total of 'sales' for each category running_total_df = df.withColumn( "running_total_sales", sum(col("sales")).over(window_spec_sum) )

A window for a rolling 5-minute average window_spec_avg = Window.partitionBy("category") .orderBy("timestamp") .rangeBetween(-300, 0) # 300 seconds (5 minutes) preceding the current row

Calculate the rolling average of 'sales' rolling_avg_df = df.withColumn( "rolling_avg_sales", avg(col("sales")).over(window_spec_avg) )

Statistical Functions
  • Concept: Functions that calculate statistical values over a window, such as standard deviation or variance.

  • Code Example (Standard Deviation):

from pyspark.sql.window import Window from pyspark.sql.functions import stddev, col

A window specification for calculating standard deviation window_spec_stddev = Window.partitionBy("category") .orderBy("timestamp") .rowsBetween(Window.unboundedPreceding, Window.currentRow)

Calculate the running standard deviation of a metric stddev_df = df.withColumn( "running_stddev", stddev(col("metric_value")).over(window_spec_stddev) )

Offset Functions
  • Concept: Functions that retrieve a value from a row that is a certain offset away from the current row. lag() retrieves a value from a previous row, while lead() retrieves a value from a subsequent row.

  • Code Example (Lag):

from pyspark.sql.window import Window from pyspark.sql.functions import lag, col

A window specification for accessing previous rows window_spec_lag = Window.partitionBy("user_id").orderBy("event_timestamp")

Get the value of 'last_login' from the previous row lag_df = df.withColumn( "previous_value", lag(col("last_login"), 1).over(window_spec_lag) )

Compare the current and previous values comparison_df = lag_df.withColumn( "is_new_session", (col("last_login") != col("previous_value")) )

6. Working with Iceberg Tables

Iceberg is a high-performance, open table format for huge analytic datasets. Structured Streaming can read from and write to Iceberg tables directly.

Read from an Iceberg Table

  • Concept: Spark can read an Iceberg table as a streaming source, where each new snapshot is treated as a new micro-batch.

  • Code Example:

spark.readStream .format("iceberg") .load("catalog_name.database_name.table_name")

Write to an Iceberg Table

  • Concept: Stream output is written to the Iceberg table in an append-only fashion.

  • Code Example:

query = df.writeStream .format("iceberg") .outputMode("append") .option("path", "your/warehouse/path/table_name") .option("checkpointLocation", "your/checkpoint/path") .start()

7. Advanced Optimizations: Shuffle and Skew

Optimizing data distribution is critical for performance. These strategies help manage data movement and skewed partitions.

Shuffle

  • Concept: The process of redistributing data across executors, which occurs during wide transformations like groupBy, join, or repartition. Shuffles are expensive because they involve network I/O, disk I/O, and serialization/deserialization.

  • Strategies:

  • Adjust spark.sql.shuffle.partitions: This is the most common configuration option for shuffle. It controls the number of partitions that are created in a shuffle stage. The default is 200. A good practice is to set it to a value that results in partition sizes of 128MB or less, which is a balanced size for HDFS blocks and efficient processing. A common starting point is to set it to num_executors * num_cores_per_executor * 2.

  • Example:

    # Set shuffle partitions to a value that aligns with your cluster size
    spark.conf.set("spark.sql.shuffle.partitions", 500)
    
    

Data Skew

  • Concept: Occurs when data is unevenly distributed across partitions. This causes a few tasks to handle a disproportionately large amount of data, leading to a bottleneck. The "hot key" is a common symptom.

  • Strategies:

  • Salt the Join Key: A powerful technique to handle skew. You add a random integer to the skewed key to distribute the data more evenly across partitions.

  • Example (Salting a join):

    from pyspark.sql.functions import concat, lit, floor, rand, col
    
    # Assuming 'user_id' is the skewed key in a stream
    salted_stream = stream_df.withColumn(
      "salted_id",
      concat(col("user_id"), lit("_"), floor(rand() * 10).cast("int"))
    )
    
    # Assume 'user_id' is the skewed key in a static DataFrame
    salted_static = static_df.withColumn(
      "salted_id",
      concat(col("user_id"), lit("_"), floor(rand() * 10).cast("int"))
    )
    
    # Perform the join on the new salted key
    joined_df = salted_stream.join(salted_static, "salted_id")
    # You may need to drop the salting column after the join
    final_df = joined_df.drop("salted_id")
    
    
  • Broadcast Joins: If one side of a join is small enough to fit into memory on all executors, you can broadcast it. This eliminates the shuffle for the larger DataFrame entirely. Spark automatically does this for tables under 10MB (configurable), but you can force it.

  • Example (Forcing a broadcast join):

    from pyspark.sql.functions import broadcast
    
    # The 'lookup_df' must be small enough to fit in memory
    joined_df = streaming_df.join(broadcast(lookup_df), "key")
    
    
  • Split and Union: An advanced strategy for severe skew. You split the skewed keys from the main DataFrame, process the small portion separately (e.g., with a broadcast join), and then union the results back. This avoids shuffling the majority of the data.

  • Example (Conceptual):

    # Step 1: Filter out the skewed keys and process them separately
    skewed_keys_df = stream_df.filter("key = 'highly_skewed_value'")
    non_skewed_df = stream_df.filter("key != 'highly_skewed_value'")
    
    # Step 2: Join the non-skewed data normally
    non_skewed_result = non_skewed_df.join(other_df, "key")
    
    # Step 3: Join the skewed data using a more efficient method (e.g., a broadcast)
    skewed_result = skewed_keys_df.join(broadcast(other_df), "key")
    
    # Step 4: Union the results
    final_result = non_skewed_result.union(skewed_result)
    
    

repartition() vs. coalesce()

This is a key decision for managing partitions, especially before writing to a sink to control the number of files and avoid the small file problem.

Feature repartition() coalesce()
Shuffle Yes (Full Shuffle) No
Partitions Can increase or decrease the number of partitions. Can only decrease the number of partitions.
Data Movement Distributes data evenly across a new number of partitions, often to a new key. Moves data minimally by combining existing partitions onto fewer executors.
Use Case Use when you need to increase parallelism (e.g., before a computationally heavy operation) or to re-distribute skewed data on a new key. Use when you want to reduce the number of output files to avoid the small file problem, especially as a final step before a write operation.
Performance Slower due to a full shuffle. Faster as it avoids the shuffle and network I/O.

8. Best Practices

  1. Always use a checkpointLocation: This is non-negotiable for fault-tolerant and recoverable jobs.

  2. Define a Schema: Avoid schema inference, as schema changes can break a running stream.

  3. Monitor Your Query: Use the Spark UI to monitor metrics like input rate, processing time, and state store memory.

  4. Use outputMode Correctly: Choose the appropriate output mode to avoid unexpected behavior, especially with aggregations.

  5. Leverage Iceberg or Delta Lake: These table formats are highly optimized for Structured Streaming, offering ACID transactions and efficient handling of small files, which are common in streaming pipelines. Some content has been disabled in this document

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment