Last active
September 24, 2025 23:24
-
-
Save egil/be9c2510d43ab1269c4dca5e136de024 to your computer and use it in GitHub Desktop.
Orleans test set up for xUnit 3 using class fixture, stream support, observable storage writing, service injection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class ExampleTest(SiloFixture fixture) : IClassFixture<SiloFixture> | |
| { | |
| [Fact] | |
| public async Task Sending_data_to_stream() | |
| { | |
| var message = "foo bar baz"; | |
| var stream = fixture.GetStream<string>("grainId1"); | |
| await stream.OnNextAsync(message); | |
| await fixture.WaitForWrites("grainId1", writeCount: 1); // blocks tests until grain has written once to its storage provider | |
| var latestGrainMessage = await GetGrain<IMyGrain)("grainId1").GetLastMessageAsync(); | |
| Assert.Equal(message, latestGrainMessage); | |
| } | |
| [Fact] | |
| public async Task Sending_data_to_stream_no_explicit_writeCount() | |
| { | |
| var message = "foo bar baz"; | |
| var stream = fixture.GetStream<string>("grainId1"); | |
| await stream.OnNextAsync(message); | |
| // Tries the assertion lambda after each write to the grain | |
| await fixture.WaitForAssertion<IMyGrain>("grainId1", async myGrain => Assert.Equal(message, await myGrain.GetLastMessageAsync())); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Egil.Orleans.Storage.Testing; | |
| using Microsoft.Extensions.DependencyInjection; | |
| using Orleans.Streams; | |
| using Orleans.TestingHost; | |
| using TimeProviderExtensions; | |
| using R3; | |
| public sealed class SiloFixture : IAsyncLifetime, IGrainFactory | |
| { | |
| private const string TestStreamProviderName = "TestStreamProvider"; | |
| private IStreamProvider? streamProvider; | |
| private InProcessTestCluster? cluster; | |
| private IGrainFactory? grainFactory; | |
| public StorageObserverAggregator StorageObserver { get; } | |
| public ManualTimeProvider TimeProvider { get; } | |
| public SiloFixture() | |
| { | |
| TimeProvider = new ManualTimeProvider(DateTimeOffset.UtcNow); | |
| StorageObserver = new StorageObserverAggregator(); | |
| } | |
| public async ValueTask InitializeAsync() | |
| { | |
| var builder = new InProcessTestClusterBuilder(initialSilosCount: 1); | |
| builder.ConfigureSilo((options, siloBuilder) => | |
| { | |
| // add other service registrations here. to make the services accessible in tests, | |
| // provide instances of the service directly and keep a reference to them | |
| // in the SiloFixture, e.g., through properties, as done with TimeProvider below. | |
| siloBuilder.Services.AddKeyedSingleton<TimeProvider>("DomainTimeProvider", TimeProvider); | |
| // Set up default storage and make that observable in tests | |
| siloBuilder.AddMemoryGrainStorageAsDefault(options => options.GrainStorageSerializer = new SystemTextJsonGrainStorageSerializer(PricingEngineJsonSerializerOptions.Instance)) | |
| .MakeDefaultGrainStorageObservable(StorageObserver); | |
| // Set up custom storage and make that observable in tests | |
| siloBuilder.AddMemoryGrainStorage( | |
| "Custom", | |
| options => options.GrainStorageSerializer = new SystemTextJsonGrainStorageSerializer(PricingEngineJsonSerializerOptions.Instance)) | |
| .MakeGrainStorageObservable( | |
| "Custom", | |
| StorageObserver); | |
| siloBuilder.AddMemoryStreams( | |
| TestStreamProviderName, | |
| configurator => | |
| { | |
| configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); | |
| }); | |
| }); | |
| builder.ConfigureClient((clientBuilder) => | |
| { | |
| clientBuilder.AddMemoryStreams(TestStreamProviderName); | |
| }); | |
| cluster = builder.Build(); | |
| await cluster.DeployAsync(); | |
| grainFactory = cluster.Client; | |
| streamProvider = cluster.Client.GetStreamProvider(TestStreamProviderName); | |
| } | |
| public async ValueTask DisposeAsync() | |
| { | |
| if (cluster is not null) | |
| { | |
| await cluster.StopAllSilosAsync(); | |
| } | |
| StorageObserver.Dispose(); | |
| } | |
| public IAsyncStream<T> GetStream<T>(string @namespace, string key) | |
| => streamProvider?.GetStream<T>(StreamId.Create(@namespace, key)) | |
| ?? throw new InvalidOperationException("Cluster not ready"); | |
| [StackTraceHidden] | |
| public Task WaitForWrites(GrainId grainId, int writeCount, TimeSpan? timeout = null) | |
| { | |
| var actualCount = 0; | |
| var task = StorageObserver | |
| .GetCompleteOperationFeed(grainId) | |
| .Where(x => x.Kind is StorageOperationKind.Write) | |
| .Index() | |
| .Do(x => | |
| { | |
| actualCount++; | |
| }) | |
| .TakeUntil(x => x.Index == writeCount - 1) | |
| .WaitAsync(TestContext.Current.CancellationToken); | |
| if (!Debugger.IsAttached) | |
| { | |
| task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5)); | |
| } | |
| return task.ContinueWith(t => | |
| { | |
| if (t.IsFaulted) | |
| { | |
| var innerException = t.Exception?.InnerException ?? t.Exception; | |
| var xunitException = new XunitException($"Timed out waiting for write operations for grain." + | |
| $"GrainId = {grainId}{Environment.NewLine}" + | |
| $"Expected: {writeCount}{Environment.NewLine}" + | |
| $"Actual: {actualCount}{Environment.NewLine}", innerException); | |
| return Task.FromException(xunitException); | |
| } | |
| if (t.IsCanceled) | |
| { | |
| var xunitException = new XunitException($"Timed out waiting for write operations for grain." + | |
| $"GrainId = {grainId}{Environment.NewLine}" + | |
| $"Expected: {writeCount}{Environment.NewLine}" + | |
| $"Actual: {actualCount}{Environment.NewLine}"); | |
| return Task.FromException(xunitException); | |
| } | |
| return Task.CompletedTask; | |
| }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); | |
| } | |
| [StackTraceHidden] | |
| public Task WaitForAssertion<TGrain>(string grainId, Func<TGrain, Task> assertionAction, TimeSpan? timeout = null) where TGrain : IGrainWithStringKey | |
| { | |
| var grain = GetGrain<TGrain>(grainId); | |
| var asserctionException = default(Exception); | |
| var task = StorageObserver | |
| .GetLiveOperationFeed(grain.GetGrainId()) | |
| .Where(x => x.Kind is StorageOperationKind.Write) | |
| .Index() | |
| .WhereAwait(async (x, ct) => | |
| { | |
| try | |
| { | |
| await assertionAction(grain); | |
| return true; | |
| } | |
| catch (Exception exception) | |
| { | |
| asserctionException = exception; | |
| return false; | |
| } | |
| }) | |
| .Select(x => x.Index) | |
| .FirstAsync(TestContext.Current.CancellationToken); | |
| if (!Debugger.IsAttached) | |
| { | |
| task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5)); | |
| } | |
| return task.ContinueWith(t => | |
| { | |
| if (t.IsFaulted && asserctionException is not null) | |
| { | |
| return Task.FromException(asserctionException); | |
| } | |
| if (t.IsCanceled && asserctionException is not null) | |
| { | |
| return Task.FromException(asserctionException); | |
| } | |
| return Task.CompletedTask; | |
| }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); | |
| } | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public TGrainObserverInterface CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).CreateObjectReference<TGrainObserverInterface>(obj); | |
| public void DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).DeleteObjectReference<TGrainObserverInterface>(obj); | |
| public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey); | |
| public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey); | |
| public IGrain GetGrain(Type grainInterfaceType, string grainPrimaryKey) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey); | |
| public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey, string keyExtension) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension); | |
| public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey, string keyExtension) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension); | |
| public TGrainInterface GetGrain<TGrainInterface>(GrainId grainId) where TGrainInterface : IAddressable | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(grainId); | |
| public IAddressable GetGrain(GrainId grainId) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId); | |
| public IAddressable GetGrain(GrainId grainId, GrainInterfaceType interfaceType) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId, interfaceType); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Orleans.Storage; | |
| using R3; | |
| namespace Egil.Orleans.Storage.Testing; | |
| public sealed class StorageObserver(string storageName, IGrainStorage observableTarget) : IGrainStorage | |
| { | |
| private readonly Subject<StorageOperation> operationsFeed = new(); | |
| public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable(); | |
| public string StorageName { get; } = storageName; | |
| public async Task ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState) | |
| { | |
| await observableTarget.ClearStateAsync(stateName, grainId, grainState); | |
| operationsFeed.OnNext( | |
| new StorageOperation( | |
| StorageOperationKind.Clear, | |
| grainId, | |
| StorageName, | |
| stateName, | |
| grainState.ETag, | |
| grainState.State)); | |
| } | |
| public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState) | |
| { | |
| await observableTarget.ReadStateAsync(stateName, grainId, grainState); | |
| operationsFeed.OnNext( | |
| new StorageOperation( | |
| StorageOperationKind.Read, | |
| grainId, | |
| StorageName, | |
| stateName, | |
| grainState.ETag, | |
| grainState.State)); | |
| } | |
| public async Task WriteStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState) | |
| { | |
| await observableTarget.WriteStateAsync(stateName, grainId, grainState); | |
| operationsFeed.OnNext( | |
| new StorageOperation( | |
| StorageOperationKind.Write, | |
| grainId, | |
| StorageName, | |
| stateName, | |
| grainState.ETag, | |
| grainState.State)); | |
| } | |
| } | |
| public enum StorageOperationKind | |
| { | |
| Clear, | |
| Read, | |
| Write | |
| } | |
| public record class StorageOperation( | |
| StorageOperationKind Kind, | |
| GrainId GrainId, | |
| string StorageName, | |
| string StateName, | |
| string? Etag, | |
| object? State); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using R3; | |
| namespace Egil.Orleans.Storage.Testing; | |
| public sealed class StorageObserverAggregator : IDisposable | |
| { | |
| private readonly Lock subLock = new(); | |
| private readonly Dictionary<string, Observable<StorageOperation>> feeds = []; | |
| private readonly List<StorageOperation> history = []; | |
| private Subject<StorageOperation> operationsFeed = new(); | |
| private IDisposable childObserverSubscriptions = Disposable.Empty; | |
| public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable(); | |
| public void Dispose() | |
| { | |
| childObserverSubscriptions.Dispose(); | |
| operationsFeed.Dispose(); | |
| history.Clear(); | |
| } | |
| public Observable<StorageOperation> GetCompleteOperationFeed(GrainId grainId) | |
| { | |
| lock (subLock) | |
| { | |
| var pastOperations = history.Where(x => x.GrainId == grainId).ToArray(); | |
| return StorageOperationFeed.Prepend(pastOperations).Where(x => x.GrainId == grainId); | |
| } | |
| } | |
| public Observable<StorageOperation> GetLiveOperationFeed(GrainId grainId) | |
| { | |
| lock (subLock) | |
| { | |
| var latestByGrainId = history.LastOrDefault(x => x.GrainId == grainId); | |
| return latestByGrainId is not null | |
| ? StorageOperationFeed.Where(x => x.GrainId == grainId).Prepend(latestByGrainId) | |
| : StorageOperationFeed.Where(x => x.GrainId == grainId); | |
| } | |
| } | |
| internal void AddObserver(string observerId, Observable<StorageOperation> feed) | |
| { | |
| lock (subLock) | |
| { | |
| feeds.Add(observerId, feed); | |
| var subscription = feed.Subscribe(item => | |
| { | |
| lock (subLock) | |
| { | |
| history.Add(item); | |
| operationsFeed.OnNext(item); | |
| } | |
| }); | |
| childObserverSubscriptions = Disposable.Combine(childObserverSubscriptions, subscription); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.Extensions.DependencyInjection; | |
| using Orleans.Providers; | |
| using Orleans.Storage; | |
| namespace Egil.Orleans.Storage.Testing; | |
| public static class StorageObserverSiloBuilderExtensions | |
| { | |
| public static ISiloBuilder MakeDefaultGrainStorageObservable(this ISiloBuilder siloBuilder, StorageObserverAggregator observerAggregator) | |
| => siloBuilder.MakeGrainStorageObservable(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, observerAggregator); | |
| public static ISiloBuilder MakeGrainStorageObservable(this ISiloBuilder siloBuilder, string name, StorageObserverAggregator observerAggregator) | |
| { | |
| ArgumentNullException.ThrowIfNull(siloBuilder); | |
| ArgumentNullException.ThrowIfNull(name); | |
| ArgumentNullException.ThrowIfNull(observerAggregator); | |
| var target = siloBuilder.Services.LastOrDefault(x => x.IsKeyedService && x.ServiceKey?.Equals(name) == true && x.KeyedImplementationFactory is not null) | |
| ?? throw new InvalidOperationException($"No grain storage provider with name '{name}' was found."); | |
| siloBuilder.Services.Remove(target); | |
| siloBuilder.Services.AddKeyedSingleton<IGrainStorage>( | |
| name, | |
| (sp, key) => | |
| { | |
| var inner = (IGrainStorage)target.KeyedImplementationFactory!(sp, key); | |
| var observer = new StorageObserver(name, inner); | |
| observerAggregator.AddObserver(observer.StorageName, observer.StorageOperationFeed); | |
| return observer; | |
| }); | |
| return siloBuilder; | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
NOTE, these packages are needed:
Code should be easily modifiable to run with other test frameworks. Not a lot here is xUnit specific.