/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;

public class StreamingFileWriter
extends AbstractStreamOperator<StreamingFileCommitter.CommitMessage>
implements OneInputStreamOperator<RowData, StreamingFileCommitter.CommitMessage>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final long bucketCheckInterval;
    private final StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder;
    private transient Buckets<RowData, String> buckets;
    private transient StreamingFileSinkHelper<RowData> helper;
    private transient long currentWatermark;
    private transient Set<String> inactivePartitions;

    public StreamingFileWriter(long bucketCheckInterval, StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder) {
        this.bucketCheckInterval = bucketCheckInterval;
        this.bucketsBuilder = bucketsBuilder;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.buckets = this.bucketsBuilder.createBuckets(this.getRuntimeContext().getIndexOfThisSubtask());
        this.helper = new StreamingFileSinkHelper(this.buckets, context.isRestored(), context.getOperatorStateStore(), this.getRuntimeContext().getProcessingTimeService(), this.bucketCheckInterval);
        this.inactivePartitions = new HashSet<String>();
        this.currentWatermark = Long.MIN_VALUE;
        this.buckets.setBucketLifeCycleListener((BucketLifeCycleListener)new BucketLifeCycleListener<RowData, String>(){

            public void bucketCreated(Bucket<RowData, String> bucket) {
            }

            public void bucketInactive(Bucket<RowData, String> bucket) {
                StreamingFileWriter.this.inactivePartitions.add(bucket.getBucketId());
            }
        });
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.helper.snapshotState(context.getCheckpointId());
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        this.helper.onElement(element.getValue(), this.getProcessingTimeService().getCurrentProcessingTime(), element.hasTimestamp() ? Long.valueOf(element.getTimestamp()) : null, this.currentWatermark);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.commitUpToCheckpoint(checkpointId);
    }

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        this.helper.commitUpToCheckpoint(checkpointId);
        StreamingFileCommitter.CommitMessage message = new StreamingFileCommitter.CommitMessage(checkpointId, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList<String>(this.inactivePartitions));
        this.output.collect((Object)new StreamRecord((Object)message));
        this.inactivePartitions.clear();
    }

    public void endInput() throws Exception {
        this.buckets.onProcessingTime(Long.MAX_VALUE);
        this.helper.snapshotState(Long.MAX_VALUE);
        this.output.emitWatermark(new Watermark(Long.MAX_VALUE));
        this.commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void dispose() throws Exception {
        super.dispose();
        if (this.helper != null) {
            this.helper.close();
        }
    }
}

