/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

public class LocationPreferenceSchedulingStrategy
implements SchedulingStrategy {
    private static final LocationPreferenceSchedulingStrategy INSTANCE = new LocationPreferenceSchedulingStrategy();
    private static final BiFunction<Integer, Integer, Integer> LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh;

    LocationPreferenceSchedulingStrategy() {
    }

    @Override
    @Nullable
    public <IN, OUT> OUT findMatchWithLocality(@Nonnull SlotProfile slotProfile, @Nonnull Supplier<Stream<IN>> candidates, @Nonnull Function<IN, SlotInfo> contextExtractor, @Nonnull Predicate<IN> additionalRequirementsFilter, @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
        return this.doFindMatchWithLocality(slotProfile, candidates.get(), contextExtractor, additionalRequirementsFilter, resultProducer);
    }

    @Nullable
    protected <IN, OUT> OUT doFindMatchWithLocality(@Nonnull SlotProfile slotProfile, @Nonnull Stream<IN> candidates, @Nonnull Function<IN, SlotInfo> contextExtractor, @Nonnull Predicate<IN> additionalRequirementsFilter, @Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
        Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations();
        if (locationPreferences.isEmpty()) {
            return candidates.filter(additionalRequirementsFilter).findFirst().map(result -> resultProducer.apply(result, Locality.UNCONSTRAINED)).orElse(null);
        }
        HashMap<ResourceID, Integer> preferredResourceIDs = new HashMap<ResourceID, Integer>(locationPreferences.size());
        HashMap<String, Integer> preferredFQHostNames = new HashMap<String, Integer>(locationPreferences.size());
        for (TaskManagerLocation locationPreference : locationPreferences) {
            preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum);
            preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum);
        }
        Iterator iterator = candidates.iterator();
        Object bestCandidate = null;
        int bestCandidateScore = Integer.MIN_VALUE;
        Locality bestCandidateLocality = null;
        while (iterator.hasNext()) {
            Integer hostLocalWeigh;
            SlotInfo slotContext;
            Integer localWeigh;
            int candidateScore;
            Object candidate = iterator.next();
            if (!additionalRequirementsFilter.test(candidate) || (candidateScore = LOCALITY_EVALUATION_FUNCTION.apply(localWeigh = preferredResourceIDs.getOrDefault((slotContext = contextExtractor.apply(candidate)).getTaskManagerLocation().getResourceID(), 0), hostLocalWeigh = preferredFQHostNames.getOrDefault(slotContext.getTaskManagerLocation().getFQDNHostname(), 0)).intValue()) <= bestCandidateScore) continue;
            bestCandidateScore = candidateScore;
            bestCandidate = candidate;
            bestCandidateLocality = localWeigh > 0 ? Locality.LOCAL : (hostLocalWeigh > 0 ? Locality.HOST_LOCAL : Locality.NON_LOCAL);
        }
        if (bestCandidate != null) {
            return resultProducer.apply(bestCandidate, bestCandidateLocality);
        }
        return null;
    }

    public static LocationPreferenceSchedulingStrategy getInstance() {
        return INSTANCE;
    }
}

