Created
May 4, 2020 10:58
-
-
Save iundarigun/16cf0b7e120837945a3dc418504fefe4 to your computer and use it in GitHub Desktop.
MessageHandler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class RenameTopicHandler implements MirrorMaker.MirrorMakerMessageHandler { | |
| private final HashMap<String, String> topicMap = new HashMap<String, String>(); | |
| private long NO_TIMESTAMP = -1L; | |
| public RenameTopicHandler(String topics) { | |
| if(topics != null && !topics.isEmpty()){ | |
| for (String topicAssignment : topics.split(";")) { | |
| String[] topicsArray = topicAssignment.split(","); | |
| if (topicsArray.length == 2) { | |
| topicMap.put(topicsArray[0], topicsArray[1]); | |
| } | |
| } | |
| } | |
| } | |
| public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) { | |
| Long timestamp = record.timestamp() == NO_TIMESTAMP ? null : record.timestamp(); | |
| return Collections.singletonList(new ProducerRecord<byte[], byte[]>(getTopicName(record.topic()), null, timestamp, record.key(), record.value(), record.headers())); | |
| } | |
| private String getTopicName(String currentTopicName){ | |
| return topicMap.containsKey(currentTopicName) ? topicMap.get(currentTopicName) : currentTopicName; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment