Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created September 29, 2025 17:45
Show Gist options
  • Select an option

  • Save to11mtm/2232a818f8c0148b047cca6496c4da35 to your computer and use it in GitHub Desktop.

Select an option

Save to11mtm/2232a818f8c0148b047cca6496c4da35 to your computer and use it in GitHub Desktop.
Nats Alpakka DSL WIP
using System.Text;
using System.Threading.Tasks.Dataflow;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Fusing;
using Akka.Util;
using JetBrains.Annotations;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
namespace GlutenFree.Akka.Streams.nats_net_v2.JetStream;
public static class DSL
{
public static Source<NatsJSMsg<T>, NotUsed> NatsJsOrderedConsumerSource<T>(NatsJSContext context,
string stream,
NatsJSOrderedConsumerOpts? opts = default,
NatsJSConsumeOpts? consumeOpts = default,
INatsDeserialize<T>? consumeSerializer = default,
CancellationToken token = default,
CancellationToken consumeCancel = default)
{
return Source.UnfoldResourceAsync([MustDisposeResource]async () =>
{
return (await context.CreateOrderedConsumerAsync(stream, opts,
token)).ConsumeAsync(consumeSerializer, consumeOpts,
consumeCancel).GetAsyncEnumerator(consumeCancel);
},
async (jsc) =>
{
if (await jsc.MoveNextAsync())
{
return jsc.Current;
}
else
{
return Option<NatsJSMsg<T>>.None;
}
}, async (jsc) =>
{
await jsc.DisposeAsync();
return Done.Instance;
});
}
public static Sink<T, Task<Done>> NatsJsPublishSink<T, TPub>(NatsJSContext context,
Func<T, string> subject, Func<T, TPub> pubTransform,
INatsSerializer<TPub>? serializer = default,
NatsJSPubOpts? opts = default,
NatsHeaders? headers = default,
CancellationToken cancellationToken = default)
{
return Sink.ForEachAsync<T>(1, async t =>
{
await context.PublishAsync(subject(t), pubTransform(t), serializer, opts,
headers, cancellationToken);
});
}
public record WithPublishOpts<T, TPub, TOut>(
Func<T, Option<TPub>> Publish,
Func<TPub, string> PublishOver,
Func<T, TPub, PubAckResponse, TOut> Success,
Func<T, TPub, Exception, TOut> Failure,
Func<T, TOut> Ignore,
Func<TPub, INatsSerializer<TPub>?>? PubSer = default,
Func<TPub, NatsJSPubOpts?>? PubOpts = default,
Func<TPub, NatsHeaders?>? Headers = default);
public static Flow<T, TOut, NotUsed> NatsJSPublishStage<T, TPub, TOut>(
NatsJSContext context, WithPublishOpts<T, TPub, TOut> opts,
CancellationToken token = default)
{
return Flow.FromGraph(
new SelectAsync<T, TOut>(1, async (t) =>
{
var p = opts.Publish(t);
if (p.HasValue)
{
var v = p.Value;
try
{
var resp = await context.PublishAsync(
opts.PublishOver(v), v,
opts.PubSer?.Invoke(v),
opts.PubOpts?.Invoke(v), opts.Headers?.Invoke(v),
token);
return opts.Success(t, v, resp);
}
catch (Exception e)
{
return opts.Failure(t, v, e);
}
}
else
{
return opts.Ignore(t);
}
}));
}
public static IFlow<TOut, TMat> WithNatsJsPublish<T, TPub, TOut, TMat>(
this IFlow<T, TMat> flow,
NatsJSContext context,
WithPublishOpts<T,TPub,TOut> opts,
CancellationToken token = default)
{
return flow.Via(NatsJSPublishStage<T,TPub,TOut>(context,opts,token));
}
public static Sink<T, Task<Done>> NatsJSOutputSink<T, TPub>(NatsJSContext context,
WithPublishOpts<T,TPub,NotUsed> opts,
CancellationToken cancellationToken = default)
{
return Sink.ForEachAsync<T>(1,
async (t) =>
{
var p = opts.Publish(t);
if (p.HasValue)
{
var v = p.Value;
try
{
var resp = await context.PublishAsync(
opts.PublishOver(v), v,
opts.PubSer?.Invoke(v),
opts.PubOpts?.Invoke(v), opts.Headers?.Invoke(v),
cancellationToken);
opts.Success(t, v, resp);
}
catch (Exception e)
{
throw;
}
}
else
{
opts.Ignore(t);
}
});
}
}
public record NatsJSCarrier<TNats, TOther>(
NatsJSMsg<TNats> NatsJsMsg,
TOther Other);
public static class NatsJSCarrier
{
public static NatsJSCarrier<TNats, TOther> ToCarrier<TNats, TOther>(
this NatsJSMsg<TNats> natsJsMsg, TOther otherMsg)
{
return new NatsJSCarrier<TNats, TOther>(natsJsMsg, otherMsg);
}
public static NatsJSCarrier<TNats, TOther>
ToCarrier<TNats, TOtherOld, TOther>(
this NatsJSCarrier<TNats, TOtherOld> natsJsCarrier, TOther other)
{
return new NatsJSCarrier<TNats, TOther>(natsJsCarrier.NatsJsMsg, other);
}
}
using System.Text;
using System.Threading.Channels;
using Akka;
using Akka.Streams;
using Akka.Streams.Actors;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Fusing;
using Akka.Streams.Implementation.Stages;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Akka.Util;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Client.KeyValueStore;
using NATSWrappers;
using ValueTaskSupplement;
namespace GlutenFree.Akka.Streams.nats_net_v2;
//TODO: When https://github.com/akkadotnet/akka.net/pull/7028/ merged,
// revise and use ValueTask Unfolds/SelectAsyncs/etc.
public static partial class DSL
{
public record NatsSubSourceStageOpts<T>(
string Subject,
string? QueueGroup = default,
INatsDeserialize<T> Deserialize = default,
NatsSubOpts? Opts = default);
public record NatsPublishStageOpts<T, TPub>(
Func<T, Option<TPub>> PublishProduce,
Func<T, string> PublishOver,
Func<T, NatsHeaders>? HeadersAdd = null,
Func<T, string>? ReplyTo = null,
Func<T, INatsSerialize<TPub>>? Serializer = null,
Func<T, NatsPubOpts>? Opts = null);
/// <summary>
/// Convenience Method to convert a general publish stage option set into one with output.
/// </summary>
public static NatsPublishStageOpts<T, TPub, TOut> WithOutput<T, TPub, TOut>(
this NatsPublishStageOpts<T, TPub> pub,
Func<T, TOut> ignore,
Func<T, TPub, TOut> success,
Func<T, TPub, Exception, TOut> failure)
{
return new NatsPublishStageOpts<T, TPub, TOut>(pub.PublishProduce,
pub.PublishOver, ignore, success, failure, pub.HeadersAdd,
pub.ReplyTo, pub.Serializer, pub.Opts);
}
//Different name when it is a pass-through?
public record NatsPublishStageOpts<T, TPub, TOut>(
Func<T, Option<TPub>> PublishProduce,
Func<T, string> PublishOver,
Func<T, TOut> Ignore,
Func<T, TPub, TOut> Success,
Func<T, TPub, Exception, TOut> Failure,
Func<T, NatsHeaders>? HeadersAdd = null,
Func<T, string>? ReplyTo = null,
Func<T, INatsSerialize<TPub>>? Serializer = null,
Func<T, NatsPubOpts>? Opts = null) : NatsPublishStageOpts<T, TPub>(
PublishProduce, PublishOver, HeadersAdd, ReplyTo, Serializer, Opts);
public static ChannelReader<NatsMsg<T>> Subscription<T>(this NatsConnection connection,
NatsSubSourceStageOpts<T> stageOpts,
IMaterializer materializer,
int bufferSize = 16,
CancellationToken connectionCancellation = default,
CancellationToken readCancellation = default)
{
return connection.NatsSubscribeSource(stageOpts, connectionCancellation, readCancellation)
.Select(a =>
{
Console.WriteLine(a.ToString()); // Imagine something interesting happening
return a;
})
.RunWith(Sink.ChannelReader<NatsMsg<T>>(bufferSize, false), materializer);
}
/// <summary>
/// Creates a Subscription source based on a Nats connection and provided options.
/// </summary>
/// <param name="connection"></param>
/// <param name="stageOpts"></param>
/// <param name="connectionCancellation"></param>
/// <param name="readCancellation"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static Source<NatsMsg<T>,NotUsed> NatsSubscribeSource<T>(
this NatsConnection connection,
NatsSubSourceStageOpts<T> stageOpts,
CancellationToken connectionCancellation = default,
CancellationToken readCancellation = default
)
{
return Source.UnfoldResourceAsync(async () =>
{
return await connection.SubscribeCoreAsync<T>(stageOpts.Subject,
stageOpts.QueueGroup, stageOpts.Deserialize, stageOpts.Opts, connectionCancellation);
},
readCancellation == default? funcHelpers<T>.func : Read, async (sub) =>
{
await sub.DisposeAsync();
return Done.Instance;
});
async Task<Option<NatsMsg<T>>> Read(INatsSub<T> a)
{
do
{
if (a.Msgs.TryRead(out var item))
{
return item;
}
} while (await a.Msgs.WaitToReadAsync(readCancellation));
return Option<NatsMsg<T>>.None;
}
}
/// <summary>
/// A publish stage that takes an input, decides whether to publish,
/// And After publishing (or not) produces a new message to downstream,
/// Based on provided options.
/// </summary>
/// <param name="connection"></param>
/// <param name="parallelism">
/// Max number of publishes to run in parallel.
/// Set to 1 for guaranteed ordering.
/// </param>
/// <param name="stageOpts"></param>
/// <param name="token"></param>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TPub"></typeparam>
/// <typeparam name="TOut"></typeparam>
/// <returns></returns>
public static Flow<TIn, TOut, NotUsed> NatsPublishStage<TIn,TPub, TOut>(
this NatsConnection connection,
int parallelism,
NatsPublishStageOpts<TIn,TPub,TOut> stageOpts,
CancellationToken token)
{
return Flow.FromGraph(
new SelectAsync<TIn, TOut>(1, async (t) =>
{
var p = stageOpts.PublishProduce(t);
if (p.HasValue)
{
var v = p.Value;
try
{
await connection.PublishAsync(
stageOpts.PublishOver(t), v,
stageOpts.HeadersAdd?.Invoke(t),
stageOpts.ReplyTo?.Invoke(t),
stageOpts.Serializer?.Invoke(t),
stageOpts.Opts?.Invoke(t),
token);
return stageOpts.Success(t, v);
}
catch (Exception e)
{
return stageOpts.Failure(t, v, e);
}
}
else
{
return stageOpts.Ignore(t);
}
}));
}
/// <summary>
/// Sends a series of Elements to be published based on the Option Transform.
/// May optionally run in parallelism with optional 'finish ordering'.
/// </summary>
/// <param name="connection"></param>
/// <param name="parallelism">Number of publishes to run in parallel. </param>
/// <param name="stageOpts"></param>
/// <param name="requireFinishOrder">If true and <see cref="parallelism"/> is greater than 1, freeing of publishers is based on the when the oldest publish is completed</param>
/// <param name="token"></param>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TPub"></typeparam>
/// <returns>A sink that may be attached to a stream and run.</returns>
/// <exception cref="Exception"></exception>
public static Sink<T, Task<Done>> NatsPublishSink<T,TPub>(
this NatsConnection connection,
int parallelism,
NatsPublishStageOpts<T,TPub> stageOpts,
bool requireFinishOrder,
CancellationToken token = default)
{
if (requireFinishOrder)
{
return NatsPublishStage(connection, parallelism,
stageOpts.WithOutput(a => NotUsed.Instance,
(a, b) => NotUsed.Instance, (a, b, c) => throw c), token)
.ToMaterialized(Sink.Ignore<NotUsed>(),Keep.Right);
}
else
{
return Sink.ForEachAsync<T>(parallelism, async (a) =>
{
var r = stageOpts.PublishProduce(a);
if (r.HasValue)
{
await connection.PublishAsync<TPub>(
stageOpts.PublishOver(a),
r.Value,
stageOpts.HeadersAdd?.Invoke(a),
stageOpts.ReplyTo?.Invoke(a),
stageOpts.Serializer?.Invoke(a),
stageOpts.Opts?.Invoke(a));
}
});
}
}
}
/// <remarks>
/// Used to minimize allocation for default case
/// </remarks>
internal static class funcHelpers<T>
{
internal static readonly Func<INatsSub<T>, Task<Option<NatsMsg<T>>>> func = async a=>{
do
{
if (a.Msgs.TryRead(out var item))
{
return item;
}
} while (await a.Msgs.WaitToReadAsync());
return Option<NatsMsg<T>>.None;
};
}
using System.Threading.Channels;
using Akka;
using Akka.Streams.Dsl;
using NATS.Client.Core;
using NATS.Client.Services;
using static NATS.Client.Services.NatsSvcServer;
namespace GlutenFree.Akka.Streams.nats_net_v2.Services;
public static class DSL
{
public static Source<HeldEndpointValue<T>, NotUsed> NatsServiceEndpointSource<T>(
Group serviceContext,
INatsDeserialize<T>? serializer = null,
IDictionary<string, string>? metadata = null,
string? name = null,
string? subject = null,
string? queueGroup = null,
int? maxPending = null, CancellationToken stopToken = default,
CancellationToken reqToken = default)
{
var ch = maxPending == null
? Channel.CreateUnbounded<HeldEndpointValue<T>>()
: Channel.CreateBounded<HeldEndpointValue<T>>(maxPending.Value);
return Source.UnfoldResourceAsync<HeldEndpointValue<T>,(Channel<HeldEndpointValue<T>> ch,CancellationTokenSource cts)>(async () =>
{
var cts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken);
var ch = maxPending == null
? Channel.CreateUnbounded<HeldEndpointValue<T>>()
: Channel.CreateBounded<HeldEndpointValue<T>>(maxPending.Value);
await serviceContext.AddEndpointAsync<T>(async msg =>
{
var wrapped = new HeldEndpointValue<T>(msg);
await ch.Writer.WriteAsync(wrapped,cts.Token);
await wrapped.GetCompletion;
}, name, subject, queueGroup, metadata, serializer, cts.Token);
return (ch, cts);
}, async uf =>
{
var r = await (uf.ch.Reader.ReadAsync(uf.cts.Token));
return r;
}, async uf =>
{
uf.ch.Writer.TryComplete();
uf.cts.Cancel();
return Done.Instance;
});
}
/// <summary>
/// Signifies an Endpoint Request for Processing.
/// </summary>
/// <typeparam name="T"></typeparam>
public struct HeldEndpointValue<T>
{
//TODO: IValueTaskCompletionSource
private readonly TaskCompletionSource _completionSource;
internal ValueTask GetCompletion =>
new ValueTask(_completionSource.Task);
public HeldEndpointValue(NatsSvcMsg<T> msg)
{
Message = msg;
_completionSource = new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
}
public NatsSvcMsg<T> Message { get; }
public void Reply(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken token = default)
{
Message.ReplyAsync(headers, replyTo, opts, token);
}
public void MarkCompleted()
{
_completionSource.TrySetResult();
}
public void MarkFaulted(Exception error)
{
_completionSource.TrySetException(error);
}
public void TrySetCancelled(CancellationToken token = default)
{
_completionSource.TrySetCanceled(token);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment