This document provides comprehensive guidance on using IStreamFilter in Orleans based on research of the Orleans GitHub repository and official documentation.
Disclaimer: Created by Claude Sonnet 4.5 - AI makes mistakes, etc., not validated info. The prompt was as follows:
Investigate how to use IStreamFilter (need to check the dotnet/orleans codebase on github) to filter out specific events from certain stream providers. there is no documentation of the feature, so you need to go look at the source code in the orleans github repo and look at test cases that uses the feature to understand how it works. provide some sample code for me to look at
IStreamFilter is a simple stream filtering mechanism in Orleans that allows you to filter stream events on a per-provider basis. This feature was introduced in Orleans to provide a simpler alternative to the previous producer-side filtering mechanism.
- Configuration: Filters are configured per stream provider (only one filter per provider is supported)
- Subscription: When consumers subscribe to a stream, they can attach a
string filterDataparameter - Filtering: The
ShouldDelivermethod is called for each stream item to determine if it should be delivered to that specific consumer
namespace Orleans.Streams.Filtering
{
public interface IStreamFilter
{
bool ShouldDeliver(StreamId streamId, object item, string filterData);
}
}streamId: The identifier of the streamitem: The stream event/item object being evaluatedfilterData: String data provided by the consumer during subscription
true: The item should be delivered to the consumerfalse: The item should be filtered out (not delivered)
using Orleans.Runtime;
using Orleans.Streams.Filtering;
public class EvenNumberFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
if (item is int number)
{
return number % 2 == 0;
}
return true; // Deliver non-integer items
}
}public class EventTypeFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
// filterData could be "TypeA,TypeB" - comma-separated types to allow
if (string.IsNullOrEmpty(filterData))
{
return true; // No filter data = deliver everything
}
var allowedTypes = filterData.Split(',', StringSplitOptions.RemoveEmptyEntries);
var itemType = item.GetType().Name;
return allowedTypes.Any(t => t.Trim().Equals(itemType, StringComparison.OrdinalIgnoreCase));
}
}public class ChargingSessionStatusFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
// filterData could be "Active" or "Completed"
if (item is ChargingSessionEvent sessionEvent && !string.IsNullOrEmpty(filterData))
{
return sessionEvent.Status.Equals(filterData, StringComparison.OrdinalIgnoreCase);
}
return true;
}
}public class LocationBasedFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
// filterData could be a comma-separated list of location IDs: "LOC001,LOC002"
if (string.IsNullOrEmpty(filterData))
{
return true;
}
var allowedLocationIds = filterData.Split(',', StringSplitOptions.RemoveEmptyEntries)
.Select(id => id.Trim())
.ToHashSet(StringComparer.OrdinalIgnoreCase);
// Try to extract location ID from different event types
var locationId = item switch
{
ILocationEvent locationEvent => locationEvent.LocationId,
IChargingSessionEvent sessionEvent => sessionEvent.LocationId,
_ => null
};
return locationId != null && allowedLocationIds.Contains(locationId);
}
}// Built-in Orleans implementation - delivers everything
internal sealed class NoOpStreamFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
return true; // Always deliver
}
}using Orleans.Hosting;
var builder = Host.CreateDefaultBuilder(args)
.UseOrleans((context, siloBuilder) =>
{
siloBuilder
// Configure your stream provider (e.g., Memory, Azure Queue, Event Hub, etc.)
.AddMemoryStreams("MyStreamProvider")
// Add the stream filter for this provider
.AddStreamFilter<EventTypeFilter>("MyStreamProvider");
// For Azure Queue Streams
siloBuilder
.AddAzureQueueStreams("AzureQueueProvider", configurator =>
{
configurator.ConfigureAzureQueue(options =>
{
options.Configure(queueOptions =>
{
queueOptions.ConfigureQueueServiceClient("your-connection-string");
});
});
})
.AddStreamFilter<LocationBasedFilter>("AzureQueueProvider");
});using Orleans.Hosting;
var clientBuilder = new ClientBuilder()
.AddMemoryStreams("MyStreamProvider")
.AddStreamFilter<EventTypeFilter>("MyStreamProvider");
var client = clientBuilder.Build();
await client.Connect();using Orleans.Streams;
public class MyConsumerGrain : Grain, IMyConsumerGrain
{
public async Task SubscribeToStream()
{
var streamProvider = this.GetStreamProvider("MyStreamProvider");
var stream = streamProvider.GetStream<ChargingSessionEvent>(
StreamId.Create("namespace", "key"));
// Subscribe with filterData to only receive "Active" status events
var subscriptionHandle = await stream.SubscribeAsync(
onNext: async (data, token) =>
{
// This will only be called for events that pass the filter
Console.WriteLine($"Received: {data}");
},
token: null,
filterData: "Active" // This string is passed to IStreamFilter.ShouldDeliver
);
}
}public class EventMonitorGrain : Grain, IEventMonitorGrain
{
public async Task SubscribeWithMultipleTypes()
{
var streamProvider = this.GetStreamProvider("MyStreamProvider");
var stream = streamProvider.GetStream<object>(StreamId.Create("events", "all"));
// Subscribe to only specific event types
await stream.SubscribeAsync(
onNext: async (data, token) =>
{
// Will only receive TypeA and TypeB events
Console.WriteLine($"Received: {data.GetType().Name}");
},
token: null,
filterData: "TypeA,TypeB"
);
}
}public class LocationManagerGrain : Grain, ILocationManagerGrain
{
public async Task MonitorSpecificLocations(params string[] locationIds)
{
var streamProvider = this.GetStreamProvider("MyStreamProvider");
var stream = streamProvider.GetStream<object>(StreamId.Create("locations", "all"));
// Only receive events from specific locations
var filterData = string.Join(",", locationIds);
await stream.SubscribeAsync(
onNext: async (evt, token) =>
{
// Process events only from specified locations
await ProcessLocationEvent(evt);
},
token: null,
filterData: filterData
);
}
private Task ProcessLocationEvent(object evt)
{
// Process the event
return Task.CompletedTask;
}
}public async Task SubscribeToAllEvents()
{
var streamProvider = this.GetStreamProvider("MyStreamProvider");
var stream = streamProvider.GetStream<MyEvent>(StreamId.Create("namespace", "key"));
// Subscribe without filterData - receives all events
await stream.SubscribeAsync(
onNext: async (data, token) =>
{
Console.WriteLine($"Received: {data}");
}
);
}using Xunit;
public class StreamFilteringTests
{
[Fact]
public async Task TestStreamFiltering()
{
// Arrange
var streamProvider = GetStreamProvider("MyStreamProvider");
var stream = streamProvider.GetStream<int>(StreamId.Create("numbers", "test"));
var receivedEvents = new List<int>();
// Subscribe with filter to only receive even numbers
await stream.SubscribeAsync(
onNext: async (num, token) => receivedEvents.Add(num),
token: null,
filterData: "even" // Assuming filter uses this
);
// Act - Publish both even and odd numbers
for (int i = 1; i <= 10; i++)
{
await stream.OnNextAsync(i);
}
await Task.Delay(1000); // Allow processing
// Assert - Should only have received even numbers
Assert.All(receivedEvents, num => Assert.True(num % 2 == 0));
Assert.Equal(5, receivedEvents.Count); // Should have 2, 4, 6, 8, 10
}
[Fact]
public async Task TestMultipleSubscriptionsWithDifferentFilters()
{
// Arrange
var streamProvider = GetStreamProvider("MyStreamProvider");
var stream = streamProvider.GetStream<int>(StreamId.Create("numbers", "test"));
var evenNumbers = new List<int>();
var oddNumbers = new List<int>();
// Two subscriptions with different filter data
await stream.SubscribeAsync(
onNext: async (num, token) => evenNumbers.Add(num),
token: null,
filterData: "even"
);
await stream.SubscribeAsync(
onNext: async (num, token) => oddNumbers.Add(num),
token: null,
filterData: "odd"
);
// Act
for (int i = 1; i <= 10; i++)
{
await stream.OnNextAsync(i);
}
await Task.Delay(1000);
// Assert
Assert.All(evenNumbers, num => Assert.True(num % 2 == 0));
Assert.All(oddNumbers, num => Assert.True(num % 2 != 0));
}
}You can only configure one IStreamFilter implementation per stream provider. If you need different filtering logic, you should:
- Use different stream providers, OR
- Implement complex logic within a single filter that uses
filterDatato determine behavior
The filter data passed during subscription must be a string. For complex data:
- Use CSV format:
"value1,value2,value3" - Use JSON:
"{\"type\":\"A\",\"priority\":1}" - Use custom encoding schemes
Example with JSON:
public class JsonFilterDataFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
if (string.IsNullOrEmpty(filterData))
return true;
try
{
var filter = JsonSerializer.Deserialize<FilterCriteria>(filterData);
// Use filter criteria to evaluate item
return EvaluateItem(item, filter);
}
catch
{
return true; // On error, deliver the item
}
}
}- Filtering happens on the consumer side
- All events still flow through the stream infrastructure
- The filter is invoked for every event and every subscription
- Keep filter logic lightweight and efficient
- Consider caching parsed
filterDataif possible (though be aware of thread-safety)
Different consumers can subscribe to the same stream with different filterData values:
// Consumer 1: Only active sessions
await stream.SubscribeAsync(handler1, null, "Active");
// Consumer 2: Only completed sessions
await stream.SubscribeAsync(handler2, null, "Completed");
// Consumer 3: All sessions
await stream.SubscribeAsync(handler3, null, null);- Available in Orleans 7.0+
- Fully compatible with Orleans 8.0+ and .NET 9
- Works with all stream providers (Memory, Azure Queue, Event Hub, etc.)
// Silo configuration
public static ISiloBuilder AddStreamFilter<T>(
this ISiloBuilder builder,
string providerName)
where T : IStreamFilter
// Client configuration
public static IClientBuilder AddStreamFilter<T>(
this IClientBuilder builder,
string providerName)
where T : IStreamFilter// Basic subscription (no filter)
Task<StreamSubscriptionHandle<T>> SubscribeAsync(
IAsyncObserver<T> observer)
// Subscription with sequence token and filter data
Task<StreamSubscriptionHandle<T>> SubscribeAsync(
IAsyncObserver<T> observer,
StreamSequenceToken? token,
string? filterData = null)
// Lambda-based subscription with filter data
Task<StreamSubscriptionHandle<T>> SubscribeAsync(
Func<T, StreamSequenceToken, Task> onNextAsync,
StreamSequenceToken? token = null,
string? filterData = null)Here's a complete example for the Clever Pricing Engine use case:
// Filter implementation
public class ChargingSessionFilter : IStreamFilter
{
public bool ShouldDeliver(StreamId streamId, object item, string filterData)
{
if (string.IsNullOrEmpty(filterData))
return true;
// Support multiple filter criteria
// Format: "Status=Active;LocationId=LOC001;ConnectorId=CONN123"
var criteria = ParseFilterData(filterData);
if (item is ChargingSessionEvent sessionEvent)
{
if (criteria.TryGetValue("Status", out var status) &&
!sessionEvent.Status.Equals(status, StringComparison.OrdinalIgnoreCase))
{
return false;
}
if (criteria.TryGetValue("LocationId", out var locationId) &&
!sessionEvent.LocationId.Equals(locationId, StringComparison.OrdinalIgnoreCase))
{
return false;
}
if (criteria.TryGetValue("ConnectorId", out var connectorId) &&
!sessionEvent.ConnectorId.Equals(connectorId, StringComparison.OrdinalIgnoreCase))
{
return false;
}
return true;
}
return true;
}
private Dictionary<string, string> ParseFilterData(string filterData)
{
return filterData
.Split(';', StringSplitOptions.RemoveEmptyEntries)
.Select(part => part.Split('=', 2))
.Where(parts => parts.Length == 2)
.ToDictionary(
parts => parts[0].Trim(),
parts => parts[1].Trim(),
StringComparer.OrdinalIgnoreCase);
}
}
// Configuration in AppHost or Silo
siloBuilder
.AddMemoryStreams("ChargingSessionStream")
.AddStreamFilter<ChargingSessionFilter>("ChargingSessionStream");
// Usage in grain
public class LocationMonitorGrain : Grain, ILocationMonitorGrain
{
public async Task MonitorLocation(string locationId)
{
var streamProvider = this.GetStreamProvider("ChargingSessionStream");
var stream = streamProvider.GetStream<ChargingSessionEvent>(
StreamId.Create("charging-sessions", "all"));
var filterData = $"Status=Active;LocationId={locationId}";
await stream.SubscribeAsync(
async (sessionEvent, token) =>
{
// Only receives active sessions for this location
await HandleChargingSessionEvent(sessionEvent);
},
token: null,
filterData: filterData
);
}
}- Verify the filter is registered with the correct provider name
- Ensure
filterDatais being passed during subscription - Check that the filter's
ShouldDelivermethod is returning the expected boolean value - Add logging to the filter to debug what's being evaluated
- Optimize the
ShouldDelivermethod - it's called for every event/subscription combination - Avoid expensive operations (database calls, network requests)
- Consider caching parsed filter data if the same
filterDatais used repeatedly - Use efficient string comparison methods
Since only one filter per provider is supported:
- Use multiple stream providers if you need completely different filtering logic
- Implement a composite filter that uses
filterDatato determine behavior - Consider using stream namespaces to logically separate streams
- Orleans Streaming Documentation
- IStreamFilter API Reference
- Orleans GitHub PR #6739 - Stream filter implementation
- Orleans Streaming APIs
IStreamFilter provides a simple, effective way to filter stream events in Orleans on a per-consumer basis. Key takeaways:
- ✅ Simple interface with one method:
ShouldDeliver - ✅ Configured per stream provider
- ✅ Consumers pass string
filterDataduring subscription - ✅ Enables fine-grained control over which events each subscriber receives
- ✅ Works with all Orleans stream providers
- ✅ Compatible with .NET 9 and Orleans 8.0+