Skip to content

Instantly share code, notes, and snippets.

@laurencee
Last active December 29, 2015 09:09
Show Gist options
  • Select an option

  • Save laurencee/7648413 to your computer and use it in GitHub Desktop.

Select an option

Save laurencee/7648413 to your computer and use it in GitHub Desktop.
Redis subscription handling sending json serialized types through redis pub/sub using servicestack redis/common/text
using System;
using System.Data;
using System.Runtime.Remoting.Messaging;
using System.Threading.Tasks;
namespace RedisClientPrototype
{
class Program
{
static void Main(string[] args)
{
var redisPublisher = new RedisPublisher();
var subscriptionHandler = new RedisSubscriptionHandler();
var consoleWriterRegistration = new RedisRegistration("CHANNEL2", Console.WriteLine);
// Generic type to be deserialized
subscriptionHandler.RegisterSubscription<TestClass>(TestClassConsoleWrite, "CHANNEL");
// Regular string action
subscriptionHandler.RegisterSubscription(consoleWriterRegistration);
Task.Run(() => subscriptionHandler.StartListener());
// Publish some messages of the generic type
redisPublisher.PublishMessage(new TestClass()
{
FirstName = "test",
Age = 21
}, "CHANNEL");
redisPublisher.PublishMessage(new TestClass()
{
FirstName = "test2",
Age = 31
}, "CHANNEL");
redisPublisher.PublishMessage(new TestClass()
{
FirstName = "test3",
Age = 41
}, "CHANNEL");
// Start publishing basic string messages on another channel
redisPublisher.StartPublisher(1000, "CHANNEL2");
Console.ReadLine();
subscriptionHandler.StopListener();
redisPublisher.StopPublisher();
Console.Write("Press any key to exit");
Console.ReadLine();
}
private static void TestClassConsoleWrite(TestClass testClass)
{
Console.WriteLine("TestClass - FirstName:{0}, Age:{1}", testClass.FirstName, testClass.Age);
}
}
[Serializable]
public class TestClass
{
public string FirstName { get; set; }
public int Age { get; set; }
}
}
using System;
using System.Timers;
using ServiceStack.Redis;
using ServiceStack.Text;
namespace RedisClientPrototype
{
internal class RedisPublisher
{
private string channelName;
private const string MessagePrefix = "MESSAGE ";
private readonly Timer timer = new Timer();
private int counter;
public void StartPublisher(int interval, string channel)
{
if (timer.Enabled) return;
Console.WriteLine("Starting publisher");
channelName = channel;
timer.Elapsed += PublishMessage;
timer.Interval = interval;
timer.Start();
}
public void StopPublisher()
{
Console.WriteLine("Stopping publisher");
timer.Stop();
}
public void PublishMessage<T>(T serializableMessage, string channel)
{
var serializer = new JsonSerializer<T>();
using (var redisClient = new RedisClient(TestConfig.LocalHost))
{
var message = serializer.SerializeToString(serializableMessage);
Console.WriteLine("Publishing '{0}' to '{1}'", message, channel);
redisClient.PublishMessage(channel, message);
}
}
private void PublishMessage(object sender, ElapsedEventArgs e)
{
using (var redisClient = new RedisClient(TestConfig.LocalHost))
{
var message = MessagePrefix + ++counter;
Console.WriteLine("Publishing '{0}' to '{1}'", message, channelName);
redisClient.PublishMessage(channelName, message);
}
}
}
internal class TestConfig
{
public const string LocalHost = "localhost";
}
}
using System;
using ServiceStack.Text;
namespace RedisClientPrototype
{
public static class RedisSubscriptionExtensions
{
public static void RegisterSubscription<T>(this IRedisSubscriptionHandler handler, Action<T> action, string channel)
{
var serializer = new JsonSerializer<T>();
var registration = new RedisRegistration(channel, s => action(serializer.DeserializeFromString(s)));
handler.RegisterSubscription(registration);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using ServiceStack.Common.Extensions;
using ServiceStack.Redis;
namespace RedisClientPrototype
{
public class RedisRegistration
{
public RedisRegistration(string channel, Action<string> messageAction)
{
ChannelName = channel;
MessageAction = messageAction;
}
public string ChannelName { get; private set; }
public Action<string> MessageAction { get; private set; }
}
public interface IRedisSubscriptionHandler
{
void RegisterSubscription(RedisRegistration redisRegistration);
/// <summary> Blocking call </summary>
void StartListener();
void StopListener();
}
public class RedisSubscriptionHandler : IRedisSubscriptionHandler
{
private readonly List<RedisRegistration> registrations = new List<RedisRegistration>();
private IRedisSubscription redisSubscription;
private readonly string unsubscribeChannel = typeof (RedisSubscriptionHandler).FullName + ".Unsubscribe";
public void RegisterSubscription(RedisRegistration redisRegistration)
{
registrations.Add(redisRegistration);
}
/// <summary> Blocking call </summary>
public void StartListener()
{
var unsubscriptionChannels = new RedisRegistration(unsubscribeChannel, OnUnsubscribeChannels);
registrations.Insert(0, unsubscriptionChannels);
using (var redisClient = new RedisClient(TestConfig.LocalHost))
using (var subscription = redisClient.CreateSubscription())
{
redisSubscription = subscription;
subscription.OnMessage = SendMessage;
subscription.SubscribeToChannels(registrations.Select(x => x.ChannelName).Distinct().ToArray()); // blocking
}
}
public void StopListener()
{
using (var redisClient = new RedisClient(TestConfig.LocalHost))
{
Console.WriteLine("Unsubscribing channels");
redisClient.PublishMessage(unsubscribeChannel, "");
}
}
private void SendMessage(string channel, string message)
{
var registeredActions = from registration in registrations
where registration.ChannelName == channel
select registration.MessageAction;
registeredActions.ForEach(action => action(message));
}
private void OnUnsubscribeChannels(string message)
{
if (redisSubscription == null) return;
redisSubscription.UnSubscribeFromAllChannels();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment