Last active
November 16, 2025 16:53
-
-
Save to11mtm/45724cadbfafcbfaa9d7801619c35425 to your computer and use it in GitHub Desktop.
FlowWithResourceAsync example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Akka.Annotations; | |
| using Akka.Streams.Dsl; | |
| using Akka.Streams.Implementation.Stages; | |
| using Akka.Streams.Stage; | |
| using Akka.Streams.Supervision; | |
| using Akka.Util; | |
| using Akka.Util.Internal; | |
| namespace Akka.Streams.NATS.KeyValueStore.AkkaStreamsExtensions; | |
| /// <summary> | |
| /// DSL extensions for creating flows and sources with managed async resources ✨ | |
| /// </summary> | |
| /// <remarks> | |
| /// CopilotNotes: These extensions provide resource-safe stream operations where resources | |
| /// are automatically created, managed, and cleaned up during stream lifecycle events. | |
| /// Perfect for working with connections, file handles, or any IDisposable-like resources! UwU | |
| /// </remarks> | |
| public class FlowWithResourceAsyncDsl | |
| { | |
| /// <summary> | |
| /// Creates a Flow that processes elements using a managed async resource! (◕‿◕)✨ | |
| /// </summary> | |
| /// <typeparam name="TIn">Input element type</typeparam> | |
| /// <typeparam name="TOut">Output element type</typeparam> | |
| /// <typeparam name="TResource">Resource type that will be created and managed</typeparam> | |
| /// <param name="create">Factory function to create the resource asynchronously</param> | |
| /// <param name="process">Function to process each input element using the resource</param> | |
| /// <param name="close">Function to close/cleanup the resource when done</param> | |
| /// <returns>A Flow that processes elements with the managed resource nya~</returns> | |
| /// <remarks> | |
| /// CopilotNotes: The resource is created on PreStart, used for all elements flowing through, | |
| /// and properly cleaned up on PostStop. Supervision strategies are respected for error handling! UwU | |
| /// </remarks> | |
| public static Flow<TIn, TOut, NotUsed> FlowWithResourceAsync<TIn, TOut, TResource>( | |
| Func<Task<TResource>> create, | |
| Func<TResource, TIn, Task<TOut>> process, | |
| Func<TResource, Task> close) | |
| { | |
| return Flow.FromGraph(new FlowWithResourceAsyncStage<TIn, TOut, TResource>(create, process, close)); | |
| } | |
| } | |
| /// <summary> | |
| /// A Flow stage that manages an async resource lifecycle and uses it to process incoming elements! (ノ◕ヮ◕)ノ*:・゚✧ | |
| /// </summary> | |
| /// <typeparam name="TIn">Type of elements flowing in</typeparam> | |
| /// <typeparam name="TOut">Type of elements flowing out</typeparam> | |
| /// <typeparam name="TResource">Type of the managed resource</typeparam> | |
| /// <remarks> | |
| /// CopilotNotes: This stage creates a resource on PreStart, processes each element with it, | |
| /// and ensures cleanup on PostStop. Supervision strategies control error handling behavior. | |
| /// Resource creation/usage errors are handled according to the supervision decider! uwu | |
| /// </remarks> | |
| [InternalApi] | |
| public sealed class FlowWithResourceAsyncStage<TIn, TOut, TResource> : GraphStage<FlowShape<TIn, TOut>> | |
| { | |
| #region Logic | |
| private sealed class Logic : GraphStageLogic | |
| { | |
| private readonly FlowWithResourceAsyncStage<TIn, TOut, TResource> _stage; | |
| private readonly Lazy<Decider> _decider; | |
| private readonly Lazy<IAsyncCallback<Try<TResource>>> _createdCallback; | |
| private Option<TResource> _resource = Option<TResource>.None; | |
| private int _inFlightProcessing; // Track pending async processing operations! ✨ | |
| private bool _upstreamFinished; | |
| public Logic(FlowWithResourceAsyncStage<TIn, TOut, TResource> stage, Attributes inheritedAttributes) | |
| : base(stage.Shape) | |
| { | |
| _stage = stage; | |
| _inFlightProcessing = 0; | |
| _upstreamFinished = false; | |
| // Initialize supervision decider lazily ✨ | |
| _decider = new Lazy<Decider>(() => | |
| { | |
| var strategy = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null); | |
| return strategy != null ? strategy.Decider : Deciders.StoppingDecider; | |
| }); | |
| // Callback for when resource creation completes (◕‿◕) | |
| _createdCallback = new Lazy<IAsyncCallback<Try<TResource>>>(() => | |
| GetTypedAsyncCallback<Try<TResource>>(result => | |
| { | |
| if (result.IsSuccess) | |
| { | |
| _resource = result.Success.Value; | |
| // If we have demand and input is available, pull from upstream nya~ | |
| if (!HasBeenPulled(_stage.In) && IsAvailable(_stage.Out)) | |
| { | |
| Pull(_stage.In); | |
| } | |
| } | |
| else | |
| { | |
| FailStage(result.Failure.Value); | |
| } | |
| })); | |
| SetHandler(_stage.In, | |
| onPush: () => | |
| { | |
| // Got an element from upstream! (੭ु。╹▿╹。)੭ु⁾⁾ | |
| if (_resource.HasValue) | |
| { | |
| var input = Grab(_stage.In); | |
| ProcessElement(input); | |
| } | |
| else | |
| { | |
| // Resource not ready yet - this shouldn't happen because we only pull when ready | |
| // But handle it gracefully by failing the stage uwu | |
| FailStage(new InvalidOperationException("Received element before resource was created")); | |
| } | |
| }, | |
| onUpstreamFinish: () => | |
| { | |
| // Upstream completed! Mark it and complete only if no pending operations uwu | |
| _upstreamFinished = true; | |
| // Only complete if all async processing has finished! (◕‿◕)♡ | |
| if (_inFlightProcessing == 0) | |
| { | |
| CompleteStage(); | |
| } | |
| // Otherwise, the last processing callback will complete us! ✨ | |
| }, | |
| onUpstreamFailure: ex => | |
| { | |
| // Upstream failed! Propagate it (╥﹏╥) | |
| FailStage(ex); | |
| }); | |
| SetHandler(_stage.Out, | |
| onPull: () => | |
| { | |
| // Downstream wants data! Pull from upstream if resource is ready ✨ | |
| if (_resource.HasValue && !HasBeenPulled(_stage.In)) | |
| { | |
| Pull(_stage.In); | |
| } | |
| // If resource isn't ready yet, we'll pull once it's created | |
| }); | |
| } | |
| private IAsyncCallback<Try<TResource>> CreatedCallback => _createdCallback.Value; | |
| /// <summary> | |
| /// Processes an element using the managed resource (。♥‿♥。) | |
| /// </summary> | |
| private void ProcessElement(TIn input) | |
| { | |
| if (!_resource.HasValue) | |
| { | |
| FailStage(new InvalidOperationException("Cannot process element without resource")); | |
| return; | |
| } | |
| var resource = _resource.Value; | |
| // Track that we have processing in flight! ✨ | |
| _inFlightProcessing++; | |
| try | |
| { | |
| // Process the element with the resource asynchronously! ✧*。٩(ˊᗜˋ*)و✧*。 | |
| _stage._process(resource, input).OnComplete(GetAsyncCallback<Try<TOut>>(result => | |
| { | |
| // Decrement in-flight counter! (◕‿◕)♡ | |
| _inFlightProcessing--; | |
| if (result.IsSuccess) | |
| { | |
| // Push the processed result downstream! (ノ◕ヮ◕)ノ*:・゚✧ | |
| Push(_stage.Out, result.Success.Value); | |
| // Check if we should complete now! uwu | |
| if (_upstreamFinished && _inFlightProcessing == 0) | |
| { | |
| CompleteStage(); | |
| } | |
| } | |
| else | |
| { | |
| // Error during processing! Handle according to supervision strategy (。•́︿•̀。) | |
| HandleError(result.Failure.Value); | |
| } | |
| })); | |
| } | |
| catch (Exception ex) | |
| { | |
| // Synchronous error during process invocation - decrement counter! (。•́︿•̀。) | |
| _inFlightProcessing--; | |
| HandleError(ex); | |
| } | |
| } | |
| /// <summary> | |
| /// Handles errors according to supervision strategy (๑•̀ㅂ•́)و✧ | |
| /// </summary> | |
| private void HandleError(Exception ex) | |
| { | |
| switch (_decider.Value(ex)) | |
| { | |
| case Directive.Stop: | |
| // Stop the stream on error ⊂(◉‿◉)つ | |
| FailStage(ex); | |
| break; | |
| case Directive.Restart: | |
| // Restart the resource! (ง •̀_•́)ง | |
| try | |
| { | |
| RestartResource(); | |
| } | |
| catch (Exception restartEx) | |
| { | |
| FailStage(restartEx); | |
| } | |
| break; | |
| case Directive.Resume: | |
| // Skip this element and continue! (◕‿◕) | |
| if (!HasBeenPulled(_stage.In) && IsAvailable(_stage.Out)) | |
| { | |
| Pull(_stage.In); | |
| } | |
| break; | |
| default: | |
| throw new ArgumentOutOfRangeException(nameof(ex), "Unknown supervision directive"); | |
| } | |
| } | |
| /// <summary> | |
| /// Creates the resource asynchronously! ✨ | |
| /// </summary> | |
| private void CreateResource() | |
| { | |
| _stage._create().OnComplete(GetAsyncCallback<Try<TResource>>(resource => | |
| { | |
| async Task InvokeCallback() | |
| { | |
| try | |
| { | |
| await CreatedCallback.InvokeWithFeedback(resource); | |
| } | |
| catch (StreamDetachedException) | |
| { | |
| // Stream stopped before resource callback - cleanup if needed (。•́︿•̀。) | |
| if (resource.IsSuccess) | |
| { | |
| #pragma warning disable CS4014 // Fire-and-forget cleanup is intentional | |
| _stage._close(resource.Success.Value); | |
| #pragma warning restore CS4014 | |
| } | |
| } | |
| } | |
| _ = InvokeCallback(); | |
| })); | |
| } | |
| /// <summary> | |
| /// Restarts the resource by closing and recreating it! (ง'̀-'́)ง | |
| /// </summary> | |
| private void RestartResource() | |
| { | |
| if (_resource.HasValue) | |
| { | |
| var oldResource = _resource.Value; | |
| _resource = Option<TResource>.None; | |
| // Close the old resource before creating a new one (。•̀ᴗ-)✧ | |
| _stage._close(oldResource).OnComplete(GetAsyncCallback<Try<Done>>(done => | |
| { | |
| if (done.IsSuccess) CreateResource(); | |
| else FailStage(done.Failure.Value); | |
| })); | |
| } | |
| else | |
| { | |
| CreateResource(); | |
| } | |
| } | |
| public override void PreStart() | |
| { | |
| // Keep the stage alive even after upstream finishes until we explicitly complete! (◕‿◕)♡ | |
| // This prevents PostStop from being called while async processing is in flight~! ✨ | |
| SetKeepGoing(true); | |
| // Create the resource when the stage starts! (ノ◕ヮ◕)ノ*:・゚✧ | |
| CreateResource(); | |
| } | |
| public override void PostStop() | |
| { | |
| // Clean up the resource when the stage stops! (◕‿◕✿) | |
| if (_resource.HasValue) | |
| { | |
| #pragma warning disable CS4014 // Fire-and-forget cleanup is intentional for PostStop | |
| _stage._close(_resource.Value); | |
| #pragma warning restore CS4014 | |
| } | |
| } | |
| } | |
| #endregion | |
| private readonly Func<Task<TResource>> _create; | |
| private readonly Func<TResource, TIn, Task<TOut>> _process; | |
| private readonly Func<TResource, Task> _close; | |
| /// <summary> | |
| /// Creates a new FlowWithResourceAsync stage! ✨(ノ◕ヮ◕)ノ*:・゚✧ | |
| /// </summary> | |
| /// <param name="create">Factory to create the resource asynchronously</param> | |
| /// <param name="process">Function to process each element with the resource</param> | |
| /// <param name="close">Function to close/cleanup the resource</param> | |
| /// <remarks> | |
| /// CopilotNotes: The resource lifecycle is managed automatically - created on PreStart, | |
| /// used for all elements, and cleaned up on PostStop. Perfect for database connections, | |
| /// HTTP clients, or any stateful resource that needs proper cleanup! uwu | |
| /// </remarks> | |
| public FlowWithResourceAsyncStage( | |
| Func<Task<TResource>> create, | |
| Func<TResource, TIn, Task<TOut>> process, | |
| Func<TResource, Task> close) | |
| { | |
| _create = create ?? throw new ArgumentNullException(nameof(create)); | |
| _process = process ?? throw new ArgumentNullException(nameof(process)); | |
| _close = close ?? throw new ArgumentNullException(nameof(close)); | |
| In = new Inlet<TIn>("FlowWithResourceAsync.in"); | |
| Out = new Outlet<TOut>("FlowWithResourceAsync.out"); | |
| Shape = new FlowShape<TIn, TOut>(In, Out); | |
| } | |
| /// <summary> | |
| /// The inlet for incoming elements (◕‿◕) | |
| /// </summary> | |
| public Inlet<TIn> In { get; } | |
| /// <summary> | |
| /// The outlet for processed elements! ✨ | |
| /// </summary> | |
| public Outlet<TOut> Out { get; } | |
| /// <summary> | |
| /// The flow shape of this stage nya~ | |
| /// </summary> | |
| public override FlowShape<TIn, TOut> Shape { get; } | |
| /// <summary> | |
| /// Default attributes for this stage (◕‿◕✿) | |
| /// </summary> | |
| protected override Attributes InitialAttributes => DefaultAttributes.UnfoldResourceSourceAsync; | |
| /// <summary> | |
| /// Creates the graph stage logic! (ノ◕ヮ◕)ノ*:・゚✧ | |
| /// </summary> | |
| protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) | |
| => new Logic(this, inheritedAttributes); | |
| /// <summary> | |
| /// String representation uwu | |
| /// </summary> | |
| public override string ToString() => "FlowWithResourceAsync"; | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment