/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher;
import org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ChannelStateWriteRequestDispatcherImpl
implements ChannelStateWriteRequestDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestDispatcherImpl.class);
    private final Map<Long, ChannelStateCheckpointWriter> writers;
    private final CheckpointStorageWorkerView streamFactoryResolver;
    private final ChannelStateSerializer serializer;
    private final int subtaskIndex;

    ChannelStateWriteRequestDispatcherImpl(int subtaskIndex, CheckpointStorageWorkerView streamFactoryResolver, ChannelStateSerializer serializer) {
        this.subtaskIndex = subtaskIndex;
        this.writers = new HashMap<Long, ChannelStateCheckpointWriter>();
        this.streamFactoryResolver = (CheckpointStorageWorkerView)Preconditions.checkNotNull((Object)streamFactoryResolver);
        this.serializer = (ChannelStateSerializer)Preconditions.checkNotNull((Object)serializer);
    }

    @Override
    public void dispatch(ChannelStateWriteRequest request) throws Exception {
        LOG.debug("process {}", (Object)request);
        try {
            this.dispatchInternal(request);
        }
        catch (Exception e) {
            try {
                request.cancel(e);
            }
            catch (Exception ex) {
                e.addSuppressed(ex);
            }
            throw e;
        }
    }

    private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
        if (request instanceof CheckpointStartRequest) {
            Preconditions.checkState((!this.writers.containsKey(request.getCheckpointId()) ? 1 : 0) != 0, (Object)("writer not found for request " + request));
            this.writers.put(request.getCheckpointId(), this.buildWriter((CheckpointStartRequest)request));
        } else if (request instanceof CheckpointInProgressRequest) {
            ChannelStateCheckpointWriter writer = this.writers.get(request.getCheckpointId());
            CheckpointInProgressRequest req = (CheckpointInProgressRequest)request;
            if (writer == null) {
                req.onWriterMissing();
            } else {
                req.execute(writer);
            }
        } else {
            throw new IllegalArgumentException("unknown request type: " + request);
        }
    }

    private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest request) throws Exception {
        return new ChannelStateCheckpointWriter(this.subtaskIndex, request, this.streamFactoryResolver.resolveCheckpointStorageLocation(request.getCheckpointId(), request.getLocationReference()), this.serializer, () -> this.writers.remove(request.getCheckpointId()));
    }

    @Override
    public void fail(Throwable cause) {
        for (ChannelStateCheckpointWriter writer : this.writers.values()) {
            try {
                writer.fail(cause);
            }
            catch (Exception ex) {
                LOG.warn("unable to fail write channel state writer", cause);
            }
        }
        this.writers.clear();
    }
}

