Created
May 2, 2025 11:33
-
-
Save pgdad/e7f43a57cf20c3d555694b9da4cb3d47 to your computer and use it in GitHub Desktop.
example kafka with listener spring boot multiple brokers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Project structure: | |
| // - pom.xml | |
| // - src/main/java/com/example/dualkafka/ | |
| // - DualKafkaApplication.java (Main application class) | |
| // - config/ | |
| // - FirstKafkaConfig.java (First Kafka broker config) | |
| // - SecondKafkaConfig.java (Second Kafka broker config) | |
| // - listener/ | |
| // - FirstKafkaListener.java (Listener for first broker) | |
| // - SecondKafkaListener.java (Listener for second broker) | |
| // - model/ | |
| // - Message.java (Message model) | |
| // - src/main/resources/application.properties | |
| // pom.xml | |
| <?xml version="1.0" encoding="UTF-8"?> | |
| <project xmlns="http://maven.apache.org/POM/4.0.0" | |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
| xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
| <modelVersion>4.0.0</modelVersion> | |
| <parent> | |
| <groupId>org.springframework.boot</groupId> | |
| <artifactId>spring-boot-starter-parent</artifactId> | |
| <version>2.7.1</version> | |
| <relativePath/> | |
| </parent> | |
| <groupId>com.example</groupId> | |
| <artifactId>dual-kafka</artifactId> | |
| <version>0.0.1-SNAPSHOT</version> | |
| <name>dual-kafka</name> | |
| <description>Spring Boot app with dual Kafka broker listeners</description> | |
| <properties> | |
| <java.version>11</java.version> | |
| </properties> | |
| <dependencies> | |
| <dependency> | |
| <groupId>org.springframework.boot</groupId> | |
| <artifactId>spring-boot-starter</artifactId> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.springframework.boot</groupId> | |
| <artifactId>spring-boot-starter-web</artifactId> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.springframework.kafka</groupId> | |
| <artifactId>spring-kafka</artifactId> | |
| </dependency> | |
| <dependency> | |
| <groupId>com.fasterxml.jackson.core</groupId> | |
| <artifactId>jackson-databind</artifactId> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.projectlombok</groupId> | |
| <artifactId>lombok</artifactId> | |
| <optional>true</optional> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.springframework.boot</groupId> | |
| <artifactId>spring-boot-starter-test</artifactId> | |
| <scope>test</scope> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.springframework.kafka</groupId> | |
| <artifactId>spring-kafka-test</artifactId> | |
| <scope>test</scope> | |
| </dependency> | |
| </dependencies> | |
| <build> | |
| <plugins> | |
| <plugin> | |
| <groupId>org.springframework.boot</groupId> | |
| <artifactId>spring-boot-maven-plugin</artifactId> | |
| <configuration> | |
| <excludes> | |
| <exclude> | |
| <groupId>org.projectlombok</groupId> | |
| <artifactId>lombok</artifactId> | |
| </exclude> | |
| </excludes> | |
| </configuration> | |
| </plugin> | |
| </plugins> | |
| </build> | |
| </project> | |
| // application.properties | |
| # First Kafka Broker Configuration | |
| kafka.first.bootstrap-servers=first-kafka-host:9092 | |
| kafka.first.group-id=group-first | |
| kafka.first.topic=topic-first | |
| kafka.first.auto-offset-reset=earliest | |
| kafka.first.enable-auto-commit=true | |
| # Second Kafka Broker Configuration | |
| kafka.second.bootstrap-servers=second-kafka-host:9092 | |
| kafka.second.group-id=group-second | |
| kafka.second.topic=topic-second | |
| kafka.second.auto-offset-reset=earliest | |
| kafka.second.enable-auto-commit=true | |
| # Main Spring Boot Configuration | |
| server.port=8080 | |
| spring.application.name=dual-kafka-app | |
| // DualKafkaApplication.java | |
| package com.example.dualkafka; | |
| import org.springframework.boot.SpringApplication; | |
| import org.springframework.boot.autoconfigure.SpringBootApplication; | |
| import org.springframework.kafka.annotation.EnableKafka; | |
| @SpringBootApplication | |
| @EnableKafka | |
| public class DualKafkaApplication { | |
| public static void main(String[] args) { | |
| SpringApplication.run(DualKafkaApplication.class, args); | |
| } | |
| } | |
| // Message.java | |
| package com.example.dualkafka.model; | |
| import lombok.AllArgsConstructor; | |
| import lombok.Data; | |
| import lombok.NoArgsConstructor; | |
| @Data | |
| @NoArgsConstructor | |
| @AllArgsConstructor | |
| public class Message { | |
| private String id; | |
| private String content; | |
| private long timestamp; | |
| } | |
| // FirstKafkaConfig.java | |
| package com.example.dualkafka.config; | |
| import org.apache.kafka.clients.consumer.ConsumerConfig; | |
| import org.apache.kafka.common.serialization.StringDeserializer; | |
| import org.springframework.beans.factory.annotation.Value; | |
| import org.springframework.context.annotation.Bean; | |
| import org.springframework.context.annotation.Configuration; | |
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
| import org.springframework.kafka.core.ConsumerFactory; | |
| import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
| import org.springframework.kafka.support.serializer.JsonDeserializer; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import com.example.dualkafka.model.Message; | |
| @Configuration | |
| public class FirstKafkaConfig { | |
| @Value("${kafka.first.bootstrap-servers}") | |
| private String bootstrapServers; | |
| @Value("${kafka.first.group-id}") | |
| private String groupId; | |
| @Value("${kafka.first.auto-offset-reset}") | |
| private String autoOffsetReset; | |
| @Value("${kafka.first.enable-auto-commit}") | |
| private boolean enableAutoCommit; | |
| @Bean | |
| public ConsumerFactory<String, Message> firstConsumerFactory() { | |
| Map<String, Object> props = new HashMap<>(); | |
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
| props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
| props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); | |
| JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class); | |
| jsonDeserializer.addTrustedPackages("*"); | |
| return new DefaultKafkaConsumerFactory<>( | |
| props, | |
| new StringDeserializer(), | |
| jsonDeserializer | |
| ); | |
| } | |
| @Bean | |
| public ConcurrentKafkaListenerContainerFactory<String, Message> firstKafkaListenerContainerFactory() { | |
| ConcurrentKafkaListenerContainerFactory<String, Message> factory = | |
| new ConcurrentKafkaListenerContainerFactory<>(); | |
| factory.setConsumerFactory(firstConsumerFactory()); | |
| return factory; | |
| } | |
| } | |
| // SecondKafkaConfig.java | |
| package com.example.dualkafka.config; | |
| import org.apache.kafka.clients.consumer.ConsumerConfig; | |
| import org.apache.kafka.common.serialization.StringDeserializer; | |
| import org.springframework.beans.factory.annotation.Value; | |
| import org.springframework.context.annotation.Bean; | |
| import org.springframework.context.annotation.Configuration; | |
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
| import org.springframework.kafka.core.ConsumerFactory; | |
| import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
| import org.springframework.kafka.support.serializer.JsonDeserializer; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import com.example.dualkafka.model.Message; | |
| @Configuration | |
| public class SecondKafkaConfig { | |
| @Value("${kafka.second.bootstrap-servers}") | |
| private String bootstrapServers; | |
| @Value("${kafka.second.group-id}") | |
| private String groupId; | |
| @Value("${kafka.second.auto-offset-reset}") | |
| private String autoOffsetReset; | |
| @Value("${kafka.second.enable-auto-commit}") | |
| private boolean enableAutoCommit; | |
| @Bean | |
| public ConsumerFactory<String, Message> secondConsumerFactory() { | |
| Map<String, Object> props = new HashMap<>(); | |
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
| props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
| props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); | |
| JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class); | |
| jsonDeserializer.addTrustedPackages("*"); | |
| return new DefaultKafkaConsumerFactory<>( | |
| props, | |
| new StringDeserializer(), | |
| jsonDeserializer | |
| ); | |
| } | |
| @Bean | |
| public ConcurrentKafkaListenerContainerFactory<String, Message> secondKafkaListenerContainerFactory() { | |
| ConcurrentKafkaListenerContainerFactory<String, Message> factory = | |
| new ConcurrentKafkaListenerContainerFactory<>(); | |
| factory.setConsumerFactory(secondConsumerFactory()); | |
| return factory; | |
| } | |
| } | |
| // FirstKafkaListener.java | |
| package com.example.dualkafka.listener; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.kafka.annotation.KafkaListener; | |
| import org.springframework.stereotype.Service; | |
| import com.example.dualkafka.model.Message; | |
| @Service | |
| public class FirstKafkaListener { | |
| private static final Logger logger = LoggerFactory.getLogger(FirstKafkaListener.class); | |
| @KafkaListener( | |
| topics = "${kafka.first.topic}", | |
| groupId = "${kafka.first.group-id}", | |
| containerFactory = "firstKafkaListenerContainerFactory" | |
| ) | |
| public void listen(Message message) { | |
| logger.info("Received message from first Kafka broker: {}", message); | |
| // Process the message from the first broker | |
| processFirstMessage(message); | |
| } | |
| private void processFirstMessage(Message message) { | |
| // Business logic for processing messages from the first broker | |
| logger.info("Processing message from first broker - ID: {}, Content: {}", | |
| message.getId(), message.getContent()); | |
| } | |
| } | |
| // SecondKafkaListener.java | |
| package com.example.dualkafka.listener; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.kafka.annotation.KafkaListener; | |
| import org.springframework.stereotype.Service; | |
| import com.example.dualkafka.model.Message; | |
| @Service | |
| public class SecondKafkaListener { | |
| private static final Logger logger = LoggerFactory.getLogger(SecondKafkaListener.class); | |
| @KafkaListener( | |
| topics = "${kafka.second.topic}", | |
| groupId = "${kafka.second.group-id}", | |
| containerFactory = "secondKafkaListenerContainerFactory" | |
| ) | |
| public void listen(Message message) { | |
| logger.info("Received message from second Kafka broker: {}", message); | |
| // Process the message from the second broker | |
| processSecondMessage(message); | |
| } | |
| private void processSecondMessage(Message message) { | |
| // Business logic for processing messages from the second broker | |
| logger.info("Processing message from second broker - ID: {}, Content: {}", | |
| message.getId(), message.getContent()); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment