Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Last active November 16, 2025 16:53
Show Gist options
  • Select an option

  • Save to11mtm/45724cadbfafcbfaa9d7801619c35425 to your computer and use it in GitHub Desktop.

Select an option

Save to11mtm/45724cadbfafcbfaa9d7801619c35425 to your computer and use it in GitHub Desktop.
FlowWithResourceAsync example
using Akka.Annotations;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Stages;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Akka.Util;
using Akka.Util.Internal;
namespace Akka.Streams.NATS.KeyValueStore.AkkaStreamsExtensions;
/// <summary>
/// DSL extensions for creating flows and sources with managed async resources ✨
/// </summary>
/// <remarks>
/// CopilotNotes: These extensions provide resource-safe stream operations where resources
/// are automatically created, managed, and cleaned up during stream lifecycle events.
/// Perfect for working with connections, file handles, or any IDisposable-like resources! UwU
/// </remarks>
public class FlowWithResourceAsyncDsl
{
/// <summary>
/// Creates a Flow that processes elements using a managed async resource! (◕‿◕)✨
/// </summary>
/// <typeparam name="TIn">Input element type</typeparam>
/// <typeparam name="TOut">Output element type</typeparam>
/// <typeparam name="TResource">Resource type that will be created and managed</typeparam>
/// <param name="create">Factory function to create the resource asynchronously</param>
/// <param name="process">Function to process each input element using the resource</param>
/// <param name="close">Function to close/cleanup the resource when done</param>
/// <returns>A Flow that processes elements with the managed resource nya~</returns>
/// <remarks>
/// CopilotNotes: The resource is created on PreStart, used for all elements flowing through,
/// and properly cleaned up on PostStop. Supervision strategies are respected for error handling! UwU
/// </remarks>
public static Flow<TIn, TOut, NotUsed> FlowWithResourceAsync<TIn, TOut, TResource>(
Func<Task<TResource>> create,
Func<TResource, TIn, Task<TOut>> process,
Func<TResource, Task> close)
{
return Flow.FromGraph(new FlowWithResourceAsyncStage<TIn, TOut, TResource>(create, process, close));
}
}
/// <summary>
/// A Flow stage that manages an async resource lifecycle and uses it to process incoming elements! (ノ◕ヮ◕)ノ*:・゚✧
/// </summary>
/// <typeparam name="TIn">Type of elements flowing in</typeparam>
/// <typeparam name="TOut">Type of elements flowing out</typeparam>
/// <typeparam name="TResource">Type of the managed resource</typeparam>
/// <remarks>
/// CopilotNotes: This stage creates a resource on PreStart, processes each element with it,
/// and ensures cleanup on PostStop. Supervision strategies control error handling behavior.
/// Resource creation/usage errors are handled according to the supervision decider! uwu
/// </remarks>
[InternalApi]
public sealed class FlowWithResourceAsyncStage<TIn, TOut, TResource> : GraphStage<FlowShape<TIn, TOut>>
{
#region Logic
private sealed class Logic : GraphStageLogic
{
private readonly FlowWithResourceAsyncStage<TIn, TOut, TResource> _stage;
private readonly Lazy<Decider> _decider;
private readonly Lazy<IAsyncCallback<Try<TResource>>> _createdCallback;
private Option<TResource> _resource = Option<TResource>.None;
private int _inFlightProcessing; // Track pending async processing operations! ✨
private bool _upstreamFinished;
public Logic(FlowWithResourceAsyncStage<TIn, TOut, TResource> stage, Attributes inheritedAttributes)
: base(stage.Shape)
{
_stage = stage;
_inFlightProcessing = 0;
_upstreamFinished = false;
// Initialize supervision decider lazily ✨
_decider = new Lazy<Decider>(() =>
{
var strategy = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
return strategy != null ? strategy.Decider : Deciders.StoppingDecider;
});
// Callback for when resource creation completes (◕‿◕)
_createdCallback = new Lazy<IAsyncCallback<Try<TResource>>>(() =>
GetTypedAsyncCallback<Try<TResource>>(result =>
{
if (result.IsSuccess)
{
_resource = result.Success.Value;
// If we have demand and input is available, pull from upstream nya~
if (!HasBeenPulled(_stage.In) && IsAvailable(_stage.Out))
{
Pull(_stage.In);
}
}
else
{
FailStage(result.Failure.Value);
}
}));
SetHandler(_stage.In,
onPush: () =>
{
// Got an element from upstream! (੭ु。╹▿╹。)੭ु⁾⁾
if (_resource.HasValue)
{
var input = Grab(_stage.In);
ProcessElement(input);
}
else
{
// Resource not ready yet - this shouldn't happen because we only pull when ready
// But handle it gracefully by failing the stage uwu
FailStage(new InvalidOperationException("Received element before resource was created"));
}
},
onUpstreamFinish: () =>
{
// Upstream completed! Mark it and complete only if no pending operations uwu
_upstreamFinished = true;
// Only complete if all async processing has finished! (◕‿◕)♡
if (_inFlightProcessing == 0)
{
CompleteStage();
}
// Otherwise, the last processing callback will complete us! ✨
},
onUpstreamFailure: ex =>
{
// Upstream failed! Propagate it (╥﹏╥)
FailStage(ex);
});
SetHandler(_stage.Out,
onPull: () =>
{
// Downstream wants data! Pull from upstream if resource is ready ✨
if (_resource.HasValue && !HasBeenPulled(_stage.In))
{
Pull(_stage.In);
}
// If resource isn't ready yet, we'll pull once it's created
});
}
private IAsyncCallback<Try<TResource>> CreatedCallback => _createdCallback.Value;
/// <summary>
/// Processes an element using the managed resource (。♥‿♥。)
/// </summary>
private void ProcessElement(TIn input)
{
if (!_resource.HasValue)
{
FailStage(new InvalidOperationException("Cannot process element without resource"));
return;
}
var resource = _resource.Value;
// Track that we have processing in flight! ✨
_inFlightProcessing++;
try
{
// Process the element with the resource asynchronously! ✧*。٩(ˊᗜˋ*)و✧*。
_stage._process(resource, input).OnComplete(GetAsyncCallback<Try<TOut>>(result =>
{
// Decrement in-flight counter! (◕‿◕)♡
_inFlightProcessing--;
if (result.IsSuccess)
{
// Push the processed result downstream! (ノ◕ヮ◕)ノ*:・゚✧
Push(_stage.Out, result.Success.Value);
// Check if we should complete now! uwu
if (_upstreamFinished && _inFlightProcessing == 0)
{
CompleteStage();
}
}
else
{
// Error during processing! Handle according to supervision strategy (。•́︿•̀。)
HandleError(result.Failure.Value);
}
}));
}
catch (Exception ex)
{
// Synchronous error during process invocation - decrement counter! (。•́︿•̀。)
_inFlightProcessing--;
HandleError(ex);
}
}
/// <summary>
/// Handles errors according to supervision strategy (๑•̀ㅂ•́)و✧
/// </summary>
private void HandleError(Exception ex)
{
switch (_decider.Value(ex))
{
case Directive.Stop:
// Stop the stream on error ⊂(◉‿◉)つ
FailStage(ex);
break;
case Directive.Restart:
// Restart the resource! (ง •̀_•́)ง
try
{
RestartResource();
}
catch (Exception restartEx)
{
FailStage(restartEx);
}
break;
case Directive.Resume:
// Skip this element and continue! (◕‿◕)
if (!HasBeenPulled(_stage.In) && IsAvailable(_stage.Out))
{
Pull(_stage.In);
}
break;
default:
throw new ArgumentOutOfRangeException(nameof(ex), "Unknown supervision directive");
}
}
/// <summary>
/// Creates the resource asynchronously! ✨
/// </summary>
private void CreateResource()
{
_stage._create().OnComplete(GetAsyncCallback<Try<TResource>>(resource =>
{
async Task InvokeCallback()
{
try
{
await CreatedCallback.InvokeWithFeedback(resource);
}
catch (StreamDetachedException)
{
// Stream stopped before resource callback - cleanup if needed (。•́︿•̀。)
if (resource.IsSuccess)
{
#pragma warning disable CS4014 // Fire-and-forget cleanup is intentional
_stage._close(resource.Success.Value);
#pragma warning restore CS4014
}
}
}
_ = InvokeCallback();
}));
}
/// <summary>
/// Restarts the resource by closing and recreating it! (ง'̀-'́)ง
/// </summary>
private void RestartResource()
{
if (_resource.HasValue)
{
var oldResource = _resource.Value;
_resource = Option<TResource>.None;
// Close the old resource before creating a new one (。•̀ᴗ-)✧
_stage._close(oldResource).OnComplete(GetAsyncCallback<Try<Done>>(done =>
{
if (done.IsSuccess) CreateResource();
else FailStage(done.Failure.Value);
}));
}
else
{
CreateResource();
}
}
public override void PreStart()
{
// Keep the stage alive even after upstream finishes until we explicitly complete! (◕‿◕)♡
// This prevents PostStop from being called while async processing is in flight~! ✨
SetKeepGoing(true);
// Create the resource when the stage starts! (ノ◕ヮ◕)ノ*:・゚✧
CreateResource();
}
public override void PostStop()
{
// Clean up the resource when the stage stops! (◕‿◕✿)
if (_resource.HasValue)
{
#pragma warning disable CS4014 // Fire-and-forget cleanup is intentional for PostStop
_stage._close(_resource.Value);
#pragma warning restore CS4014
}
}
}
#endregion
private readonly Func<Task<TResource>> _create;
private readonly Func<TResource, TIn, Task<TOut>> _process;
private readonly Func<TResource, Task> _close;
/// <summary>
/// Creates a new FlowWithResourceAsync stage! ✨(ノ◕ヮ◕)ノ*:・゚✧
/// </summary>
/// <param name="create">Factory to create the resource asynchronously</param>
/// <param name="process">Function to process each element with the resource</param>
/// <param name="close">Function to close/cleanup the resource</param>
/// <remarks>
/// CopilotNotes: The resource lifecycle is managed automatically - created on PreStart,
/// used for all elements, and cleaned up on PostStop. Perfect for database connections,
/// HTTP clients, or any stateful resource that needs proper cleanup! uwu
/// </remarks>
public FlowWithResourceAsyncStage(
Func<Task<TResource>> create,
Func<TResource, TIn, Task<TOut>> process,
Func<TResource, Task> close)
{
_create = create ?? throw new ArgumentNullException(nameof(create));
_process = process ?? throw new ArgumentNullException(nameof(process));
_close = close ?? throw new ArgumentNullException(nameof(close));
In = new Inlet<TIn>("FlowWithResourceAsync.in");
Out = new Outlet<TOut>("FlowWithResourceAsync.out");
Shape = new FlowShape<TIn, TOut>(In, Out);
}
/// <summary>
/// The inlet for incoming elements (◕‿◕)
/// </summary>
public Inlet<TIn> In { get; }
/// <summary>
/// The outlet for processed elements! ✨
/// </summary>
public Outlet<TOut> Out { get; }
/// <summary>
/// The flow shape of this stage nya~
/// </summary>
public override FlowShape<TIn, TOut> Shape { get; }
/// <summary>
/// Default attributes for this stage (◕‿◕✿)
/// </summary>
protected override Attributes InitialAttributes => DefaultAttributes.UnfoldResourceSourceAsync;
/// <summary>
/// Creates the graph stage logic! (ノ◕ヮ◕)ノ*:・゚✧
/// </summary>
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
=> new Logic(this, inheritedAttributes);
/// <summary>
/// String representation uwu
/// </summary>
public override string ToString() => "FlowWithResourceAsync";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment