/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.multilang;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.MessageReader;
import com.amazonaws.services.kinesis.multilang.MessageWriter;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class MultiLangProtocol {
    private static final Log log = LogFactory.getLog(MultiLangProtocol.class);
    private MessageReader messageReader;
    private MessageWriter messageWriter;
    private final InitializationInput initializationInput;
    private KinesisClientLibConfiguration configuration;

    MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, InitializationInput initializationInput, KinesisClientLibConfiguration configuration) {
        this.messageReader = messageReader;
        this.messageWriter = messageWriter;
        this.initializationInput = initializationInput;
        this.configuration = configuration;
    }

    boolean initialize() {
        Future<Boolean> writeFuture = this.messageWriter.writeInitializeMessage(this.initializationInput);
        return this.waitForStatusMessage("initialize", null, writeFuture);
    }

    boolean processRecords(ProcessRecordsInput processRecordsInput) {
        Future<Boolean> writeFuture = this.messageWriter.writeProcessRecordsMessage(processRecordsInput);
        return this.waitForStatusMessage("processRecords", processRecordsInput.getCheckpointer(), writeFuture);
    }

    boolean shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        Future<Boolean> writeFuture = this.messageWriter.writeShutdownMessage(reason);
        return this.waitForStatusMessage("shutdown", checkpointer, writeFuture);
    }

    boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        Future<Boolean> writeFuture = this.messageWriter.writeShutdownRequestedMessage();
        return this.waitForStatusMessage("shutdownRequested", checkpointer, writeFuture);
    }

    private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer, Future<Boolean> writeFuture) {
        boolean statusWasCorrect = this.waitForStatusMessage(action, checkpointer);
        try {
            boolean writerIsStillOpen = writeFuture.get();
            return statusWasCorrect && writerIsStillOpen;
        }
        catch (InterruptedException e) {
            log.error((Object)String.format("Interrupted while writing %s message for shard %s", action, this.initializationInput.getShardId()));
            return false;
        }
        catch (ExecutionException e) {
            log.error((Object)String.format("Failed to write %s message for shard %s", action, this.initializationInput.getShardId()), (Throwable)e);
            return false;
        }
    }

    boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
        Optional<Object> statusMessage = Optional.empty();
        while (!statusMessage.isPresent()) {
            Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
            Optional<Message> message = this.configuration.getTimeoutInSeconds().map(second -> this.futureMethod(() -> (Message)future.get(second.intValue(), TimeUnit.SECONDS), action)).orElse(this.futureMethod(future::get, action));
            if (!message.isPresent()) {
                return false;
            }
            Optional<Boolean> checkpointFailed = message.filter(m -> m instanceof CheckpointMessage).map(m -> (CheckpointMessage)m).flatMap(m -> this.futureMethod(() -> this.checkpoint((CheckpointMessage)m, checkpointer).get(), "Checkpoint")).map(checkpointSuccess -> checkpointSuccess == false);
            if (checkpointFailed.orElse(false).booleanValue()) {
                return false;
            }
            statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage)m);
        }
        return this.validateStatusMessage((StatusMessage)statusMessage.get(), action);
    }

    private <T> Optional<T> futureMethod(FutureMethod<T> fm, String action) {
        try {
            return Optional.of(fm.get());
        }
        catch (InterruptedException e) {
            log.error((Object)String.format("Interrupted while waiting for %s message for shard %s", action, this.initializationInput.getShardId()), (Throwable)e);
        }
        catch (ExecutionException e) {
            log.error((Object)String.format("Failed to get status message for %s action for shard %s", action, this.initializationInput.getShardId()), (Throwable)e);
        }
        catch (TimeoutException e) {
            log.error((Object)String.format("Timedout to get status message for %s action for shard %s. Terminating...", action, this.initializationInput.getShardId()), (Throwable)e);
            this.haltJvm(1);
        }
        return Optional.empty();
    }

    protected void haltJvm(int exitStatus) {
        Runtime.getRuntime().halt(exitStatus);
    }

    private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
        log.info((Object)("Received response " + statusMessage + " from subprocess while waiting for " + action + " while processing shard " + this.initializationInput.getShardId()));
        return statusMessage != null && statusMessage.getResponseFor() != null && statusMessage.getResponseFor().equals(action);
    }

    private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) {
        String sequenceNumber = checkpointMessage.getSequenceNumber();
        Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
        try {
            if (checkpointer != null) {
                log.debug((Object)this.logCheckpointMessage(sequenceNumber, subSequenceNumber));
                if (sequenceNumber != null) {
                    if (subSequenceNumber != null) {
                        checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
                    } else {
                        checkpointer.checkpoint(sequenceNumber);
                    }
                } else {
                    checkpointer.checkpoint();
                }
                return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
            }
            String message = String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", sequenceNumber, this.initializationInput.getShardId());
            log.error((Object)message);
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, new InvalidStateException(message));
        }
        catch (Throwable t) {
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, t);
        }
    }

    private String logCheckpointMessage(String sequenceNumber, Long subSequenceNumber) {
        return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s", this.initializationInput.getShardId(), sequenceNumber, subSequenceNumber);
    }

    private static interface FutureMethod<T> {
        public T get() throws InterruptedException, TimeoutException, ExecutionException;
    }
}

