Skip to content

Instantly share code, notes, and snippets.

@sheradmin
Created September 21, 2017 11:05
Show Gist options
  • Select an option

  • Save sheradmin/db7af9ddfaf8db540a40d2f3b4670def to your computer and use it in GitHub Desktop.

Select an option

Save sheradmin/db7af9ddfaf8db540a40d2f3b4670def to your computer and use it in GitHub Desktop.
Spring boot messaging with RabbitMQ
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
@Configuration
@EnableRabbit
public class RabbitMqConfiguration {
public static final String EVENT_QUEUE_1 = "EVENT_QUEUE_1";
public final static String DELAY_EXCHANGE_NAME = "delayedExchange";
public final static String DEFAULT_EXCHANGE_NAME = "defaultExchange";
@Inject
private ObjectMapper objectMapper;
@Primary
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(producerJackson2MessageConverter());
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(8);
return factory;
}
@Bean
@Lazy
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactorySimple(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(producerJackson2MessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
@Bean
@Lazy
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(objectMapper);
jackson2JsonMessageConverter.setClassMapper(classMapper());
return jackson2JsonMessageConverter;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setObjectMapper(objectMapper);
return messageConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("payload", Payload.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
}
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = EVENT_QUEUE_1),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = EVENT_QUEUE_1))
public void eventQueue1(Payload payload) {
//do ...
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = EVENT_QUEUE_2),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = EVENT_QUEUE_2))
public void eventQueue2(Payload payload) {
//do ...
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = EVENT_QUEUE_3),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = EVENT_QUEUE_3))
public void eventQueue3(Payload payload) {
//do ...
}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqProducer {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
@Inject
public RabbitMqProducer(final RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
this.rabbitTemplate = rabbitTemplate;
this.objectMapper = objectMapper;
}
public void sendEventQueue1(Payload payload) {
send(payload, EVENT_QUEUE_1, 2000);
}
public void sendEventQueue2(Payload payload) {
send(payload, EVENT_QUEUE_2, 2000);
}
public void sendEventQueue3(Payload payload) {
send(payload, EVENT_QUEUE_3, 2000);
}
private void send(Payload payload, String queueName, long delay) {
String payloadString;
try {
payloadString = objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
e.printStackTrace();
return;
}
Message jsonMessage = MessageBuilder.withBody(payloadString.getBytes())
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").setHeader("x-delay", delay)//milles. delay
.setHeader("__TypeId__", "payload").build()).build();
rabbitTemplate.send(
DELAY_EXCHANGE_NAME,
queueName,
jsonMessage
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment