I will print the markdown file to the console for you.
This comprehensive guide covers the core concepts, common transformations, and advanced operations for building robust and scalable streaming applications with PySpark.
The life of a streaming query follows a simple pattern:
-
Define a Source: Use
spark.readStreamto create a streamingDataFrame. -
Apply Transformations: Manipulate the data using various
DataFrameAPIs. -
Start a Sink: Use
df.writeStreamto start the query and write to an output sink.
Note on Modern Formats: For production, Iceberg and Delta Lake are highly recommended as they provide ACID transactions, schema evolution, and other table-management features on top of Parquet files.
-
Read from a source (e.g., file-based JSON) input_df = spark.readStream .format("json") .option("path", "your/input/data") .schema(your_schema) .load()
-
Apply transformations (e.g., filter and select) transformed_df = input_df .filter("value.status = 'active'") .select("value.id", "value.timestamp")
-
Write to a sink (e.g., console for debugging) query = transformed_df.writeStream .format("console") .outputMode("append") .option("checkpointLocation", "your/checkpoint/path") .start()
Wait for the query to terminate query.awaitTermination()
These operations are straightforward and work on each micro-batch independently. They are the same as those for static DataFrames.
| Operation | Code Example | Description |
|---|---|---|
select() |
df.select("col1", "col2") |
Selects a subset of columns. |
filter() / where() |
df.filter(df.age > 25) |
Filters rows based on a condition. |
withColumn() |
df.withColumn("new_col", F.col("old_col") + 1) |
Adds a new column or replaces an existing one. |
drop() |
df.drop("unwanted_col") |
Removes a column. |
unionAll() |
df1.unionAll(df2) |
Merges two streaming DataFrames with a compatible schema. |
These operations require maintaining state across micro-batches. Checkpointing and watermarking are crucial for managing this state.
-
Concept: Grouping rows and applying an aggregation function (e.g.,
count,sum,avg). RequiresoutputMode("complete")for updating aggregates. -
Code Example:
from pyspark.sql.functions import count
Count occurrences of a category counts_df = df.groupBy("category").agg(count("*").alias("count"))
query = counts_df.writeStream .format("console") .outputMode("complete") .option("checkpointLocation", "/path/to/counts_checkpoint") .start()
-
Concept: Grouping data based on a time window, often used for event-time analysis. Watermarking is used to manage late data and prevent state explosion.
-
Code Example:
from pyspark.sql.functions import window, col
Define a 5-minute watermark on the 'timestamp' column with 10-minute tumbling windows windowed_df = df.withWatermark("timestamp", "5 minutes") .groupBy(window(col("timestamp"), "10 minutes")) .count()
query = windowed_df.writeStream .format("console") .outputMode("append") .option("checkpointLocation", "/path/to/window_checkpoint") .start()
Structured Streaming supports joins between a streaming DataFrame and a static DataFrame, as well as between two streaming DataFrames.
-
Concept: Joining a continuous stream of data with a static, finite
DataFrame(e.g., a lookup table). The staticDataFrameis broadcast to all workers for efficiency. -
Code Example:
Static lookup table (e.g., user info) users_df = spark.read.json("/path/to/users_info.json")
Join the stream with the static DataFrame joined_df = stream_df.join(users_df, "user_id")
This is a stateless join and supports any output mode query = joined_df.writeStream .format("parquet") .option("path", "/path/to/output") .outputMode("append") .option("checkpointLocation", "/path/to/join_checkpoint") .start()
-
Concept: Joining two streams. This is a complex stateful operation that requires a time-based condition and a watermark to manage the state of unmatched data from both sides.
-
Code Example (Inner Join with a condition and watermark):
from pyspark.sql.functions import expr, from_json from pyspark.sql.types import StructType, StructField, StringType, TimestampType
Define schemas for both streams schema1 = StructType([ StructField("order_id", StringType()), StructField("timestamp", TimestampType()), StructField("product_id", StringType()) ]) schema2 = StructType([ StructField("payment_id", StringType()), StructField("timestamp", TimestampType()), StructField("order_id", StringType()) ])
Read streams with event-time and watermark orders_stream = spark.readStream.format("kafka").option(...).load() .select(from_json(col("value").cast("string"), schema1).alias("data")) .select("data.*") .withWatermark("timestamp", "10 minutes")
payments_stream = spark.readStream.format("kafka").option(...).load() .select(from_json(col("value").cast("string"), schema2).alias("data")) .select("data.*") .withWatermark("timestamp", "15 minutes")
Join both streams on 'order_id' and a time-based condition joined_stream = orders_stream.join( payments_stream, expr(""" orders_stream.order_id = payments_stream.order_id AND payments_stream.timestamp >= orders_stream.timestamp AND payments_stream.timestamp <= orders_stream.timestamp + interval 1 hour """) )
query = joined_stream.writeStream .format("console") .outputMode("append") .option("checkpointLocation", "/path/to/stream_stream_join_checkpoint") .start()
Structured Streaming supports a wide range of built-in functions, including aggregate, percentile, and analytical functions.
-
Concept: Creates a column with a constant, literal value. Useful for adding static metadata to a stream.
-
Code Example:
from pyspark.sql.functions import lit
Add a new column with a literal string value df_with_status = df.withColumn("processing_status", lit("processed"))
-
Concept: Computes the approximate percentile of a numeric column. This is a powerful aggregate function for analyzing data distributions in a streaming context.
-
Code Example:
from pyspark.sql.functions import percentile_approx
Group by a category and calculate the 95th percentile of 'latency_ms' percentile_df = df.groupBy("category") .agg( percentile_approx("latency_ms", lit(0.95)).alias("p95_latency") )
query = percentile_df.writeStream .format("console") .outputMode("complete") .option("checkpointLocation", "/path/to/percentile_checkpoint") .start()
-
Concept: Analytical functions perform a calculation across a set of table rows that are related to the current row. Unlike regular aggregates, they don't collapse rows. They require a
Windowspecification. -
Important: Window functions require
outputMode("update")oroutputMode("complete")to continuously recalculate the state, as new data can change the rank or running total of previous rows.
-
Concept: Assign a rank to each row within a window.
rank()provides a rank with gaps, whiledense_rank()provides a rank without gaps.row_number()assigns a unique, sequential number.ntile()distributes rows into a specified number of groups. -
Code Example:
from pyspark.sql.window import Window from pyspark.sql.functions import rank, dense_rank, row_number, col, desc
A window specification for ranking window_spec = Window.partitionBy("category").orderBy(desc("metric_value"))
Assign a rank with gaps ranked_df = df.withColumn("rank", rank().over(window_spec))
Assign a dense rank (no gaps) dense_ranked_df = df.withColumn("dense_rank", dense_rank().over(window_spec))
Assign a unique row number row_numbered_df = df.withColumn("row_number", row_number().over(window_spec))
-
Concept: Perform an aggregate calculation over a defined window of rows. This is different from a standard
groupByas it returns a value for each row rather than a single aggregated value for the group. -
Code Example (Running Total and Average):
from pyspark.sql.window import Window from pyspark.sql.functions import sum, avg, col
A window specification for a running total window_spec_sum = Window.partitionBy("category") .orderBy("timestamp") .rowsBetween(Window.unboundedPreceding, Window.currentRow)
Calculate a running total of 'sales' for each category running_total_df = df.withColumn( "running_total_sales", sum(col("sales")).over(window_spec_sum) )
A window for a rolling 5-minute average window_spec_avg = Window.partitionBy("category") .orderBy("timestamp") .rangeBetween(-300, 0) # 300 seconds (5 minutes) preceding the current row
Calculate the rolling average of 'sales' rolling_avg_df = df.withColumn( "rolling_avg_sales", avg(col("sales")).over(window_spec_avg) )
-
Concept: Functions that calculate statistical values over a window, such as standard deviation or variance.
-
Code Example (Standard Deviation):
from pyspark.sql.window import Window from pyspark.sql.functions import stddev, col
A window specification for calculating standard deviation window_spec_stddev = Window.partitionBy("category") .orderBy("timestamp") .rowsBetween(Window.unboundedPreceding, Window.currentRow)
Calculate the running standard deviation of a metric stddev_df = df.withColumn( "running_stddev", stddev(col("metric_value")).over(window_spec_stddev) )
-
Concept: Functions that retrieve a value from a row that is a certain offset away from the current row.
lag()retrieves a value from a previous row, whilelead()retrieves a value from a subsequent row. -
Code Example (Lag):
from pyspark.sql.window import Window from pyspark.sql.functions import lag, col
A window specification for accessing previous rows window_spec_lag = Window.partitionBy("user_id").orderBy("event_timestamp")
Get the value of 'last_login' from the previous row lag_df = df.withColumn( "previous_value", lag(col("last_login"), 1).over(window_spec_lag) )
Compare the current and previous values comparison_df = lag_df.withColumn( "is_new_session", (col("last_login") != col("previous_value")) )
Iceberg is a high-performance, open table format for huge analytic datasets. Structured Streaming can read from and write to Iceberg tables directly.
-
Concept: Spark can read an Iceberg table as a streaming source, where each new snapshot is treated as a new micro-batch.
-
Code Example:
spark.readStream .format("iceberg") .load("catalog_name.database_name.table_name")
-
Concept: Stream output is written to the Iceberg table in an append-only fashion.
-
Code Example:
query = df.writeStream .format("iceberg") .outputMode("append") .option("path", "your/warehouse/path/table_name") .option("checkpointLocation", "your/checkpoint/path") .start()
Optimizing data distribution is critical for performance. These strategies help manage data movement and skewed partitions.
-
Concept: The process of redistributing data across executors, which occurs during wide transformations like
groupBy,join, orrepartition. Shuffles are expensive because they involve network I/O, disk I/O, and serialization/deserialization. -
Strategies:
-
Adjust
spark.sql.shuffle.partitions: This is the most common configuration option for shuffle. It controls the number of partitions that are created in a shuffle stage. The default is 200. A good practice is to set it to a value that results in partition sizes of 128MB or less, which is a balanced size for HDFS blocks and efficient processing. A common starting point is to set it tonum_executors * num_cores_per_executor * 2. -
Example:
# Set shuffle partitions to a value that aligns with your cluster size spark.conf.set("spark.sql.shuffle.partitions", 500)
-
Concept: Occurs when data is unevenly distributed across partitions. This causes a few tasks to handle a disproportionately large amount of data, leading to a bottleneck. The "hot key" is a common symptom.
-
Strategies:
-
Salt the Join Key: A powerful technique to handle skew. You add a random integer to the skewed key to distribute the data more evenly across partitions.
-
Example (Salting a join):
from pyspark.sql.functions import concat, lit, floor, rand, col # Assuming 'user_id' is the skewed key in a stream salted_stream = stream_df.withColumn( "salted_id", concat(col("user_id"), lit("_"), floor(rand() * 10).cast("int")) ) # Assume 'user_id' is the skewed key in a static DataFrame salted_static = static_df.withColumn( "salted_id", concat(col("user_id"), lit("_"), floor(rand() * 10).cast("int")) ) # Perform the join on the new salted key joined_df = salted_stream.join(salted_static, "salted_id") # You may need to drop the salting column after the join final_df = joined_df.drop("salted_id") -
Broadcast Joins: If one side of a join is small enough to fit into memory on all executors, you can broadcast it. This eliminates the shuffle for the larger
DataFrameentirely. Spark automatically does this for tables under 10MB (configurable), but you can force it. -
Example (Forcing a broadcast join):
from pyspark.sql.functions import broadcast # The 'lookup_df' must be small enough to fit in memory joined_df = streaming_df.join(broadcast(lookup_df), "key") -
Split and Union: An advanced strategy for severe skew. You split the skewed keys from the main
DataFrame, process the small portion separately (e.g., with a broadcast join), and thenunionthe results back. This avoids shuffling the majority of the data. -
Example (Conceptual):
# Step 1: Filter out the skewed keys and process them separately skewed_keys_df = stream_df.filter("key = 'highly_skewed_value'") non_skewed_df = stream_df.filter("key != 'highly_skewed_value'") # Step 2: Join the non-skewed data normally non_skewed_result = non_skewed_df.join(other_df, "key") # Step 3: Join the skewed data using a more efficient method (e.g., a broadcast) skewed_result = skewed_keys_df.join(broadcast(other_df), "key") # Step 4: Union the results final_result = non_skewed_result.union(skewed_result)
This is a key decision for managing partitions, especially before writing to a sink to control the number of files and avoid the small file problem.
| Feature | repartition() |
coalesce() |
|---|---|---|
| Shuffle | Yes (Full Shuffle) | No |
| Partitions | Can increase or decrease the number of partitions. | Can only decrease the number of partitions. |
| Data Movement | Distributes data evenly across a new number of partitions, often to a new key. | Moves data minimally by combining existing partitions onto fewer executors. |
| Use Case | Use when you need to increase parallelism (e.g., before a computationally heavy operation) or to re-distribute skewed data on a new key. | Use when you want to reduce the number of output files to avoid the small file problem, especially as a final step before a write operation. |
| Performance | Slower due to a full shuffle. | Faster as it avoids the shuffle and network I/O. |
-
Always use a
checkpointLocation: This is non-negotiable for fault-tolerant and recoverable jobs. -
Define a Schema: Avoid schema inference, as schema changes can break a running stream.
-
Monitor Your Query: Use the Spark UI to monitor metrics like input rate, processing time, and state store memory.
-
Use
outputModeCorrectly: Choose the appropriate output mode to avoid unexpected behavior, especially with aggregations. -
Leverage Iceberg or Delta Lake: These table formats are highly optimized for Structured Streaming, offering ACID transactions and efficient handling of small files, which are common in streaming pipelines. Some content has been disabled in this document