Created
August 8, 2018 13:24
-
-
Save OutOfBrain/1c50ea01c68059c173dfaacb0c20ec55 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package flink; | |
| import org.apache.flink.streaming.api.datastream.DataStream; | |
| import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
| import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; | |
| import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import java.io.Serializable; | |
| public class BroadcastTest { | |
| private static final Logger logger = LoggerFactory.getLogger(BroadcastTest.class); | |
| public static void main(String[] args) throws Exception { | |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
| env.setParallelism(9); | |
| final DataStreamSource<ComboElement> stream = env.addSource(new SourceFunction<ComboElement>() { | |
| @Override | |
| public void run(final SourceContext<ComboElement> ctx) throws Exception { | |
| int i = 0; | |
| while (true) { | |
| String element = "This is kafka: " + i++; | |
| ctx.collect(new ComboElement(new KafkaElement(element), ReloadEventElement.dont_reload)); | |
| Thread.sleep(100); | |
| } | |
| } | |
| @Override | |
| public void cancel() { | |
| } | |
| }); | |
| final DataStreamSource<ComboElement> reloadSource = env.addSource(new SourceFunction<ComboElement>() { | |
| @Override | |
| public void run(final SourceContext<ComboElement> ctx) throws Exception { | |
| while (true) { | |
| // simulate reload from kafka | |
| Thread.sleep(4000); | |
| ctx.collect(new ComboElement(KafkaElement.EMPTY, ReloadEventElement.do_reload)); | |
| } | |
| } | |
| @Override | |
| public void cancel() { | |
| } | |
| }); | |
| final DataStream<ComboElement> mergedStream = stream.union(reloadSource.broadcast()); | |
| mergedStream.addSink(new RichSinkFunction<ComboElement>() { | |
| @Override | |
| public void invoke(final ComboElement value, final Context context) { | |
| // here we can check if reload is set or not | |
| logger.info(value.toString()); | |
| } | |
| }); | |
| env.execute(); | |
| } | |
| } | |
| class KafkaElement implements Serializable { | |
| public static KafkaElement EMPTY = new KafkaElement(""); | |
| public final String element; | |
| public KafkaElement(final String element) { | |
| this.element = element; | |
| } | |
| @Override | |
| public String toString() { | |
| return "KafkaElement{" + | |
| "element='" + element + '\'' + | |
| '}'; | |
| } | |
| } | |
| enum ReloadEventElement { | |
| dont_reload, do_reload | |
| } | |
| class ComboElement implements Serializable { | |
| public KafkaElement kafkaElement; | |
| public ReloadEventElement reloadEventElement; | |
| public ComboElement(final KafkaElement kafkaElement, final ReloadEventElement reloadEventElement) { | |
| this.kafkaElement = kafkaElement; | |
| this.reloadEventElement = reloadEventElement; | |
| } | |
| @Override | |
| public String toString() { | |
| return "ComboElement{" + | |
| "kafkaElement=" + kafkaElement + | |
| ", reloadEventElement=" + reloadEventElement + | |
| '}'; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment