Skip to content

Instantly share code, notes, and snippets.

@OutOfBrain
Created August 8, 2018 13:24
Show Gist options
  • Select an option

  • Save OutOfBrain/1c50ea01c68059c173dfaacb0c20ec55 to your computer and use it in GitHub Desktop.

Select an option

Save OutOfBrain/1c50ea01c68059c173dfaacb0c20ec55 to your computer and use it in GitHub Desktop.
package flink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
public class BroadcastTest {
private static final Logger logger = LoggerFactory.getLogger(BroadcastTest.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
final DataStreamSource<ComboElement> stream = env.addSource(new SourceFunction<ComboElement>() {
@Override
public void run(final SourceContext<ComboElement> ctx) throws Exception {
int i = 0;
while (true) {
String element = "This is kafka: " + i++;
ctx.collect(new ComboElement(new KafkaElement(element), ReloadEventElement.dont_reload));
Thread.sleep(100);
}
}
@Override
public void cancel() {
}
});
final DataStreamSource<ComboElement> reloadSource = env.addSource(new SourceFunction<ComboElement>() {
@Override
public void run(final SourceContext<ComboElement> ctx) throws Exception {
while (true) {
// simulate reload from kafka
Thread.sleep(4000);
ctx.collect(new ComboElement(KafkaElement.EMPTY, ReloadEventElement.do_reload));
}
}
@Override
public void cancel() {
}
});
final DataStream<ComboElement> mergedStream = stream.union(reloadSource.broadcast());
mergedStream.addSink(new RichSinkFunction<ComboElement>() {
@Override
public void invoke(final ComboElement value, final Context context) {
// here we can check if reload is set or not
logger.info(value.toString());
}
});
env.execute();
}
}
class KafkaElement implements Serializable {
public static KafkaElement EMPTY = new KafkaElement("");
public final String element;
public KafkaElement(final String element) {
this.element = element;
}
@Override
public String toString() {
return "KafkaElement{" +
"element='" + element + '\'' +
'}';
}
}
enum ReloadEventElement {
dont_reload, do_reload
}
class ComboElement implements Serializable {
public KafkaElement kafkaElement;
public ReloadEventElement reloadEventElement;
public ComboElement(final KafkaElement kafkaElement, final ReloadEventElement reloadEventElement) {
this.kafkaElement = kafkaElement;
this.reloadEventElement = reloadEventElement;
}
@Override
public String toString() {
return "ComboElement{" +
"kafkaElement=" + kafkaElement +
", reloadEventElement=" + reloadEventElement +
'}';
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment