/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegionExecutionView;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.topology.Result;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class RegionPartitionReleaseStrategy
implements PartitionReleaseStrategy {
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap<ExecutionVertexID, PipelinedRegionExecutionView>();

    public RegionPartitionReleaseStrategy(SchedulingTopology schedulingTopology) {
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        this.initRegionExecutionViewByVertex();
    }

    private void initRegionExecutionViewByVertex() {
        for (SchedulingPipelinedRegion pipelinedRegion : this.schedulingTopology.getAllPipelinedRegions()) {
            PipelinedRegionExecutionView regionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion);
            for (SchedulingExecutionVertex executionVertexId : pipelinedRegion.getVertices()) {
                this.regionExecutionViewByVertex.put((ExecutionVertexID)executionVertexId.getId(), regionExecutionView);
            }
        }
    }

    @Override
    public List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID finishedVertex) {
        PipelinedRegionExecutionView regionExecutionView = this.getPipelinedRegionExecutionViewForVertex(finishedVertex);
        regionExecutionView.vertexFinished(finishedVertex);
        if (regionExecutionView.isFinished()) {
            SchedulingPipelinedRegion pipelinedRegion = (SchedulingPipelinedRegion)this.schedulingTopology.getPipelinedRegionOfVertex(finishedVertex);
            return this.filterReleasablePartitions(pipelinedRegion.getConsumedResults());
        }
        return Collections.emptyList();
    }

    @Override
    public void vertexUnfinished(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView regionExecutionView = this.getPipelinedRegionExecutionViewForVertex(executionVertexId);
        regionExecutionView.vertexUnfinished(executionVertexId);
    }

    private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView pipelinedRegionExecutionView = this.regionExecutionViewByVertex.get(executionVertexId);
        Preconditions.checkState((pipelinedRegionExecutionView != null ? 1 : 0) != 0, (String)"PipelinedRegionExecutionView not found for execution vertex %s", (Object[])new Object[]{executionVertexId});
        return pipelinedRegionExecutionView;
    }

    private List<IntermediateResultPartitionID> filterReleasablePartitions(Iterable<? extends SchedulingResultPartition> schedulingResultPartitions) {
        return IterableUtils.toStream(schedulingResultPartitions).map(Result::getId).filter(this::areConsumerRegionsFinished).collect(Collectors.toList());
    }

    private boolean areConsumerRegionsFinished(IntermediateResultPartitionID resultPartitionId) {
        SchedulingResultPartition resultPartition = this.schedulingTopology.getResultPartition(resultPartitionId);
        return IterableUtils.toStream(resultPartition.getConsumers()).map(Vertex::getId).allMatch(this::isRegionOfVertexFinished);
    }

    private boolean isRegionOfVertexFinished(ExecutionVertexID executionVertexId) {
        PipelinedRegionExecutionView regionExecutionView = this.getPipelinedRegionExecutionViewForVertex(executionVertexId);
        return regionExecutionView.isFinished();
    }

    public static class Factory
    implements PartitionReleaseStrategy.Factory {
        @Override
        public PartitionReleaseStrategy createInstance(SchedulingTopology schedulingStrategy) {
            return new RegionPartitionReleaseStrategy(schedulingStrategy);
        }
    }
}

