/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.tddl.common.sync;

import com.taobao.tddl.common.Monitor;
import com.taobao.tddl.common.sync.BucketSwitcher;
import com.taobao.tddl.common.sync.NoStrictBucketSwitcher;
import com.taobao.tddl.common.sync.ReplicationTaskListener;
import com.taobao.tddl.common.sync.RowBasedReplicaterMBean;
import com.taobao.tddl.common.sync.RowBasedReplicationContext;
import com.taobao.tddl.common.sync.RowBasedReplicationExecutor;
import com.taobao.tddl.common.sync.RowBasedReplicationTask;
import com.taobao.tddl.common.util.TDDLMBeanServer;
import com.taobao.tddl.interact.rule.bean.SqlType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RowBasedReplicater
implements ReplicationTaskListener,
RowBasedReplicaterMBean {
    private static final Log logger = LogFactory.getLog(RowBasedReplicater.class);
    private static final int DEFAULT_THREAD_POOL_SIZE = 16;
    private static final int DEFAULT_WORK_QUEUE_SIZE = 4096;
    public static final int DEFAULT_BATCH_DELETE_SIZE = 1280;
    public static final int DEFAULT_BATCH_UPDATE_SIZE = 512;
    private int threadPoolSize = 16;
    private int workQueueSize = 4096;
    private ThreadPoolExecutor replicationExecutor;
    protected ThreadPoolExecutor deleteSyncLogExecutor;
    protected ThreadPoolExecutor updateSyncLogExecutor;
    private NoStrictBucketSwitcher<RowBasedReplicationContext> deleteBucketSwitcher;
    private NoStrictBucketSwitcher<RowBasedReplicationContext> updateBucketSwitcher;

    public void init() {
        this.replicationExecutor = new ThreadPoolExecutor(this.threadPoolSize, this.threadPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(this.workQueueSize), new ThreadPoolExecutor.CallerRunsPolicy());
        this.deleteSyncLogExecutor = new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                logger.warn((Object)"A DeleteSyncLogTask discarded");
            }
        });
        this.updateSyncLogExecutor = new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                logger.warn((Object)"A UpdateSyncLogTask discarded");
            }
        });
        BucketSwitcher.BucketTaker<RowBasedReplicationContext> deleteBucketTaker = new BucketSwitcher.BucketTaker<RowBasedReplicationContext>((ExecutorService)this.deleteSyncLogExecutor){

            @Override
            public Runnable createTakeAwayTask(Collection<RowBasedReplicationContext> list) {
                return new DeleteSyncLogTask(list);
            }
        };
        BucketSwitcher.BucketTaker<RowBasedReplicationContext> updateBucketTaker = new BucketSwitcher.BucketTaker<RowBasedReplicationContext>((ExecutorService)this.updateSyncLogExecutor){

            @Override
            public Runnable createTakeAwayTask(Collection<RowBasedReplicationContext> list) {
                return new UpdateSyncLogTask(list);
            }
        };
        this.deleteBucketSwitcher = new NoStrictBucketSwitcher<RowBasedReplicationContext>(deleteBucketTaker, 1280);
        this.updateBucketSwitcher = new NoStrictBucketSwitcher<RowBasedReplicationContext>(updateBucketTaker, 512);
        TDDLMBeanServer.registerMBean(this, "Replicater");
    }

    public void replicate(Collection<RowBasedReplicationContext> contexts) {
        contexts = this.mergeAndReduce(contexts);
        long time0 = System.currentTimeMillis();
        for (RowBasedReplicationContext context : contexts) {
            try {
                this.replicationExecutor.execute(new RowBasedReplicationTask(context, this));
            }
            catch (Throwable t) {
                logger.warn((Object)"[SyncServer]replicate failed", t);
            }
        }
        long timeused = System.currentTimeMillis() - time0;
        logger.warn((Object)(contexts.size() + " replication logs processe tasks accepted, time used:" + timeused));
        Monitor.add("TDDL", "Sync", "ReplicationTasksAccepted", contexts.size(), timeused);
    }

    private Collection<RowBasedReplicationContext> mergeAndReduce(Collection<RowBasedReplicationContext> contexts) {
        HashMap<String, RowBasedReplicationContext> sortMap = new HashMap<String, RowBasedReplicationContext>(contexts.size());
        ArrayList<RowBasedReplicationContext> noMergeList = new ArrayList<RowBasedReplicationContext>(contexts.size());
        for (RowBasedReplicationContext context : contexts) {
            if (SqlType.INSERT.equals((Object)context.getSqlType())) {
                noMergeList.add(context);
                continue;
            }
            String key = context.getMasterLogicTableName() + "#" + context.getPrimaryKeyValue() + "#" + context.getPrimaryKeyColumn();
            RowBasedReplicationContext last = (RowBasedReplicationContext)sortMap.get(key);
            if (last == null) {
                sortMap.put(key, context);
                continue;
            }
            if (context.getCreateTime().equals(last.getCreateTime())) {
                noMergeList.add(context);
                continue;
            }
            if (context.getCreateTime().after(last.getCreateTime())) {
                sortMap.put(key, context);
                continue;
            }
            logger.warn((Object)new StringBuilder("Dropping a log:id=").append(context.getSyncLogId()).append(",LogicTableName=").append(context.getMasterLogicTableName()).append(",").append(context.getPrimaryKeyColumn()).append("=").append(context.getPrimaryKeyValue()));
            this.deleteBucketSwitcher.pourin(context);
        }
        noMergeList.addAll(sortMap.values());
        return noMergeList;
    }

    @Override
    public void onTaskCompleted(RowBasedReplicationContext context, boolean success) {
        if (success) {
            this.deleteBucketSwitcher.pourin(context);
        } else {
            this.updateBucketSwitcher.pourin(context);
        }
    }

    public void destroy() {
    }

    @Override
    public int getReplicationQueueSize() {
        return this.replicationExecutor.getQueue().size();
    }

    @Override
    public int getDeleteSyncLogQueueSize() {
        return this.deleteSyncLogExecutor.getQueue().size();
    }

    @Override
    public int getUpdateSyncLogQueueSize() {
        return this.updateSyncLogExecutor.getQueue().size();
    }

    @Override
    public long getCompletedReplicationCount() {
        return this.replicationExecutor.getCompletedTaskCount();
    }

    @Override
    public long getCompletedDeleteSyncLogCount() {
        return this.deleteSyncLogExecutor.getCompletedTaskCount();
    }

    @Override
    public long getCompletedUpdateSyncLogCount() {
        return this.updateSyncLogExecutor.getCompletedTaskCount();
    }

    @Override
    public int getDeleteBatchSize() {
        return this.deleteBucketSwitcher.getBucketSize();
    }

    @Override
    public void setDeleteBatchSize(int bucketSize) {
        this.deleteBucketSwitcher.setBucketSize(bucketSize);
    }

    @Override
    public int getUpdateBatchSize() {
        return this.updateBucketSwitcher.getBucketSize();
    }

    @Override
    public void setUpdateBatchSize(int bucketSize) {
        this.updateBucketSwitcher.setBucketSize(bucketSize);
    }

    public void setThreadPoolSize(int threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    public void setWorkQueueSize(int workQueueSize) {
        this.workQueueSize = workQueueSize;
    }

    static class InUpdateSyncLogTask
    implements Runnable {
        private final Collection<RowBasedReplicationContext> contexts;
        private final int onceSize;

        public InUpdateSyncLogTask(Collection<RowBasedReplicationContext> contexts, int size) {
            this.contexts = contexts;
            this.onceSize = size;
        }

        @Override
        public void run() {
            RowBasedReplicationExecutor.inUpdateSyncLog(this.contexts, 0L, this.onceSize);
        }
    }

    public static class InDeleteSyncLogTask
    implements Runnable {
        private final Collection<RowBasedReplicationContext> contexts;
        private final int onceSize;

        public InDeleteSyncLogTask(Collection<RowBasedReplicationContext> contexts, int size) {
            this.contexts = contexts;
            this.onceSize = size;
        }

        @Override
        public void run() {
            RowBasedReplicationExecutor.inDeleteSyncLog(this.contexts, this.onceSize);
        }
    }

    static class UpdateSyncLogTask
    implements Runnable {
        private final Collection<RowBasedReplicationContext> contexts;

        public UpdateSyncLogTask(Collection<RowBasedReplicationContext> contexts) {
            this.contexts = contexts;
        }

        @Override
        public void run() {
            RowBasedReplicationExecutor.batchUpdateSyncLog(this.contexts, 0L);
        }
    }

    public static class DeleteSyncLogTask
    implements Runnable {
        private final Collection<RowBasedReplicationContext> contexts;

        public DeleteSyncLogTask(Collection<RowBasedReplicationContext> contexts) {
            this.contexts = contexts;
        }

        @Override
        public void run() {
            RowBasedReplicationExecutor.batchDeleteSyncLog(this.contexts);
        }
    }
}

