/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.drc.clusterclient.impl;

import com.aliyun.drc.client.Listener;
import com.aliyun.drc.client.message.DataMessage;
import com.aliyun.drc.clusterclient.impl.DataTypeEnum;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.aliyun.drc.clusterclient.partition.CloudPartitionImpl;
import com.taobao.drc.clusterclient.MessageNotifier;
import com.taobao.drc.clusterclient.NotifyController;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudClientListenerAdapter
implements Listener {
    private static Logger logger = LoggerFactory.getLogger(CloudClientListenerAdapter.class);
    private final CloudPartitionImpl partition;
    private final NotifyController<ClusterMessage> notifyController;
    private final MessageNotifier<ClusterMessage> notifier;
    private final DataTypeEnum dataTypeEnum;

    public CloudClientListenerAdapter(CloudPartitionImpl partition, NotifyController<ClusterMessage> notifyController, MessageNotifier<ClusterMessage> notifier, String dataType) {
        this.partition = partition;
        this.notifyController = notifyController;
        this.notifier = notifier;
        this.dataTypeEnum = dataType.equalsIgnoreCase("dml") ? DataTypeEnum.DML : DataTypeEnum.ALL;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notify(DataMessage message) throws Exception {
        if (this.notifyController.isClosed()) {
            return;
        }
        this.notifyController.setNotifying(true);
        try {
            while (!this.notifyController.isValid()) {
                if (this.notifyController.isClosed()) {
                    logger.warn("Client for partition [{}] was stopped", (Object)this.partition.getName());
                    return;
                }
                Thread.sleep(1L);
            }
            ArrayList<ClusterMessage> messages = new ArrayList<ClusterMessage>();
            for (DataMessage.Record record : message.getRecordList()) {
                ClusterMessage msg = new ClusterMessage(record, this.partition);
                switch (record.getOpt()) {
                    case HEARTBEAT: {
                        this.partition.bookCheckpoint(msg.getCheckpoint());
                        this.partition.ackAsConsumed(msg.getCheckpoint());
                        messages.add(msg);
                        break;
                    }
                    case DDL: {
                        if (this.dataTypeEnum == DataTypeEnum.DML) break;
                    }
                    default: {
                        messages.add(msg);
                        this.partition.bookCheckpoint(msg.getCheckpoint());
                        this.notifyController.onNotified((Object)msg);
                    }
                }
            }
            if (messages.size() == 0) {
                return;
            }
            this.notifier.onMessages(messages);
        }
        finally {
            this.notifyController.setNotifying(false);
        }
    }

    public void notifyRuntimeLog(String level, String log) throws Exception {
        if (level.equalsIgnoreCase("INFO")) {
            logger.info(log);
        } else if (level.equalsIgnoreCase("ERROR")) {
            logger.error(log);
        } else if (level.equalsIgnoreCase("WARN")) {
            logger.warn(log);
        } else {
            logger.warn(log);
        }
    }

    public void handleException(Exception e) {
        logger.warn("Caught exception in listener adapter for partition [{}], stop partition...", (Object)this.partition.getName(), (Object)e);
        this.notifyController.close();
    }
}

