package com.aliyun.drc.clusterclient.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.aliyun.drc.client.DRCClient;
import com.aliyun.drc.client.DRCClientFactory;
import com.aliyun.drc.client.DataFilter;
import com.aliyun.drc.client.DataFilterBase;
import com.aliyun.drc.clusterclient.ClusterContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.aliyun.drc.clusterclient.partition.Checkpoint;
import com.aliyun.drc.clusterclient.partition.CloudPartitionImpl;
import com.taobao.drc.clusterclient.MessageNotifier;
import com.taobao.drc.clusterclient.NotifyController;
import com.taobao.drc.clusterclient.PartitionClient;
import com.taobao.drc.clusterclient.clustermanager.PartitionInfo;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.util.Time;
import java.io.IOException;
import java.lang.Thread;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/drc/clusterclient/impl/CloudPartitionClient.class */
public class CloudPartitionClient implements PartitionClient<Checkpoint> {
    private static final Logger logger = LoggerFactory.getLogger(CloudPartitionClient.class);
    private final ClusterContext context;
    private final PartitionInfo partitionInfo;
    private final MessageNotifier<ClusterMessage> notifier;
    private final String initOffset;
    private final NotifyController<ClusterMessage> notifyController = new NotifyController<>(Time.SYSTEM);
    private final CloudPartitionImpl partition;
    private final DRCClient client;
    private Thread clientThread;
    private String filterDbTable;

    public CloudPartitionClient(ClusterContext clusterContext, PartitionInfo partitionInfo, MessageNotifier<ClusterMessage> messageNotifier, String str, String str2) throws Exception {
        this.context = clusterContext;
        this.partitionInfo = partitionInfo;
        this.notifier = messageNotifier;
        this.initOffset = str;
        this.partition = new CloudPartitionImpl(clusterContext, partitionInfo);
        Properties properties = new Properties();
        properties.putAll(clusterContext.getProperties());
        if (partitionInfo.getTables() != null && !partitionInfo.getTables().isEmpty()) {
            properties.setProperty(ClusterContext.TAG_META_MAPPING, JSON.toJSONString(partitionInfo.getTables(), new SerializerFeature[]{SerializerFeature.SortField}));
        }
        this.client = DRCClientFactory.create(DRCClientFactory.Type.MYSQL, properties);
        this.filterDbTable = str2;
        initClient();
    }

    private void initClient() throws Exception {
        if (this.context.isUsePublicIp()) {
            this.client.usePublicIp();
        }
        this.client.addDataFilter(parseDataFilter());
        this.client.addListener(new CloudClientListenerAdapter(this.partition, this.notifyController, this.notifier, this.context.getDataType()));
        this.client.initService(this.context.getAppGroupUserName(), this.partitionInfo.getTopic(), this.context.getAppGroupPassword(), parseCheckpoint(this.initOffset), (String) null);
    }

    private com.aliyun.drc.client.impl.Checkpoint parseCheckpoint(String str) {
        Checkpoint checkpoint = new Checkpoint(str, this.partition);
        com.aliyun.drc.client.impl.Checkpoint checkpoint2 = new com.aliyun.drc.client.impl.Checkpoint();
        if (checkpoint.getInstance() != null) {
            checkpoint2.setServerId(checkpoint.getInstance());
        }
        if (checkpoint.getFilePosition() != null) {
            checkpoint2.setPosition(checkpoint.getFilePosition());
        }
        if (checkpoint.getTimestamp() != null) {
            checkpoint2.setTimestamp(checkpoint.getTimestamp());
        }
        if (checkpoint.getId() != null) {
            checkpoint2.setRecordId(checkpoint.getId());
        }
        return checkpoint2;
    }

    private DataFilterBase parseDataFilter() {
        if (StringUtils.isNotEmpty(this.filterDbTable)) {
            return new DataFilter(this.filterDbTable);
        }
        String makeFilterString = this.partitionInfo.makeFilterString();
        logger.info("CloudPartitionClient Filter for partition [{}][{}][{}]: [{}]", new Object[]{this.context.getAppGuid(), this.context.getAppGroup(), this.partitionInfo.getPartition(), makeFilterString});
        return new DataFilter(makeFilterString);
    }

    public void start() throws IOException {
        if (this.clientThread != null) {
            throw new IllegalStateException("Client for partition [" + this.partitionInfo + "] has already started");
        }
        try {
            this.clientThread = this.client.startService();
            this.clientThread.setName("DTS-DRCClient-" + this.partition.getName() + "-Thread");
            this.clientThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.aliyun.drc.clusterclient.impl.CloudPartitionClient.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    CloudPartitionClient.logger.error(thread.getName() + ", " + th.toString());
                }
            });
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw new IllegalStateException(e2);
        }
    }

    public boolean isActive() {
        return this.clientThread != null && this.clientThread.isAlive();
    }

    public String offset() {
        Checkpoint checkpoint;
        if (this.client == null || (checkpoint = (Checkpoint) this.partition.getCheckpoint()) == null) {
            return null;
        }
        return checkpoint.toString();
    }

    public NotifyController getNotifyController() {
        return this.notifyController;
    }

    public IPartition<Checkpoint> getPartition() {
        return this.partition;
    }

    public void close() throws IOException {
        if (this.clientThread == null) {
            return;
        }
        try {
            this.client.stopService();
            this.clientThread.join();
        } catch (IOException e) {
            logger.error("Failed to stop client", e);
            throw e;
        } catch (Exception e2) {
            logger.error("Failed to stop client", e2);
            throw new IllegalStateException(e2);
        }
    }

    public Map<String, Object> getMetrics() {
        TreeMap treeMap = new TreeMap();
        treeMap.putAll(this.partition.getMetrics());
        treeMap.putAll(this.notifyController.getMetrics());
        return treeMap;
    }
}
