Last active
December 29, 2015 09:09
-
-
Save laurencee/7648413 to your computer and use it in GitHub Desktop.
Redis subscription handling sending json serialized types through redis pub/sub using servicestack redis/common/text
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Data; | |
| using System.Runtime.Remoting.Messaging; | |
| using System.Threading.Tasks; | |
| namespace RedisClientPrototype | |
| { | |
| class Program | |
| { | |
| static void Main(string[] args) | |
| { | |
| var redisPublisher = new RedisPublisher(); | |
| var subscriptionHandler = new RedisSubscriptionHandler(); | |
| var consoleWriterRegistration = new RedisRegistration("CHANNEL2", Console.WriteLine); | |
| // Generic type to be deserialized | |
| subscriptionHandler.RegisterSubscription<TestClass>(TestClassConsoleWrite, "CHANNEL"); | |
| // Regular string action | |
| subscriptionHandler.RegisterSubscription(consoleWriterRegistration); | |
| Task.Run(() => subscriptionHandler.StartListener()); | |
| // Publish some messages of the generic type | |
| redisPublisher.PublishMessage(new TestClass() | |
| { | |
| FirstName = "test", | |
| Age = 21 | |
| }, "CHANNEL"); | |
| redisPublisher.PublishMessage(new TestClass() | |
| { | |
| FirstName = "test2", | |
| Age = 31 | |
| }, "CHANNEL"); | |
| redisPublisher.PublishMessage(new TestClass() | |
| { | |
| FirstName = "test3", | |
| Age = 41 | |
| }, "CHANNEL"); | |
| // Start publishing basic string messages on another channel | |
| redisPublisher.StartPublisher(1000, "CHANNEL2"); | |
| Console.ReadLine(); | |
| subscriptionHandler.StopListener(); | |
| redisPublisher.StopPublisher(); | |
| Console.Write("Press any key to exit"); | |
| Console.ReadLine(); | |
| } | |
| private static void TestClassConsoleWrite(TestClass testClass) | |
| { | |
| Console.WriteLine("TestClass - FirstName:{0}, Age:{1}", testClass.FirstName, testClass.Age); | |
| } | |
| } | |
| [Serializable] | |
| public class TestClass | |
| { | |
| public string FirstName { get; set; } | |
| public int Age { get; set; } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Timers; | |
| using ServiceStack.Redis; | |
| using ServiceStack.Text; | |
| namespace RedisClientPrototype | |
| { | |
| internal class RedisPublisher | |
| { | |
| private string channelName; | |
| private const string MessagePrefix = "MESSAGE "; | |
| private readonly Timer timer = new Timer(); | |
| private int counter; | |
| public void StartPublisher(int interval, string channel) | |
| { | |
| if (timer.Enabled) return; | |
| Console.WriteLine("Starting publisher"); | |
| channelName = channel; | |
| timer.Elapsed += PublishMessage; | |
| timer.Interval = interval; | |
| timer.Start(); | |
| } | |
| public void StopPublisher() | |
| { | |
| Console.WriteLine("Stopping publisher"); | |
| timer.Stop(); | |
| } | |
| public void PublishMessage<T>(T serializableMessage, string channel) | |
| { | |
| var serializer = new JsonSerializer<T>(); | |
| using (var redisClient = new RedisClient(TestConfig.LocalHost)) | |
| { | |
| var message = serializer.SerializeToString(serializableMessage); | |
| Console.WriteLine("Publishing '{0}' to '{1}'", message, channel); | |
| redisClient.PublishMessage(channel, message); | |
| } | |
| } | |
| private void PublishMessage(object sender, ElapsedEventArgs e) | |
| { | |
| using (var redisClient = new RedisClient(TestConfig.LocalHost)) | |
| { | |
| var message = MessagePrefix + ++counter; | |
| Console.WriteLine("Publishing '{0}' to '{1}'", message, channelName); | |
| redisClient.PublishMessage(channelName, message); | |
| } | |
| } | |
| } | |
| internal class TestConfig | |
| { | |
| public const string LocalHost = "localhost"; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using ServiceStack.Text; | |
| namespace RedisClientPrototype | |
| { | |
| public static class RedisSubscriptionExtensions | |
| { | |
| public static void RegisterSubscription<T>(this IRedisSubscriptionHandler handler, Action<T> action, string channel) | |
| { | |
| var serializer = new JsonSerializer<T>(); | |
| var registration = new RedisRegistration(channel, s => action(serializer.DeserializeFromString(s))); | |
| handler.RegisterSubscription(registration); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using ServiceStack.Common.Extensions; | |
| using ServiceStack.Redis; | |
| namespace RedisClientPrototype | |
| { | |
| public class RedisRegistration | |
| { | |
| public RedisRegistration(string channel, Action<string> messageAction) | |
| { | |
| ChannelName = channel; | |
| MessageAction = messageAction; | |
| } | |
| public string ChannelName { get; private set; } | |
| public Action<string> MessageAction { get; private set; } | |
| } | |
| public interface IRedisSubscriptionHandler | |
| { | |
| void RegisterSubscription(RedisRegistration redisRegistration); | |
| /// <summary> Blocking call </summary> | |
| void StartListener(); | |
| void StopListener(); | |
| } | |
| public class RedisSubscriptionHandler : IRedisSubscriptionHandler | |
| { | |
| private readonly List<RedisRegistration> registrations = new List<RedisRegistration>(); | |
| private IRedisSubscription redisSubscription; | |
| private readonly string unsubscribeChannel = typeof (RedisSubscriptionHandler).FullName + ".Unsubscribe"; | |
| public void RegisterSubscription(RedisRegistration redisRegistration) | |
| { | |
| registrations.Add(redisRegistration); | |
| } | |
| /// <summary> Blocking call </summary> | |
| public void StartListener() | |
| { | |
| var unsubscriptionChannels = new RedisRegistration(unsubscribeChannel, OnUnsubscribeChannels); | |
| registrations.Insert(0, unsubscriptionChannels); | |
| using (var redisClient = new RedisClient(TestConfig.LocalHost)) | |
| using (var subscription = redisClient.CreateSubscription()) | |
| { | |
| redisSubscription = subscription; | |
| subscription.OnMessage = SendMessage; | |
| subscription.SubscribeToChannels(registrations.Select(x => x.ChannelName).Distinct().ToArray()); // blocking | |
| } | |
| } | |
| public void StopListener() | |
| { | |
| using (var redisClient = new RedisClient(TestConfig.LocalHost)) | |
| { | |
| Console.WriteLine("Unsubscribing channels"); | |
| redisClient.PublishMessage(unsubscribeChannel, ""); | |
| } | |
| } | |
| private void SendMessage(string channel, string message) | |
| { | |
| var registeredActions = from registration in registrations | |
| where registration.ChannelName == channel | |
| select registration.MessageAction; | |
| registeredActions.ForEach(action => action(message)); | |
| } | |
| private void OnUnsubscribeChannels(string message) | |
| { | |
| if (redisSubscription == null) return; | |
| redisSubscription.UnSubscribeFromAllChannels(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment