/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoGroupRawDriver<IT1, IT2, OT>
implements Driver<CoGroupFunction<IT1, IT2, OT>, OT> {
    private static final Logger LOG = LoggerFactory.getLogger(CoGroupRawDriver.class);
    private TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
    private SimpleIterable<IT1> coGroupIterator1;
    private SimpleIterable<IT2> coGroupIterator2;

    @Override
    public void setup(TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
        this.taskContext = context;
    }

    @Override
    public int getNumberOfInputs() {
        return 2;
    }

    @Override
    public int getNumberOfDriverComparators() {
        return 0;
    }

    @Override
    public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
        Class<CoGroupFunction> clazz = CoGroupFunction.class;
        return clazz;
    }

    @Override
    public void prepare() throws Exception {
        TaskConfig config = this.taskContext.getTaskConfig();
        if (config.getDriverStrategy() != DriverStrategy.CO_GROUP_RAW) {
            throw new Exception("Unrecognized driver strategy for CoGoup Python driver: " + config.getDriverStrategy().name());
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        Object reuse1 = this.taskContext.getInputSerializer(0).getSerializer().createInstance();
        Object reuse2 = this.taskContext.getInputSerializer(1).getSerializer().createInstance();
        this.coGroupIterator1 = new SimpleIterable<Object>(reuse1, in1);
        this.coGroupIterator2 = new SimpleIterable<Object>(reuse2, in2);
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("CoGroup task iterator ready."));
        }
    }

    @Override
    public void run() throws Exception {
        CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        SimpleIterable<IT1> i1 = this.coGroupIterator1;
        SimpleIterable<IT2> i2 = this.coGroupIterator2;
        coGroupStub.coGroup(i1, i2, collector);
    }

    @Override
    public void cleanup() throws Exception {
    }

    @Override
    public void cancel() throws Exception {
        this.cleanup();
    }

    public static class SimpleIterable<IN>
    implements Iterable<IN> {
        private IN reuse;
        private final MutableObjectIterator<IN> iterator;

        public SimpleIterable(IN reuse, MutableObjectIterator<IN> iterator) throws IOException {
            this.iterator = iterator;
            this.reuse = reuse;
        }

        @Override
        public Iterator<IN> iterator() {
            return new SimpleIterator<IN>(this.reuse, this.iterator);
        }

        protected class SimpleIterator<IN>
        implements Iterator<IN> {
            private IN reuse;
            private final MutableObjectIterator<IN> iterator;
            private boolean consumed = true;

            public SimpleIterator(IN reuse, MutableObjectIterator<IN> iterator) {
                this.iterator = iterator;
                this.reuse = reuse;
            }

            @Override
            public boolean hasNext() {
                try {
                    if (!this.consumed) {
                        return true;
                    }
                    Object result = this.iterator.next(this.reuse);
                    this.consumed = result == null;
                    return !this.consumed;
                }
                catch (IOException ioex) {
                    throw new RuntimeException("An error occurred while reading the next record: " + ioex.getMessage(), ioex);
                }
            }

            @Override
            public IN next() {
                this.consumed = true;
                return this.reuse;
            }

            @Override
            public void remove() {
            }
        }
    }
}

