/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table;

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.AbstractFileSystemTable;
import org.apache.flink.connector.file.table.DeserializationSchemaAdapter;
import org.apache.flink.connector.file.table.EmptyMetaStoreFactory;
import org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.FileSystemFactory;
import org.apache.flink.connector.file.table.FileSystemOutputFormat;
import org.apache.flink.connector.file.table.OutputFormatFactory;
import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
import org.apache.flink.connector.file.table.PartitionComputer;
import org.apache.flink.connector.file.table.RowDataPartitionComputer;
import org.apache.flink.connector.file.table.SerializationSchemaAdapter;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.StreamingSink;
import org.apache.flink.connector.file.table.stream.compact.CompactBulkReader;
import org.apache.flink.connector.file.table.stream.compact.CompactOperator;
import org.apache.flink.connector.file.table.stream.compact.CompactReader;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

@Internal
public class FileSystemTableSink
extends AbstractFileSystemTable
implements DynamicTableSink,
SupportsPartitioning,
SupportsOverwrite {
    @Nullable
    private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat;
    @Nullable
    private final DecodingFormat<DeserializationSchema<RowData>> deserializationFormat;
    @Nullable
    private final EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat;
    @Nullable
    private final EncodingFormat<SerializationSchema<RowData>> serializationFormat;
    private boolean overwrite = false;
    private boolean dynamicGrouping = false;
    private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap();
    @Nullable
    private Integer configuredParallelism;

    FileSystemTableSink(ObjectIdentifier tableIdentifier, DataType physicalRowDataType, List<String> partitionKeys, ReadableConfig tableOptions, @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat, @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat, @Nullable EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat, @Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat) {
        super(tableIdentifier, physicalRowDataType, partitionKeys, tableOptions);
        this.bulkReaderFormat = bulkReaderFormat;
        this.deserializationFormat = deserializationFormat;
        if (Stream.of(bulkWriterFormat, serializationFormat).allMatch(Objects::isNull)) {
            String identifier = (String)tableOptions.get(FactoryUtil.FORMAT);
            throw new ValidationException(String.format("Could not find any format factory for identifier '%s' in the classpath.", identifier));
        }
        this.bulkWriterFormat = bulkWriterFormat;
        this.serializationFormat = serializationFormat;
        this.configuredParallelism = (Integer)this.tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(final DynamicTableSink.Context sinkContext) {
        return new DataStreamSinkProvider(){

            public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                return FileSystemTableSink.this.consume(providerContext, (DataStream<RowData>)dataStream, sinkContext);
            }
        };
    }

    private DataStreamSink<?> consume(ProviderContext providerContext, DataStream<RowData> dataStream, DynamicTableSink.Context sinkContext) {
        boolean parallelismConfigued;
        int inputParallelism = dataStream.getParallelism();
        int parallelism = Optional.ofNullable(this.configuredParallelism).orElse(inputParallelism);
        boolean bl = parallelismConfigued = this.configuredParallelism != null;
        if (sinkContext.isBounded()) {
            return this.createBatchSink(dataStream, sinkContext, parallelism, parallelismConfigued);
        }
        if (this.overwrite) {
            throw new IllegalStateException("Streaming mode not support overwrite.");
        }
        return this.createStreamingSink(providerContext, dataStream, sinkContext, parallelism, parallelismConfigued);
    }

    private RowDataPartitionComputer partitionComputer() {
        return new RowDataPartitionComputer(this.defaultPartName, DataType.getFieldNames((DataType)this.physicalRowDataType).toArray(new String[0]), DataType.getFieldDataTypes((DataType)this.physicalRowDataType).toArray(new DataType[0]), this.partitionKeys.toArray(new String[0]));
    }

    private DataStreamSink<RowData> createBatchSink(DataStream<RowData> inputStream, DynamicTableSink.Context sinkContext, int parallelism, boolean parallelismConfigured) {
        FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<RowData>();
        builder.setPartitionComputer(this.partitionComputer()).setDynamicGrouped(this.dynamicGrouping).setPartitionColumns(this.partitionKeys.toArray(new String[0])).setFormatFactory(this.createOutputFormatFactory(sinkContext)).setMetaStoreFactory(new EmptyMetaStoreFactory(this.path)).setOverwrite(this.overwrite).setStaticPartitions(this.staticPartitions).setTempPath(this.toStagingPath()).setOutputFileConfig(OutputFileConfig.builder().withPartPrefix("part-" + UUID.randomUUID()).build()).setPartitionCommitPolicyFactory(new PartitionCommitPolicyFactory((String)this.tableOptions.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND), (String)this.tableOptions.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS), (String)this.tableOptions.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME), (List)this.tableOptions.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)));
        DataStreamSink sink = inputStream.writeUsingOutputFormat(builder.build());
        sink.getTransformation().setParallelism(parallelism, parallelismConfigured);
        return sink.name("Filesystem");
    }

    private DataStreamSink<?> createStreamingSink(ProviderContext providerContext, DataStream<RowData> dataStream, DynamicTableSink.Context sinkContext, int parallelism, boolean parallelismConfigured) {
        DataStream<PartitionCommitInfo> writerStream;
        FileSystemFactory fsFactory = FileSystem::get;
        RowDataPartitionComputer computer = this.partitionComputer();
        boolean autoCompaction = (Boolean)this.tableOptions.get(FileSystemConnectorOptions.AUTO_COMPACTION);
        Object writer = this.createWriter(sinkContext);
        boolean isEncoder = writer instanceof Encoder;
        TableBucketAssigner assigner = new TableBucketAssigner(computer);
        TableRollingPolicy rollingPolicy = new TableRollingPolicy(!isEncoder || autoCompaction, ((MemorySize)this.tableOptions.get(FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE)).getBytes(), ((Duration)this.tableOptions.get(FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL)).toMillis(), ((Duration)this.tableOptions.get(FileSystemConnectorOptions.SINK_ROLLING_POLICY_INACTIVITY_INTERVAL)).toMillis());
        String randomPrefix = "part-" + UUID.randomUUID().toString();
        OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder();
        fileNamingBuilder = autoCompaction ? fileNamingBuilder.withPartPrefix(CompactOperator.convertToUncompacted(randomPrefix)) : fileNamingBuilder.withPartPrefix(randomPrefix);
        OutputFileConfig fileNamingConfig = fileNamingBuilder.build();
        Object bucketsBuilder = isEncoder ? ((StreamingFileSink.DefaultRowFormatBuilder)((StreamingFileSink.DefaultRowFormatBuilder)StreamingFileSink.forRowFormat((Path)this.path, (Encoder)new ProjectionEncoder((Encoder)writer, computer)).withBucketAssigner((BucketAssigner)assigner)).withOutputFileConfig(fileNamingConfig)).withRollingPolicy((RollingPolicy)rollingPolicy) : ((StreamingFileSink.DefaultBulkFormatBuilder)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)this.path, (BulkWriter.Factory)new ProjectionBulkFactory((BulkWriter.Factory<RowData>)((BulkWriter.Factory)writer), computer)).withBucketAssigner((BucketAssigner)assigner)).withOutputFileConfig(fileNamingConfig)).withRollingPolicy((CheckpointRollingPolicy)rollingPolicy);
        long bucketCheckInterval = ((Duration)this.tableOptions.get(FileSystemConnectorOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL)).toMillis();
        if (autoCompaction) {
            long compactionSize = ((MemorySize)this.tableOptions.getOptional(FileSystemConnectorOptions.COMPACTION_FILE_SIZE).orElse(this.tableOptions.get(FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE))).getBytes();
            CompactReader.Factory<RowData> reader = this.createCompactReaderFactory(sinkContext).orElseThrow(() -> new TableException("Please implement available reader for compaction: BulkFormat, FileInputFormat."));
            writerStream = StreamingSink.compactionWriter(providerContext, dataStream, bucketCheckInterval, bucketsBuilder, fsFactory, this.path, reader, compactionSize, parallelism, parallelismConfigured);
        } else {
            writerStream = StreamingSink.writer(providerContext, dataStream, bucketCheckInterval, bucketsBuilder, parallelism, this.partitionKeys, this.tableOptions, parallelismConfigured);
        }
        return StreamingSink.sink(providerContext, writerStream, this.path, this.tableIdentifier, this.partitionKeys, new EmptyMetaStoreFactory(this.path), fsFactory, this.tableOptions);
    }

    private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(DynamicTableSink.Context context) {
        DataType producedDataType = this.physicalRowDataType;
        DataType physicalDataType = DataType.getFields((DataType)producedDataType).stream().filter(field -> !this.partitionKeys.contains(field.getName())).collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
        if (this.bulkReaderFormat != null) {
            FileInfoExtractorBulkFormat format = new FileInfoExtractorBulkFormat((BulkFormat)this.bulkReaderFormat.createRuntimeDecoder(this.createSourceContext(context), physicalDataType), producedDataType, (TypeInformation<RowData>)context.createTypeInformation(producedDataType), Collections.emptyMap(), this.partitionKeys, this.defaultPartName);
            return Optional.of(CompactBulkReader.factory(format));
        }
        if (this.deserializationFormat != null) {
            DeserializationSchema decoder = (DeserializationSchema)this.deserializationFormat.createRuntimeDecoder(this.createSourceContext(context), physicalDataType);
            FileInfoExtractorBulkFormat format = new FileInfoExtractorBulkFormat(new DeserializationSchemaAdapter((DeserializationSchema<RowData>)decoder), producedDataType, (TypeInformation<RowData>)context.createTypeInformation(producedDataType), Collections.emptyMap(), this.partitionKeys, this.defaultPartName);
            return Optional.of(CompactBulkReader.factory(format));
        }
        return Optional.empty();
    }

    private DynamicTableSource.Context createSourceContext(final DynamicTableSink.Context context) {
        return new DynamicTableSource.Context(){

            public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
                return context.createTypeInformation(producedDataType);
            }

            public <T> TypeInformation<T> createTypeInformation(LogicalType producedLogicalType) {
                return context.createTypeInformation(producedLogicalType);
            }

            public DynamicTableSource.DataStructureConverter createDataStructureConverter(DataType producedDataType) {
                throw new UnsupportedOperationException("Compaction reader not support DataStructure converter.");
            }
        };
    }

    private Path toStagingPath() {
        Path stagingDir = new Path(this.path, ".staging_" + System.currentTimeMillis());
        try {
            FileSystem fs = stagingDir.getFileSystem();
            Preconditions.checkState((fs.exists(stagingDir) || fs.mkdirs(stagingDir) ? 1 : 0) != 0, (Object)("Failed to create staging dir " + stagingDir));
            return stagingDir;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private OutputFormatFactory<RowData> createOutputFormatFactory(DynamicTableSink.Context sinkContext) {
        Object writer = this.createWriter(sinkContext);
        return writer instanceof Encoder ? path -> FileSystemTableSink.createEncoderOutputFormat((Encoder<RowData>)((Encoder)writer), path) : path -> FileSystemTableSink.createBulkWriterOutputFormat((BulkWriter.Factory<RowData>)((BulkWriter.Factory)writer), path);
    }

    private Object createWriter(DynamicTableSink.Context sinkContext) {
        DataType physicalDataTypeWithoutPartitionColumns = DataType.getFields((DataType)this.physicalRowDataType).stream().filter(field -> !this.partitionKeys.contains(field.getName())).collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
        if (this.bulkWriterFormat != null) {
            return this.bulkWriterFormat.createRuntimeEncoder(sinkContext, physicalDataTypeWithoutPartitionColumns);
        }
        if (this.serializationFormat != null) {
            return new SerializationSchemaAdapter((SerializationSchema<RowData>)((SerializationSchema)this.serializationFormat.createRuntimeEncoder(sinkContext, physicalDataTypeWithoutPartitionColumns)));
        }
        throw new TableException("Can not find format factory.");
    }

    private void checkConfiguredParallelismAllowed(ChangelogMode requestChangelogMode) {
        Integer parallelism = this.configuredParallelism;
        if (parallelism == null) {
            return;
        }
        if (!requestChangelogMode.containsOnly(RowKind.INSERT)) {
            throw new ValidationException(String.format("Currently, filesystem sink doesn't support setting parallelism (%d) by '%s' when the input stream is not INSERT only. The row kinds of input stream are [%s]", parallelism, FileSystemConnectorOptions.SINK_PARALLELISM.key(), requestChangelogMode.getContainedKinds().stream().map(RowKind::shortString).collect(Collectors.joining(","))));
        }
    }

    private static OutputFormat<RowData> createBulkWriterOutputFormat(final BulkWriter.Factory<RowData> factory, final Path path) {
        return new OutputFormat<RowData>(){
            private static final long serialVersionUID = 1L;
            private transient BulkWriter<RowData> writer;
            private transient FSDataOutputStream stream;

            public void configure(Configuration parameters) {
            }

            public void open(int taskNumber, int numTasks) throws IOException {
                this.stream = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
                this.writer = factory.create(this.stream);
            }

            public void writeRecord(RowData record) throws IOException {
                this.writer.addElement((Object)record);
            }

            public void close() throws IOException {
                this.writer.flush();
                this.writer.finish();
                this.stream.close();
            }
        };
    }

    private static OutputFormat<RowData> createEncoderOutputFormat(final Encoder<RowData> encoder, final Path path) {
        return new OutputFormat<RowData>(){
            private static final long serialVersionUID = 1L;
            private transient FSDataOutputStream output;

            public void configure(Configuration parameters) {
            }

            public void open(int taskNumber, int numTasks) throws IOException {
                this.output = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
            }

            public void writeRecord(RowData record) throws IOException {
                encoder.encode((Object)record, (OutputStream)this.output);
            }

            public void close() throws IOException {
                this.output.flush();
                this.output.close();
            }
        };
    }

    private LinkedHashMap<String, String> toPartialLinkedPartSpec(Map<String, String> part) {
        LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
        for (String partitionKey : this.partitionKeys) {
            if (!part.containsKey(partitionKey)) continue;
            partSpec.put(partitionKey, part.get(partitionKey));
        }
        return partSpec;
    }

    public boolean requiresPartitionGrouping(boolean supportsGrouping) {
        this.dynamicGrouping = supportsGrouping;
        return this.dynamicGrouping;
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        this.checkConfiguredParallelismAllowed(requestedMode);
        if (this.bulkWriterFormat != null) {
            return this.bulkWriterFormat.getChangelogMode();
        }
        if (this.serializationFormat != null) {
            return this.serializationFormat.getChangelogMode();
        }
        throw new TableException("Can not find format factory.");
    }

    public DynamicTableSink copy() {
        FileSystemTableSink sink = new FileSystemTableSink(this.tableIdentifier, this.physicalRowDataType, this.partitionKeys, (ReadableConfig)this.tableOptions, this.bulkReaderFormat, this.deserializationFormat, this.bulkWriterFormat, this.serializationFormat);
        sink.overwrite = this.overwrite;
        sink.dynamicGrouping = this.dynamicGrouping;
        sink.staticPartitions = this.staticPartitions;
        return sink;
    }

    public String asSummaryString() {
        return "Filesystem";
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    public void applyStaticPartition(Map<String, String> partition) {
        this.staticPartitions = this.toPartialLinkedPartSpec(partition);
    }

    public static class ProjectionBulkFactory
    implements BulkWriter.Factory<RowData> {
        private final BulkWriter.Factory<RowData> factory;
        private final RowDataPartitionComputer computer;

        public ProjectionBulkFactory(BulkWriter.Factory<RowData> factory, RowDataPartitionComputer computer) {
            this.factory = factory;
            this.computer = computer;
        }

        public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
            final BulkWriter writer = this.factory.create(out);
            return new BulkWriter<RowData>(){

                public void addElement(RowData element) throws IOException {
                    writer.addElement((Object)computer.projectColumnsToWrite(element));
                }

                public void flush() throws IOException {
                    writer.flush();
                }

                public void finish() throws IOException {
                    writer.finish();
                }
            };
        }
    }

    private static class ProjectionEncoder
    implements Encoder<RowData> {
        private final Encoder<RowData> encoder;
        private final RowDataPartitionComputer computer;

        private ProjectionEncoder(Encoder<RowData> encoder, RowDataPartitionComputer computer) {
            this.encoder = encoder;
            this.computer = computer;
        }

        public void encode(RowData element, OutputStream stream) throws IOException {
            this.encoder.encode((Object)this.computer.projectColumnsToWrite(element), stream);
        }
    }

    public static class TableRollingPolicy
    extends CheckpointRollingPolicy<RowData, String> {
        private final boolean rollOnCheckpoint;
        private final long rollingFileSize;
        private final long rollingTimeInterval;
        private final long inactivityInterval;

        public TableRollingPolicy(boolean rollOnCheckpoint, long rollingFileSize, long rollingTimeInterval, long inactivityInterval) {
            this.rollOnCheckpoint = rollOnCheckpoint;
            Preconditions.checkArgument((rollingFileSize > 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((rollingTimeInterval > 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((inactivityInterval > 0L ? 1 : 0) != 0);
            this.rollingFileSize = rollingFileSize;
            this.rollingTimeInterval = rollingTimeInterval;
            this.inactivityInterval = inactivityInterval;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
            try {
                return this.rollOnCheckpoint || partFileState.getSize() > this.rollingFileSize;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) throws IOException {
            return partFileState.getSize() > this.rollingFileSize;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
            return currentTime - partFileState.getCreationTime() >= this.rollingTimeInterval || currentTime - partFileState.getLastUpdateTime() >= this.inactivityInterval;
        }
    }

    public static class TableBucketAssigner
    implements BucketAssigner<RowData, String> {
        private final PartitionComputer<RowData> computer;

        public TableBucketAssigner(PartitionComputer<RowData> computer) {
            this.computer = computer;
        }

        public String getBucketId(RowData element, BucketAssigner.Context context) {
            try {
                return PartitionPathUtils.generatePartitionPath(this.computer.generatePartValues(element));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }
}

