/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.drc.clusterclient.impl;

import com.alibaba.fastjson.JSON;
import com.taobao.drc.clusterclient.BaseClusterContext;
import com.taobao.drc.clusterclient.ConsumerState;
import com.taobao.drc.clusterclient.MessageNotifier;
import com.taobao.drc.clusterclient.NotifyController;
import com.taobao.drc.clusterclient.PartitionClient;
import com.taobao.drc.clusterclient.PartitionClientFactory;
import com.taobao.drc.clusterclient.PartitionManager;
import com.taobao.drc.clusterclient.clustermanager.ExpectedConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.ExpectedPartitionStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalConsumerStatus;
import com.taobao.drc.clusterclient.clustermanager.LocalPartitionStatus;
import com.taobao.drc.clusterclient.clustermanager.PartitionInfo;
import com.taobao.drc.clusterclient.coordinator.Coordinator;
import com.taobao.drc.clusterclient.coordinator.CoordinatorManager;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.partition.PartitionRef;
import com.taobao.drc.clusterclient.partition.PartitionState;
import com.taobao.drc.clusterclient.partition.PartitionStateChangeListener;
import com.taobao.drc.clusterclient.util.Futures;
import com.taobao.drc.clusterclient.util.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultPartitionManager<C extends BaseClusterContext, T extends MessageNotifier>
implements PartitionManager<C, T> {
    static final int CODE_NO_ERROR = 0;
    static final int CODE_WRITE_DB_FAILURE = 440;
    static final int CODE_INVALID_GENERATION = 511;
    static final long ACTION_ID_NOOP = -1L;
    static final long METRICS_SUBMISSION_PERIOD = 55000L;
    private static final Logger logger = LoggerFactory.getLogger(DefaultPartitionManager.class);
    private final CoordinatorManager coordinatorManager;
    private final Coordinator coordinator;
    private final PartitionClientFactory<C, T> partitionClientFactory;
    private final C context;
    private final String version;
    private String ip;
    final Map<String, ActivePartition> activePartitionMap = new HashMap<String, ActivePartition>();
    private final List<T> notifiers;
    private final List<PartitionStateChangeListener> partitionStateChangeListeners;
    private final Time time;
    private int nextListenerIdx = 0;
    private long actionId = 0L;
    private boolean submittingMetrics = false;
    private long lastMetricsSubmitMs = 0L;
    private ConsumerState state = ConsumerState.STARTING;
    private volatile String seq;
    private final BlockingQueue<Future> futuresOnStopped = new LinkedBlockingDeque<Future>();

    public DefaultPartitionManager(CoordinatorManager coordinatorManager, PartitionClientFactory<C, T> partitionClientFactory, C context, String version, String ip, List<T> notifiers, List<PartitionStateChangeListener> partitionStateChangeListeners) {
        this(coordinatorManager, partitionClientFactory, context, version, ip, notifiers, partitionStateChangeListeners, Time.SYSTEM);
    }

    public DefaultPartitionManager(CoordinatorManager coordinatorManager, PartitionClientFactory<C, T> partitionClientFactory, C context, String version, String ip, List<T> notifiers, List<PartitionStateChangeListener> partitionStateChangeListeners, Time time) {
        this.coordinatorManager = coordinatorManager;
        this.coordinator = coordinatorManager.acquireCoordinator(((BaseClusterContext)context).getClusterUrl());
        this.partitionClientFactory = partitionClientFactory;
        this.context = context;
        this.version = version;
        this.ip = ip;
        this.notifiers = notifiers;
        this.partitionStateChangeListeners = partitionStateChangeListeners;
        this.time = time;
    }

    @Override
    public ConsumerState getState() {
        return this.state;
    }

    @Override
    public void start() throws ExecutionException, InterruptedException {
        this.coordinator.register(this).get();
    }

    @Override
    public void stop() {
        this.futuresOnStopped.add(this.coordinator.runOnEventThread(new Runnable(){

            @Override
            public void run() {
                logger.info("To stop partition manager [{}][{}]", (Object)DefaultPartitionManager.this.context.getAppGuid(), (Object)DefaultPartitionManager.this.context.getAppGroup());
                DefaultPartitionManager.this.state = ConsumerState.STOPPING;
                for (ActivePartition activePartition : DefaultPartitionManager.this.activePartitionMap.values()) {
                    switch (activePartition.getState()) {
                        case RUNNING: {
                            Future future = DefaultPartitionManager.this.maybeStopPartitionClient(activePartition);
                            if (future == null) break;
                            DefaultPartitionManager.this.futuresOnStopped.add(future);
                        }
                    }
                    activePartition.setState(PartitionState.STOPPING);
                }
            }
        }));
        this.futuresOnStopped.add(this.coordinator.deregister(this));
        this.futuresOnStopped.add(this.coordinatorManager.releaseCoordinator(this.coordinator));
    }

    private boolean isStopping() {
        return this.state == ConsumerState.STOPPING;
    }

    @Override
    public void waitForStop(long timeLimitInSec) throws InterruptedException {
        long now = this.time.millis();
        long due = now + TimeUnit.SECONDS.toMillis(timeLimitInSec);
        Iterator iterator = this.futuresOnStopped.iterator();
        boolean fullyStopped = true;
        while (iterator.hasNext()) {
            if (due <= now) {
                fullyStopped = false;
                break;
            }
            try {
                ((Future)iterator.next()).get(due - now, TimeUnit.MILLISECONDS);
                iterator.remove();
            }
            catch (TimeoutException e) {
                fullyStopped = false;
                break;
            }
            catch (Exception e) {
                logger.error("Caught exception on stopping partition manager [{}][{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), e});
                iterator.remove();
            }
            now = this.time.millis();
        }
        if (!fullyStopped) {
            logger.warn("The partition manager [{}][{}][{}] has not been stopped completely within [{}] seconds", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), this.seq, timeLimitInSec});
        } else {
            logger.info("The partition manager [{}][{}][{}] has been stopped completely", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), this.seq});
        }
    }

    @Override
    public C getContext() {
        return this.context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public T getNextListener() {
        DefaultPartitionManager defaultPartitionManager = this;
        synchronized (defaultPartitionManager) {
            this.nextListenerIdx %= this.notifiers.size();
            return (T)((MessageNotifier)this.notifiers.get(this.nextListenerIdx++));
        }
    }

    private long getSessionTimeoutMs() {
        return this.coordinator.getSessionTimeoutMs();
    }

    private long getNextActionId() {
        return this.actionId++;
    }

    private ActivePartition getActivePartition(String partition) {
        return this.activePartitionMap.get(partition);
    }

    private void startClient(final String partition, final PartitionClient client, final long actionId) {
        this.coordinator.runOnIOPool(new Runnable(){

            @Override
            public void run() {
                try {
                    client.start();
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable(){

                        @Override
                        public void run() {
                            DefaultPartitionManager.this.onClientStarted(partition, actionId, client, null);
                        }
                    });
                }
                catch (Exception e) {
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable(){

                        @Override
                        public void run() {
                            DefaultPartitionManager.this.onClientStarted(partition, actionId, client, e);
                        }
                    });
                }
            }
        });
    }

    private Future stopClient(final String partition, final PartitionClient client, final long actionId) {
        return this.coordinator.runOnIOPool(new Runnable(){

            @Override
            public void run() {
                try {
                    client.close();
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable(){

                        @Override
                        public void run() {
                            DefaultPartitionManager.this.onClientStopped(partition, actionId, null);
                        }
                    });
                }
                catch (Exception e) {
                    DefaultPartitionManager.this.coordinator.runOnEventThread(new Runnable(){

                        @Override
                        public void run() {
                            DefaultPartitionManager.this.onClientStopped(partition, actionId, e);
                        }
                    });
                }
            }
        });
    }

    Future maybeStopPartitionClient(ActivePartition activePartition) {
        if (activePartition == null) {
            return Futures.success(null);
        }
        switch (activePartition.getState()) {
            case RUNNING: {
                activePartition.getClient().getNotifyController().close();
                activePartition.setState(PartitionState.STOPPING);
                activePartition.setCurrentActionId(this.getNextActionId());
                return this.stopClient(activePartition.getPartition(), activePartition.getClient(), activePartition.getCurrentActionId());
            }
            case INIT: {
                activePartition.setState(PartitionState.STOPPED);
                break;
            }
            case STARTING: {
                activePartition.setCurrentActionId(-1L);
                activePartition.setState(PartitionState.STOPPED);
                break;
            }
            case STOPPING: {
                break;
            }
            case STOPPED: {
                break;
            }
        }
        return Futures.success(null);
    }

    private boolean isStateTimeout(ActivePartition activePartition) {
        return this.time.millis() - activePartition.getStateStartTime() > this.getSessionTimeoutMs();
    }

    private String getMessage(boolean withMetrics) {
        TreeMap map = new TreeMap();
        ArrayList<Map<String, Object>> metrics = new ArrayList<Map<String, Object>>();
        for (MessageNotifier notifier : this.notifiers) {
            metrics.add(notifier.getMetrics());
        }
        logger.info("Notifier metrics for [{}][{}]: {}", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), metrics});
        if (withMetrics) {
            map.put("metrics", metrics);
        }
        return JSON.toJSONString(map);
    }

    @Override
    public LocalConsumerStatus getLocalConsumerStatus() {
        if (this.state == ConsumerState.RUNNING) {
            this.submittingMetrics = this.time.millis() - this.lastMetricsSubmitMs > 55000L;
        } else if (this.state == ConsumerState.STOPPING) {
            this.submittingMetrics = true;
        }
        LocalConsumerStatus consumerStatus = new LocalConsumerStatus(((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), ((BaseClusterContext)this.context).getAppGroupUserName(), ((BaseClusterContext)this.context).getAppGroupPassword(), this.version, this.ip, ((BaseClusterContext)this.context).getMaxConns(), this.state.toString(), this.getMessage(this.submittingMetrics));
        ArrayList<ActivePartition> toRemove = new ArrayList<ActivePartition>();
        for (ActivePartition activePartition : this.activePartitionMap.values()) {
            switch (activePartition.getState()) {
                case STOPPED: {
                    toRemove.add(activePartition);
                    break;
                }
                case RUNNING: {
                    if (!activePartition.getClient().isActive()) {
                        logger.error("The client for {} is not active", (Object)activePartition);
                        this.maybeStopPartitionClient(activePartition);
                        break;
                    }
                    if (!activePartition.getClient().getNotifyController().isClosed()) break;
                    logger.error("The notify controller for {} is closed");
                    this.maybeStopPartitionClient(activePartition);
                    break;
                }
                default: {
                    if (!this.isStateTimeout(activePartition)) break;
                    logger.error("State timeout for partition {}: action started at [{}]", (Object)activePartition, (Object)activePartition.getStateStartTime());
                    activePartition.setState(PartitionState.STOPPED);
                }
            }
            consumerStatus.addPartition(new LocalPartitionStatus(activePartition.getPartition(), activePartition.getGeneration(), activePartition.offset(), activePartition.getState().name(), activePartition.getMessage(this.submittingMetrics)));
        }
        for (ActivePartition activePartition : toRemove) {
            logger.info("Partition {} has been stopped completely, remove it from active list", (Object)activePartition);
            this.activePartitionMap.remove(activePartition.getPartition());
        }
        return consumerStatus;
    }

    private boolean isUpgradeRequired(String expectedVersionStr, String currentVersionStr) {
        if (expectedVersionStr == null || currentVersionStr == null) {
            return false;
        }
        try {
            Version expected = new Version(expectedVersionStr);
            Version current = new Version(currentVersionStr);
            return expected.compareTo(current) > 0;
        }
        catch (Exception e) {
            logger.warn("Invalid version, current [{}], expected [{}]", (Object)currentVersionStr, (Object)expectedVersionStr);
            return false;
        }
    }

    private int getActivePartitionNum() {
        return this.activePartitionMap.size();
    }

    @Override
    public Map<PartitionRef, Long> onCommitComplete(ExpectedConsumerStatus expectedConsumerStatus, Throwable throwable) {
        if (this.isStopping()) {
            return Collections.emptyMap();
        }
        long now = this.time.millis();
        if (throwable == null) {
            if (this.state == ConsumerState.STARTING && expectedConsumerStatus.isNewSeqAllocated()) {
                this.state = ConsumerState.RUNNING;
                this.seq = expectedConsumerStatus.getAllocatedSeq();
            }
            if (expectedConsumerStatus.getErrCode() != 0) {
                logger.error("Failed to commit for [{}][{}], error code [{}], msg: [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), expectedConsumerStatus.getErrCode(), expectedConsumerStatus.getErrMsg()});
                return Collections.emptyMap();
            }
            if (this.submittingMetrics) {
                this.lastMetricsSubmitMs = this.time.millis();
            }
            if (this.isUpgradeRequired(expectedConsumerStatus.getVersion(), this.version)) {
                logger.warn("Please update to the version [{}], current version is [{}]", (Object)expectedConsumerStatus.getVersion(), (Object)this.version);
            }
            if (expectedConsumerStatus.getIp() != null && !StringUtils.equals((CharSequence)expectedConsumerStatus.getIp(), (CharSequence)this.ip)) {
                logger.info("Changed ip of [{}] from [{}] to [{}] according to response", new Object[]{expectedConsumerStatus.getSeq(), this.ip, expectedConsumerStatus.getIp()});
                this.ip = expectedConsumerStatus.getIp();
            }
            HashMap<PartitionRef, Long> newPartitions = new HashMap<PartitionRef, Long>();
            block8: for (ExpectedPartitionStatus expectedPartitionStatus : expectedConsumerStatus.getExpectedPartitionStatusList()) {
                ActivePartition activePartition = this.getActivePartition(expectedPartitionStatus.getPartition());
                switch (expectedPartitionStatus.getErrCode()) {
                    case 0: {
                        break;
                    }
                    case 440: {
                        logger.warn("Cluster manager failed to commit checkpoint of [{}][{}][{}] on [{}] to database", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), expectedPartitionStatus.getPartition(), expectedConsumerStatus.getSeq()});
                        break;
                    }
                    case 511: {
                        if (activePartition == null) break;
                        this.handleInvalidGeneration(activePartition, expectedPartitionStatus);
                        newPartitions.put(activePartition.getRef(), expectedPartitionStatus.getGeneration());
                        break;
                    }
                    default: {
                        logger.warn("Got error for partition [{}][{}][{}][{}]: [{}][{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), expectedPartitionStatus.getPartition(), expectedPartitionStatus.getGeneration(), expectedPartitionStatus.getErrCode(), expectedPartitionStatus.getErrMsg()});
                        this.maybeStopPartitionClient(activePartition);
                        continue block8;
                    }
                }
                if (activePartition == null) {
                    if (this.getActivePartitionNum() >= ((BaseClusterContext)this.context).getMaxConns()) {
                        logger.warn("Active partition number has reached [{}], do not start partition [{}][{}][{}][{}]", new Object[]{((BaseClusterContext)this.context).getMaxConns(), ((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), expectedPartitionStatus.getPartition(), expectedPartitionStatus.getGeneration()});
                        continue;
                    }
                    logger.warn("Got new partition [{}][{}][{}] on [{}]: generation [{}], offset: [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), expectedPartitionStatus.getPartition(), this.seq, expectedPartitionStatus.getGeneration(), expectedPartitionStatus.getOffset()});
                    activePartition = new ActivePartition(this, expectedPartitionStatus.getPartition(), this.time, this.partitionStateChangeListeners);
                    activePartition.setGeneration(expectedPartitionStatus.getGeneration());
                    activePartition.setInitOffset(expectedPartitionStatus.getOffset());
                    this.activePartitionMap.put(expectedPartitionStatus.getPartition(), activePartition);
                    newPartitions.put(activePartition.getRef(), expectedPartitionStatus.getGeneration());
                } else {
                    switch (activePartition.getState()) {
                        case RUNNING: {
                            activePartition.extendLease(this.getSessionTimeoutMs());
                            break;
                        }
                    }
                }
                activePartition.setLastSuccessCommitAt(now);
            }
            return newPartitions;
        }
        for (ActivePartition activePartition : this.activePartitionMap.values()) {
            if (activePartition.getState() != PartitionState.RUNNING) {
                logger.info("Failed to commit for partition {}, while the state of the partition is [{}]", (Object)activePartition, (Object)activePartition.getState());
                continue;
            }
            if (!activePartition.getClient().getNotifyController().isValid()) {
                logger.warn("The running partition {} has not committed successfully since [{}], exceeded session timeout [{}] for [{}] ms", new Object[]{activePartition, activePartition.getLastSuccessCommitAt(), this.getSessionTimeoutMs(), now - activePartition.getLastSuccessCommitAt() - this.getSessionTimeoutMs()});
                NotifyController notifyController = activePartition.getClient().getNotifyController();
                if (!notifyController.isClosed() && (notifyController.isNotifying() || now - notifyController.getLastNotifiedMs() < this.getSessionTimeoutMs())) {
                    long targetMs = now + this.getSessionTimeoutMs();
                    logger.info("The client for partition {} is still notifying messages, last notified at [{}], extends the lease to [{}]", new Object[]{activePartition, notifyController.getLastNotifiedMs(), targetMs});
                    notifyController.extendLeaseTo(targetMs);
                    continue;
                }
                logger.warn("The partition {} is not notifying message, and no message has arrived since [{}], stop the client", (Object)activePartition, (Object)notifyController.getLastNotifiedMs());
                this.maybeStopPartitionClient(activePartition);
                continue;
            }
            logger.info("Failed to commit for partition {}, while the partition is still valid for consuming", (Object)activePartition);
        }
        return Collections.emptyMap();
    }

    private void handleInvalidGeneration(ActivePartition activePartition, ExpectedPartitionStatus expectedPartitionStatus) {
        logger.warn("Generation of [{}][{}][{}] changed: [{}] -> [{}], offset: [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), expectedPartitionStatus.getPartition(), activePartition.getGeneration(), expectedPartitionStatus.getGeneration(), expectedPartitionStatus.getOffset()});
        switch (activePartition.getState()) {
            case RUNNING: {
                this.maybeStopPartitionClient(activePartition);
                activePartition.setPartitionInfo(null);
                activePartition.setClient(null);
                break;
            }
            default: {
                activePartition.setCurrentActionId(-1L);
            }
        }
        activePartition.setGeneration(expectedPartitionStatus.getGeneration());
        activePartition.setInitOffset(expectedPartitionStatus.getOffset());
        activePartition.setState(PartitionState.INIT);
    }

    @Override
    public void onGetPartitionInfoComplete(Map<PartitionRef, PartitionInfo> partitionInfoMap, Map<PartitionRef, Long> generationMap, Throwable throwable) {
        if (this.isStopping()) {
            return;
        }
        if (throwable == null) {
            for (Map.Entry<PartitionRef, PartitionInfo> entry : partitionInfoMap.entrySet()) {
                Long generation = generationMap.get(entry.getKey());
                PartitionInfo partitionInfo = entry.getValue();
                ActivePartition activePartition = this.getActivePartition(partitionInfo.getPartition());
                if (activePartition == null) {
                    logger.warn("Got info of partition [{}][{}] while active partition not exists", (Object)partitionInfo.getPartition(), (Object)generation);
                    continue;
                }
                if (generation == null || activePartition.getGeneration() != generation.longValue()) {
                    logger.warn("Got info of partition [{}] of generation [{}], while current generation of the partition is [{}]", new Object[]{partitionInfo.getPartition(), generation, activePartition.getGeneration()});
                    continue;
                }
                if (activePartition.getPartitionInfo() == null) {
                    if (partitionInfo.getErrCode() != 0) {
                        logger.error("Got invalid partition info [{}][{}] for [{}][{}]", new Object[]{partitionInfo.getErrCode(), partitionInfo.getErrMsg(), partitionInfo.getPartition(), generation});
                        this.activePartitionMap.remove(activePartition.getPartition());
                        continue;
                    }
                    if (!StringUtils.equals((CharSequence)partitionInfo.getGuid(), (CharSequence)((BaseClusterContext)this.context).getAppGuid())) {
                        logger.error("Got invalid guid from the partition info [{}], expected [{}], group [{}]", new Object[]{partitionInfo, ((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup()});
                        this.activePartitionMap.remove(activePartition.getPartition());
                        continue;
                    }
                    activePartition.setPartitionInfo(partitionInfo);
                    try {
                        activePartition.setClient(this.partitionClientFactory.create(this.context, partitionInfo, this.getNextListener(), activePartition.getInitOffset()));
                    }
                    catch (Exception e) {
                        logger.error("Failed to create client for partition [{}], mark the partition as stopped", (Object)activePartition, (Object)e);
                        activePartition.setState(PartitionState.STOPPED);
                        continue;
                    }
                    activePartition.setState(PartitionState.STARTING);
                    activePartition.setCurrentActionId(this.getNextActionId());
                    this.startClient(activePartition.getPartition(), activePartition.getClient(), activePartition.getCurrentActionId());
                    continue;
                }
                logger.warn("Partition info has already been set for {}", (Object)activePartition);
            }
        } else {
            logger.error("Failed to get partition info for {}", generationMap, (Object)throwable);
            for (Map.Entry<PartitionRef, Long> entry : generationMap.entrySet()) {
                ActivePartition activePartition = this.getActivePartition(entry.getKey().getPartition());
                if (activePartition.getGeneration() != entry.getValue().longValue() || activePartition.getState() != PartitionState.INIT) continue;
                this.activePartitionMap.remove(activePartition.getPartition());
                logger.error("Removed partition [{}][{}] due to getting partition info failure", new Object[]{activePartition.getPartition(), activePartition.getGeneration(), throwable});
            }
        }
    }

    void onClientStarted(String partition, long actionId, PartitionClient client, Throwable throwable) {
        if (throwable == null) {
            logger.info("Client for partition [{}][{}][{}] started, action id [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId});
        } else {
            logger.warn("Failed to start client for partition [{}][{}][{}], action id [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId, throwable});
        }
        if (this.isStopping()) {
            if (throwable == null) {
                logger.warn("Client for partition [{}][{}][{}], action id [{}] started while the partition manager is not running", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId});
                this.stopClient(partition, client, -1L);
                return;
            }
            return;
        }
        ActivePartition activePartition = this.getActivePartition(partition);
        if (activePartition == null) {
            if (throwable == null) {
                logger.warn("Client started while no active partition [{}][{}][{}] exists, action id [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId});
                this.stopClient(partition, client, -1L);
                return;
            }
            logger.warn("Client started failed while no active partition [{}][{}][{}] exists, action id [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId, throwable});
            return;
        }
        if (throwable == null) {
            switch (activePartition.getState()) {
                case STARTING: {
                    if (activePartition.getCurrentActionId() == actionId) {
                        activePartition.setState(PartitionState.RUNNING);
                        activePartition.setClient(client);
                        activePartition.setCurrentActionId(-1L);
                        activePartition.extendLease(this.getSessionTimeoutMs());
                        break;
                    }
                    logger.error("Client started for partition {}, but the action id does not match [{}]", (Object)activePartition, (Object)actionId);
                    this.stopClient(partition, client, -1L);
                    break;
                }
                default: {
                    logger.error("Client started for partition {}, but the partition is in an invalid state [{}]", (Object)activePartition, (Object)activePartition.getState());
                    this.stopClient(partition, client, actionId);
                    activePartition.setState(PartitionState.STOPPING);
                    break;
                }
            }
        } else if (activePartition.getCurrentActionId() == actionId) {
            logger.error("Failed to start client for partition {}", (Object)activePartition, (Object)throwable);
            activePartition.setCurrentActionId(-1L);
            activePartition.setState(PartitionState.STOPPED);
        } else {
            logger.error("Failed to start client for [{}][{}][{}], action id [{}], while the current action id is [{}], ignore", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId, activePartition.getCurrentActionId()});
        }
    }

    void onClientStopped(String partition, long actionId, Throwable throwable) {
        if (throwable == null) {
            logger.info("Client for partition [{}][{}][{}] stopped, action id [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId});
        } else {
            logger.warn("Failed to stop client for partition [{}][{}][{}], action id [{}]", new Object[]{((BaseClusterContext)this.context).getAppGuid(), ((BaseClusterContext)this.context).getAppGroup(), partition, actionId, throwable});
        }
        if (this.isStopping()) {
            return;
        }
        ActivePartition activePartition = this.getActivePartition(partition);
        if (activePartition == null) {
            logger.warn("No active partition for [{}] exists, action id [{}]", (Object)partition, (Object)actionId);
            return;
        }
        if (activePartition.getState() == PartitionState.STOPPING && activePartition.getCurrentActionId() == actionId) {
            activePartition.setState(PartitionState.STOPPED);
            activePartition.setCurrentActionId(-1L);
            activePartition.setClient(null);
        } else {
            logger.error("Partition [{}] is not in expected state, action id [{}]", (Object)activePartition, (Object)actionId);
        }
    }

    static class ActivePartition {
        private final PartitionManager partitionManager;
        private final String partition;
        private final List<PartitionStateChangeListener> stateChangeListeners;
        private final PartitionRef ref;
        private final Time time;
        private PartitionInfo partitionInfo;
        private long generation;
        private String initOffset;
        private PartitionState state = PartitionState.INIT;
        private long stateStartTime;
        private PartitionClient client;
        private long lastSuccessCommitAt;
        private long currentActionId = -1L;
        private String message;

        public ActivePartition(PartitionManager partitionManager, String partition, Time time) {
            this(partitionManager, partition, time, Collections.emptyList());
        }

        public ActivePartition(PartitionManager partitionManager, String partition, Time time, List<PartitionStateChangeListener> stateChangeListeners) {
            this.partitionManager = partitionManager;
            this.partition = partition;
            this.time = time;
            this.stateChangeListeners = stateChangeListeners;
            this.ref = new PartitionRef(((BaseClusterContext)partitionManager.getContext()).getAppGuid(), ((BaseClusterContext)partitionManager.getContext()).getAppGroup(), partition);
            this.setState(PartitionState.INIT);
        }

        public String getPartition() {
            return this.partition;
        }

        public PartitionRef getRef() {
            return this.ref;
        }

        public PartitionInfo getPartitionInfo() {
            return this.partitionInfo;
        }

        public void setPartitionInfo(PartitionInfo partitionInfo) {
            this.partitionInfo = partitionInfo;
        }

        public long getGeneration() {
            return this.generation;
        }

        public void setGeneration(long generation) {
            this.generation = generation;
        }

        public String getInitOffset() {
            return this.initOffset;
        }

        public void setInitOffset(String initOffset) {
            this.initOffset = initOffset;
        }

        public String offset() {
            if (this.client != null) {
                return this.client.offset();
            }
            return this.initOffset;
        }

        public PartitionState getState() {
            return this.state;
        }

        public void setState(PartitionState state) {
            this.state = state;
            this.stateStartTime = this.time.millis();
            if (!this.stateChangeListeners.isEmpty()) {
                for (PartitionStateChangeListener partitionStateChangeListener : this.stateChangeListeners) {
                    IPartition iPartition = null;
                    if (this.client != null) {
                        iPartition = this.client.getPartition();
                    }
                    partitionStateChangeListener.onStateChanged(this.getRef(), state, iPartition);
                }
            }
        }

        public long getStateStartTime() {
            return this.stateStartTime;
        }

        public long getLastSuccessCommitAt() {
            return this.lastSuccessCommitAt;
        }

        public void setLastSuccessCommitAt(long lastSuccessCommitAt) {
            this.lastSuccessCommitAt = lastSuccessCommitAt;
        }

        public void setCurrentActionId(long currentActionId) {
            this.currentActionId = currentActionId;
        }

        public long getCurrentActionId() {
            return this.currentActionId;
        }

        public PartitionClient getClient() {
            return this.client;
        }

        public long extendLease(long sessionTimeoutMs) {
            long toMs = this.getLastSuccessCommitAt() + sessionTimeoutMs;
            if (this.getClient() != null) {
                logger.debug("To extend lease of partition [{}] to [{}]", (Object)this.getPartition(), (Object)toMs);
                this.getClient().getNotifyController().extendLeaseTo(toMs);
            } else {
                logger.error("No client for partition {} to extend the lease", (Object)this);
            }
            return toMs;
        }

        public void setClient(PartitionClient client) {
            this.client = client;
        }

        public String getMessage(boolean withMetrics) {
            HashMap<String, Object> map = new HashMap<String, Object>();
            if (this.client != null) {
                Map<String, Object> metrics = this.client.getMetrics();
                logger.info("Partition {} metrics: {}", (Object)this.ref, metrics);
                if (withMetrics) {
                    map.put("metrics", this.client.getMetrics());
                }
            }
            if (this.message != null) {
                map.put("message", this.message);
            }
            return JSON.toJSONString(map);
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public String toString() {
            return "[" + ((BaseClusterContext)this.partitionManager.getContext()).getAppGuid() + "][" + ((BaseClusterContext)this.partitionManager.getContext()).getAppGroup() + "][" + this.getPartition() + "][" + this.getGeneration() + "][" + (Object)((Object)this.getState()) + "]";
        }
    }

    static class Version
    implements Comparable<Version> {
        private static final Pattern VERSION_PATTERN = Pattern.compile("^(\\d+)\\.(\\d+)\\.(\\d+).*");
        private final int majorVersion;
        private final int minorVersion;
        private final int revisionNumber;

        public Version(String versionStr) {
            Matcher matcher = VERSION_PATTERN.matcher(versionStr);
            if (matcher.matches()) {
                this.majorVersion = Integer.parseInt(matcher.group(1));
                this.minorVersion = Integer.parseInt(matcher.group(2));
                this.revisionNumber = Integer.parseInt(matcher.group(3));
            } else {
                this.majorVersion = 0;
                this.minorVersion = 0;
                this.revisionNumber = 0;
            }
        }

        public int getMajorVersion() {
            return this.majorVersion;
        }

        public int getMinorVersion() {
            return this.minorVersion;
        }

        public int getRevisionNumber() {
            return this.revisionNumber;
        }

        @Override
        public int compareTo(Version that) {
            if (this.majorVersion != that.majorVersion) {
                return Integer.signum(this.majorVersion - that.majorVersion);
            }
            if (this.minorVersion != that.minorVersion) {
                return Integer.signum(this.minorVersion - that.minorVersion);
            }
            if (this.revisionNumber != that.revisionNumber) {
                return Integer.signum(this.revisionNumber - that.revisionNumber);
            }
            return 0;
        }
    }
}

