/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.BrokerControlStates;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.Replicas;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;

public class ReplicationControlManager {
    public static final int NO_LEADER = -1;
    public static final int NO_LEADER_CHANGE = -2;
    private final SnapshotRegistry snapshotRegistry;
    private final Logger log;
    private final short defaultReplicationFactor;
    private final int defaultNumPartitions;
    private final ConfigurationControlManager configurationControl;
    private final ClusterControlManager clusterControl;
    private final TimelineHashMap<String, Uuid> topicsByName;
    private final TimelineHashMap<Uuid, TopicControlInfo> topics;
    private final BrokersToIsrs brokersToIsrs;

    ReplicationControlManager(SnapshotRegistry snapshotRegistry, LogContext logContext, short defaultReplicationFactor, int defaultNumPartitions, ConfigurationControlManager configurationControl, ClusterControlManager clusterControl) {
        this.snapshotRegistry = snapshotRegistry;
        this.log = logContext.logger(ReplicationControlManager.class);
        this.defaultReplicationFactor = defaultReplicationFactor;
        this.defaultNumPartitions = defaultNumPartitions;
        this.configurationControl = configurationControl;
        this.clusterControl = clusterControl;
        this.topicsByName = new TimelineHashMap(snapshotRegistry, 0);
        this.topics = new TimelineHashMap(snapshotRegistry, 0);
        this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
    }

    public void replay(TopicRecord record) {
        this.topicsByName.put(record.name(), record.topicId());
        this.topics.put(record.topicId(), new TopicControlInfo(record.name(), this.snapshotRegistry, record.topicId()));
        this.log.info("Created topic {} with ID {}.", (Object)record.name(), (Object)record.topicId());
    }

    public void replay(PartitionRecord record) {
        TopicControlInfo topicInfo = this.topics.get(record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
        PartitionControlInfo prevPartInfo = (PartitionControlInfo)topicInfo.parts.get(record.partitionId());
        if (prevPartInfo == null) {
            this.log.info("Created partition {}:{} with {}.", new Object[]{record.topicId(), record.partitionId(), newPartInfo.toString()});
            topicInfo.parts.put(record.partitionId(), newPartInfo);
            this.brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, -1, newPartInfo.leader);
        } else {
            String diff = newPartInfo.diff(prevPartInfo);
            if (!diff.isEmpty()) {
                this.log.info("Modified partition {}:{}: {}.", new Object[]{record.topicId(), record.partitionId(), diff});
                topicInfo.parts.put(record.partitionId(), newPartInfo);
                this.brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
            }
        }
    }

    public void replay(PartitionChangeRecord record) {
        TopicControlInfo topicInfo = this.topics.get(record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionControlInfo prevPartitionInfo = (PartitionControlInfo)topicInfo.parts.get(record.partitionId());
        if (prevPartitionInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no partition with that id was found.");
        }
        PartitionControlInfo newPartitionInfo = prevPartitionInfo.merge(record);
        topicInfo.parts.put(record.partitionId(), newPartitionInfo);
        this.brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, newPartitionInfo.leader);
        this.log.debug("Applied ISR change record: {}", (Object)record.toString());
    }

    public void replay(RemoveTopicRecord record) {
        TopicControlInfo topic = this.topics.remove(record.topicId());
        if (topic == null) {
            throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() + " to remove.");
        }
        this.topicsByName.remove(topic.name);
        this.configurationControl.deleteTopicConfigs(topic.name);
        for (PartitionControlInfo partition : topic.parts.values()) {
            for (int i = 0; i < partition.isr.length; ++i) {
                this.brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
            }
        }
        this.brokersToIsrs.removeTopicEntryForBroker(topic.id, -1);
        this.log.info("Removed topic {} with ID {}.", (Object)topic.name, (Object)record.topicId());
    }

    ControllerResult<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
        HashMap<String, ApiError> topicErrors = new HashMap<String, ApiError>();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ReplicationControlManager.validateNewTopicNames(topicErrors, request.topics());
        request.topics().stream().filter(creatableTopic -> this.topicsByName.containsKey(creatableTopic.name())).forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS)));
        Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges = ReplicationControlManager.computeConfigChanges(topicErrors, request.topics());
        ControllerResult<Map<ConfigResource, ApiError>> configResult = this.configurationControl.incrementalAlterConfigs(configChanges);
        for (Map.Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) {
            if (!entry.getValue().isFailure()) continue;
            topicErrors.put(entry.getKey().name(), (ApiError)entry.getValue());
        }
        records.addAll(configResult.records());
        HashMap<String, CreateTopicsResponseData.CreatableTopicResult> successes = new HashMap<String, CreateTopicsResponseData.CreatableTopicResult>();
        for (CreateTopicsRequestData.CreatableTopic topic : request.topics()) {
            ApiError error;
            if (topicErrors.containsKey(topic.name()) || !(error = this.createTopic(topic, records, successes)).isFailure()) continue;
            topicErrors.put(topic.name(), error);
        }
        CreateTopicsResponseData data = new CreateTopicsResponseData();
        StringBuilder resultsBuilder = new StringBuilder();
        String resultsPrefix = "";
        for (CreateTopicsRequestData.CreatableTopic topic : request.topics()) {
            ApiError error = (ApiError)topicErrors.get(topic.name());
            if (error != null) {
                data.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name()).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                resultsBuilder.append(resultsPrefix).append(topic).append(": ").append(error.error()).append(" (").append(error.message()).append(")");
                resultsPrefix = ", ";
                continue;
            }
            CreateTopicsResponseData.CreatableTopicResult result = (CreateTopicsResponseData.CreatableTopicResult)successes.get(topic.name());
            data.topics().add((ImplicitLinkedHashCollection.Element)result);
            resultsBuilder.append(resultsPrefix).append(topic).append(": ").append("SUCCESS");
            resultsPrefix = ", ";
        }
        this.log.info("createTopics result(s): {}", (Object)resultsBuilder.toString());
        return ControllerResult.atomicOf(records, data);
    }

    private ApiError createTopic(CreateTopicsRequestData.CreatableTopic topic, List<ApiMessageAndVersion> records, Map<String, CreateTopicsResponseData.CreatableTopicResult> successes) {
        HashMap<Integer, PartitionControlInfo> newParts = new HashMap<Integer, PartitionControlInfo>();
        if (!topic.assignments().isEmpty()) {
            if (topic.replicationFactor() != -1) {
                return new ApiError(Errors.INVALID_REQUEST, "A manual partition assignment was specified, but replication factor was not set to -1.");
            }
            if (topic.numPartitions() != -1) {
                return new ApiError(Errors.INVALID_REQUEST, "A manual partition assignment was specified, but numPartitions was not set to -1.");
            }
            for (CreateTopicsRequestData.CreatableReplicaAssignment assignment : topic.assignments()) {
                if (newParts.containsKey(assignment.partitionIndex())) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "Found multiple manual partition assignments for partition " + assignment.partitionIndex());
                }
                HashSet<Integer> brokerIds = new HashSet<Integer>();
                Iterator iterator = assignment.brokerIds().iterator();
                while (iterator.hasNext()) {
                    int brokerId = (Integer)iterator.next();
                    if (!brokerIds.add(brokerId)) {
                        return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "The manual partition assignment specifies the same node id more than once.");
                    }
                    if (this.clusterControl.unfenced(brokerId)) continue;
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "The manual partition assignment contains node " + brokerId + ", but that node is not usable.");
                }
                int[] replicas = new int[assignment.brokerIds().size()];
                for (int i = 0; i < replicas.length; ++i) {
                    replicas[i] = (Integer)assignment.brokerIds().get(i);
                }
                int[] isr = new int[assignment.brokerIds().size()];
                for (int i = 0; i < replicas.length; ++i) {
                    isr[i] = (Integer)assignment.brokerIds().get(i);
                }
                newParts.put(assignment.partitionIndex(), new PartitionControlInfo(replicas, isr, null, null, isr[0], 0, 0));
            }
        } else {
            if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Replication factor was set to an invalid non-positive value.");
            }
            if (!topic.assignments().isEmpty()) {
                return new ApiError(Errors.INVALID_REQUEST, "Replication factor was not set to -1 but a manual partition assignment was specified.");
            }
            if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
                return new ApiError(Errors.INVALID_PARTITIONS, "Number of partitions was set to an invalid non-positive value.");
            }
            int numPartitions = topic.numPartitions() == -1 ? this.defaultNumPartitions : topic.numPartitions();
            short replicationFactor = topic.replicationFactor() == -1 ? this.defaultReplicationFactor : topic.replicationFactor();
            try {
                List<List<Integer>> replicas = this.clusterControl.placeReplicas(numPartitions, replicationFactor);
                for (int partitionId = 0; partitionId < replicas.size(); ++partitionId) {
                    int[] r = Replicas.toArray(replicas.get(partitionId));
                    newParts.put(partitionId, new PartitionControlInfo(r, r, null, null, r[0], 0, 0));
                }
            }
            catch (InvalidReplicationFactorException e) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Unable to replicate the partition " + replicationFactor + " times: " + e.getMessage());
            }
        }
        Uuid topicId = Uuid.randomUuid();
        successes.put(topic.name(), new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name()).setTopicId(topicId).setErrorCode((short)0).setErrorMessage(null).setNumPartitions(newParts.size()).setReplicationFactor((short)((PartitionControlInfo)newParts.get((Object)Integer.valueOf((int)0))).replicas.length));
        records.add(new ApiMessageAndVersion(new TopicRecord().setName(topic.name()).setTopicId(topicId), 0));
        for (Map.Entry partEntry : newParts.entrySet()) {
            int partitionIndex = (Integer)partEntry.getKey();
            PartitionControlInfo info = (PartitionControlInfo)partEntry.getValue();
            records.add(new ApiMessageAndVersion(new PartitionRecord().setPartitionId(partitionIndex).setTopicId(topicId).setReplicas(Replicas.toList(info.replicas)).setIsr(Replicas.toList(info.isr)).setRemovingReplicas(null).setAddingReplicas(null).setLeader(info.leader).setLeaderEpoch(info.leaderEpoch).setPartitionEpoch(0), 0));
        }
        return ApiError.NONE;
    }

    static void validateNewTopicNames(Map<String, ApiError> topicErrors, CreateTopicsRequestData.CreatableTopicCollection topics) {
        for (CreateTopicsRequestData.CreatableTopic topic : topics) {
            if (topicErrors.containsKey(topic.name())) continue;
            try {
                Topic.validate((String)topic.name());
            }
            catch (InvalidTopicException e) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
            }
        }
    }

    static Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> computeConfigChanges(Map<String, ApiError> topicErrors, CreateTopicsRequestData.CreatableTopicCollection topics) {
        HashMap<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges = new HashMap<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>>();
        for (CreateTopicsRequestData.CreatableTopic topic : topics) {
            if (topicErrors.containsKey(topic.name())) continue;
            HashMap<String, AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>> topicConfigs = new HashMap<String, AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>>();
            for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) {
                topicConfigs.put(config.name(), new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, config.value()));
            }
            if (topicConfigs.isEmpty()) continue;
            configChanges.put(new ConfigResource(ConfigResource.Type.TOPIC, topic.name()), topicConfigs);
        }
        return configChanges;
    }

    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
        HashMap<String, ResultOrError<Uuid>> results = new HashMap<String, ResultOrError<Uuid>>(names.size());
        for (String name : names) {
            if (name == null) {
                results.put(null, new ResultOrError(Errors.INVALID_REQUEST, "Invalid null topic name."));
                continue;
            }
            Uuid id = this.topicsByName.get(name, offset);
            if (id == null) {
                results.put(name, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)));
                continue;
            }
            results.put(name, new ResultOrError<Uuid>(id));
        }
        return results;
    }

    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
        HashMap<Uuid, ResultOrError<String>> results = new HashMap<Uuid, ResultOrError<String>>(ids.size());
        for (Uuid id : ids) {
            if (id == null || id.equals((Object)Uuid.ZERO_UUID)) {
                results.put(id, new ResultOrError(new ApiError(Errors.INVALID_REQUEST, "Attempt to find topic with invalid topicId " + id)));
                continue;
            }
            TopicControlInfo topic = this.topics.get(id, offset);
            if (topic == null) {
                results.put(id, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID)));
                continue;
            }
            results.put(id, new ResultOrError<String>(topic.name));
        }
        return results;
    }

    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
        HashMap<Uuid, ApiError> results = new HashMap<Uuid, ApiError>(ids.size());
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(ids.size());
        for (Uuid id : ids) {
            try {
                this.deleteTopic(id, records);
                results.put(id, ApiError.NONE);
            }
            catch (ApiException e) {
                results.put(id, ApiError.fromThrowable((Throwable)e));
            }
            catch (Exception e) {
                this.log.error("Unexpected deleteTopics error for {}", (Object)id, (Object)e);
                results.put(id, ApiError.fromThrowable((Throwable)e));
            }
        }
        return ControllerResult.atomicOf(records, results);
    }

    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
        TopicControlInfo topic = this.topics.get(id);
        if (topic == null) {
            throw new UnknownTopicIdException(Errors.UNKNOWN_TOPIC_ID.message());
        }
        records.add(new ApiMessageAndVersion(new RemoveTopicRecord().setTopicId(id), 0));
    }

    PartitionControlInfo getPartition(Uuid topicId, int partitionId) {
        TopicControlInfo topic = this.topics.get(topicId);
        if (topic == null) {
            return null;
        }
        return (PartitionControlInfo)topic.parts.get(partitionId);
    }

    BrokersToIsrs brokersToIsrs() {
        return this.brokersToIsrs;
    }

    ControllerResult<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
        this.clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
        AlterIsrResponseData response = new AlterIsrResponseData();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        for (AlterIsrRequestData.TopicData topicData : request.topics()) {
            AlterIsrResponseData.TopicData responseTopicData = new AlterIsrResponseData.TopicData().setName(topicData.name());
            response.topics().add(responseTopicData);
            Uuid topicId = this.topicsByName.get(topicData.name());
            if (topicId == null || !this.topics.containsKey(topicId)) {
                for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
                }
                continue;
            }
            TopicControlInfo topic = this.topics.get(topicId);
            for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
                PartitionControlInfo partition = (PartitionControlInfo)topic.parts.get(partitionData.partitionIndex());
                if (partition == null) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
                    continue;
                }
                if (request.brokerId() != partition.leader) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.INVALID_REQUEST.code()));
                    continue;
                }
                if (partitionData.leaderEpoch() != partition.leaderEpoch) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
                    continue;
                }
                if (partitionData.currentIsrVersion() != partition.partitionEpoch) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
                    continue;
                }
                int[] newIsr = Replicas.toArray(partitionData.newIsr());
                if (!Replicas.validateIsr(partition.replicas, newIsr)) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.INVALID_REQUEST.code()));
                    continue;
                }
                if (!Replicas.contains(newIsr, partition.leader)) {
                    responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.INVALID_REQUEST.code()));
                    continue;
                }
                records.add(new ApiMessageAndVersion(new PartitionChangeRecord().setPartitionId(partitionData.partitionIndex()).setTopicId(topic.id).setIsr(partitionData.newIsr()), 0));
                responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(Errors.NONE.code()).setLeaderId(partition.leader).setLeaderEpoch(partition.leaderEpoch).setCurrentIsrVersion(partition.partitionEpoch + 1).setIsr(partitionData.newIsr()));
            }
        }
        return ControllerResult.of(records, response);
    }

    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
        BrokerRegistration brokerRegistration = this.clusterControl.brokerRegistrations().get(brokerId);
        if (brokerRegistration == null) {
            throw new RuntimeException("Can't find broker registration for broker " + brokerId);
        }
        this.handleNodeDeactivated(brokerId, records);
        records.add(new ApiMessageAndVersion(new FenceBrokerRecord().setId(brokerId).setEpoch(brokerRegistration.epoch()), 0));
    }

    void handleBrokerUnregistered(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        this.handleNodeDeactivated(brokerId, records);
        records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), 0));
    }

    void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> records) {
        BrokersToIsrs.PartitionsOnReplicaIterator iterator = this.brokersToIsrs.iterator(brokerId, false);
        while (iterator.hasNext()) {
            BrokersToIsrs.TopicIdPartition topicIdPartition = (BrokersToIsrs.TopicIdPartition)iterator.next();
            TopicControlInfo topic = this.topics.get(topicIdPartition.topicId());
            if (topic == null) {
                throw new RuntimeException("Topic ID " + topicIdPartition.topicId() + " existed in isrMembers, but not in the topics map.");
            }
            PartitionControlInfo partition = (PartitionControlInfo)topic.parts.get(topicIdPartition.partitionId());
            if (partition == null) {
                throw new RuntimeException("Partition " + topicIdPartition + " existed in isrMembers, but not in the partitions map.");
            }
            PartitionChangeRecord record = new PartitionChangeRecord().setPartitionId(topicIdPartition.partitionId()).setTopicId(topic.id);
            int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
            if (newIsr.length == 0) {
                if (record.leader() == -1) continue;
                record.setLeader(-1);
                records.add(new ApiMessageAndVersion(record, 0));
                continue;
            }
            record.setIsr(Replicas.toList(newIsr));
            if (partition.leader == brokerId) {
                int newLeader = this.bestLeader(partition.replicas, newIsr, false);
                record.setLeader(newLeader);
            } else {
                record.setLeader(partition.leader);
            }
            records.add(new ApiMessageAndVersion(record, 0));
        }
    }

    void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).setEpoch(brokerEpoch), 0));
        this.handleNodeActivated(brokerId, records);
    }

    void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
        BrokersToIsrs.PartitionsOnReplicaIterator iterator = this.brokersToIsrs.noLeaderIterator();
        while (iterator.hasNext()) {
            BrokersToIsrs.TopicIdPartition topicIdPartition = (BrokersToIsrs.TopicIdPartition)iterator.next();
            TopicControlInfo topic = this.topics.get(topicIdPartition.topicId());
            if (topic == null) {
                throw new RuntimeException("Topic ID " + topicIdPartition.topicId() + " existed in isrMembers, but not in the topics map.");
            }
            PartitionControlInfo partition = (PartitionControlInfo)topic.parts.get(topicIdPartition.partitionId());
            if (partition == null) {
                throw new RuntimeException("Partition " + topicIdPartition + " existed in isrMembers, but not in the partitions map.");
            }
            if (!Replicas.contains(partition.isr, brokerId)) continue;
            records.add(new ApiMessageAndVersion(new PartitionChangeRecord().setPartitionId(topicIdPartition.partitionId()).setTopicId(topic.id).setLeader(brokerId), 0));
        }
    }

    ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
        boolean unclean = ReplicationControlManager.electionIsUnclean(request.electionType());
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ElectLeadersResponseData response = new ElectLeadersResponseData();
        for (ElectLeadersRequestData.TopicPartitions topic : request.topicPartitions()) {
            ElectLeadersResponseData.ReplicaElectionResult topicResults = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic.topic());
            response.replicaElectionResults().add(topicResults);
            Iterator iterator = topic.partitionId().iterator();
            while (iterator.hasNext()) {
                int partitionId = (Integer)iterator.next();
                ApiError error = this.electLeader(topic.topic(), partitionId, unclean, records);
                topicResults.partitionResult().add(new ElectLeadersResponseData.PartitionResult().setPartitionId(partitionId).setErrorCode(error.error().code()).setErrorMessage(error.message()));
            }
        }
        return ControllerResult.of(records, response);
    }

    static boolean electionIsUnclean(byte electionType) {
        ElectionType type;
        try {
            type = ElectionType.valueOf((byte)electionType);
        }
        catch (IllegalArgumentException e) {
            throw new InvalidRequestException("Unknown election type " + electionType);
        }
        return type == ElectionType.UNCLEAN;
    }

    ApiError electLeader(String topic, int partitionId, boolean unclean, List<ApiMessageAndVersion> records) {
        Uuid topicId = this.topicsByName.get(topic);
        if (topicId == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as " + topic);
        }
        TopicControlInfo topicInfo = this.topics.get(topicId);
        if (topicInfo == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic id as " + topicId);
        }
        PartitionControlInfo partitionInfo = (PartitionControlInfo)topicInfo.parts.get(partitionId);
        if (partitionInfo == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such partition as " + topic + "-" + partitionId);
        }
        int newLeader = this.bestLeader(partitionInfo.replicas, partitionInfo.isr, unclean);
        if (newLeader == -1) {
            return new ApiError(Errors.LEADER_NOT_AVAILABLE, "Unable to find any leader for the partition.");
        }
        if (newLeader == partitionInfo.leader) {
            return ApiError.NONE;
        }
        if (partitionInfo.hasLeader() && newLeader != partitionInfo.preferredReplica()) {
            return ApiError.NONE;
        }
        PartitionChangeRecord record = new PartitionChangeRecord().setPartitionId(partitionId).setTopicId(topicId);
        if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) {
            record.setIsr(Collections.singletonList(newLeader));
        }
        record.setLeader(newLeader);
        records.add(new ApiMessageAndVersion(record, 0));
        return ApiError.NONE;
    }

    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(BrokerHeartbeatRequestData request, long lastCommittedOffset) {
        int brokerId = request.brokerId();
        long brokerEpoch = request.brokerEpoch();
        this.clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
        BrokerHeartbeatManager heartbeatManager = this.clusterControl.heartbeatManager();
        BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId, request, lastCommittedOffset, () -> this.brokersToIsrs.hasLeaderships(brokerId));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        if (states.current() != states.next()) {
            switch (states.next()) {
                case FENCED: {
                    this.handleBrokerFenced(brokerId, records);
                    break;
                }
                case UNFENCED: {
                    this.handleBrokerUnfenced(brokerId, brokerEpoch, records);
                    break;
                }
                case CONTROLLED_SHUTDOWN: {
                    this.handleNodeDeactivated(brokerId, records);
                    break;
                }
                case SHUTDOWN_NOW: {
                    this.handleBrokerFenced(brokerId, records);
                }
            }
        }
        heartbeatManager.touch(brokerId, states.next().fenced(), request.currentMetadataOffset());
        boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset;
        BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp, states.next().fenced(), states.next().inControlledShutdown(), states.next().shouldShutDown());
        return ControllerResult.of(records, reply);
    }

    int bestLeader(int[] replicas, int[] isr, boolean unclean) {
        int replica;
        int i;
        for (i = 0; i < replicas.length; ++i) {
            replica = replicas[i];
            if (!Replicas.contains(isr, replica)) continue;
            return replica;
        }
        if (unclean) {
            for (i = 0; i < replicas.length; ++i) {
                replica = replicas[i];
                if (!this.clusterControl.unfenced(replica)) continue;
                return replica;
            }
        }
        return -1;
    }

    public ControllerResult<Void> unregisterBroker(int brokerId) {
        BrokerRegistration registration = this.clusterControl.brokerRegistrations().get(brokerId);
        if (registration == null) {
            throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + " is not currently registered");
        }
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        this.handleBrokerUnregistered(brokerId, registration.epoch(), records);
        return ControllerResult.of(records, null);
    }

    ControllerResult<Void> maybeFenceStaleBrokers() {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        BrokerHeartbeatManager heartbeatManager = this.clusterControl.heartbeatManager();
        List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
        for (int brokerId : staleBrokers) {
            this.log.info("Fencing broker {} because its session has timed out.", (Object)brokerId);
            this.handleBrokerFenced(brokerId, records);
            heartbeatManager.fence(brokerId);
        }
        return ControllerResult.of(records, null);
    }

    static class PartitionControlInfo {
        public final int[] replicas;
        public final int[] isr;
        public final int[] removingReplicas;
        public final int[] addingReplicas;
        public final int leader;
        public final int leaderEpoch;
        public final int partitionEpoch;

        PartitionControlInfo(PartitionRecord record) {
            this(Replicas.toArray(record.replicas()), Replicas.toArray(record.isr()), Replicas.toArray(record.removingReplicas()), Replicas.toArray(record.addingReplicas()), record.leader(), record.leaderEpoch(), record.partitionEpoch());
        }

        PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas, int[] addingReplicas, int leader, int leaderEpoch, int partitionEpoch) {
            this.replicas = replicas;
            this.isr = isr;
            this.removingReplicas = removingReplicas;
            this.addingReplicas = addingReplicas;
            this.leader = leader;
            this.leaderEpoch = leaderEpoch;
            this.partitionEpoch = partitionEpoch;
        }

        PartitionControlInfo merge(PartitionChangeRecord record) {
            int newLeaderEpoch;
            int newLeader;
            int[] newIsr;
            int[] nArray = newIsr = record.isr() == null ? this.isr : Replicas.toArray(record.isr());
            if (record.leader() == -2) {
                newLeader = this.leader;
                newLeaderEpoch = this.leaderEpoch;
            } else {
                newLeader = record.leader();
                newLeaderEpoch = this.leaderEpoch + 1;
            }
            return new PartitionControlInfo(this.replicas, newIsr, this.removingReplicas, this.addingReplicas, newLeader, newLeaderEpoch, this.partitionEpoch + 1);
        }

        String diff(PartitionControlInfo prev) {
            StringBuilder builder = new StringBuilder();
            String prefix = "";
            if (!Arrays.equals(this.replicas, prev.replicas)) {
                builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
                prefix = ", ";
                builder.append(prefix).append("newReplicas=").append(Arrays.toString(this.replicas));
            }
            if (!Arrays.equals(this.isr, prev.isr)) {
                builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
                prefix = ", ";
                builder.append(prefix).append("newIsr=").append(Arrays.toString(this.isr));
            }
            if (!Arrays.equals(this.removingReplicas, prev.removingReplicas)) {
                builder.append(prefix).append("oldRemovingReplicas=").append(Arrays.toString(prev.removingReplicas));
                prefix = ", ";
                builder.append(prefix).append("newRemovingReplicas=").append(Arrays.toString(this.removingReplicas));
            }
            if (!Arrays.equals(this.addingReplicas, prev.addingReplicas)) {
                builder.append(prefix).append("oldAddingReplicas=").append(Arrays.toString(prev.addingReplicas));
                prefix = ", ";
                builder.append(prefix).append("newAddingReplicas=").append(Arrays.toString(this.addingReplicas));
            }
            if (this.leader != prev.leader) {
                builder.append(prefix).append("oldLeader=").append(prev.leader);
                prefix = ", ";
                builder.append(prefix).append("newLeader=").append(this.leader);
            }
            if (this.leaderEpoch != prev.leaderEpoch) {
                builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
                prefix = ", ";
                builder.append(prefix).append("newLeaderEpoch=").append(this.leaderEpoch);
            }
            if (this.partitionEpoch != prev.partitionEpoch) {
                builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
                prefix = ", ";
                builder.append(prefix).append("newPartitionEpoch=").append(this.partitionEpoch);
            }
            return builder.toString();
        }

        boolean hasLeader() {
            return this.leader != -1;
        }

        int preferredReplica() {
            return this.replicas.length == 0 ? -1 : this.replicas[0];
        }

        public int hashCode() {
            return Objects.hash(this.replicas, this.isr, this.removingReplicas, this.addingReplicas, this.leader, this.leaderEpoch, this.partitionEpoch);
        }

        public boolean equals(Object o) {
            if (!(o instanceof PartitionControlInfo)) {
                return false;
            }
            PartitionControlInfo other = (PartitionControlInfo)o;
            return this.diff(other).isEmpty();
        }

        public String toString() {
            StringBuilder builder = new StringBuilder("PartitionControlInfo(");
            builder.append("replicas=").append(Arrays.toString(this.replicas));
            builder.append(", isr=").append(Arrays.toString(this.isr));
            builder.append(", removingReplicas=").append(Arrays.toString(this.removingReplicas));
            builder.append(", addingReplicas=").append(Arrays.toString(this.addingReplicas));
            builder.append(", leader=").append(this.leader);
            builder.append(", leaderEpoch=").append(this.leaderEpoch);
            builder.append(", partitionEpoch=").append(this.partitionEpoch);
            builder.append(")");
            return builder.toString();
        }
    }

    static class TopicControlInfo {
        private final String name;
        private final Uuid id;
        private final TimelineHashMap<Integer, PartitionControlInfo> parts;

        TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
            this.name = name;
            this.id = id;
            this.parts = new TimelineHashMap(snapshotRegistry, 0);
        }
    }
}

