Skip to content

Instantly share code, notes, and snippets.

@mt89vein
Last active June 20, 2024 19:23
Show Gist options
  • Select an option

  • Save mt89vein/039998c3c2730b4e03ffa8d784f91869 to your computer and use it in GitHub Desktop.

Select an option

Save mt89vein/039998c3c2730b4e03ffa8d784f91869 to your computer and use it in GitHub Desktop.
Using System.Threading.Channels with prioritized sending notifications
using System.Threading.Channels;
var notificator = new SlackNotificator(); // should be singleton
await Task.Delay(10); // give some time to init reader
// send some notifications with priority (higher number - higher priority)
var items = new[]
{
notificator.SendAsync(new Notification(1)),
notificator.SendAsync(new Notification(4)),
notificator.SendAsync(new Notification(7)),
notificator.SendAsync(new Notification(2)),
};
await Task.WhenAll(items); // wait for all complete
public sealed record Notification(int Priority);
public interface INotificator
{
Task SendAsync(Notification notification, CancellationToken ct = default);
}
internal sealed class SlackNotificator : INotificator
{
private readonly Channel<NotificationPromise> _channel = Channel.CreateUnboundedPrioritized(
new UnboundedPrioritizedChannelOptions<NotificationPromise>
{
Comparer = Comparer<NotificationPromise>.Create((n1, n2) =>
n2.Notification.Priority.CompareTo(n1.Notification.Priority))
}
);
public SlackNotificator()
{
StartConsuming();
}
public async Task SendAsync(Notification notification, CancellationToken ct = default)
{
// here we create tcs to wait until a notification is sent or a timeout.
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
ct.Register(() => tcs.TrySetCanceled(ct));
if (await _channel.Writer.WaitToWriteAsync(ct))
{
await _channel.Writer.WriteAsync(new NotificationPromise(notification, tcs, ct), ct);
}
await tcs.Task; // waiting while the notification is sent.
}
private async void StartConsuming()
{
while (await _channel.Reader.WaitToReadAsync())
{
while (_channel.Reader.TryRead(out var n))
{
if (n.Tcs.Task.IsCompleted)
{
continue;
}
try
{
await SendNotificationAsync(n.Notification, n.Ct);
n.Tcs.TrySetResult();
}
catch (Exception e)
{
n.Tcs.TrySetException(e);
}
}
}
}
private static Task SendNotificationAsync(Notification notification, CancellationToken ct)
{
Console.WriteLine(notification);
return Task.CompletedTask;
}
private readonly record struct NotificationPromise(Notification Notification, TaskCompletionSource Tcs, CancellationToken Ct);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment