Created
September 29, 2025 17:45
-
-
Save to11mtm/2232a818f8c0148b047cca6496c4da35 to your computer and use it in GitHub Desktop.
Nats Alpakka DSL WIP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.Text; | |
| using System.Threading.Tasks.Dataflow; | |
| using Akka; | |
| using Akka.Streams; | |
| using Akka.Streams.Dsl; | |
| using Akka.Streams.Implementation.Fusing; | |
| using Akka.Util; | |
| using JetBrains.Annotations; | |
| using NATS.Client.Core; | |
| using NATS.Client.JetStream; | |
| using NATS.Client.JetStream.Models; | |
| namespace GlutenFree.Akka.Streams.nats_net_v2.JetStream; | |
| public static class DSL | |
| { | |
| public static Source<NatsJSMsg<T>, NotUsed> NatsJsOrderedConsumerSource<T>(NatsJSContext context, | |
| string stream, | |
| NatsJSOrderedConsumerOpts? opts = default, | |
| NatsJSConsumeOpts? consumeOpts = default, | |
| INatsDeserialize<T>? consumeSerializer = default, | |
| CancellationToken token = default, | |
| CancellationToken consumeCancel = default) | |
| { | |
| return Source.UnfoldResourceAsync([MustDisposeResource]async () => | |
| { | |
| return (await context.CreateOrderedConsumerAsync(stream, opts, | |
| token)).ConsumeAsync(consumeSerializer, consumeOpts, | |
| consumeCancel).GetAsyncEnumerator(consumeCancel); | |
| }, | |
| async (jsc) => | |
| { | |
| if (await jsc.MoveNextAsync()) | |
| { | |
| return jsc.Current; | |
| } | |
| else | |
| { | |
| return Option<NatsJSMsg<T>>.None; | |
| } | |
| }, async (jsc) => | |
| { | |
| await jsc.DisposeAsync(); | |
| return Done.Instance; | |
| }); | |
| } | |
| public static Sink<T, Task<Done>> NatsJsPublishSink<T, TPub>(NatsJSContext context, | |
| Func<T, string> subject, Func<T, TPub> pubTransform, | |
| INatsSerializer<TPub>? serializer = default, | |
| NatsJSPubOpts? opts = default, | |
| NatsHeaders? headers = default, | |
| CancellationToken cancellationToken = default) | |
| { | |
| return Sink.ForEachAsync<T>(1, async t => | |
| { | |
| await context.PublishAsync(subject(t), pubTransform(t), serializer, opts, | |
| headers, cancellationToken); | |
| }); | |
| } | |
| public record WithPublishOpts<T, TPub, TOut>( | |
| Func<T, Option<TPub>> Publish, | |
| Func<TPub, string> PublishOver, | |
| Func<T, TPub, PubAckResponse, TOut> Success, | |
| Func<T, TPub, Exception, TOut> Failure, | |
| Func<T, TOut> Ignore, | |
| Func<TPub, INatsSerializer<TPub>?>? PubSer = default, | |
| Func<TPub, NatsJSPubOpts?>? PubOpts = default, | |
| Func<TPub, NatsHeaders?>? Headers = default); | |
| public static Flow<T, TOut, NotUsed> NatsJSPublishStage<T, TPub, TOut>( | |
| NatsJSContext context, WithPublishOpts<T, TPub, TOut> opts, | |
| CancellationToken token = default) | |
| { | |
| return Flow.FromGraph( | |
| new SelectAsync<T, TOut>(1, async (t) => | |
| { | |
| var p = opts.Publish(t); | |
| if (p.HasValue) | |
| { | |
| var v = p.Value; | |
| try | |
| { | |
| var resp = await context.PublishAsync( | |
| opts.PublishOver(v), v, | |
| opts.PubSer?.Invoke(v), | |
| opts.PubOpts?.Invoke(v), opts.Headers?.Invoke(v), | |
| token); | |
| return opts.Success(t, v, resp); | |
| } | |
| catch (Exception e) | |
| { | |
| return opts.Failure(t, v, e); | |
| } | |
| } | |
| else | |
| { | |
| return opts.Ignore(t); | |
| } | |
| })); | |
| } | |
| public static IFlow<TOut, TMat> WithNatsJsPublish<T, TPub, TOut, TMat>( | |
| this IFlow<T, TMat> flow, | |
| NatsJSContext context, | |
| WithPublishOpts<T,TPub,TOut> opts, | |
| CancellationToken token = default) | |
| { | |
| return flow.Via(NatsJSPublishStage<T,TPub,TOut>(context,opts,token)); | |
| } | |
| public static Sink<T, Task<Done>> NatsJSOutputSink<T, TPub>(NatsJSContext context, | |
| WithPublishOpts<T,TPub,NotUsed> opts, | |
| CancellationToken cancellationToken = default) | |
| { | |
| return Sink.ForEachAsync<T>(1, | |
| async (t) => | |
| { | |
| var p = opts.Publish(t); | |
| if (p.HasValue) | |
| { | |
| var v = p.Value; | |
| try | |
| { | |
| var resp = await context.PublishAsync( | |
| opts.PublishOver(v), v, | |
| opts.PubSer?.Invoke(v), | |
| opts.PubOpts?.Invoke(v), opts.Headers?.Invoke(v), | |
| cancellationToken); | |
| opts.Success(t, v, resp); | |
| } | |
| catch (Exception e) | |
| { | |
| throw; | |
| } | |
| } | |
| else | |
| { | |
| opts.Ignore(t); | |
| } | |
| }); | |
| } | |
| } | |
| public record NatsJSCarrier<TNats, TOther>( | |
| NatsJSMsg<TNats> NatsJsMsg, | |
| TOther Other); | |
| public static class NatsJSCarrier | |
| { | |
| public static NatsJSCarrier<TNats, TOther> ToCarrier<TNats, TOther>( | |
| this NatsJSMsg<TNats> natsJsMsg, TOther otherMsg) | |
| { | |
| return new NatsJSCarrier<TNats, TOther>(natsJsMsg, otherMsg); | |
| } | |
| public static NatsJSCarrier<TNats, TOther> | |
| ToCarrier<TNats, TOtherOld, TOther>( | |
| this NatsJSCarrier<TNats, TOtherOld> natsJsCarrier, TOther other) | |
| { | |
| return new NatsJSCarrier<TNats, TOther>(natsJsCarrier.NatsJsMsg, other); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.Text; | |
| using System.Threading.Channels; | |
| using Akka; | |
| using Akka.Streams; | |
| using Akka.Streams.Actors; | |
| using Akka.Streams.Dsl; | |
| using Akka.Streams.Implementation.Fusing; | |
| using Akka.Streams.Implementation.Stages; | |
| using Akka.Streams.Stage; | |
| using Akka.Streams.Supervision; | |
| using Akka.Util; | |
| using NATS.Client.Core; | |
| using NATS.Client.JetStream; | |
| using NATS.Client.JetStream.Models; | |
| using NATS.Client.KeyValueStore; | |
| using NATSWrappers; | |
| using ValueTaskSupplement; | |
| namespace GlutenFree.Akka.Streams.nats_net_v2; | |
| //TODO: When https://github.com/akkadotnet/akka.net/pull/7028/ merged, | |
| // revise and use ValueTask Unfolds/SelectAsyncs/etc. | |
| public static partial class DSL | |
| { | |
| public record NatsSubSourceStageOpts<T>( | |
| string Subject, | |
| string? QueueGroup = default, | |
| INatsDeserialize<T> Deserialize = default, | |
| NatsSubOpts? Opts = default); | |
| public record NatsPublishStageOpts<T, TPub>( | |
| Func<T, Option<TPub>> PublishProduce, | |
| Func<T, string> PublishOver, | |
| Func<T, NatsHeaders>? HeadersAdd = null, | |
| Func<T, string>? ReplyTo = null, | |
| Func<T, INatsSerialize<TPub>>? Serializer = null, | |
| Func<T, NatsPubOpts>? Opts = null); | |
| /// <summary> | |
| /// Convenience Method to convert a general publish stage option set into one with output. | |
| /// </summary> | |
| public static NatsPublishStageOpts<T, TPub, TOut> WithOutput<T, TPub, TOut>( | |
| this NatsPublishStageOpts<T, TPub> pub, | |
| Func<T, TOut> ignore, | |
| Func<T, TPub, TOut> success, | |
| Func<T, TPub, Exception, TOut> failure) | |
| { | |
| return new NatsPublishStageOpts<T, TPub, TOut>(pub.PublishProduce, | |
| pub.PublishOver, ignore, success, failure, pub.HeadersAdd, | |
| pub.ReplyTo, pub.Serializer, pub.Opts); | |
| } | |
| //Different name when it is a pass-through? | |
| public record NatsPublishStageOpts<T, TPub, TOut>( | |
| Func<T, Option<TPub>> PublishProduce, | |
| Func<T, string> PublishOver, | |
| Func<T, TOut> Ignore, | |
| Func<T, TPub, TOut> Success, | |
| Func<T, TPub, Exception, TOut> Failure, | |
| Func<T, NatsHeaders>? HeadersAdd = null, | |
| Func<T, string>? ReplyTo = null, | |
| Func<T, INatsSerialize<TPub>>? Serializer = null, | |
| Func<T, NatsPubOpts>? Opts = null) : NatsPublishStageOpts<T, TPub>( | |
| PublishProduce, PublishOver, HeadersAdd, ReplyTo, Serializer, Opts); | |
| public static ChannelReader<NatsMsg<T>> Subscription<T>(this NatsConnection connection, | |
| NatsSubSourceStageOpts<T> stageOpts, | |
| IMaterializer materializer, | |
| int bufferSize = 16, | |
| CancellationToken connectionCancellation = default, | |
| CancellationToken readCancellation = default) | |
| { | |
| return connection.NatsSubscribeSource(stageOpts, connectionCancellation, readCancellation) | |
| .Select(a => | |
| { | |
| Console.WriteLine(a.ToString()); // Imagine something interesting happening | |
| return a; | |
| }) | |
| .RunWith(Sink.ChannelReader<NatsMsg<T>>(bufferSize, false), materializer); | |
| } | |
| /// <summary> | |
| /// Creates a Subscription source based on a Nats connection and provided options. | |
| /// </summary> | |
| /// <param name="connection"></param> | |
| /// <param name="stageOpts"></param> | |
| /// <param name="connectionCancellation"></param> | |
| /// <param name="readCancellation"></param> | |
| /// <typeparam name="T"></typeparam> | |
| /// <returns></returns> | |
| public static Source<NatsMsg<T>,NotUsed> NatsSubscribeSource<T>( | |
| this NatsConnection connection, | |
| NatsSubSourceStageOpts<T> stageOpts, | |
| CancellationToken connectionCancellation = default, | |
| CancellationToken readCancellation = default | |
| ) | |
| { | |
| return Source.UnfoldResourceAsync(async () => | |
| { | |
| return await connection.SubscribeCoreAsync<T>(stageOpts.Subject, | |
| stageOpts.QueueGroup, stageOpts.Deserialize, stageOpts.Opts, connectionCancellation); | |
| }, | |
| readCancellation == default? funcHelpers<T>.func : Read, async (sub) => | |
| { | |
| await sub.DisposeAsync(); | |
| return Done.Instance; | |
| }); | |
| async Task<Option<NatsMsg<T>>> Read(INatsSub<T> a) | |
| { | |
| do | |
| { | |
| if (a.Msgs.TryRead(out var item)) | |
| { | |
| return item; | |
| } | |
| } while (await a.Msgs.WaitToReadAsync(readCancellation)); | |
| return Option<NatsMsg<T>>.None; | |
| } | |
| } | |
| /// <summary> | |
| /// A publish stage that takes an input, decides whether to publish, | |
| /// And After publishing (or not) produces a new message to downstream, | |
| /// Based on provided options. | |
| /// </summary> | |
| /// <param name="connection"></param> | |
| /// <param name="parallelism"> | |
| /// Max number of publishes to run in parallel. | |
| /// Set to 1 for guaranteed ordering. | |
| /// </param> | |
| /// <param name="stageOpts"></param> | |
| /// <param name="token"></param> | |
| /// <typeparam name="TIn"></typeparam> | |
| /// <typeparam name="TPub"></typeparam> | |
| /// <typeparam name="TOut"></typeparam> | |
| /// <returns></returns> | |
| public static Flow<TIn, TOut, NotUsed> NatsPublishStage<TIn,TPub, TOut>( | |
| this NatsConnection connection, | |
| int parallelism, | |
| NatsPublishStageOpts<TIn,TPub,TOut> stageOpts, | |
| CancellationToken token) | |
| { | |
| return Flow.FromGraph( | |
| new SelectAsync<TIn, TOut>(1, async (t) => | |
| { | |
| var p = stageOpts.PublishProduce(t); | |
| if (p.HasValue) | |
| { | |
| var v = p.Value; | |
| try | |
| { | |
| await connection.PublishAsync( | |
| stageOpts.PublishOver(t), v, | |
| stageOpts.HeadersAdd?.Invoke(t), | |
| stageOpts.ReplyTo?.Invoke(t), | |
| stageOpts.Serializer?.Invoke(t), | |
| stageOpts.Opts?.Invoke(t), | |
| token); | |
| return stageOpts.Success(t, v); | |
| } | |
| catch (Exception e) | |
| { | |
| return stageOpts.Failure(t, v, e); | |
| } | |
| } | |
| else | |
| { | |
| return stageOpts.Ignore(t); | |
| } | |
| })); | |
| } | |
| /// <summary> | |
| /// Sends a series of Elements to be published based on the Option Transform. | |
| /// May optionally run in parallelism with optional 'finish ordering'. | |
| /// </summary> | |
| /// <param name="connection"></param> | |
| /// <param name="parallelism">Number of publishes to run in parallel. </param> | |
| /// <param name="stageOpts"></param> | |
| /// <param name="requireFinishOrder">If true and <see cref="parallelism"/> is greater than 1, freeing of publishers is based on the when the oldest publish is completed</param> | |
| /// <param name="token"></param> | |
| /// <typeparam name="T"></typeparam> | |
| /// <typeparam name="TPub"></typeparam> | |
| /// <returns>A sink that may be attached to a stream and run.</returns> | |
| /// <exception cref="Exception"></exception> | |
| public static Sink<T, Task<Done>> NatsPublishSink<T,TPub>( | |
| this NatsConnection connection, | |
| int parallelism, | |
| NatsPublishStageOpts<T,TPub> stageOpts, | |
| bool requireFinishOrder, | |
| CancellationToken token = default) | |
| { | |
| if (requireFinishOrder) | |
| { | |
| return NatsPublishStage(connection, parallelism, | |
| stageOpts.WithOutput(a => NotUsed.Instance, | |
| (a, b) => NotUsed.Instance, (a, b, c) => throw c), token) | |
| .ToMaterialized(Sink.Ignore<NotUsed>(),Keep.Right); | |
| } | |
| else | |
| { | |
| return Sink.ForEachAsync<T>(parallelism, async (a) => | |
| { | |
| var r = stageOpts.PublishProduce(a); | |
| if (r.HasValue) | |
| { | |
| await connection.PublishAsync<TPub>( | |
| stageOpts.PublishOver(a), | |
| r.Value, | |
| stageOpts.HeadersAdd?.Invoke(a), | |
| stageOpts.ReplyTo?.Invoke(a), | |
| stageOpts.Serializer?.Invoke(a), | |
| stageOpts.Opts?.Invoke(a)); | |
| } | |
| }); | |
| } | |
| } | |
| } | |
| /// <remarks> | |
| /// Used to minimize allocation for default case | |
| /// </remarks> | |
| internal static class funcHelpers<T> | |
| { | |
| internal static readonly Func<INatsSub<T>, Task<Option<NatsMsg<T>>>> func = async a=>{ | |
| do | |
| { | |
| if (a.Msgs.TryRead(out var item)) | |
| { | |
| return item; | |
| } | |
| } while (await a.Msgs.WaitToReadAsync()); | |
| return Option<NatsMsg<T>>.None; | |
| }; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.Threading.Channels; | |
| using Akka; | |
| using Akka.Streams.Dsl; | |
| using NATS.Client.Core; | |
| using NATS.Client.Services; | |
| using static NATS.Client.Services.NatsSvcServer; | |
| namespace GlutenFree.Akka.Streams.nats_net_v2.Services; | |
| public static class DSL | |
| { | |
| public static Source<HeldEndpointValue<T>, NotUsed> NatsServiceEndpointSource<T>( | |
| Group serviceContext, | |
| INatsDeserialize<T>? serializer = null, | |
| IDictionary<string, string>? metadata = null, | |
| string? name = null, | |
| string? subject = null, | |
| string? queueGroup = null, | |
| int? maxPending = null, CancellationToken stopToken = default, | |
| CancellationToken reqToken = default) | |
| { | |
| var ch = maxPending == null | |
| ? Channel.CreateUnbounded<HeldEndpointValue<T>>() | |
| : Channel.CreateBounded<HeldEndpointValue<T>>(maxPending.Value); | |
| return Source.UnfoldResourceAsync<HeldEndpointValue<T>,(Channel<HeldEndpointValue<T>> ch,CancellationTokenSource cts)>(async () => | |
| { | |
| var cts = | |
| CancellationTokenSource.CreateLinkedTokenSource(stopToken); | |
| var ch = maxPending == null | |
| ? Channel.CreateUnbounded<HeldEndpointValue<T>>() | |
| : Channel.CreateBounded<HeldEndpointValue<T>>(maxPending.Value); | |
| await serviceContext.AddEndpointAsync<T>(async msg => | |
| { | |
| var wrapped = new HeldEndpointValue<T>(msg); | |
| await ch.Writer.WriteAsync(wrapped,cts.Token); | |
| await wrapped.GetCompletion; | |
| }, name, subject, queueGroup, metadata, serializer, cts.Token); | |
| return (ch, cts); | |
| }, async uf => | |
| { | |
| var r = await (uf.ch.Reader.ReadAsync(uf.cts.Token)); | |
| return r; | |
| }, async uf => | |
| { | |
| uf.ch.Writer.TryComplete(); | |
| uf.cts.Cancel(); | |
| return Done.Instance; | |
| }); | |
| } | |
| /// <summary> | |
| /// Signifies an Endpoint Request for Processing. | |
| /// </summary> | |
| /// <typeparam name="T"></typeparam> | |
| public struct HeldEndpointValue<T> | |
| { | |
| //TODO: IValueTaskCompletionSource | |
| private readonly TaskCompletionSource _completionSource; | |
| internal ValueTask GetCompletion => | |
| new ValueTask(_completionSource.Task); | |
| public HeldEndpointValue(NatsSvcMsg<T> msg) | |
| { | |
| Message = msg; | |
| _completionSource = new TaskCompletionSource(TaskCreationOptions | |
| .RunContinuationsAsynchronously); | |
| } | |
| public NatsSvcMsg<T> Message { get; } | |
| public void Reply(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken token = default) | |
| { | |
| Message.ReplyAsync(headers, replyTo, opts, token); | |
| } | |
| public void MarkCompleted() | |
| { | |
| _completionSource.TrySetResult(); | |
| } | |
| public void MarkFaulted(Exception error) | |
| { | |
| _completionSource.TrySetException(error); | |
| } | |
| public void TrySetCancelled(CancellationToken token = default) | |
| { | |
| _completionSource.TrySetCanceled(token); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment