Skip to content

Instantly share code, notes, and snippets.

@Thesharing
Created December 3, 2020 07:43
Show Gist options
  • Select an option

  • Save Thesharing/5bd732390525552b59a6e893d03cbf14 to your computer and use it in GitHub Desktop.

Select an option

Save Thesharing/5bd732390525552b59a6e893d03cbf14 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestExecutionVertexOperationsDecorator;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Performance tests.
*/
public class PerformanceTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
private static final Time TIMEOUT = Time.seconds(10L);
private static final long SUBMIT_TIMEOUT = 300_000L;
private static final int PARALLELISM = 8000;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutorService;
private Configuration configuration;
private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
private ExecutionVertexVersioner executionVertexVersioner;
private ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
@Before
public void setup() {
executor = Executors.newSingleThreadExecutor();
scheduledExecutorService = new DirectScheduledExecutorService();
configuration = new Configuration();
testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0);
testExecutionVertexOperations = new TestExecutionVertexOperationsDecorator(new DefaultExecutionVertexOperations());
executionVertexVersioner = new ExecutionVertexVersioner();
}
@Test
public void testBuildExecutionGraphWithPipelinedRegionPerformance() throws Exception {
final List<JobVertex> jobVertices = createDefaultJobVertices(
PARALLELISM,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED);
final JobGraph jobGraph = createJobGraph(
jobVertices,
ScheduleMode.EAGER,
ExecutionMode.PIPELINED);
final SlotProvider slotProvider = new SimpleSlotProvider(2 * PARALLELISM);
final JobInformation jobInformation = new DummyJobInformation(
jobGraph.getJobID(),
jobGraph.getName());
final ClassLoader classLoader = ExecutionGraph.class.getClassLoader();
final ExecutionGraph eg = new ExecutionGraph(
jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
new RestartAllStrategy.Factory(),
slotProvider,
classLoader,
VoidBlobWriter.getInstance(),
Time.seconds(10L),
new RegionPartitionReleaseStrategy.Factory(),
NettyShuffleMaster.INSTANCE,
NoOpJobMasterPartitionTracker.INSTANCE,
jobGraph.getScheduleMode(),
NoOpExecutionDeploymentListener.INSTANCE,
(execution, newState) -> {
},
System.currentTimeMillis());
final long startTime = System.nanoTime();
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
final long duration = (System.nanoTime() - startTime) / 1_000_000;
LOG.info(String.format("Duration of building execution graph is : %d ms", duration));
}
@Test
public void testDeployTaskInStreamingJobPerformance() throws Exception {
final List<JobVertex> jobVertices = createDefaultJobVertices(
PARALLELISM,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED);
final JobGraph jobGraph = createJobGraph(
jobVertices,
ScheduleMode.EAGER,
ExecutionMode.PIPELINED);
final BlockingQueue<TaskDeploymentDescriptor> taskDeploymentDescriptors = new ArrayBlockingQueue<>(
PARALLELISM * 2);
final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
taskManagerGateway.setSubmitConsumer(taskDeploymentDescriptors::offer);
final SlotProvider slotProvider = new SimpleSlotProvider(
PARALLELISM * 2,
taskManagerGateway);
final DefaultScheduler scheduler = createScheduler(
jobGraph,
slotProvider);
final long startTime = System.nanoTime();
startScheduling(scheduler);
waitForAllTaskSubmitted(taskDeploymentDescriptors, PARALLELISM * 2, SUBMIT_TIMEOUT);
final long duration = (System.nanoTime() - startTime) / 1_000_000;
LOG.info(String.format("Duration of deploying tasks is : %d ms", duration));
}
@Test
public void testDeployTaskInBatchJobPerformance() throws Exception {
final List<JobVertex> jobVertices = createDefaultJobVertices(
PARALLELISM,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING);
final JobGraph jobGraph = createJobGraph(
jobVertices,
ScheduleMode.LAZY_FROM_SOURCES,
ExecutionMode.BATCH);
final BlockingQueue<TaskDeploymentDescriptor> taskDeploymentDescriptors =
new ArrayBlockingQueue<>(PARALLELISM * 2);
final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
taskManagerGateway.setSubmitConsumer(taskDeploymentDescriptors::offer);
final SlotProvider slotProvider = new SimpleSlotProvider(
PARALLELISM * 2,
taskManagerGateway);
final DefaultScheduler scheduler = createScheduler(
jobGraph,
slotProvider);
final long startTime = System.nanoTime();
startScheduling(scheduler);
waitForAllTaskSubmitted(taskDeploymentDescriptors, PARALLELISM, SUBMIT_TIMEOUT);
final long sourceDuration = (System.nanoTime() - startTime) / 1_000_000;
final JobVertex source = jobVertices.get(0);
final AccessExecutionJobVertex ejv = scheduler
.requestJob()
.getAllVertices()
.get(source.getID());
for (int i = 0; i < PARALLELISM - 1; i++) {
transitionTaskStatus(scheduler, ejv, i, ExecutionState.FINISHED);
}
final long sinkStartTime = System.nanoTime();
transitionTaskStatus(scheduler, ejv, PARALLELISM - 1, ExecutionState.FINISHED);
waitForAllTaskSubmitted(taskDeploymentDescriptors, PARALLELISM * 2, SUBMIT_TIMEOUT);
long sinkDuration = (System.nanoTime() - sinkStartTime) / 1_000_000;
LOG.info(String.format("Duration of deploying sources is : %d ms", sourceDuration));
LOG.info(String.format("Duration of deploying sinks is : %d ms", sinkDuration));
}
@Test
public void testRegionFailoverInStreamingJobPerformance() throws Exception {
final List<JobVertex> jobVertices = createDefaultJobVertices(
PARALLELISM,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED);
final JobGraph jobGraph = createJobGraph(
jobVertices,
ScheduleMode.EAGER,
ExecutionMode.PIPELINED);
final BlockingQueue<TaskDeploymentDescriptor> taskDeploymentDescriptors = new ArrayBlockingQueue<>(
PARALLELISM * 2);
final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
taskManagerGateway.setSubmitConsumer(taskDeploymentDescriptors::offer);
final SlotProvider slotProvider = new SimpleSlotProvider(
PARALLELISM * 2,
taskManagerGateway);
final DefaultScheduler scheduler = createScheduler(
jobGraph,
slotProvider);
startScheduling(scheduler);
waitForAllTaskSubmitted(taskDeploymentDescriptors, PARALLELISM * 2, SUBMIT_TIMEOUT);
final JobVertex source = jobVertices.get(0);
Predicate<AccessExecution> isDeploying = ExecutionGraphTestUtils.isInExecutionState(
ExecutionState.DEPLOYING);
ExecutionGraphTestUtils.waitForAllExecutionsPredicate(
scheduler.getExecutionGraph(),
isDeploying,
TIMEOUT.toMilliseconds());
final AccessExecutionJobVertex ejvSource = scheduler
.requestJob()
.getAllVertices()
.get(source.getID());
transitionAllTaskStatus(scheduler, ejvSource, ExecutionState.RUNNING);
final AccessExecutionJobVertex ejvSink = scheduler
.requestJob()
.getAllVertices()
.get(jobVertices.get(1).getID());
transitionAllTaskStatus(scheduler, ejvSink, ExecutionState.RUNNING);
ExecutionVertex ev11 = scheduler.getExecutionJobVertex(source.getID()).getTaskVertices()[0];
final long startTime = System.nanoTime();
ev11.getCurrentExecutionAttempt().fail(new Exception("new fail"));
final long duration = (System.nanoTime() - startTime) / 1_000_000;
LOG.info(String.format("Duration of failover in the streaming task is : %d ms", duration));
}
public List<JobVertex> createDefaultJobVertices(
int parallelism,
DistributionPattern distributionPattern,
ResultPartitionType resultPartitionType) {
List<JobVertex> jobVertices = new ArrayList<>();
final JobVertex source = new JobVertex("source");
source.setInvokableClass(NoOpInvokable.class);
source.setParallelism(parallelism);
jobVertices.add(source);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(NoOpInvokable.class);
sink.setParallelism(parallelism);
jobVertices.add(sink);
sink.connectNewDataSetAsInput(source, distributionPattern, resultPartitionType);
return jobVertices;
}
public JobGraph createJobGraph(
List<JobVertex> jobVertices,
ScheduleMode scheduleMode,
ExecutionMode executionMode) throws IOException {
final JobGraph jobGraph = new JobGraph(jobVertices.toArray(new JobVertex[0]));
jobGraph.setScheduleMode(scheduleMode);
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setExecutionMode(executionMode);
jobGraph.setExecutionConfig(executionConfig);
return jobGraph;
}
public List<SlotOffer> createSlotOffers(int slotNum) {
return IntStream
.range(0, slotNum)
.mapToObj(i -> new SlotOffer(new AllocationID(), i, ResourceProfile.UNKNOWN))
.limit(20)
.collect(Collectors.toList());
}
private DefaultScheduler createScheduler(
final JobGraph jobGraph,
final SlotProvider slotProvider) throws Exception {
final SchedulingStrategyFactory schedulingStrategyFactory =
jobGraph.getScheduleMode() == ScheduleMode.LAZY_FROM_SOURCES ?
new LazyFromSourcesSchedulingStrategy.Factory() :
new EagerSchedulingStrategy.Factory();
return SchedulerTestingUtils.newSchedulerBuilderWithDefaultSlotAllocator(
jobGraph,
slotProvider,
TIMEOUT)
.setLogger(log)
.setIoExecutor(executor)
.setJobMasterConfiguration(configuration)
.setFutureExecutor(scheduledExecutorService)
.setDelayExecutor(taskRestartExecutor)
.setSchedulingStrategyFactory(schedulingStrategyFactory)
.setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
.setExecutionVertexOperations(testExecutionVertexOperations)
.setExecutionVertexVersioner(executionVertexVersioner)
.build();
}
private void startScheduling(final SchedulerNG scheduler) {
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
scheduler.startScheduling();
}
public static void waitForAllTaskSubmitted(
BlockingQueue<TaskDeploymentDescriptor> taskDeploymentDescriptors,
int desiredCount,
long maxWaitMillis
) throws TimeoutException {
// this is a poor implementation - we may want to improve it eventually
final long deadline =
maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
while (taskDeploymentDescriptors.size() < desiredCount && System.nanoTime() < deadline) {
try {
Thread.sleep(2);
} catch (InterruptedException ignored) {
}
}
if (System.nanoTime() >= deadline) {
throw new TimeoutException();
}
}
private static void transitionTaskStatus(
DefaultScheduler scheduler,
AccessExecutionJobVertex vertex,
int subtask,
ExecutionState executionState) {
final ExecutionAttemptID attemptId = vertex.getTaskVertices()[subtask]
.getCurrentExecutionAttempt()
.getAttemptId();
scheduler.updateTaskExecutionState(
new TaskExecutionState(
scheduler.getJobGraph().getJobID(),
attemptId,
executionState));
}
private static void transitionAllTaskStatus(
DefaultScheduler scheduler,
AccessExecutionJobVertex vertex,
ExecutionState executionState) {
for (AccessExecutionVertex ev : vertex.getTaskVertices()) {
ExecutionAttemptID attemptId = ev.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
new TaskExecutionState(
scheduler.getJobGraph().getJobID(),
attemptId,
executionState));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment