package com.aliyun.drc.client.impl;

import com.aliyun.drc.client.BinlogPos;
import com.aliyun.drc.client.DRCClient;
import com.aliyun.drc.client.DRCClientException;
import com.aliyun.drc.client.DataFilter;
import com.aliyun.drc.client.DataFilterBase;
import com.aliyun.drc.client.Listener;
import com.aliyun.drc.client.enums.DBType;
import com.aliyun.drc.client.enums.LogLevel;
import com.aliyun.drc.client.enums.Status;
import com.aliyun.drc.client.message.DataMessage;
import com.aliyun.drc.client.message.Message;
import com.aliyun.drc.client.message.RedirectMessage;
import com.aliyun.drc.client.message.drcmessage.DataType;
import com.aliyun.drc.utils.DataFilterUtil;
import java.io.IOException;
import java.io.Reader;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/* loaded from: input_file:com/aliyun/drc/client/impl/DRCClientImpl.class */
public class DRCClientImpl implements DRCClient, Runnable {
    private ServerProxy server;
    private final DRCConfig config;
    private long lastNotifiedTime;
    private String blackList;
    private int accumulatedHeartbeats;
    private int retriedTimes;
    private long accumulatedTime;
    private long count;
    private RecordsCache cache;
    private int checkpointPeriod;
    private int recordCheckpointTime;
    public static final String TOPIC_MATCH_REGEX = "^\\w+\\-[0-9]+\\-[0-9]+$";
    private DataFilterBase filter = null;
    private volatile boolean quit = false;
    private LocalityFile binlogfile = null;
    private int oneHbEverySeconds = 1;
    private Status status = Status.RAW;
    private Thread serviceThread = null;
    private long notifiedPeriod = 60;
    private String storeIpAndPort = null;
    private volatile boolean suspend = false;
    private boolean useDrcnet = true;
    private DBType dbType = null;
    private List<Listener> listeners = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.aliyun.drc.client.impl.DRCClientImpl$1, reason: invalid class name */
    /* loaded from: input_file:com/aliyun/drc/client/impl/DRCClientImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$aliyun$drc$client$message$DataMessage$Record$Type;
        static final /* synthetic */ int[] $SwitchMap$com$aliyun$drc$client$enums$LogLevel = new int[LogLevel.values().length];

        static {
            try {
                $SwitchMap$com$aliyun$drc$client$enums$LogLevel[LogLevel.ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$aliyun$drc$client$enums$LogLevel[LogLevel.WARN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$aliyun$drc$client$enums$LogLevel[LogLevel.INFO.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$com$aliyun$drc$client$message$DataMessage$Record$Type = new int[DataMessage.Record.Type.values().length];
            try {
                $SwitchMap$com$aliyun$drc$client$message$DataMessage$Record$Type[DataMessage.Record.Type.HEARTBEAT.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$aliyun$drc$client$message$DataMessage$Record$Type[DataMessage.Record.Type.BEGIN.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$aliyun$drc$client$message$DataMessage$Record$Type[DataMessage.Record.Type.COMMIT.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public DRCClientImpl(String str) throws IOException {
        this.config = new DRCConfig(str);
    }

    public DRCClientImpl(Reader reader) throws IOException {
        this.config = new DRCConfig(reader);
    }

    public DRCClientImpl(Properties properties) throws IOException {
        this.config = new DRCConfig(properties);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void addDRCConfigure(String str, String str2) {
        this.config.addConfigure(str, str2);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public Map<String, String> getDRCConfigures() {
        return this.config.getConfigures();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public String getDRCConfigure(String str) {
        return this.config.getConfigure(str);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void addUserParameter(String str, String str2) {
        this.config.addParam(str, str2);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public Map<String, String> getUserParameters() {
        return this.config.getParams();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public String getUserParameter(String str) {
        return this.config.getParam(str);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void initService(String str, String str2, String str3, String str4, String str5) throws Exception {
        initService(str, str2, str3, str4, Location.PRIM_META, "", "", str5);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void initService(String str, String str2, String str3, String str4) throws Exception {
        Checkpoint localInfo = getLocalInfo(str4);
        initService(str, str2, str3, localInfo.getPosition().equals(BinlogPos.AT) ? localInfo.getTimestamp() : localInfo.getPosition(), Location.PRIM_META, "", localInfo.getServerId(), str4);
    }

    public static boolean isValidTopicName(String str) {
        return Pattern.compile(TOPIC_MATCH_REGEX).matcher(str).matches();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public synchronized void initService(String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8) throws Exception {
        if (this.status != Status.RAW) {
            throw new Exception("server state is error,current state:" + this.status);
        }
        this.config.setGroupName(str);
        this.config.setDbname(str2);
        this.config.setIdentification(str3);
        this.config.setStartingPoint(str4);
        this.config.setInstance(str7);
        if (str2 != null && !isValidTopicName(str2)) {
            this.filter.setBranchDb(str2);
        }
        this.config.setBlackList(this.blackList);
        this.config.setDataFilter(this.filter);
        this.config.setRequiredTablesAndColumns(this.filter.toString());
        if (str8 != null) {
            this.config.setBinlogFilename(str8);
            this.binlogfile = new LocalityFile(str8, 0, 104857600L);
        } else {
            this.binlogfile = null;
        }
        this.status = Status.INIT;
        sendLog2Listeners(LogLevel.INFO, "Initialize the service with starting point: " + str4);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public synchronized void initService(String str, String str2, String str3, Checkpoint checkpoint, String str4) throws Exception {
        if (this.status != Status.RAW) {
            throw new Exception("server state is error,current state:" + this.status);
        }
        this.config.setGroupName(str);
        this.config.setDbname(str2);
        this.config.setIdentification(str3);
        this.config.setCheckpoint(checkpoint);
        if (str2 != null && !isValidTopicName(str2)) {
            this.filter.setBranchDb(str2);
        }
        this.config.setBlackList(this.blackList);
        this.config.setDataFilter(this.filter);
        this.config.setRequiredTablesAndColumns(this.filter.toString());
        if (str4 != null) {
            this.config.setBinlogFilename(str4);
            this.binlogfile = new LocalityFile(str4, 0, 104857600L);
        } else {
            this.binlogfile = null;
        }
        this.status = Status.INIT;
        sendLog2Listeners(LogLevel.INFO, "Initialize the service with starting point: " + checkpoint.toString());
    }

    private void connectClusterManager(String str) throws Exception {
        this.server = new ClusterManagers(str).findStore(this.config, this);
        if (this.server != null) {
            Matcher matcher = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)").matcher(this.server.getUrl());
            if (matcher.find()) {
                this.storeIpAndPort = matcher.group(0);
            }
            sendLog2Listeners(LogLevel.INFO, "Connect to " + this.server.getUrl() + " successfully");
        }
    }

    private void resetServer() throws Exception {
        if (this.status != Status.STOP) {
            throw new Exception("server state is error,current state:" + this.status);
        }
        if (this.server != null) {
            this.server.close();
            this.server = null;
        }
        sendLog2Listeners(LogLevel.INFO, "Reset Server");
        this.status = Status.INIT;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:15:0x0099. Please report as an issue. */
    @Override // java.lang.Runnable
    public void run() {
        this.lastNotifiedTime = System.currentTimeMillis();
        MessageId messageId = new MessageId(this.config.getDbname());
        this.checkpointPeriod = this.config.getCheckpointPeriod();
        this.recordCheckpointTime = this.checkpointPeriod;
        if (this.config.isTxnRequiredCompleted()) {
            this.cache = new RecordsCache(this.config);
        }
        this.useDrcnet = this.config.getUseDrcNet();
        while (!isQuit()) {
            try {
                if (!this.suspend) {
                    Message response = !this.useDrcnet ? this.server.getResponse(this.config.isBinaryFormat()) : this.server.getDrcNetResponse(this.config.isBinaryFormat());
                    messageId.meet(response.getMid());
                    switch (response.getType()) {
                        case 100:
                            processDataMessage(response);
                            notifyDataMessage(response);
                            persistCheckpoint();
                            break;
                        case 300:
                            processRedirectMessage(response);
                            messageId.reset(this.config.getDbname());
                            break;
                        default:
                            throw new DRCClientException("Wrong DRCMessage type " + response.getType());
                            break;
                    }
                } else {
                    TimeUnit.SECONDS.sleep(1L);
                }
            } catch (Exception e) {
                if (isQuit()) {
                    sendLog2Listeners(LogLevel.WARN, "client interrupted for quitting");
                } else {
                    String message = e.getMessage();
                    if (message == null) {
                        message = "server closed unexpectedly";
                    }
                    sendLog2Listeners(LogLevel.WARN, "Connect to " + (this.server != null ? this.server.getUrl() : new String("unknown url")) + " failed because " + message);
                    for (StackTraceElement stackTraceElement : e.getStackTrace()) {
                        sendLog2Listeners(LogLevel.WARN, stackTraceElement.toString());
                    }
                    e.printStackTrace();
                    int i = this.retriedTimes;
                    this.retriedTimes = i + 1;
                    if (i >= this.config.getMaxRetriedTimes()) {
                        throw new RuntimeException(e);
                    }
                    messageId.reset(this.config.getDbname());
                    RedirectMessage redirectMessage = new RedirectMessage();
                    redirectMessage.setId(0L);
                    redirectMessage.setType(300);
                    redirectMessage.setDelayed(10);
                    sendLog2Listeners(LogLevel.WARN, "After broken, retry cluster manager " + this.retriedTimes + " out of " + this.config.getMaxRetriedTimes());
                    if (this.server == null) {
                        this.server = new ServerProxy("retry", this.config);
                    }
                    if (!this.server.setResponse(redirectMessage)) {
                        sendLog2Listeners(LogLevel.ERROR, "Fail to retry");
                        throw new RuntimeException(e);
                    }
                }
            }
        }
        this.status = Status.STOP;
        try {
            resetServer();
            sendLog2Listeners(LogLevel.INFO, "Out of the thread loop");
            if (this.binlogfile != null) {
                this.binlogfile.close();
                sendLog2Listeners(LogLevel.INFO, "Closed log file");
            }
        } catch (Exception e2) {
            System.err.print("Close local file failed");
        }
    }

    private void persistCheckpoint() throws IOException {
        if (this.recordCheckpointTime < this.checkpointPeriod) {
            this.recordCheckpointTime++;
            return;
        }
        if (this.binlogfile != null) {
            this.binlogfile.writeLine(DRCConfig.POSITION_INFO + this.config.getCheckpoint().toString());
        }
        this.recordCheckpointTime = 0;
    }

    private void notifyDataMessage(Message message) {
        DataMessage dataMessage = (DataMessage) message;
        for (Listener listener : this.listeners) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                if (this.cache == null) {
                    dataMessage.addAttribute(Message.SOURCEIPANDPORT, this.storeIpAndPort);
                    listener.notify(dataMessage);
                } else if (this.cache.isReady()) {
                    DataMessage readyRecords = this.cache.getReadyRecords();
                    readyRecords.addAttribute(Message.SOURCEIPANDPORT, this.storeIpAndPort);
                    listener.notify(readyRecords);
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 - this.lastNotifiedTime > this.notifiedPeriod * 1000) {
                    listener.notifyRuntimeLog("Info", "notify consume: " + (this.accumulatedTime / this.count) + " ms");
                    this.lastNotifiedTime = currentTimeMillis2;
                    this.accumulatedTime = 0L;
                    this.count = 0L;
                } else {
                    this.accumulatedTime += currentTimeMillis2 - currentTimeMillis;
                    this.count++;
                }
            } catch (Exception e) {
                listener.handleException(e);
            }
        }
    }

    private void processDataMessage(Message message) throws IOException {
        Iterator<DataMessage.Record> it = ((DataMessage) message).getRecordList().iterator();
        while (it.hasNext()) {
            DataMessage.Record next = it.next();
            processColumnFilter(next);
            Checkpoint checkpoint = this.config.getCheckpoint();
            switch (AnonymousClass1.$SwitchMap$com$aliyun$drc$client$message$DataMessage$Record$Type[next.getOpt().ordinal()]) {
                case DataType.DT_INT8 /* 1 */:
                    int i = this.accumulatedHeartbeats + 1;
                    this.accumulatedHeartbeats = i;
                    if (i >= this.oneHbEverySeconds) {
                        String timestamp = next.getTimestamp();
                        if (checkpoint.getTimestamp() != null && !checkpoint.getTimestamp().equals(timestamp) && this.retriedTimes != 0) {
                            this.retriedTimes = 0;
                        }
                        checkpoint.setTimestamp(timestamp);
                        this.accumulatedHeartbeats = 0;
                        if (this.cache == null) {
                            break;
                        } else {
                            this.cache.addRecord(next);
                            break;
                        }
                    } else {
                        it.remove();
                        break;
                    }
                case DataType.DT_UINT8 /* 2 */:
                case DataType.DT_INT16 /* 3 */:
                    if (checkpoint.equals(next.getCheckpoint()) && this.retriedTimes != 0) {
                        this.retriedTimes = 0;
                    }
                    this.config.setInstance(next.getServerId());
                    if (this.cache == null) {
                        break;
                    } else {
                        this.cache.addRecord(next);
                        break;
                    }
                    break;
                default:
                    if (!checkpoint.equals(next.getCheckpoint()) && this.retriedTimes != 0) {
                        this.retriedTimes = 0;
                    }
                    this.config.setInstance(next.getServerId());
                    if (this.cache == null) {
                        break;
                    } else {
                        this.cache.addRecord(next);
                        break;
                    }
                    break;
            }
        }
    }

    private void processColumnFilter(DataMessage.Record record) {
        List<String> colNamesWithMapping;
        if (this.filter.getIsAllMatch() || (colNamesWithMapping = DataFilterUtil.getColNamesWithMapping(record.getDbname(), record.getTablename(), this.filter)) == null) {
            return;
        }
        Iterator<DataMessage.Record.Field> it = record.getFieldList().iterator();
        while (it.hasNext()) {
            if (!DataFilterUtil.isColInArray(it.next().getFieldname(), colNamesWithMapping)) {
                it.remove();
            }
        }
    }

    private void processRedirectMessage(Message message) throws Exception {
        sendLog2Listeners(LogLevel.INFO, "Redirect to taskName: " + this.config.getDbname());
        if (((RedirectMessage) message).getDelayed() > 0) {
            TimeUnit.SECONDS.sleep(r0.getDelayed());
        }
        connectClusterManager(this.config.getClusterManagerAddresses());
    }

    @Override // com.aliyun.drc.client.DRCClient
    public synchronized Thread startService() throws Exception {
        if (this.status != Status.INIT) {
            throw new Exception("server state is error,current state:" + this.status);
        }
        connectClusterManager(this.config.getClusterManagerAddresses());
        sendLog2Listeners(LogLevel.INFO, "start service successfully");
        clearParams();
        this.serviceThread = new Thread(this);
        this.serviceThread.start();
        sendLog2Listeners(LogLevel.INFO, "getting data from daemon server");
        this.status = Status.START;
        return this.serviceThread;
    }

    private void clearParams() {
        this.accumulatedHeartbeats = 0;
        this.retriedTimes = 0;
        this.accumulatedTime = 0L;
        this.count = 0L;
        this.cache = null;
        this.checkpointPeriod = 0;
        this.recordCheckpointTime = 0;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public synchronized void resetService() throws Exception {
        if (this.status != Status.STOP) {
            throw new Exception("server state is error,current state:" + this.status);
        }
        setQuit(false);
        this.status = Status.INIT;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public synchronized void stopService() throws Exception {
        if (this.status != Status.START) {
            throw new Exception("server state is error,current state:" + this.status);
        }
        sendLog2Listeners(LogLevel.INFO, "Set quit true");
        setQuit(true);
        sendLog2Listeners(LogLevel.INFO, "Judge if thread is null");
        if (this.serviceThread != null && this.serviceThread.isAlive() && this.serviceThread != Thread.currentThread()) {
            sendLog2Listeners(LogLevel.INFO, "Wait for the thread stopped");
            this.serviceThread.interrupt();
            this.serviceThread.join();
        }
        this.status = Status.STOP;
        this.storeIpAndPort = null;
        sendLog2Listeners(LogLevel.INFO, "Service stopped.");
    }

    @Override // com.aliyun.drc.client.DRCClient
    public final void addListener(Listener listener) {
        this.listeners.add(listener);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public String getInstance() {
        return this.config.getInstance();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void trimLongType() {
        this.config.trimLongType();
    }

    private final Checkpoint getLocalInfo(String str) throws IOException, DRCClientException {
        LocalityFile localityFile = new LocalityFile(str, 0, 0L);
        String str2 = null;
        while (true) {
            String readLine = localityFile.readLine();
            if (readLine == null) {
                break;
            }
            if (isLocationInfo(readLine)) {
                str2 = extractLocationInfo(readLine);
            }
        }
        if (str2 == null || str2.isEmpty()) {
            throw new DRCClientException("Local file " + str + " not exists or is empty");
        }
        Checkpoint checkpoint = new Checkpoint();
        String[] split = str2.split(":");
        checkpoint.setServerId(split[1] + "-" + split[2]);
        checkpoint.setPosition(split[4] + BinlogPos.AT + split[3]);
        checkpoint.setTimestamp(split[5]);
        if (split.length > 6) {
            checkpoint.setRecordId(split[6]);
        }
        return checkpoint;
    }

    private static boolean isLocationInfo(String str) {
        return str.contains(DRCConfig.POSITION_INFO);
    }

    private static final String extractLocationInfo(String str) {
        if (str == null) {
            return null;
        }
        return str.split(" ")[2];
    }

    @Override // com.aliyun.drc.client.DRCClient
    public final String getDbName() {
        return this.config.getDbname();
    }

    @Override // com.aliyun.drc.client.DRCClient
    @Deprecated
    public void addDataFilter(DataFilter dataFilter) {
        this.filter = dataFilter;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public final void addDataFilter(DataFilterBase dataFilterBase) {
        this.filter = dataFilterBase;
    }

    @Override // com.aliyun.drc.client.DRCClient
    @Deprecated
    public final void addWhereFilter(DataFilterBase dataFilterBase) {
        this.config.setWhereFilters(dataFilterBase.toString());
    }

    @Override // com.aliyun.drc.client.DRCClient
    @Deprecated
    public final void useStrictFilter() {
        this.config.setFilterUnchangedRecords();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public String getTaskName() {
        return Location.PRIM_META;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setHeartbeatFrequency(int i) {
        this.oneHbEverySeconds = i;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0026. Please report as an issue. */
    private void sendLog2Listeners(LogLevel logLevel, String str) {
        for (Listener listener : this.listeners) {
            try {
                switch (AnonymousClass1.$SwitchMap$com$aliyun$drc$client$enums$LogLevel[logLevel.ordinal()]) {
                    case DataType.DT_INT8 /* 1 */:
                        listener.notifyRuntimeLog("ERROR", str);
                        break;
                    case DataType.DT_UINT8 /* 2 */:
                        listener.notifyRuntimeLog("WARN", str);
                        break;
                    case DataType.DT_INT16 /* 3 */:
                        listener.notifyRuntimeLog("INFO", str);
                        break;
                }
            } catch (Exception e) {
                listener.handleException(e);
            }
        }
    }

    private void setQuit(boolean z) {
        this.quit = z;
    }

    private boolean isQuit() {
        return this.quit;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void requireTxnMark(boolean z) {
        this.config.requireTxnMark(z);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setNotifyRuntimePeriodInSec(long j) {
        this.notifiedPeriod = j;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setNumOfRecordsPerBatch(int i) {
        this.config.setNumOfRecordsPerBatch(i);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public DBType getDatabaseType() throws DRCClientException {
        try {
            return new ClusterManagers(this.config.getClusterManagerAddresses()).getDatabaseType(this.config);
        } catch (MalformedURLException e) {
            throw new DRCClientException(e.getMessage());
        } catch (UnknownHostException e2) {
            throw new DRCClientException(e2.getMessage());
        }
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setGroup(String str) {
        this.config.setGroup(str);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setSubGroup(String str) {
        this.config.setSubGroup(str);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setDrcMark(String str) {
        this.config.setDRCMark(str);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void askSelfUnit() {
        this.config.useDrcMark();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void setBlackList(String str) {
        this.blackList = str;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void suspend() {
        this.suspend = true;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void resume() {
        this.suspend = false;
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void usePublicIp() {
        this.config.usePublicIp();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void useCaseSensitive() {
        this.config.useCaseSensitive();
    }

    @Override // com.aliyun.drc.client.DRCClient
    public void useCRC32Check() {
        this.config.setUseCheckCRC(true);
    }

    @Override // com.aliyun.drc.client.DRCClient
    public DBType getDBType() {
        return this.dbType;
    }

    public void setDbType(DBType dBType) {
        this.dbType = dBType;
    }

    public boolean validateDataFilter() throws DRCClientException {
        return this.filter.validateFilter(this.dbType);
    }
}
