package org.apache.cassandra.db;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.apache.cassandra.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.$internal.com.google.common.collect.ArrayListMultimap;
import org.apache.cassandra.$internal.com.google.common.collect.Iterables;
import org.apache.cassandra.$internal.com.google.common.collect.Lists;
import org.apache.cassandra.$internal.com.google.common.collect.Multimap;
import org.apache.cassandra.$internal.com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.CFPropDefs;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/db/BatchlogManager.class */
public class BatchlogManager implements BatchlogManagerMBean {
    private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
    private static final long REPLAY_INTERVAL = 60000;
    private static final int PAGE_SIZE = 128;
    private final AtomicLong totalBatchesReplayed = new AtomicLong();
    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
    public static final BatchlogManager instance = new BatchlogManager();
    private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/db/BatchlogManager$Batch.class */
    public static class Batch {
        private final UUID id;
        private final long writtenAt;
        private final ByteBuffer data;
        private final int version;
        private List<ReplayWriteResponseHandler> replayHandlers;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/cassandra/db/BatchlogManager$Batch$ReplayWriteResponseHandler.class */
        public static class ReplayWriteResponseHandler extends WriteResponseHandler {
            private final Set<InetAddress> undelivered;
            static final /* synthetic */ boolean $assertionsDisabled;

            public ReplayWriteResponseHandler(Collection<InetAddress> collection) {
                super(collection, Collections.emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
                this.undelivered = Collections.newSetFromMap(new ConcurrentHashMap());
                this.undelivered.addAll(collection);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.cassandra.service.AbstractWriteResponseHandler
            public int totalBlockFor() {
                return this.naturalEndpoints.size();
            }

            @Override // org.apache.cassandra.service.WriteResponseHandler, org.apache.cassandra.service.AbstractWriteResponseHandler, org.apache.cassandra.net.IAsyncCallback
            public void response(MessageIn messageIn) {
                boolean remove = this.undelivered.remove(messageIn.from);
                if (!$assertionsDisabled && !remove) {
                    throw new AssertionError();
                }
                super.response(messageIn);
            }

            static {
                $assertionsDisabled = !BatchlogManager.class.desiredAssertionStatus();
            }
        }

        public Batch(UUID uuid, long j, ByteBuffer byteBuffer, int i) {
            this.id = uuid;
            this.writtenAt = j;
            this.data = byteBuffer;
            this.version = i;
        }

        public int replay(RateLimiter rateLimiter) throws IOException {
            int calculateHintTTL;
            BatchlogManager.logger.debug("Replaying batch {}", this.id);
            List<Mutation> replayingMutations = replayingMutations();
            if (replayingMutations.isEmpty() || (calculateHintTTL = calculateHintTTL(replayingMutations)) <= 0) {
                return 0;
            }
            this.replayHandlers = sendReplays(replayingMutations, this.writtenAt, calculateHintTTL);
            rateLimiter.acquire(this.data.remaining());
            return this.replayHandlers.size();
        }

        public void finish() {
            for (int i = 0; i < this.replayHandlers.size(); i++) {
                try {
                    this.replayHandlers.get(i).get();
                } catch (WriteTimeoutException e) {
                    BatchlogManager.logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
                    writeHintsForUndeliveredEndpoints(i);
                    return;
                }
            }
        }

        private List<Mutation> replayingMutations() throws IOException {
            DataInputStream dataInputStream = new DataInputStream(ByteBufferUtil.inputStream(this.data));
            int readInt = dataInputStream.readInt();
            ArrayList arrayList = new ArrayList(readInt);
            for (int i = 0; i < readInt; i++) {
                Mutation deserialize = Mutation.serializer.deserialize((DataInput) dataInputStream, this.version);
                for (UUID uuid : deserialize.getColumnFamilyIds()) {
                    if (this.writtenAt <= SystemKeyspace.getTruncatedAt(uuid)) {
                        deserialize = deserialize.without(uuid);
                    }
                }
                if (!deserialize.isEmpty()) {
                    arrayList.add(deserialize);
                }
            }
            return arrayList;
        }

        private void writeHintsForUndeliveredEndpoints(int i) {
            try {
                List<Mutation> replayingMutations = replayingMutations();
                for (int i2 = i; i2 < this.replayHandlers.size(); i2++) {
                    Mutation mutation = replayingMutations.get(i2);
                    int calculateHintTTL = calculateHintTTL(replayingMutations);
                    ReplayWriteResponseHandler replayWriteResponseHandler = this.replayHandlers.get(i2);
                    if (calculateHintTTL > 0 && replayWriteResponseHandler != null) {
                        Iterator it = replayWriteResponseHandler.undelivered.iterator();
                        while (it.hasNext()) {
                            StorageProxy.writeHintForMutation(mutation, this.writtenAt, calculateHintTTL, (InetAddress) it.next());
                        }
                    }
                }
            } catch (IOException e) {
                BatchlogManager.logger.error("Cannot schedule hints for undelivered batch", e);
            }
        }

        private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> list, long j, int i) {
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<Mutation> it = list.iterator();
            while (it.hasNext()) {
                ReplayWriteResponseHandler sendSingleReplayMutation = sendSingleReplayMutation(it.next(), j, i);
                if (sendSingleReplayMutation != null) {
                    arrayList.add(sendSingleReplayMutation);
                }
            }
            return arrayList;
        }

        private ReplayWriteResponseHandler sendSingleReplayMutation(Mutation mutation, long j, int i) {
            HashSet hashSet = new HashSet();
            String keyspaceName = mutation.getKeyspaceName();
            Token token = StorageService.getPartitioner().getToken(mutation.key());
            for (InetAddress inetAddress : Iterables.concat(StorageService.instance.getNaturalEndpoints(keyspaceName, token), StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName))) {
                if (inetAddress.equals(FBUtilities.getBroadcastAddress())) {
                    mutation.apply();
                } else if (FailureDetector.instance.isAlive(inetAddress)) {
                    hashSet.add(inetAddress);
                } else {
                    StorageProxy.writeHintForMutation(mutation, j, i, inetAddress);
                }
            }
            if (hashSet.isEmpty()) {
                return null;
            }
            ReplayWriteResponseHandler replayWriteResponseHandler = new ReplayWriteResponseHandler(hashSet);
            MessageOut<Mutation> createMessage = mutation.createMessage();
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                MessagingService.instance().sendRR(createMessage, (InetAddress) it.next(), replayWriteResponseHandler, false);
            }
            return replayWriteResponseHandler;
        }

        private int calculateHintTTL(Collection<Mutation> collection) {
            int i = Integer.MAX_VALUE;
            Iterator<Mutation> it = collection.iterator();
            while (it.hasNext()) {
                i = Math.min(i, HintedHandOffManager.calculateHintTTL(it.next()));
            }
            return i - ((int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - this.writtenAt));
        }
    }

    /* loaded from: input_file:org/apache/cassandra/db/BatchlogManager$EndpointFilter.class */
    public static class EndpointFilter {
        private final String localRack;
        private final Multimap<String, InetAddress> endpoints;

        public EndpointFilter(String str, Multimap<String, InetAddress> multimap) {
            this.localRack = str;
            this.endpoints = multimap;
        }

        public Collection<InetAddress> filter() {
            Collection newArrayList;
            if (this.endpoints.values().size() == 1) {
                return this.endpoints.values();
            }
            ArrayListMultimap create = ArrayListMultimap.create();
            for (Map.Entry<String, InetAddress> entry : this.endpoints.entries()) {
                if (isValid(entry.getValue())) {
                    create.put(entry.getKey(), entry.getValue());
                }
            }
            if (create.size() <= 2) {
                return create.values();
            }
            if (create.size() - create.get((ArrayListMultimap) this.localRack).size() >= 2) {
                create.removeAll((Object) this.localRack);
            }
            if (create.keySet().size() == 1) {
                List<V> list = create.get((ArrayListMultimap) create.keySet().iterator().next());
                Collections.shuffle(list);
                return Lists.newArrayList(Iterables.limit(list, 2));
            }
            if (create.keySet().size() == 2) {
                newArrayList = create.keySet();
            } else {
                newArrayList = Lists.newArrayList(create.keySet());
                Collections.shuffle((List) newArrayList);
            }
            ArrayList arrayList = new ArrayList(2);
            Iterator it = Iterables.limit(newArrayList, 2).iterator();
            while (it.hasNext()) {
                List<V> list2 = create.get((ArrayListMultimap) it.next());
                arrayList.add(list2.get(getRandomInt(list2.size())));
            }
            return arrayList;
        }

        @VisibleForTesting
        protected boolean isValid(InetAddress inetAddress) {
            return !inetAddress.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(inetAddress);
        }

        @VisibleForTesting
        protected int getRandomInt(int i) {
            return ThreadLocalRandom.current().nextInt(i);
        }
    }

    public void start() {
        try {
            ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(MBEAN_NAME));
            batchlogTasks.scheduleWithFixedDelay(new WrappedRunnable() { // from class: org.apache.cassandra.db.BatchlogManager.1
                @Override // org.apache.cassandra.utils.WrappedRunnable
                public void runMayThrow() throws ExecutionException, InterruptedException {
                    BatchlogManager.this.replayAllFailedBatches();
                }
            }, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void shutdown() throws InterruptedException {
        batchlogTasks.shutdown();
        batchlogTasks.awaitTermination(60L, TimeUnit.SECONDS);
    }

    @Override // org.apache.cassandra.db.BatchlogManagerMBean
    public int countAllBatches() {
        return (int) QueryProcessor.executeInternal(String.format("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF), new Object[0]).one().getLong("count");
    }

    @Override // org.apache.cassandra.db.BatchlogManagerMBean
    public long getTotalBatchesReplayed() {
        return this.totalBatchesReplayed.longValue();
    }

    @Override // org.apache.cassandra.db.BatchlogManagerMBean
    public void forceBatchlogReplay() {
        startBatchlogReplay();
    }

    public Future<?> startBatchlogReplay() {
        return batchlogTasks.submit(new WrappedRunnable() { // from class: org.apache.cassandra.db.BatchlogManager.2
            @Override // org.apache.cassandra.utils.WrappedRunnable
            public void runMayThrow() throws ExecutionException, InterruptedException {
                BatchlogManager.this.replayAllFailedBatches();
            }
        });
    }

    public static Mutation getBatchlogMutationFor(Collection<Mutation> collection, UUID uuid, int i) {
        return getBatchlogMutationFor(collection, uuid, i, FBUtilities.timestampMicros());
    }

    @VisibleForTesting
    static Mutation getBatchlogMutationFor(Collection<Mutation> collection, UUID uuid, int i, long j) {
        ArrayBackedSortedColumns create = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
        new CFRowAdder(create, CFMetaData.BatchlogCf.comparator.builder().build(), j).add("data", serializeMutations(collection, i)).add("written_at", new Date(j / 1000)).add("version", Integer.valueOf(i));
        return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), create);
    }

    private static ByteBuffer serializeMutations(Collection<Mutation> collection, int i) {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        try {
            dataOutputBuffer.writeInt(collection.size());
            Iterator<Mutation> it = collection.iterator();
            while (it.hasNext()) {
                Mutation.serializer.serialize(it.next(), (DataOutputPlus) dataOutputBuffer, i);
            }
            return dataOutputBuffer.asByteBuffer();
        } catch (IOException e) {
            throw new AssertionError();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void replayAllFailedBatches() throws ExecutionException, InterruptedException {
        logger.debug("Started replayAllFailedBatches");
        RateLimiter create = RateLimiter.create(DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size() == 0 ? Double.MAX_VALUE : r0 * 1024);
        UntypedResultSet executeInternal = QueryProcessor.executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF, 128), new Object[0]);
        while (true) {
            UntypedResultSet untypedResultSet = executeInternal;
            if (untypedResultSet.isEmpty()) {
                break;
            }
            UUID processBatchlogPage = processBatchlogPage(untypedResultSet, create);
            if (untypedResultSet.size() < 128) {
                break;
            } else {
                executeInternal = QueryProcessor.executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF, 128), processBatchlogPage);
            }
        }
        cleanup();
        logger.debug("Finished replayAllFailedBatches");
    }

    private void deleteBatch(UUID uuid) {
        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid));
        mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros());
        mutation.apply();
    }

    private UUID processBatchlogPage(UntypedResultSet untypedResultSet, RateLimiter rateLimiter) {
        UUID uuid = null;
        ArrayList arrayList = new ArrayList(untypedResultSet.size());
        Iterator<UntypedResultSet.Row> it = untypedResultSet.iterator();
        while (it.hasNext()) {
            UntypedResultSet.Row next = it.next();
            uuid = next.getUUID(CFPropDefs.KW_ID);
            long j = next.getLong("written_at");
            if (System.currentTimeMillis() >= j + getBatchlogTimeout()) {
                Batch batch = new Batch(uuid, j, next.getBytes("data"), next.has("version") ? next.getInt("version") : 6);
                try {
                    if (batch.replay(rateLimiter) > 0) {
                        arrayList.add(batch);
                    } else {
                        deleteBatch(uuid);
                        this.totalBatchesReplayed.incrementAndGet();
                    }
                } catch (IOException e) {
                    logger.warn("Skipped batch replay of {} due to {}", uuid, e);
                    deleteBatch(uuid);
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Batch batch2 = (Batch) it2.next();
            batch2.finish();
            deleteBatch(batch2.id);
        }
        this.totalBatchesReplayed.addAndGet(arrayList.size());
        return uuid;
    }

    public long getBatchlogTimeout() {
        return DatabaseDescriptor.getWriteRpcTimeout() * 2;
    }

    private void cleanup() throws ExecutionException, InterruptedException {
        ColumnFamilyStore columnFamilyStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF);
        columnFamilyStore.forceBlockingFlush();
        ArrayList arrayList = new ArrayList();
        Iterator<SSTableReader> it = columnFamilyStore.getSSTables().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().descriptor);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        CompactionManager.instance.submitUserDefined(columnFamilyStore, arrayList, CompactionManager.GC_ALL).get();
    }
}
