/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointPlan;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class PendingCheckpointTest {
    private static final List<Execution> ACK_TASKS = new ArrayList<Execution>();
    private static final List<ExecutionVertex> TASKS_TO_COMMIT = new ArrayList<ExecutionVertex>();
    private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
    public static final OperatorID OPERATOR_ID = new OperatorID();
    public static final int PARALLELISM = 1;
    public static final int MAX_PARALLELISM = 128;
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();

    @Test
    public void testCanBeSubsumed() throws Exception {
        CheckpointProperties forced = new CheckpointProperties(true, CheckpointType.SAVEPOINT, false, false, false, false, false);
        PendingCheckpoint pending = this.createPendingCheckpoint(forced);
        Assert.assertFalse((boolean)pending.canBeSubsumed());
        try {
            this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        CheckpointProperties subsumed = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
        pending = this.createPendingCheckpoint(subsumed);
        Assert.assertFalse((boolean)pending.canBeSubsumed());
    }

    @Test
    public void testSyncSavepointCannotBeSubsumed() throws Exception {
        CheckpointProperties forced = CheckpointProperties.forSyncSavepoint((boolean)true, (boolean)false);
        PendingCheckpoint pending = this.createPendingCheckpoint(forced);
        Assert.assertFalse((boolean)pending.canBeSubsumed());
        try {
            this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testCompletionFuture() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
        PendingCheckpoint pending = this.createPendingCheckpoint(props);
        CompletableFuture future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assert.assertTrue((boolean)future.isDone());
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assert.assertTrue((boolean)future.isDone());
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assert.assertTrue((boolean)future.isDone());
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
        Assert.assertTrue((boolean)pending.areTasksFullyAcknowledged());
        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
        Assert.assertTrue((boolean)future.isDone());
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assert.assertFalse((boolean)future.isDone());
        try {
            pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testAbortDiscardsState() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, false, false, false, false, false);
        QueueExecutor executor = new QueueExecutor();
        OperatorState state = (OperatorState)Mockito.mock(OperatorState.class);
        ((OperatorState)Mockito.doNothing().when((Object)state)).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class));
        PendingCheckpoint pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new OperatorState[]{state});
        pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new OperatorState[]{state});
        pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new OperatorState[]{state});
        pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
        executor.runQueuedCommands();
        ((OperatorState)Mockito.verify((Object)state, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    public void testPendingCheckpointStatsCallbacks() throws Exception {
        PendingCheckpointStats callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportSubtaskStats((JobVertexID)ArgumentMatchers.nullable(JobVertexID.class), (SubtaskStateStats)ArgumentMatchers.any(SubtaskStateStats.class));
        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportCompletedCheckpoint((String)ArgumentMatchers.any(String.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(ArgumentMatchers.anyLong(), (Throwable)ArgumentMatchers.any(Exception.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(ArgumentMatchers.anyLong(), (Throwable)ArgumentMatchers.any(Exception.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(ArgumentMatchers.anyLong(), (Throwable)ArgumentMatchers.any(Exception.class));
        callback = (PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class);
        pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
        ((PendingCheckpointStats)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).reportFailedCheckpoint(ArgumentMatchers.anyLong(), (Throwable)ArgumentMatchers.any(Exception.class));
    }

    @Test
    public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        pending.acknowledgeTask(ATTEMPT_ID, null, (CheckpointMetrics)Mockito.mock(CheckpointMetrics.class), null);
        OperatorState expectedState = new OperatorState(OPERATOR_ID, 1, 128);
        Assert.assertEquals(Collections.singletonMap(OPERATOR_ID, expectedState), (Object)pending.getOperatorStates());
    }

    @Test
    public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        pending.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class), (CheckpointMetrics)Mockito.mock(CheckpointMetrics.class), null);
        Assert.assertFalse((boolean)pending.getOperatorStates().isEmpty());
    }

    @Test
    public void testSetCanceller() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true, true);
        PendingCheckpoint aborted = this.createPendingCheckpoint(props);
        this.abort(aborted, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assert.assertTrue((boolean)aborted.isDisposed());
        Assert.assertFalse((boolean)aborted.setCancellerHandle((ScheduledFuture)Mockito.mock(ScheduledFuture.class)));
        PendingCheckpoint pending = this.createPendingCheckpoint(props);
        ScheduledFuture canceller = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        Assert.assertTrue((boolean)pending.setCancellerHandle(canceller));
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        ((ScheduledFuture)Mockito.verify((Object)canceller)).cancel(false);
    }

    @Test
    public void testMasterState() throws Exception {
        TestingMasterTriggerRestoreHook masterHook = new TestingMasterTriggerRestoreHook("master hook");
        masterHook.addStateContent("state");
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonList(masterHook.getIdentifier()));
        MasterState masterState = (MasterState)MasterHooks.triggerHook((MasterTriggerRestoreHook)masterHook, (long)0L, (long)System.currentTimeMillis(), (Executor)Executors.directExecutor()).get();
        pending.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
        Assert.assertTrue((boolean)pending.areMasterStatesFullyAcknowledged());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
        Assert.assertTrue((boolean)pending.areTasksFullyAcknowledged());
        List resultMasterStates = pending.getMasterStates();
        Assert.assertEquals((long)1L, (long)resultMasterStates.size());
        String deserializedState = (String)masterHook.createCheckpointDataSerializer().deserialize(77, ((MasterState)resultMasterStates.get(0)).bytes());
        Assert.assertEquals((Object)"state", (Object)deserializedState);
    }

    @Test
    public void testMasterStateWithNullState() throws Exception {
        TestingMasterTriggerRestoreHook masterHook = new TestingMasterTriggerRestoreHook("master hook");
        masterHook.addStateContent("state");
        TestingMasterTriggerRestoreHook nullableMasterHook = new TestingMasterTriggerRestoreHook("nullable master hook");
        ArrayList<String> masterIdentifiers = new ArrayList<String>(2);
        masterIdentifiers.add(masterHook.getIdentifier());
        masterIdentifiers.add(nullableMasterHook.getIdentifier());
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), masterIdentifiers);
        MasterState masterStateNormal = (MasterState)MasterHooks.triggerHook((MasterTriggerRestoreHook)masterHook, (long)0L, (long)System.currentTimeMillis(), (Executor)Executors.directExecutor()).get();
        pending.acknowledgeMasterState(masterHook.getIdentifier(), masterStateNormal);
        Assert.assertFalse((boolean)pending.areMasterStatesFullyAcknowledged());
        MasterState masterStateNull = (MasterState)MasterHooks.triggerHook((MasterTriggerRestoreHook)nullableMasterHook, (long)0L, (long)System.currentTimeMillis(), (Executor)Executors.directExecutor()).get();
        pending.acknowledgeMasterState(nullableMasterHook.getIdentifier(), masterStateNull);
        Assert.assertTrue((boolean)pending.areMasterStatesFullyAcknowledged());
        Assert.assertFalse((boolean)pending.areTasksFullyAcknowledged());
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
        Assert.assertTrue((boolean)pending.areTasksFullyAcknowledged());
        List resultMasterStates = pending.getMasterStates();
        Assert.assertEquals((long)1L, (long)resultMasterStates.size());
        String deserializedState = (String)masterHook.createCheckpointDataSerializer().deserialize(77, ((MasterState)resultMasterStates.get(0)).bytes());
        Assert.assertEquals((Object)"state", (Object)deserializedState);
    }

    @Test
    public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(new TestingOperatorInfo(), new TestingOperatorInfo());
        Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
        Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
    }

    @Test
    public void testAcknowledgedCoordinatorStates() throws Exception {
        TestingOperatorInfo coord1 = new TestingOperatorInfo();
        TestingOperatorInfo coord2 = new TestingOperatorInfo();
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(coord1, coord2);
        PendingCheckpoint.TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState((OperatorInfo)coord1, (ByteStreamStateHandle)new TestingStreamStateHandle());
        PendingCheckpoint.TaskAcknowledgeResult ack2 = checkpoint.acknowledgeCoordinatorState((OperatorInfo)coord2, null);
        Assert.assertEquals((Object)PendingCheckpoint.TaskAcknowledgeResult.SUCCESS, (Object)ack1);
        Assert.assertEquals((Object)PendingCheckpoint.TaskAcknowledgeResult.SUCCESS, (Object)ack2);
        Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
        Assert.assertTrue((boolean)checkpoint.isFullyAcknowledged());
        Assert.assertThat(checkpoint.getOperatorStates().keySet(), (Matcher)Matchers.containsInAnyOrder((Object[])new OperatorID[]{OPERATOR_ID, coord1.operatorId(), coord2.operatorId()}));
    }

    @Test
    public void testDuplicateAcknowledgeCoordinator() throws Exception {
        TestingOperatorInfo coordinator = new TestingOperatorInfo();
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(coordinator);
        checkpoint.acknowledgeCoordinatorState((OperatorInfo)coordinator, (ByteStreamStateHandle)new TestingStreamStateHandle());
        PendingCheckpoint.TaskAcknowledgeResult secondAck = checkpoint.acknowledgeCoordinatorState((OperatorInfo)coordinator, null);
        Assert.assertEquals((Object)PendingCheckpoint.TaskAcknowledgeResult.DUPLICATE, (Object)secondAck);
    }

    @Test
    public void testAcknowledgeUnknownCoordinator() throws Exception {
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(new TestingOperatorInfo());
        PendingCheckpoint.TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState((OperatorInfo)new TestingOperatorInfo(), null);
        Assert.assertEquals((Object)PendingCheckpoint.TaskAcknowledgeResult.UNKNOWN, (Object)ack);
    }

    @Test
    public void testDisposeDisposesCoordinatorStates() throws Exception {
        TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
        TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithAcknowledgedCoordinators(handle1, handle2);
        this.abort(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
        Assert.assertTrue((boolean)handle1.isDisposed());
        Assert.assertTrue((boolean)handle2.isDisposed());
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) throws IOException {
        return this.createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), Executors.directExecutor());
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Executor executor) throws IOException {
        return this.createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), executor);
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<String> masterStateIdentifiers) throws IOException {
        return this.createPendingCheckpoint(props, Collections.emptyList(), masterStateIdentifiers, Executors.directExecutor());
    }

    private PendingCheckpoint createPendingCheckpointWithCoordinators(OperatorInfo ... coordinators) throws IOException {
        PendingCheckpoint checkpoint = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), OperatorInfo.getIds(Arrays.asList(coordinators)), Collections.emptyList(), Executors.directExecutor());
        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
        return checkpoint;
    }

    private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle ... handles) throws IOException {
        OperatorInfo[] coords = new OperatorInfo[handles.length];
        for (int i = 0; i < handles.length; ++i) {
            coords[i] = new TestingOperatorInfo();
        }
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(coords);
        for (int i = 0; i < handles.length; ++i) {
            checkpoint.acknowledgeCoordinatorState(coords[i], handles[i]);
        }
        return checkpoint;
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<OperatorID> operatorCoordinators, Collection<String> masterStateIdentifiers, Executor executor) throws IOException {
        Path checkpointDir = new Path(this.tmpFolder.newFolder().toURI());
        FsCheckpointStorageLocation location = new FsCheckpointStorageLocation((FileSystem)LocalFileSystem.getSharedInstance(), checkpointDir, checkpointDir, checkpointDir, CheckpointStorageLocationReference.getDefault(), 1024, 4096);
        ArrayList<Execution> ackTasks = new ArrayList<Execution>(ACK_TASKS);
        ArrayList<ExecutionVertex> tasksToCommit = new ArrayList<ExecutionVertex>(TASKS_TO_COMMIT);
        return new PendingCheckpoint(new JobID(), 0L, 1L, new CheckpointPlan(Collections.emptyList(), ackTasks, tasksToCommit, Collections.emptyList(), Collections.emptyList()), operatorCoordinators, masterStateIdentifiers, props, (CheckpointStorageLocation)location, new CompletableFuture());
    }

    static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
        Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
        field.setAccessible(true);
        Map taskStates = (Map)field.get(pending);
        taskStates.put(new OperatorID(), state);
    }

    private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
        this.abort(checkpoint, reason, null);
    }

    private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason, PendingCheckpointStats statsCallback) {
        checkpoint.abort(reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), statsCallback);
    }

    static {
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        PowerMockito.when((Object)jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)OPERATOR_ID)));
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        PowerMockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)128);
        PowerMockito.when((Object)vertex.getTotalNumberOfParallelSubtasks()).thenReturn((Object)1);
        PowerMockito.when((Object)vertex.getJobVertex()).thenReturn((Object)jobVertex);
        Execution execution = (Execution)Mockito.mock(Execution.class);
        PowerMockito.when((Object)execution.getAttemptId()).thenReturn((Object)ATTEMPT_ID);
        PowerMockito.when((Object)execution.getVertex()).thenReturn((Object)vertex);
        ACK_TASKS.add(execution);
        TASKS_TO_COMMIT.add(vertex);
    }

    private static final class TestingMasterTriggerRestoreHook
    implements MasterTriggerRestoreHook<String> {
        private final String identifier;
        private final ArrayDeque<String> stateContents;

        public TestingMasterTriggerRestoreHook(String identifier) {
            this.identifier = (String)Preconditions.checkNotNull((Object)identifier);
            this.stateContents = new ArrayDeque();
        }

        public void addStateContent(String stateContent) {
            this.stateContents.add(stateContent);
        }

        public String getIdentifier() {
            return this.identifier;
        }

        @Nullable
        public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
            return CompletableFuture.completedFuture(this.stateContents.poll());
        }

        public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) throws Exception {
        }

        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            return new CheckpointCoordinatorTestingUtils.StringSerializer();
        }
    }

    private static final class QueueExecutor
    implements Executor {
        private final Queue<Runnable> queue = new ArrayDeque<Runnable>(4);

        private QueueExecutor() {
        }

        @Override
        public void execute(Runnable command) {
            this.queue.add(command);
        }

        public void runQueuedCommands() {
            for (Runnable runnable : this.queue) {
                runnable.run();
            }
        }
    }
}

