/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.minicluster.SometimesExceptionSender;
import org.apache.flink.runtime.minicluster.SometimesInstantiationErrorSender;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.StringStartsWith;
import org.junit.Assert;
import org.junit.Test;

public class MiniClusterITCase
extends TestLogger {
    @Test
    public void runJobWithSingleRpcService() throws Exception {
        int numOfTMs = 3;
        int slotsPerTM = 7;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.SHARED).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(MiniClusterITCase.getSimpleJob(21));
        }
    }

    @Test
    public void runJobWithMultipleRpcServices() throws Exception {
        int numOfTMs = 3;
        int slotsPerTM = 7;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.DEDICATED).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(MiniClusterITCase.getSimpleJob(21));
        }
    }

    @Test
    public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception {
        try {
            JobVertex vertex1 = new JobVertex("Test Vertex1");
            vertex1.setParallelism(1);
            vertex1.setMaxParallelism(1);
            vertex1.setInvokableClass(BlockingNoOpInvokable.class);
            JobVertex vertex2 = new JobVertex("Test Vertex2");
            vertex2.setParallelism(1);
            vertex2.setMaxParallelism(1);
            vertex2.setInvokableClass(BlockingNoOpInvokable.class);
            vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex1, vertex2);
            this.runHandleJobsWhenNotEnoughSlots(jobGraph);
            Assert.fail((String)"Job should fail.");
        }
        catch (JobExecutionException e) {
            Assert.assertThat((Object)((Object)e), (Matcher)FlinkMatchers.containsMessage((String)"Job execution failed"));
            Assert.assertThat((Object)((Object)e), (Matcher)FlinkMatchers.containsCause(NoResourceAvailableException.class));
        }
    }

    private void runHandleJobsWhenNotEnoughSlots(JobGraph jobGraph) throws Exception {
        Configuration configuration = this.getDefaultConfiguration();
        configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(100L));
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testForwardJob() throws Exception {
        int parallelism = 31;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(62).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(31);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(31);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testBipartiteJob() throws Exception {
        int parallelism = 31;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(62).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(31);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticReceiver.class);
            receiver.setParallelism(31);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testTwoInputJobFailingEdgeMismatch() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(6).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender1 = new JobVertex("Sender1");
            sender1.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender1.setParallelism(1);
            JobVertex sender2 = new JobVertex("Sender2");
            sender2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender2.setParallelism(2);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticTertiaryReceiver.class);
            receiver.setParallelism(3);
            receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender1, receiver, sender2);
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, ArrayIndexOutOfBoundsException.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"2").isPresent());
            }
        }
    }

    @Test
    public void testTwoInputJob() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(66).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender1 = new JobVertex("Sender1");
            sender1.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender1.setParallelism(11);
            JobVertex sender2 = new JobVertex("Sender2");
            sender2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender2.setParallelism(22);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticBinaryReceiver.class);
            receiver.setParallelism(33);
            receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender1, receiver, sender2);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testSchedulingAllAtOnce() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(11);
            JobVertex forwarder = new JobVertex("Forwarder");
            forwarder.setInvokableClass(Tasks.Forwarder.class);
            forwarder.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.AgnosticReceiver.class);
            receiver.setParallelism(11);
            SlotSharingGroup sharingGroup = new SlotSharingGroup();
            sender.setSlotSharingGroup(sharingGroup);
            forwarder.setSlotSharingGroup(sharingGroup);
            receiver.setSlotSharingGroup(sharingGroup);
            forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, forwarder, receiver);
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    @Test
    public void testJobWithAFailingSenderVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(Tasks.ExceptionSender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent());
            }
        }
    }

    @Test
    public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            SlotSharingGroup group = new SlotSharingGroup();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(SometimesExceptionSender.class);
            sender.setParallelism(11);
            sender.setSlotSharingGroup(group);
            SometimesExceptionSender.configFailingSenders(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.setSlotSharingGroup(group);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent());
            }
        }
    }

    @Test
    public void testJobWithAFailingReceiverVertex() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(Tasks.ExceptionReceiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception").isPresent());
            }
        }
    }

    @Test
    public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(Tasks.InstantiationErrorSender.class);
            sender.setParallelism(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception in constructor").isPresent());
            }
        }
    }

    @Test
    public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            SlotSharingGroup group = new SlotSharingGroup();
            JobVertex sender = new JobVertex("Sender");
            sender.setInvokableClass(SometimesInstantiationErrorSender.class);
            sender.setParallelism(11);
            sender.setSlotSharingGroup(group);
            SometimesInstantiationErrorSender.configFailingSenders(11);
            JobVertex receiver = new JobVertex("Receiver");
            receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            receiver.setParallelism(11);
            receiver.setSlotSharingGroup(group);
            receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
            try {
                miniCluster.executeJobBlocking(jobGraph);
                Assert.fail((String)"Job should fail.");
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, Exception.class).isPresent());
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Test exception in constructor").isPresent());
            }
        }
    }

    @Test
    public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
        int parallelism = 11;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex source = new JobVertex("Source");
            source.setInvokableClass(WaitingNoOpInvokable.class);
            source.setParallelism(11);
            WaitOnFinalizeJobVertex.resetFinalizedOnMaster();
            WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 20L);
            sink.setInvokableClass(NoOpInvokable.class);
            sink.setParallelism(11);
            sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source, sink);
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            ((JobResult)((CompletableFuture)jobResultFuture).get()).toJobExecutionResult(((Object)((Object)this)).getClass().getClassLoader());
            Assert.assertTrue((boolean)WaitOnFinalizeJobVertex.finalizedOnMaster.get());
        }
    }

    @Test
    public void testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            OutOfMemoryJobVertex failingJobVertex = new OutOfMemoryJobVertex();
            failingJobVertex.setInvokableClass(NoOpInvokable.class);
            failingJobVertex.setParallelism(1);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(failingJobVertex);
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            try {
                ((JobResult)((CompletableFuture)jobResultFuture).get()).toJobExecutionResult(((Object)((Object)this)).getClass().getClassLoader());
            }
            catch (JobExecutionException e) {
                Assert.assertThat((Object)((Object)e), (Matcher)FlinkMatchers.containsCause(OutOfMemoryError.class));
                Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, OutOfMemoryError.class).map(Throwable::getMessage).get(), (Matcher)StringStartsWith.startsWith((String)"Java heap space. A heap space-related out-of-memory error has occurred."));
            }
        }
    }

    @Test
    public void testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization() throws Exception {
        boolean parallelism = true;
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(this.getDefaultConfiguration()).build();
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            JobVertex failingJobVertex = new JobVertex("FailingInFinalization"){

                public void initializeOnMaster(ClassLoader loader) {
                    throw new OutOfMemoryError("Java heap space");
                }
            };
            failingJobVertex.setInvokableClass(NoOpInvokable.class);
            failingJobVertex.setParallelism(1);
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(failingJobVertex);
            CompletableFuture submissionFuture = miniCluster.submitJob(jobGraph);
            CompletionStage jobResultFuture = submissionFuture.thenCompose(ignored -> miniCluster.requestJobResult(jobGraph.getJobID()));
            try {
                ((CompletableFuture)jobResultFuture).get();
            }
            catch (ExecutionException e) {
                Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(OutOfMemoryError.class));
                Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsMessage((String)"Java heap space. A heap space-related out-of-memory error has occurred."));
            }
        }
    }

    private Configuration getDefaultConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "0");
        return configuration;
    }

    private static JobGraph getSimpleJob(int parallelism) throws IOException {
        JobVertex task = new JobVertex("Test task");
        task.setParallelism(parallelism);
        task.setMaxParallelism(parallelism);
        task.setInvokableClass(NoOpInvokable.class);
        JobGraph jg = JobGraphTestUtils.streamingJobGraph(task);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)1000L));
        jg.setExecutionConfig(executionConfig);
        return jg;
    }

    private static class OutOfMemoryJobVertex
    extends JobVertex {
        private OutOfMemoryJobVertex() {
            super("FailingInFinalization");
        }

        public void finalizeOnMaster(ClassLoader loader) {
            throw new OutOfMemoryError("Java heap space");
        }
    }

    private static class WaitOnFinalizeJobVertex
    extends JobVertex {
        private static final long serialVersionUID = -1179547322468530299L;
        private static final AtomicBoolean finalizedOnMaster = new AtomicBoolean(false);
        private final long waitingTime;

        WaitOnFinalizeJobVertex(String name, long waitingTime) {
            super(name);
            this.waitingTime = waitingTime;
        }

        public void finalizeOnMaster(ClassLoader loader) throws Exception {
            Thread.sleep(this.waitingTime);
            finalizedOnMaster.set(true);
        }

        static void resetFinalizedOnMaster() {
            finalizedOnMaster.set(false);
        }
    }
}

