Last active
June 20, 2024 19:23
-
-
Save mt89vein/039998c3c2730b4e03ffa8d784f91869 to your computer and use it in GitHub Desktop.
Using System.Threading.Channels with prioritized sending notifications
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.Threading.Channels; | |
| var notificator = new SlackNotificator(); // should be singleton | |
| await Task.Delay(10); // give some time to init reader | |
| // send some notifications with priority (higher number - higher priority) | |
| var items = new[] | |
| { | |
| notificator.SendAsync(new Notification(1)), | |
| notificator.SendAsync(new Notification(4)), | |
| notificator.SendAsync(new Notification(7)), | |
| notificator.SendAsync(new Notification(2)), | |
| }; | |
| await Task.WhenAll(items); // wait for all complete | |
| public sealed record Notification(int Priority); | |
| public interface INotificator | |
| { | |
| Task SendAsync(Notification notification, CancellationToken ct = default); | |
| } | |
| internal sealed class SlackNotificator : INotificator | |
| { | |
| private readonly Channel<NotificationPromise> _channel = Channel.CreateUnboundedPrioritized( | |
| new UnboundedPrioritizedChannelOptions<NotificationPromise> | |
| { | |
| Comparer = Comparer<NotificationPromise>.Create((n1, n2) => | |
| n2.Notification.Priority.CompareTo(n1.Notification.Priority)) | |
| } | |
| ); | |
| public SlackNotificator() | |
| { | |
| StartConsuming(); | |
| } | |
| public async Task SendAsync(Notification notification, CancellationToken ct = default) | |
| { | |
| // here we create tcs to wait until a notification is sent or a timeout. | |
| var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); | |
| ct.Register(() => tcs.TrySetCanceled(ct)); | |
| if (await _channel.Writer.WaitToWriteAsync(ct)) | |
| { | |
| await _channel.Writer.WriteAsync(new NotificationPromise(notification, tcs, ct), ct); | |
| } | |
| await tcs.Task; // waiting while the notification is sent. | |
| } | |
| private async void StartConsuming() | |
| { | |
| while (await _channel.Reader.WaitToReadAsync()) | |
| { | |
| while (_channel.Reader.TryRead(out var n)) | |
| { | |
| if (n.Tcs.Task.IsCompleted) | |
| { | |
| continue; | |
| } | |
| try | |
| { | |
| await SendNotificationAsync(n.Notification, n.Ct); | |
| n.Tcs.TrySetResult(); | |
| } | |
| catch (Exception e) | |
| { | |
| n.Tcs.TrySetException(e); | |
| } | |
| } | |
| } | |
| } | |
| private static Task SendNotificationAsync(Notification notification, CancellationToken ct) | |
| { | |
| Console.WriteLine(notification); | |
| return Task.CompletedTask; | |
| } | |
| private readonly record struct NotificationPromise(Notification Notification, TaskCompletionSource Tcs, CancellationToken Ct); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment