Skip to content

Instantly share code, notes, and snippets.

@egil
Last active November 23, 2025 22:23
Show Gist options
  • Select an option

  • Save egil/6e2bdc6d8d0255603d7a2e133e8d1e20 to your computer and use it in GitHub Desktop.

Select an option

Save egil/6e2bdc6d8d0255603d7a2e133e8d1e20 to your computer and use it in GitHub Desktop.
This document provides comprehensive guidance on using `IStreamFilter` in Orleans based on research of the Orleans GitHub repository and official documentation. Created by Claude Sonnet 4.5..

Using IStreamFilter in Orleans

This document provides comprehensive guidance on using IStreamFilter in Orleans based on research of the Orleans GitHub repository and official documentation.

Disclaimer: Created by Claude Sonnet 4.5 - AI makes mistakes, etc., not validated info. The prompt was as follows:

Investigate how to use IStreamFilter (need to check the dotnet/orleans codebase on github) to filter out specific events from certain stream providers. there is no documentation of the feature, so you need to go look at the source code in the orleans github repo and look at test cases that uses the feature to understand how it works. provide some sample code for me to look at

Overview

IStreamFilter is a simple stream filtering mechanism in Orleans that allows you to filter stream events on a per-provider basis. This feature was introduced in Orleans to provide a simpler alternative to the previous producer-side filtering mechanism.

How It Works

  1. Configuration: Filters are configured per stream provider (only one filter per provider is supported)
  2. Subscription: When consumers subscribe to a stream, they can attach a string filterData parameter
  3. Filtering: The ShouldDeliver method is called for each stream item to determine if it should be delivered to that specific consumer

Interface Definition

namespace Orleans.Streams.Filtering
{
    public interface IStreamFilter
    {
        bool ShouldDeliver(StreamId streamId, object item, string filterData);
    }
}

Parameters

  • streamId: The identifier of the stream
  • item: The stream event/item object being evaluated
  • filterData: String data provided by the consumer during subscription

Return Value

  • true: The item should be delivered to the consumer
  • false: The item should be filtered out (not delivered)

Sample Filter Implementations

Example 1: Filter Only Even Numbers

using Orleans.Runtime;
using Orleans.Streams.Filtering;

public class EvenNumberFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        if (item is int number)
        {
            return number % 2 == 0;
        }
        return true; // Deliver non-integer items
    }
}

Example 2: Filter by Event Type

public class EventTypeFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        // filterData could be "TypeA,TypeB" - comma-separated types to allow
        if (string.IsNullOrEmpty(filterData))
        {
            return true; // No filter data = deliver everything
        }

        var allowedTypes = filterData.Split(',', StringSplitOptions.RemoveEmptyEntries);
        var itemType = item.GetType().Name;

        return allowedTypes.Any(t => t.Trim().Equals(itemType, StringComparison.OrdinalIgnoreCase));
    }
}

Example 3: Filter by Property Value

public class ChargingSessionStatusFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        // filterData could be "Active" or "Completed"
        if (item is ChargingSessionEvent sessionEvent && !string.IsNullOrEmpty(filterData))
        {
            return sessionEvent.Status.Equals(filterData, StringComparison.OrdinalIgnoreCase);
        }
        return true;
    }
}

Example 4: Location-Based Filter (Advanced)

public class LocationBasedFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        // filterData could be a comma-separated list of location IDs: "LOC001,LOC002"
        if (string.IsNullOrEmpty(filterData))
        {
            return true;
        }

        var allowedLocationIds = filterData.Split(',', StringSplitOptions.RemoveEmptyEntries)
            .Select(id => id.Trim())
            .ToHashSet(StringComparer.OrdinalIgnoreCase);

        // Try to extract location ID from different event types
        var locationId = item switch
        {
            ILocationEvent locationEvent => locationEvent.LocationId,
            IChargingSessionEvent sessionEvent => sessionEvent.LocationId,
            _ => null
        };

        return locationId != null && allowedLocationIds.Contains(locationId);
    }
}

Example 5: No-Op Filter (Default Behavior)

// Built-in Orleans implementation - delivers everything
internal sealed class NoOpStreamFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        return true; // Always deliver
    }
}

Configuration

Silo Configuration

using Orleans.Hosting;

var builder = Host.CreateDefaultBuilder(args)
    .UseOrleans((context, siloBuilder) =>
    {
        siloBuilder
            // Configure your stream provider (e.g., Memory, Azure Queue, Event Hub, etc.)
            .AddMemoryStreams("MyStreamProvider")

            // Add the stream filter for this provider
            .AddStreamFilter<EventTypeFilter>("MyStreamProvider");

        // For Azure Queue Streams
        siloBuilder
            .AddAzureQueueStreams("AzureQueueProvider", configurator =>
            {
                configurator.ConfigureAzureQueue(options =>
                {
                    options.Configure(queueOptions =>
                    {
                        queueOptions.ConfigureQueueServiceClient("your-connection-string");
                    });
                });
            })
            .AddStreamFilter<LocationBasedFilter>("AzureQueueProvider");
    });

Client Configuration

using Orleans.Hosting;

var clientBuilder = new ClientBuilder()
    .AddMemoryStreams("MyStreamProvider")
    .AddStreamFilter<EventTypeFilter>("MyStreamProvider");

var client = clientBuilder.Build();
await client.Connect();

Consumer Usage

Basic Subscription with Filter Data

using Orleans.Streams;

public class MyConsumerGrain : Grain, IMyConsumerGrain
{
    public async Task SubscribeToStream()
    {
        var streamProvider = this.GetStreamProvider("MyStreamProvider");
        var stream = streamProvider.GetStream<ChargingSessionEvent>(
            StreamId.Create("namespace", "key"));

        // Subscribe with filterData to only receive "Active" status events
        var subscriptionHandle = await stream.SubscribeAsync(
            onNext: async (data, token) =>
            {
                // This will only be called for events that pass the filter
                Console.WriteLine($"Received: {data}");
            },
            token: null,
            filterData: "Active" // This string is passed to IStreamFilter.ShouldDeliver
        );
    }
}

Multiple Event Types Subscription

public class EventMonitorGrain : Grain, IEventMonitorGrain
{
    public async Task SubscribeWithMultipleTypes()
    {
        var streamProvider = this.GetStreamProvider("MyStreamProvider");
        var stream = streamProvider.GetStream<object>(StreamId.Create("events", "all"));

        // Subscribe to only specific event types
        await stream.SubscribeAsync(
            onNext: async (data, token) =>
            {
                // Will only receive TypeA and TypeB events
                Console.WriteLine($"Received: {data.GetType().Name}");
            },
            token: null,
            filterData: "TypeA,TypeB"
        );
    }
}

Location-Based Subscription

public class LocationManagerGrain : Grain, ILocationManagerGrain
{
    public async Task MonitorSpecificLocations(params string[] locationIds)
    {
        var streamProvider = this.GetStreamProvider("MyStreamProvider");
        var stream = streamProvider.GetStream<object>(StreamId.Create("locations", "all"));

        // Only receive events from specific locations
        var filterData = string.Join(",", locationIds);

        await stream.SubscribeAsync(
            onNext: async (evt, token) =>
            {
                // Process events only from specified locations
                await ProcessLocationEvent(evt);
            },
            token: null,
            filterData: filterData
        );
    }

    private Task ProcessLocationEvent(object evt)
    {
        // Process the event
        return Task.CompletedTask;
    }
}

Subscription Without Filter (Receive All Events)

public async Task SubscribeToAllEvents()
{
    var streamProvider = this.GetStreamProvider("MyStreamProvider");
    var stream = streamProvider.GetStream<MyEvent>(StreamId.Create("namespace", "key"));

    // Subscribe without filterData - receives all events
    await stream.SubscribeAsync(
        onNext: async (data, token) =>
        {
            Console.WriteLine($"Received: {data}");
        }
    );
}

Testing Example

using Xunit;

public class StreamFilteringTests
{
    [Fact]
    public async Task TestStreamFiltering()
    {
        // Arrange
        var streamProvider = GetStreamProvider("MyStreamProvider");
        var stream = streamProvider.GetStream<int>(StreamId.Create("numbers", "test"));

        var receivedEvents = new List<int>();

        // Subscribe with filter to only receive even numbers
        await stream.SubscribeAsync(
            onNext: async (num, token) => receivedEvents.Add(num),
            token: null,
            filterData: "even" // Assuming filter uses this
        );

        // Act - Publish both even and odd numbers
        for (int i = 1; i <= 10; i++)
        {
            await stream.OnNextAsync(i);
        }

        await Task.Delay(1000); // Allow processing

        // Assert - Should only have received even numbers
        Assert.All(receivedEvents, num => Assert.True(num % 2 == 0));
        Assert.Equal(5, receivedEvents.Count); // Should have 2, 4, 6, 8, 10
    }

    [Fact]
    public async Task TestMultipleSubscriptionsWithDifferentFilters()
    {
        // Arrange
        var streamProvider = GetStreamProvider("MyStreamProvider");
        var stream = streamProvider.GetStream<int>(StreamId.Create("numbers", "test"));

        var evenNumbers = new List<int>();
        var oddNumbers = new List<int>();

        // Two subscriptions with different filter data
        await stream.SubscribeAsync(
            onNext: async (num, token) => evenNumbers.Add(num),
            token: null,
            filterData: "even"
        );

        await stream.SubscribeAsync(
            onNext: async (num, token) => oddNumbers.Add(num),
            token: null,
            filterData: "odd"
        );

        // Act
        for (int i = 1; i <= 10; i++)
        {
            await stream.OnNextAsync(i);
        }

        await Task.Delay(1000);

        // Assert
        Assert.All(evenNumbers, num => Assert.True(num % 2 == 0));
        Assert.All(oddNumbers, num => Assert.True(num % 2 != 0));
    }
}

Important Considerations

1. One Filter Per Provider

You can only configure one IStreamFilter implementation per stream provider. If you need different filtering logic, you should:

  • Use different stream providers, OR
  • Implement complex logic within a single filter that uses filterData to determine behavior

2. String FilterData

The filter data passed during subscription must be a string. For complex data:

  • Use CSV format: "value1,value2,value3"
  • Use JSON: "{\"type\":\"A\",\"priority\":1}"
  • Use custom encoding schemes

Example with JSON:

public class JsonFilterDataFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        if (string.IsNullOrEmpty(filterData))
            return true;

        try
        {
            var filter = JsonSerializer.Deserialize<FilterCriteria>(filterData);
            // Use filter criteria to evaluate item
            return EvaluateItem(item, filter);
        }
        catch
        {
            return true; // On error, deliver the item
        }
    }
}

3. Performance Considerations

  • Filtering happens on the consumer side
  • All events still flow through the stream infrastructure
  • The filter is invoked for every event and every subscription
  • Keep filter logic lightweight and efficient
  • Consider caching parsed filterData if possible (though be aware of thread-safety)

4. Per-Subscription Filtering

Different consumers can subscribe to the same stream with different filterData values:

// Consumer 1: Only active sessions
await stream.SubscribeAsync(handler1, null, "Active");

// Consumer 2: Only completed sessions
await stream.SubscribeAsync(handler2, null, "Completed");

// Consumer 3: All sessions
await stream.SubscribeAsync(handler3, null, null);

5. Compatibility

  • Available in Orleans 7.0+
  • Fully compatible with Orleans 8.0+ and .NET 9
  • Works with all stream providers (Memory, Azure Queue, Event Hub, etc.)

API Reference

Extension Methods

// Silo configuration
public static ISiloBuilder AddStreamFilter<T>(
    this ISiloBuilder builder,
    string providerName)
    where T : IStreamFilter

// Client configuration
public static IClientBuilder AddStreamFilter<T>(
    this IClientBuilder builder,
    string providerName)
    where T : IStreamFilter

Subscription Methods

// Basic subscription (no filter)
Task<StreamSubscriptionHandle<T>> SubscribeAsync(
    IAsyncObserver<T> observer)

// Subscription with sequence token and filter data
Task<StreamSubscriptionHandle<T>> SubscribeAsync(
    IAsyncObserver<T> observer,
    StreamSequenceToken? token,
    string? filterData = null)

// Lambda-based subscription with filter data
Task<StreamSubscriptionHandle<T>> SubscribeAsync(
    Func<T, StreamSequenceToken, Task> onNextAsync,
    StreamSequenceToken? token = null,
    string? filterData = null)

Real-World Example: Charging Session Filtering

Here's a complete example for the Clever Pricing Engine use case:

// Filter implementation
public class ChargingSessionFilter : IStreamFilter
{
    public bool ShouldDeliver(StreamId streamId, object item, string filterData)
    {
        if (string.IsNullOrEmpty(filterData))
            return true;

        // Support multiple filter criteria
        // Format: "Status=Active;LocationId=LOC001;ConnectorId=CONN123"
        var criteria = ParseFilterData(filterData);

        if (item is ChargingSessionEvent sessionEvent)
        {
            if (criteria.TryGetValue("Status", out var status) &&
                !sessionEvent.Status.Equals(status, StringComparison.OrdinalIgnoreCase))
            {
                return false;
            }

            if (criteria.TryGetValue("LocationId", out var locationId) &&
                !sessionEvent.LocationId.Equals(locationId, StringComparison.OrdinalIgnoreCase))
            {
                return false;
            }

            if (criteria.TryGetValue("ConnectorId", out var connectorId) &&
                !sessionEvent.ConnectorId.Equals(connectorId, StringComparison.OrdinalIgnoreCase))
            {
                return false;
            }

            return true;
        }

        return true;
    }

    private Dictionary<string, string> ParseFilterData(string filterData)
    {
        return filterData
            .Split(';', StringSplitOptions.RemoveEmptyEntries)
            .Select(part => part.Split('=', 2))
            .Where(parts => parts.Length == 2)
            .ToDictionary(
                parts => parts[0].Trim(),
                parts => parts[1].Trim(),
                StringComparer.OrdinalIgnoreCase);
    }
}

// Configuration in AppHost or Silo
siloBuilder
    .AddMemoryStreams("ChargingSessionStream")
    .AddStreamFilter<ChargingSessionFilter>("ChargingSessionStream");

// Usage in grain
public class LocationMonitorGrain : Grain, ILocationMonitorGrain
{
    public async Task MonitorLocation(string locationId)
    {
        var streamProvider = this.GetStreamProvider("ChargingSessionStream");
        var stream = streamProvider.GetStream<ChargingSessionEvent>(
            StreamId.Create("charging-sessions", "all"));

        var filterData = $"Status=Active;LocationId={locationId}";

        await stream.SubscribeAsync(
            async (sessionEvent, token) =>
            {
                // Only receives active sessions for this location
                await HandleChargingSessionEvent(sessionEvent);
            },
            token: null,
            filterData: filterData
        );
    }
}

Troubleshooting

Filter Not Working

  1. Verify the filter is registered with the correct provider name
  2. Ensure filterData is being passed during subscription
  3. Check that the filter's ShouldDeliver method is returning the expected boolean value
  4. Add logging to the filter to debug what's being evaluated

Performance Issues

  1. Optimize the ShouldDeliver method - it's called for every event/subscription combination
  2. Avoid expensive operations (database calls, network requests)
  3. Consider caching parsed filter data if the same filterData is used repeatedly
  4. Use efficient string comparison methods

Multiple Filters Needed

Since only one filter per provider is supported:

  1. Use multiple stream providers if you need completely different filtering logic
  2. Implement a composite filter that uses filterData to determine behavior
  3. Consider using stream namespaces to logically separate streams

References

Summary

IStreamFilter provides a simple, effective way to filter stream events in Orleans on a per-consumer basis. Key takeaways:

  • ✅ Simple interface with one method: ShouldDeliver
  • ✅ Configured per stream provider
  • ✅ Consumers pass string filterData during subscription
  • ✅ Enables fine-grained control over which events each subscriber receives
  • ✅ Works with all Orleans stream providers
  • ✅ Compatible with .NET 9 and Orleans 8.0+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment