/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import java.util.Iterator;
import java.util.UUID;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;

@Experimental
public final class DataStreamUtils {
    public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString();
        CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator operator = (CollectSinkOperator)factory.getOperator();
        CollectResultIterator iterator = new CollectResultIterator(operator.getOperatorIdFuture(), serializer, accumulatorName);
        CollectStreamSink<OUT> sink = new CollectStreamSink<OUT>(stream, factory);
        sink.name("Data stream collect sink");
        StreamExecutionEnvironment env = stream.getExecutionEnvironment();
        env.addOperator(sink.getTransformation());
        try {
            JobClient jobClient = env.executeAsync("Data Stream Collect");
            iterator.setJobClient(jobClient);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to execute data stream", e);
        }
        return iterator;
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> stream, KeySelector<T, K> keySelector) {
        return DataStreamUtils.reinterpretAsKeyedStream(stream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, stream.getType()));
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> stream, KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {
        PartitionTransformation<T> partitionTransformation = new PartitionTransformation<T>(stream.getTransformation(), new ForwardPartitioner());
        return new KeyedStream<T, K>(stream, partitionTransformation, keySelector, typeInfo);
    }

    private DataStreamUtils() {
    }
}

