/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.drc.client.impl;

import com.aliyun.drc.client.DRCClient;
import com.aliyun.drc.client.DRCClientException;
import com.aliyun.drc.client.DataFilter;
import com.aliyun.drc.client.DataFilterBase;
import com.aliyun.drc.client.Listener;
import com.aliyun.drc.client.enums.DBType;
import com.aliyun.drc.client.enums.LogLevel;
import com.aliyun.drc.client.enums.Status;
import com.aliyun.drc.client.impl.Checkpoint;
import com.aliyun.drc.client.impl.ClusterManagers;
import com.aliyun.drc.client.impl.DRCConfig;
import com.aliyun.drc.client.impl.LocalityFile;
import com.aliyun.drc.client.impl.MessageId;
import com.aliyun.drc.client.impl.RecordsCache;
import com.aliyun.drc.client.impl.ServerProxy;
import com.aliyun.drc.client.message.DataMessage;
import com.aliyun.drc.client.message.Message;
import com.aliyun.drc.client.message.RedirectMessage;
import com.aliyun.drc.utils.DataFilterUtil;
import java.io.IOException;
import java.io.Reader;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class DRCClientImpl
implements DRCClient,
Runnable {
    private DataFilterBase filter = null;
    private volatile boolean quit = false;
    private LocalityFile binlogfile = null;
    private int oneHbEverySeconds = 1;
    private Status status = Status.RAW;
    private ServerProxy server;
    private final DRCConfig config;
    private Thread serviceThread = null;
    private List<Listener> listeners;
    private long lastNotifiedTime;
    private long notifiedPeriod = 60L;
    private String blackList;
    private String storeIpAndPort = null;
    private int accumulatedHeartbeats;
    private int retriedTimes;
    private long accumulatedTime;
    private long count;
    private RecordsCache cache;
    private int checkpointPeriod;
    private int recordCheckpointTime;
    private volatile boolean suspend = false;
    private boolean useDrcnet = true;
    private DBType dbType = null;
    public static final String TOPIC_MATCH_REGEX = "^\\w+\\-[0-9]+\\-[0-9]+$";

    public DRCClientImpl(String propertiesFilename) throws IOException {
        this.config = new DRCConfig(propertiesFilename);
        this.listeners = new ArrayList<Listener>();
    }

    public DRCClientImpl(Reader reader) throws IOException {
        this.config = new DRCConfig(reader);
        this.listeners = new ArrayList<Listener>();
    }

    public DRCClientImpl(Properties properties) throws IOException {
        this.config = new DRCConfig(properties);
        this.listeners = new ArrayList<Listener>();
    }

    @Override
    public void addDRCConfigure(String name, String value) {
        this.config.addConfigure(name, value);
    }

    @Override
    public Map<String, String> getDRCConfigures() {
        return this.config.getConfigures();
    }

    @Override
    public String getDRCConfigure(String name) {
        return this.config.getConfigure(name);
    }

    @Override
    public void addUserParameter(String name, String value) {
        this.config.addParam(name, value);
    }

    @Override
    public Map<String, String> getUserParameters() {
        return this.config.getParams();
    }

    @Override
    public String getUserParameter(String name) {
        return this.config.getParam(name);
    }

    @Override
    public void initService(String groupName, String dbName, String identification, String startingPoint, String localFilename) throws Exception {
        this.initService(groupName, dbName, identification, startingPoint, "0", "", "", localFilename);
    }

    @Override
    public void initService(String groupName, String dbName, String identification, String localFilename) throws Exception {
        Checkpoint location = this.getLocalInfo(localFilename);
        this.initService(groupName, dbName, identification, location.getPosition().equals("@") ? location.getTimestamp() : location.getPosition(), "0", "", location.getServerId(), localFilename);
    }

    public static boolean isValidTopicName(String topicName) {
        Pattern p = Pattern.compile(TOPIC_MATCH_REGEX);
        Matcher m = p.matcher(topicName);
        return m.matches();
    }

    @Override
    public synchronized void initService(String groupName, String dbName, String identification, String startingPoint, String metaVersion, String taskName, String instance, String localFilename) throws Exception {
        if (this.status != Status.RAW) {
            throw new Exception("server state is error,current state:" + (Object)((Object)this.status));
        }
        this.config.setGroupName(groupName);
        this.config.setDbname(dbName);
        this.config.setIdentification(identification);
        this.config.setStartingPoint(startingPoint);
        this.config.setInstance(instance);
        if (dbName != null && !DRCClientImpl.isValidTopicName(dbName)) {
            this.filter.setBranchDb(dbName);
        }
        this.config.setBlackList(this.blackList);
        this.config.setDataFilter(this.filter);
        this.config.setRequiredTablesAndColumns(this.filter.toString());
        if (localFilename != null) {
            this.config.setBinlogFilename(localFilename);
            this.binlogfile = new LocalityFile(localFilename, 0, 0x6400000L);
        } else {
            this.binlogfile = null;
        }
        this.status = Status.INIT;
        this.sendLog2Listeners(LogLevel.INFO, "Initialize the service with starting point: " + startingPoint);
    }

    @Override
    public synchronized void initService(String groupName, String dbName, String identification, Checkpoint checkpoint, String localFilename) throws Exception {
        if (this.status != Status.RAW) {
            throw new Exception("server state is error,current state:" + (Object)((Object)this.status));
        }
        this.config.setGroupName(groupName);
        this.config.setDbname(dbName);
        this.config.setIdentification(identification);
        this.config.setCheckpoint(checkpoint);
        if (dbName != null && !DRCClientImpl.isValidTopicName(dbName)) {
            this.filter.setBranchDb(dbName);
        }
        this.config.setBlackList(this.blackList);
        this.config.setDataFilter(this.filter);
        this.config.setRequiredTablesAndColumns(this.filter.toString());
        if (localFilename != null) {
            this.config.setBinlogFilename(localFilename);
            this.binlogfile = new LocalityFile(localFilename, 0, 0x6400000L);
        } else {
            this.binlogfile = null;
        }
        this.status = Status.INIT;
        this.sendLog2Listeners(LogLevel.INFO, "Initialize the service with starting point: " + checkpoint.toString());
    }

    private void connectClusterManager(String urls) throws Exception {
        ClusterManagers managers = new ClusterManagers(urls);
        this.server = managers.findStore(this.config, this);
        if (this.server != null) {
            String storeUrl = this.server.getUrl();
            Pattern pattern = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
            Matcher matcher = pattern.matcher(storeUrl);
            if (matcher.find()) {
                this.storeIpAndPort = matcher.group(0);
            }
            this.sendLog2Listeners(LogLevel.INFO, "Connect to " + this.server.getUrl() + " successfully");
        }
    }

    private void resetServer() throws Exception {
        if (this.status != Status.STOP) {
            throw new Exception("server state is error,current state:" + (Object)((Object)this.status));
        }
        if (this.server != null) {
            this.server.close();
            this.server = null;
        }
        this.sendLog2Listeners(LogLevel.INFO, "Reset Server");
        this.status = Status.INIT;
    }

    @Override
    public void run() {
        this.lastNotifiedTime = System.currentTimeMillis();
        MessageId messageId = new MessageId(this.config.getDbname());
        this.recordCheckpointTime = this.checkpointPeriod = this.config.getCheckpointPeriod();
        if (this.config.isTxnRequiredCompleted()) {
            this.cache = new RecordsCache(this.config);
        }
        this.useDrcnet = this.config.getUseDrcNet();
        block8: while (!this.isQuit()) {
            try {
                if (this.suspend) {
                    TimeUnit.SECONDS.sleep(1L);
                    continue;
                }
                Message message = !this.useDrcnet ? this.server.getResponse(this.config.isBinaryFormat()) : this.server.getDrcNetResponse(this.config.isBinaryFormat());
                messageId.meet(message.getMid());
                switch (message.getType()) {
                    case 300: {
                        this.processRedirectMessage(message);
                        messageId.reset(this.config.getDbname());
                        continue block8;
                    }
                    case 100: {
                        this.processDataMessage(message);
                        this.notifyDataMessage(message);
                        this.persistCheckpoint();
                        continue block8;
                    }
                }
                throw new DRCClientException("Wrong DRCMessage type " + message.getType());
            }
            catch (Exception e) {
                if (!this.isQuit()) {
                    StackTraceElement[] stack;
                    String errMessage = e.getMessage();
                    if (errMessage == null) {
                        errMessage = "server closed unexpectedly";
                    }
                    String urlMessage = this.server != null ? this.server.getUrl() : new String("unknown url");
                    this.sendLog2Listeners(LogLevel.WARN, "Connect to " + urlMessage + " failed because " + errMessage);
                    for (StackTraceElement element : stack = e.getStackTrace()) {
                        this.sendLog2Listeners(LogLevel.WARN, element.toString());
                    }
                    e.printStackTrace();
                    if (this.retriedTimes++ < this.config.getMaxRetriedTimes()) {
                        messageId.reset(this.config.getDbname());
                        RedirectMessage redirectMessage = new RedirectMessage();
                        redirectMessage.setId(0L);
                        redirectMessage.setType(300);
                        redirectMessage.setDelayed(10);
                        this.sendLog2Listeners(LogLevel.WARN, "After broken, retry cluster manager " + this.retriedTimes + " out of " + this.retriedTimes);
                        if (this.server == null) {
                            this.server = new ServerProxy("retry", this.config);
                        }
                        if (this.server.setResponse(redirectMessage)) continue;
                        this.sendLog2Listeners(LogLevel.ERROR, "Fail to retry");
                        throw new RuntimeException(e);
                    }
                    throw new RuntimeException(e);
                }
                this.sendLog2Listeners(LogLevel.WARN, "client interrupted for quitting");
            }
        }
        this.status = Status.STOP;
        try {
            this.resetServer();
            this.sendLog2Listeners(LogLevel.INFO, "Out of the thread loop");
            if (this.binlogfile != null) {
                this.binlogfile.close();
                this.sendLog2Listeners(LogLevel.INFO, "Closed log file");
            }
        }
        catch (Exception e) {
            System.err.print("Close local file failed");
        }
    }

    private void persistCheckpoint() throws IOException {
        if (this.recordCheckpointTime >= this.checkpointPeriod) {
            StringBuilder pb = new StringBuilder();
            pb.append("Global_position_info:");
            pb.append(this.config.getCheckpoint().toString());
            if (this.binlogfile != null) {
                this.binlogfile.writeLine(pb.toString());
            }
            this.recordCheckpointTime = 0;
        } else {
            ++this.recordCheckpointTime;
        }
    }

    private void notifyDataMessage(Message message) {
        DataMessage dataMessage = (DataMessage)message;
        for (Listener listener : this.listeners) {
            try {
                long t_prev = System.currentTimeMillis();
                if (this.cache == null) {
                    dataMessage.addAttribute("sourceIPAndPort", this.storeIpAndPort);
                    listener.notify(dataMessage);
                } else if (this.cache.isReady()) {
                    DataMessage cachedMessage = this.cache.getReadyRecords();
                    cachedMessage.addAttribute("sourceIPAndPort", this.storeIpAndPort);
                    listener.notify(cachedMessage);
                }
                long t_after = System.currentTimeMillis();
                if (t_after - this.lastNotifiedTime > this.notifiedPeriod * 1000L) {
                    listener.notifyRuntimeLog("Info", "notify consume: " + (double)this.accumulatedTime / (double)this.count + " ms");
                    this.lastNotifiedTime = t_after;
                    this.accumulatedTime = 0L;
                    this.count = 0L;
                    continue;
                }
                this.accumulatedTime += t_after - t_prev;
                ++this.count;
            }
            catch (Exception e) {
                listener.handleException(e);
            }
        }
    }

    private void processDataMessage(Message message) throws IOException {
        DataMessage dataMessage = (DataMessage)message;
        Iterator<DataMessage.Record> rit = dataMessage.getRecordList().iterator();
        block4: while (rit.hasNext()) {
            DataMessage.Record r = rit.next();
            this.processColumnFilter(r);
            Checkpoint checkpoint = this.config.getCheckpoint();
            switch (r.getOpt()) {
                case HEARTBEAT: {
                    if (++this.accumulatedHeartbeats < this.oneHbEverySeconds) {
                        rit.remove();
                        break;
                    }
                    String tm = r.getTimestamp();
                    if (checkpoint.getTimestamp() != null && !checkpoint.getTimestamp().equals(tm) && this.retriedTimes != 0) {
                        this.retriedTimes = 0;
                    }
                    checkpoint.setTimestamp(tm);
                    this.accumulatedHeartbeats = 0;
                    if (this.cache == null) continue block4;
                    this.cache.addRecord(r);
                    break;
                }
                case BEGIN: 
                case COMMIT: {
                    String pos = r.getCheckpoint();
                    if (checkpoint.equals(pos) && this.retriedTimes != 0) {
                        this.retriedTimes = 0;
                    }
                    this.config.setInstance(r.getServerId());
                    if (this.cache == null) continue block4;
                    this.cache.addRecord(r);
                    break;
                }
                default: {
                    String pos1 = r.getCheckpoint();
                    if (!checkpoint.equals(pos1) && this.retriedTimes != 0) {
                        this.retriedTimes = 0;
                    }
                    this.config.setInstance(r.getServerId());
                    if (this.cache == null) continue block4;
                    this.cache.addRecord(r);
                }
            }
        }
    }

    private void processColumnFilter(DataMessage.Record record) {
        List<String> s;
        if (!this.filter.getIsAllMatch() && (s = DataFilterUtil.getColNamesWithMapping(record.getDbname(), record.getTablename(), this.filter)) != null) {
            List<DataMessage.Record.Field> l = record.getFieldList();
            Iterator<DataMessage.Record.Field> it = l.iterator();
            while (it.hasNext()) {
                DataMessage.Record.Field f = it.next();
                if (DataFilterUtil.isColInArray(f.getFieldname(), s)) continue;
                it.remove();
            }
        }
    }

    private void processRedirectMessage(Message message) throws Exception {
        this.sendLog2Listeners(LogLevel.INFO, "Redirect to taskName: " + this.config.getDbname());
        RedirectMessage redirectMessage = (RedirectMessage)message;
        if (redirectMessage.getDelayed() > 0) {
            TimeUnit.SECONDS.sleep(redirectMessage.getDelayed());
        }
        this.connectClusterManager(this.config.getClusterManagerAddresses());
    }

    @Override
    public synchronized Thread startService() throws Exception {
        if (this.status != Status.INIT) {
            throw new Exception("server state is error,current state:" + (Object)((Object)this.status));
        }
        this.connectClusterManager(this.config.getClusterManagerAddresses());
        this.sendLog2Listeners(LogLevel.INFO, "start service successfully");
        this.clearParams();
        this.serviceThread = new Thread(this);
        this.serviceThread.start();
        this.sendLog2Listeners(LogLevel.INFO, "getting data from daemon server");
        this.status = Status.START;
        return this.serviceThread;
    }

    private void clearParams() {
        this.accumulatedHeartbeats = 0;
        this.retriedTimes = 0;
        this.accumulatedTime = 0L;
        this.count = 0L;
        this.cache = null;
        this.checkpointPeriod = 0;
        this.recordCheckpointTime = 0;
    }

    @Override
    public synchronized void resetService() throws Exception {
        if (this.status != Status.STOP) {
            throw new Exception("server state is error,current state:" + (Object)((Object)this.status));
        }
        this.setQuit(false);
        this.status = Status.INIT;
    }

    @Override
    public synchronized void stopService() throws Exception {
        if (this.status != Status.START) {
            throw new Exception("server state is error,current state:" + (Object)((Object)this.status));
        }
        this.sendLog2Listeners(LogLevel.INFO, "Set quit true");
        this.setQuit(true);
        this.sendLog2Listeners(LogLevel.INFO, "Judge if thread is null");
        if (this.serviceThread != null && this.serviceThread.isAlive() && this.serviceThread != Thread.currentThread()) {
            this.sendLog2Listeners(LogLevel.INFO, "Wait for the thread stopped");
            this.serviceThread.interrupt();
            this.serviceThread.join();
        }
        this.status = Status.STOP;
        this.storeIpAndPort = null;
        this.sendLog2Listeners(LogLevel.INFO, "Service stopped.");
    }

    @Override
    public final void addListener(Listener listener) {
        this.listeners.add(listener);
    }

    @Override
    public String getInstance() {
        return this.config.getInstance();
    }

    @Override
    public void trimLongType() {
        this.config.trimLongType();
    }

    private final Checkpoint getLocalInfo(String filename) throws IOException, DRCClientException {
        Checkpoint checkpoint = null;
        LocalityFile localfile = new LocalityFile(filename, 0, 0L);
        String line = null;
        String location = null;
        while ((line = localfile.readLine()) != null) {
            if (!DRCClientImpl.isLocationInfo(line)) continue;
            location = DRCClientImpl.extractLocationInfo(line);
        }
        if (location == null || location.isEmpty()) {
            throw new DRCClientException("Local file " + filename + " not exists or is empty");
        }
        checkpoint = new Checkpoint();
        String[] items = location.split(":");
        checkpoint.setServerId(items[1] + "-" + items[2]);
        checkpoint.setPosition(items[4] + "@" + items[3]);
        checkpoint.setTimestamp(items[5]);
        if (items.length > 6) {
            checkpoint.setRecordId(items[6]);
        }
        return checkpoint;
    }

    private static boolean isLocationInfo(String line) {
        return line.contains("Global_position_info:");
    }

    private static final String extractLocationInfo(String string) {
        if (string == null) {
            return null;
        }
        String[] items = string.split(" ");
        return items[2];
    }

    @Override
    public final String getDbName() {
        return this.config.getDbname();
    }

    @Override
    @Deprecated
    public void addDataFilter(DataFilter filter) {
        this.filter = filter;
    }

    @Override
    public final void addDataFilter(DataFilterBase filter) {
        this.filter = filter;
    }

    @Override
    @Deprecated
    public final void addWhereFilter(DataFilterBase filter) {
        this.config.setWhereFilters(filter.toString());
    }

    @Override
    @Deprecated
    public final void useStrictFilter() {
        this.config.setFilterUnchangedRecords();
    }

    @Override
    public String getTaskName() {
        return "0";
    }

    @Override
    public void setHeartbeatFrequency(int everySeconds) {
        this.oneHbEverySeconds = everySeconds;
    }

    private void sendLog2Listeners(LogLevel level, String msg) {
        for (Listener listener : this.listeners) {
            try {
                switch (level) {
                    case ERROR: {
                        listener.notifyRuntimeLog("ERROR", msg);
                        break;
                    }
                    case WARN: {
                        listener.notifyRuntimeLog("WARN", msg);
                        break;
                    }
                    case INFO: {
                        listener.notifyRuntimeLog("INFO", msg);
                        break;
                    }
                }
            }
            catch (Exception e) {
                listener.handleException(e);
            }
        }
    }

    private void setQuit(boolean isQuit) {
        this.quit = isQuit;
    }

    private boolean isQuit() {
        return this.quit;
    }

    @Override
    public void requireTxnMark(boolean required) {
        this.config.requireTxnMark(required);
    }

    @Override
    public void setNotifyRuntimePeriodInSec(long sec) {
        this.notifiedPeriod = sec;
    }

    @Override
    public void setNumOfRecordsPerBatch(int threshold) {
        this.config.setNumOfRecordsPerBatch(threshold);
    }

    @Override
    public DBType getDatabaseType() throws DRCClientException {
        DBType type;
        ClusterManagers managers = new ClusterManagers(this.config.getClusterManagerAddresses());
        try {
            type = managers.getDatabaseType(this.config);
        }
        catch (UnknownHostException ue) {
            throw new DRCClientException(ue.getMessage());
        }
        catch (MalformedURLException me) {
            throw new DRCClientException(me.getMessage());
        }
        return type;
    }

    @Override
    public void setGroup(String group) {
        this.config.setGroup(group);
    }

    @Override
    public void setSubGroup(String subGroup) {
        this.config.setSubGroup(subGroup);
    }

    @Override
    public void setDrcMark(String mark) {
        this.config.setDRCMark(mark);
    }

    @Override
    public void askSelfUnit() {
        this.config.useDrcMark();
    }

    @Override
    public void setBlackList(String blackList) {
        this.blackList = blackList;
    }

    @Override
    public void suspend() {
        this.suspend = true;
    }

    @Override
    public void resume() {
        this.suspend = false;
    }

    @Override
    public void usePublicIp() {
        this.config.usePublicIp();
    }

    @Override
    public void useCaseSensitive() {
        this.config.useCaseSensitive();
    }

    @Override
    public void useCRC32Check() {
        this.config.setUseCheckCRC(true);
    }

    @Override
    public DBType getDBType() {
        return this.dbType;
    }

    public void setDbType(DBType dbType) {
        this.dbType = dbType;
    }

    public boolean validateDataFilter() throws DRCClientException {
        return this.filter.validateFilter(this.dbType);
    }
}

