Skip to content

Instantly share code, notes, and snippets.

@egil
Last active September 24, 2025 23:24
Show Gist options
  • Select an option

  • Save egil/be9c2510d43ab1269c4dca5e136de024 to your computer and use it in GitHub Desktop.

Select an option

Save egil/be9c2510d43ab1269c4dca5e136de024 to your computer and use it in GitHub Desktop.
Orleans test set up for xUnit 3 using class fixture, stream support, observable storage writing, service injection
public class ExampleTest(SiloFixture fixture) : IClassFixture<SiloFixture>
{
[Fact]
public async Task Sending_data_to_stream()
{
var message = "foo bar baz";
var stream = fixture.GetStream<string>("grainId1");
await stream.OnNextAsync(message);
await fixture.WaitForWrites("grainId1", writeCount: 1); // blocks tests until grain has written once to its storage provider
var latestGrainMessage = await GetGrain<IMyGrain)("grainId1").GetLastMessageAsync();
Assert.Equal(message, latestGrainMessage);
}
[Fact]
public async Task Sending_data_to_stream_no_explicit_writeCount()
{
var message = "foo bar baz";
var stream = fixture.GetStream<string>("grainId1");
await stream.OnNextAsync(message);
// Tries the assertion lambda after each write to the grain
await fixture.WaitForAssertion<IMyGrain>("grainId1", async myGrain => Assert.Equal(message, await myGrain.GetLastMessageAsync()));
}
}
using Egil.Orleans.Storage.Testing;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Streams;
using Orleans.TestingHost;
using TimeProviderExtensions;
using R3;
public sealed class SiloFixture : IAsyncLifetime, IGrainFactory
{
private const string TestStreamProviderName = "TestStreamProvider";
private IStreamProvider? streamProvider;
private InProcessTestCluster? cluster;
private IGrainFactory? grainFactory;
public StorageObserverAggregator StorageObserver { get; }
public ManualTimeProvider TimeProvider { get; }
public SiloFixture()
{
TimeProvider = new ManualTimeProvider(DateTimeOffset.UtcNow);
StorageObserver = new StorageObserverAggregator();
}
public async ValueTask InitializeAsync()
{
var builder = new InProcessTestClusterBuilder(initialSilosCount: 1);
builder.ConfigureSilo((options, siloBuilder) =>
{
// add other service registrations here. to make the services accessible in tests,
// provide instances of the service directly and keep a reference to them
// in the SiloFixture, e.g., through properties, as done with TimeProvider below.
siloBuilder.Services.AddKeyedSingleton<TimeProvider>("DomainTimeProvider", TimeProvider);
// Set up default storage and make that observable in tests
siloBuilder.AddMemoryGrainStorageAsDefault(options => options.GrainStorageSerializer = new SystemTextJsonGrainStorageSerializer(PricingEngineJsonSerializerOptions.Instance))
.MakeDefaultGrainStorageObservable(StorageObserver);
// Set up custom storage and make that observable in tests
siloBuilder.AddMemoryGrainStorage(
"Custom",
options => options.GrainStorageSerializer = new SystemTextJsonGrainStorageSerializer(PricingEngineJsonSerializerOptions.Instance))
.MakeGrainStorageObservable(
"Custom",
StorageObserver);
siloBuilder.AddMemoryStreams(
TestStreamProviderName,
configurator =>
{
configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly);
});
});
builder.ConfigureClient((clientBuilder) =>
{
clientBuilder.AddMemoryStreams(TestStreamProviderName);
});
cluster = builder.Build();
await cluster.DeployAsync();
grainFactory = cluster.Client;
streamProvider = cluster.Client.GetStreamProvider(TestStreamProviderName);
}
public async ValueTask DisposeAsync()
{
if (cluster is not null)
{
await cluster.StopAllSilosAsync();
}
StorageObserver.Dispose();
}
public IAsyncStream<T> GetStream<T>(string @namespace, string key)
=> streamProvider?.GetStream<T>(StreamId.Create(@namespace, key))
?? throw new InvalidOperationException("Cluster not ready");
[StackTraceHidden]
public Task WaitForWrites(GrainId grainId, int writeCount, TimeSpan? timeout = null)
{
var actualCount = 0;
var task = StorageObserver
.GetCompleteOperationFeed(grainId)
.Where(x => x.Kind is StorageOperationKind.Write)
.Index()
.Do(x =>
{
actualCount++;
})
.TakeUntil(x => x.Index == writeCount - 1)
.WaitAsync(TestContext.Current.CancellationToken);
if (!Debugger.IsAttached)
{
task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5));
}
return task.ContinueWith(t =>
{
if (t.IsFaulted)
{
var innerException = t.Exception?.InnerException ?? t.Exception;
var xunitException = new XunitException($"Timed out waiting for write operations for grain." +
$"GrainId = {grainId}{Environment.NewLine}" +
$"Expected: {writeCount}{Environment.NewLine}" +
$"Actual: {actualCount}{Environment.NewLine}", innerException);
return Task.FromException(xunitException);
}
if (t.IsCanceled)
{
var xunitException = new XunitException($"Timed out waiting for write operations for grain." +
$"GrainId = {grainId}{Environment.NewLine}" +
$"Expected: {writeCount}{Environment.NewLine}" +
$"Actual: {actualCount}{Environment.NewLine}");
return Task.FromException(xunitException);
}
return Task.CompletedTask;
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
[StackTraceHidden]
public Task WaitForAssertion<TGrain>(string grainId, Func<TGrain, Task> assertionAction, TimeSpan? timeout = null) where TGrain : IGrainWithStringKey
{
var grain = GetGrain<TGrain>(grainId);
var asserctionException = default(Exception);
var task = StorageObserver
.GetLiveOperationFeed(grain.GetGrainId())
.Where(x => x.Kind is StorageOperationKind.Write)
.Index()
.WhereAwait(async (x, ct) =>
{
try
{
await assertionAction(grain);
return true;
}
catch (Exception exception)
{
asserctionException = exception;
return false;
}
})
.Select(x => x.Index)
.FirstAsync(TestContext.Current.CancellationToken);
if (!Debugger.IsAttached)
{
task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5));
}
return task.ContinueWith(t =>
{
if (t.IsFaulted && asserctionException is not null)
{
return Task.FromException(asserctionException);
}
if (t.IsCanceled && asserctionException is not null)
{
return Task.FromException(asserctionException);
}
return Task.CompletedTask;
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainObserverInterface CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).CreateObjectReference<TGrainObserverInterface>(obj);
public void DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).DeleteObjectReference<TGrainObserverInterface>(obj);
public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey);
public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey);
public IGrain GetGrain(Type grainInterfaceType, string grainPrimaryKey)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey);
public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey, string keyExtension)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension);
public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey, string keyExtension)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension);
public TGrainInterface GetGrain<TGrainInterface>(GrainId grainId) where TGrainInterface : IAddressable
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(grainId);
public IAddressable GetGrain(GrainId grainId)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId);
public IAddressable GetGrain(GrainId grainId, GrainInterfaceType interfaceType)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId, interfaceType);
}
using Orleans.Storage;
using R3;
namespace Egil.Orleans.Storage.Testing;
public sealed class StorageObserver(string storageName, IGrainStorage observableTarget) : IGrainStorage
{
private readonly Subject<StorageOperation> operationsFeed = new();
public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable();
public string StorageName { get; } = storageName;
public async Task ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.ClearStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Clear,
grainId,
StorageName,
stateName,
grainState.ETag,
grainState.State));
}
public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.ReadStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Read,
grainId,
StorageName,
stateName,
grainState.ETag,
grainState.State));
}
public async Task WriteStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.WriteStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Write,
grainId,
StorageName,
stateName,
grainState.ETag,
grainState.State));
}
}
public enum StorageOperationKind
{
Clear,
Read,
Write
}
public record class StorageOperation(
StorageOperationKind Kind,
GrainId GrainId,
string StorageName,
string StateName,
string? Etag,
object? State);
using R3;
namespace Egil.Orleans.Storage.Testing;
public sealed class StorageObserverAggregator : IDisposable
{
private readonly Lock subLock = new();
private readonly Dictionary<string, Observable<StorageOperation>> feeds = [];
private readonly List<StorageOperation> history = [];
private Subject<StorageOperation> operationsFeed = new();
private IDisposable childObserverSubscriptions = Disposable.Empty;
public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable();
public void Dispose()
{
childObserverSubscriptions.Dispose();
operationsFeed.Dispose();
history.Clear();
}
public Observable<StorageOperation> GetCompleteOperationFeed(GrainId grainId)
{
lock (subLock)
{
var pastOperations = history.Where(x => x.GrainId == grainId).ToArray();
return StorageOperationFeed.Prepend(pastOperations).Where(x => x.GrainId == grainId);
}
}
public Observable<StorageOperation> GetLiveOperationFeed(GrainId grainId)
{
lock (subLock)
{
var latestByGrainId = history.LastOrDefault(x => x.GrainId == grainId);
return latestByGrainId is not null
? StorageOperationFeed.Where(x => x.GrainId == grainId).Prepend(latestByGrainId)
: StorageOperationFeed.Where(x => x.GrainId == grainId);
}
}
internal void AddObserver(string observerId, Observable<StorageOperation> feed)
{
lock (subLock)
{
feeds.Add(observerId, feed);
var subscription = feed.Subscribe(item =>
{
lock (subLock)
{
history.Add(item);
operationsFeed.OnNext(item);
}
});
childObserverSubscriptions = Disposable.Combine(childObserverSubscriptions, subscription);
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Orleans.Providers;
using Orleans.Storage;
namespace Egil.Orleans.Storage.Testing;
public static class StorageObserverSiloBuilderExtensions
{
public static ISiloBuilder MakeDefaultGrainStorageObservable(this ISiloBuilder siloBuilder, StorageObserverAggregator observerAggregator)
=> siloBuilder.MakeGrainStorageObservable(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, observerAggregator);
public static ISiloBuilder MakeGrainStorageObservable(this ISiloBuilder siloBuilder, string name, StorageObserverAggregator observerAggregator)
{
ArgumentNullException.ThrowIfNull(siloBuilder);
ArgumentNullException.ThrowIfNull(name);
ArgumentNullException.ThrowIfNull(observerAggregator);
var target = siloBuilder.Services.LastOrDefault(x => x.IsKeyedService && x.ServiceKey?.Equals(name) == true && x.KeyedImplementationFactory is not null)
?? throw new InvalidOperationException($"No grain storage provider with name '{name}' was found.");
siloBuilder.Services.Remove(target);
siloBuilder.Services.AddKeyedSingleton<IGrainStorage>(
name,
(sp, key) =>
{
var inner = (IGrainStorage)target.KeyedImplementationFactory!(sp, key);
var observer = new StorageObserver(name, inner);
observerAggregator.AddObserver(observer.StorageName, observer.StorageOperationFeed);
return observer;
});
return siloBuilder;
}
}
@egil
Copy link
Author

egil commented Sep 24, 2025

NOTE, these packages are needed:

    <PackageReference Include="Microsoft.Orleans.TestingHost" />
    <PackageReference Include="R3" />
    <PackageReference Include="TimeProviderExtensions" />
    <PackageReference Include="xunit.v3" />

Code should be easily modifiable to run with other test frameworks. Not a lot here is xUnit specific.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment