Created
September 21, 2017 11:05
-
-
Save sheradmin/db7af9ddfaf8db540a40d2f3b4670def to your computer and use it in GitHub Desktop.
Spring boot messaging with RabbitMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.springframework.amqp.rabbit.annotation.EnableRabbit; | |
| import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; | |
| import org.springframework.amqp.rabbit.connection.ConnectionFactory; | |
| import org.springframework.amqp.support.converter.DefaultClassMapper; | |
| import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; | |
| import org.springframework.context.annotation.Bean; | |
| import org.springframework.context.annotation.Configuration; | |
| import org.springframework.context.annotation.Lazy; | |
| import org.springframework.context.annotation.Primary; | |
| import org.springframework.messaging.converter.MappingJackson2MessageConverter; | |
| @Configuration | |
| @EnableRabbit | |
| public class RabbitMqConfiguration { | |
| public static final String EVENT_QUEUE_1 = "EVENT_QUEUE_1"; | |
| public final static String DELAY_EXCHANGE_NAME = "delayedExchange"; | |
| public final static String DEFAULT_EXCHANGE_NAME = "defaultExchange"; | |
| @Inject | |
| private ObjectMapper objectMapper; | |
| @Primary | |
| @Bean | |
| public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { | |
| SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); | |
| factory.setConnectionFactory(connectionFactory); | |
| factory.setMessageConverter(producerJackson2MessageConverter()); | |
| factory.setConcurrentConsumers(2); | |
| factory.setMaxConcurrentConsumers(8); | |
| return factory; | |
| } | |
| @Bean | |
| @Lazy | |
| public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactorySimple(ConnectionFactory connectionFactory) { | |
| SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); | |
| factory.setConnectionFactory(connectionFactory); | |
| factory.setMessageConverter(producerJackson2MessageConverter()); | |
| factory.setConcurrentConsumers(1); | |
| factory.setMaxConcurrentConsumers(1); | |
| return factory; | |
| } | |
| @Bean | |
| @Lazy | |
| public Jackson2JsonMessageConverter producerJackson2MessageConverter() { | |
| Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(objectMapper); | |
| jackson2JsonMessageConverter.setClassMapper(classMapper()); | |
| return jackson2JsonMessageConverter; | |
| } | |
| @Bean | |
| public MappingJackson2MessageConverter consumerJackson2MessageConverter() { | |
| MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); | |
| messageConverter.setObjectMapper(objectMapper); | |
| return messageConverter; | |
| } | |
| @Bean | |
| public DefaultClassMapper classMapper() { | |
| DefaultClassMapper classMapper = new DefaultClassMapper(); | |
| Map<String, Class<?>> idClassMapping = new HashMap<>(); | |
| idClassMapping.put("payload", Payload.class); | |
| classMapper.setIdClassMapping(idClassMapping); | |
| return classMapper; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.springframework.amqp.rabbit.annotation.Exchange; | |
| import org.springframework.amqp.rabbit.annotation.Queue; | |
| import org.springframework.amqp.rabbit.annotation.QueueBinding; | |
| import org.springframework.amqp.rabbit.annotation.RabbitListener; | |
| import org.springframework.stereotype.Component; | |
| @Component | |
| public class RabbitMqConsumer { | |
| @RabbitListener(bindings = @QueueBinding(value = @Queue(value = EVENT_QUEUE_1), | |
| exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = EVENT_QUEUE_1)) | |
| public void eventQueue1(Payload payload) { | |
| //do ... | |
| } | |
| @RabbitListener(bindings = @QueueBinding(value = @Queue(value = EVENT_QUEUE_2), | |
| exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = EVENT_QUEUE_2)) | |
| public void eventQueue2(Payload payload) { | |
| //do ... | |
| } | |
| @RabbitListener(bindings = @QueueBinding(value = @Queue(value = EVENT_QUEUE_3), | |
| exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = EVENT_QUEUE_3)) | |
| public void eventQueue3(Payload payload) { | |
| //do ... | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.springframework.amqp.core.Message; | |
| import org.springframework.amqp.core.MessageBuilder; | |
| import org.springframework.amqp.core.MessagePropertiesBuilder; | |
| import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
| import org.springframework.stereotype.Component; | |
| @Component | |
| public class RabbitMqProducer { | |
| private final RabbitTemplate rabbitTemplate; | |
| private final ObjectMapper objectMapper; | |
| @Inject | |
| public RabbitMqProducer(final RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) { | |
| this.rabbitTemplate = rabbitTemplate; | |
| this.objectMapper = objectMapper; | |
| } | |
| public void sendEventQueue1(Payload payload) { | |
| send(payload, EVENT_QUEUE_1, 2000); | |
| } | |
| public void sendEventQueue2(Payload payload) { | |
| send(payload, EVENT_QUEUE_2, 2000); | |
| } | |
| public void sendEventQueue3(Payload payload) { | |
| send(payload, EVENT_QUEUE_3, 2000); | |
| } | |
| private void send(Payload payload, String queueName, long delay) { | |
| String payloadString; | |
| try { | |
| payloadString = objectMapper.writeValueAsString(payload); | |
| } catch (JsonProcessingException e) { | |
| e.printStackTrace(); | |
| return; | |
| } | |
| Message jsonMessage = MessageBuilder.withBody(payloadString.getBytes()) | |
| .andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").setHeader("x-delay", delay)//milles. delay | |
| .setHeader("__TypeId__", "payload").build()).build(); | |
| rabbitTemplate.send( | |
| DELAY_EXCHANGE_NAME, | |
| queueName, | |
| jsonMessage | |
| ); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment