Skip to content

Instantly share code, notes, and snippets.

@pgdad
Created May 2, 2025 11:33
Show Gist options
  • Select an option

  • Save pgdad/e7f43a57cf20c3d555694b9da4cb3d47 to your computer and use it in GitHub Desktop.

Select an option

Save pgdad/e7f43a57cf20c3d555694b9da4cb3d47 to your computer and use it in GitHub Desktop.
example kafka with listener spring boot multiple brokers
// Project structure:
// - pom.xml
// - src/main/java/com/example/dualkafka/
// - DualKafkaApplication.java (Main application class)
// - config/
// - FirstKafkaConfig.java (First Kafka broker config)
// - SecondKafkaConfig.java (Second Kafka broker config)
// - listener/
// - FirstKafkaListener.java (Listener for first broker)
// - SecondKafkaListener.java (Listener for second broker)
// - model/
// - Message.java (Message model)
// - src/main/resources/application.properties
// pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>dual-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>dual-kafka</name>
<description>Spring Boot app with dual Kafka broker listeners</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
// application.properties
# First Kafka Broker Configuration
kafka.first.bootstrap-servers=first-kafka-host:9092
kafka.first.group-id=group-first
kafka.first.topic=topic-first
kafka.first.auto-offset-reset=earliest
kafka.first.enable-auto-commit=true
# Second Kafka Broker Configuration
kafka.second.bootstrap-servers=second-kafka-host:9092
kafka.second.group-id=group-second
kafka.second.topic=topic-second
kafka.second.auto-offset-reset=earliest
kafka.second.enable-auto-commit=true
# Main Spring Boot Configuration
server.port=8080
spring.application.name=dual-kafka-app
// DualKafkaApplication.java
package com.example.dualkafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@SpringBootApplication
@EnableKafka
public class DualKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(DualKafkaApplication.class, args);
}
}
// Message.java
package com.example.dualkafka.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
private String id;
private String content;
private long timestamp;
}
// FirstKafkaConfig.java
package com.example.dualkafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
import com.example.dualkafka.model.Message;
@Configuration
public class FirstKafkaConfig {
@Value("${kafka.first.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.first.group-id}")
private String groupId;
@Value("${kafka.first.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.first.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public ConsumerFactory<String, Message> firstConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
jsonDeserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> firstKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstConsumerFactory());
return factory;
}
}
// SecondKafkaConfig.java
package com.example.dualkafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
import com.example.dualkafka.model.Message;
@Configuration
public class SecondKafkaConfig {
@Value("${kafka.second.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.second.group-id}")
private String groupId;
@Value("${kafka.second.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.second.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public ConsumerFactory<String, Message> secondConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
jsonDeserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> secondKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerFactory());
return factory;
}
}
// FirstKafkaListener.java
package com.example.dualkafka.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.example.dualkafka.model.Message;
@Service
public class FirstKafkaListener {
private static final Logger logger = LoggerFactory.getLogger(FirstKafkaListener.class);
@KafkaListener(
topics = "${kafka.first.topic}",
groupId = "${kafka.first.group-id}",
containerFactory = "firstKafkaListenerContainerFactory"
)
public void listen(Message message) {
logger.info("Received message from first Kafka broker: {}", message);
// Process the message from the first broker
processFirstMessage(message);
}
private void processFirstMessage(Message message) {
// Business logic for processing messages from the first broker
logger.info("Processing message from first broker - ID: {}, Content: {}",
message.getId(), message.getContent());
}
}
// SecondKafkaListener.java
package com.example.dualkafka.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.example.dualkafka.model.Message;
@Service
public class SecondKafkaListener {
private static final Logger logger = LoggerFactory.getLogger(SecondKafkaListener.class);
@KafkaListener(
topics = "${kafka.second.topic}",
groupId = "${kafka.second.group-id}",
containerFactory = "secondKafkaListenerContainerFactory"
)
public void listen(Message message) {
logger.info("Received message from second Kafka broker: {}", message);
// Process the message from the second broker
processSecondMessage(message);
}
private void processSecondMessage(Message message) {
// Business logic for processing messages from the second broker
logger.info("Processing message from second broker - ID: {}, Content: {}",
message.getId(), message.getContent());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment