/*
 * Decompiled with CFR 0.152.
 */
package com.kingbase8.core.v3.replication;

import com.kingbase8.copy.CopyDual;
import com.kingbase8.core.KBStream;
import com.kingbase8.core.QueryExecutor;
import com.kingbase8.core.ReplicationProtocol;
import com.kingbase8.core.v3.replication.V3KBReplicationStream;
import com.kingbase8.replication.KBReplicationStream;
import com.kingbase8.replication.ReplicationType;
import com.kingbase8.replication.fluent.CommonOptions;
import com.kingbase8.replication.fluent.logical.LogicalReplicationOptions;
import com.kingbase8.replication.fluent.physical.PhysicalReplicationOptions;
import com.kingbase8.util.GT;
import com.kingbase8.util.KBLOGGER;
import com.kingbase8.util.KSQLException;
import com.kingbase8.util.KSQLState;
import com.kingbase8.util.TraceLogger;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.logging.Level;

public class V3ReplicationProtocol
implements ReplicationProtocol {
    private final QueryExecutor queryExecutorImpl;
    private final KBStream kbStream;

    public V3ReplicationProtocol(QueryExecutor queryExecutorImpl, KBStream kbStream) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.queryExecutorImpl = queryExecutorImpl;
        this.kbStream = kbStream;
    }

    @Override
    public KBReplicationStream startLogical(LogicalReplicationOptions options) throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        String query = this.createStartLogicalQuery(options);
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.initReplication(query, options, ReplicationType.LOGICAL);
    }

    @Override
    public KBReplicationStream startPhysical(PhysicalReplicationOptions options) throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        String query = this.createStartPhysicalQuery(options);
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.initReplication(query, options, ReplicationType.PHYSICAL);
    }

    private KBReplicationStream initReplication(String query, CommonOptions options, ReplicationType replicType) throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        KBLOGGER.log(Level.FINEST, " FE=> StartReplication(query: {0})", query);
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.configureSocketTimeout(options);
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        CopyDual copyDual = (CopyDual)this.queryExecutorImpl.startCopy(query, true);
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return new V3KBReplicationStream(copyDual, options.getStartLSNPosition(), options.getStatusInterval(), replicType);
    }

    private String createStartPhysicalQuery(PhysicalReplicationOptions options) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        StringBuilder _builder = new StringBuilder();
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        _builder.append("START_REPLICATION");
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        if (options.getSlotName() != null) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            _builder.append(" SLOT ").append(options.getSlotName());
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        }
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        _builder.append(" PHYSICAL ").append(options.getStartLSNPosition().asString());
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return _builder.toString();
    }

    private String createStartLogicalQuery(LogicalReplicationOptions options) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        StringBuilder _builder = new StringBuilder();
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        _builder.append("START_REPLICATION SLOT ").append(options.getSlotName()).append(" LOGICAL ").append(options.getStartLSNPosition().asString());
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        Properties slotOpts = options.getSlotOptions();
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        if (slotOpts.isEmpty()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            return _builder.toString();
        }
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        _builder.append(" (");
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        boolean isFirst = true;
        for (String name : slotOpts.stringPropertyNames()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            if (isFirst) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                isFirst = false;
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            } else {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                _builder.append(", ");
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            }
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            _builder.append('\"');
            _builder.append(name).append('\"');
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            _builder.append(" ").append('\'');
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            _builder.append(slotOpts.getProperty(name)).append('\'');
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        }
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        _builder.append(")");
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return _builder.toString();
    }

    private void configureSocketTimeout(CommonOptions options) throws KSQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        if (options.getStatusInterval() == 0) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            return;
        }
        try {
            int minimalTimeOut;
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            int previousTimeOut = this.kbStream.getSocket().getSoTimeout();
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            if (previousTimeOut > 0) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                minimalTimeOut = Math.min(previousTimeOut, options.getStatusInterval());
            } else {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                minimalTimeOut = options.getStatusInterval();
            }
            if (this.kbStream.isUseDispatch()) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                this.kbStream.getSocket().setSoTimeout(minimalTimeOut);
                this.kbStream.setSocketTimeout(1);
                KBLOGGER.log(Level.INFO, "socketTimeout is " + minimalTimeOut, new Object[0]);
            } else {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                this.kbStream.getSocket().setSoTimeout(minimalTimeOut);
            }
            this.kbStream.setMinStreamAvailableCheckDelay(0);
        }
        catch (IOException ioe) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            throw new KSQLException(GT.tr("The _connection attempt failed.", new Object[0]), KSQLState.CONNECTION_UNABLE_TO_CONNECT, (Throwable)ioe);
        }
    }
}

