/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.fetcher;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.OneShotLatch;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class SplitFetcherManagerTest {
    @Test
    public void testExceptionPropagationFirstFetch() throws Exception {
        this.testExceptionPropagation(new RecordsWithSplitIds[0]);
    }

    @Test
    public void testExceptionPropagationSuccessiveFetch() throws Exception {
        this.testExceptionPropagation(new RecordsWithSplitIds[]{new TestingRecordsWithSplitIds<Integer>("testSplit", 1, 2, 3, 4), new TestingRecordsWithSplitIds<Integer>("testSplit", 5, 6, 7, 8)});
    }

    @Test
    public void testCloseFetcherWithException() throws Exception {
        TestingSplitReader reader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        reader.setCloseWithException();
        SplitFetcherManager fetcherManager = SplitFetcherManagerTest.createFetcher("test-split", reader, new Configuration());
        fetcherManager.close(1000L);
        Assertions.assertThatThrownBy(() -> fetcherManager.checkErrors()).hasRootCauseMessage("Artificial exception on closing the split reader.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @SafeVarargs
    private final void testExceptionPropagation(RecordsWithSplitIds<Integer> ... fetchesBeforeError) throws Exception {
        IOException testingException = new IOException("test");
        AwaitingReader reader = new AwaitingReader(testingException, fetchesBeforeError);
        Configuration configuration = new Configuration();
        configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, (Object)10);
        SplitFetcherManager<Integer, TestingSourceSplit> fetcher = SplitFetcherManagerTest.createFetcher("testSplit", reader, configuration);
        reader.awaitAllRecordsReturned();
        SplitFetcherManagerTest.drainQueue(fetcher.getQueue());
        Assertions.assertThat((boolean)fetcher.getQueue().getAvailabilityFuture().isDone()).isFalse();
        reader.triggerThrowException();
        fetcher.getQueue().getAvailabilityFuture().get();
        try {
            fetcher.checkErrors();
            Assertions.fail((String)"expected exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e.getCause().getCause()).isSameAs((Object)testingException);
        }
        finally {
            fetcher.close(20000L);
        }
    }

    private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher(String splitId, SplitReader<E, TestingSourceSplit> reader, Configuration configuration) {
        SingleThreadFetcherManager fetcher = new SingleThreadFetcherManager(() -> reader, configuration);
        fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
        return fetcher;
    }

    private static void drainQueue(FutureCompletingBlockingQueue<?> queue) {
        while (queue.poll() != null) {
        }
    }

    private static final class AwaitingReader<E, SplitT extends SourceSplit>
    implements SplitReader<E, SplitT> {
        private final Queue<RecordsWithSplitIds<E>> fetches;
        private final IOException testError;
        private final OneShotLatch inBlocking = new OneShotLatch();
        private final OneShotLatch throwError = new OneShotLatch();

        @SafeVarargs
        AwaitingReader(IOException testError, RecordsWithSplitIds<E> ... fetches) {
            this.testError = testError;
            this.fetches = new ArrayDeque<RecordsWithSplitIds<RecordsWithSplitIds<E>>>(Arrays.asList(fetches));
        }

        public RecordsWithSplitIds<E> fetch() throws IOException {
            if (!this.fetches.isEmpty()) {
                return this.fetches.poll();
            }
            this.inBlocking.trigger();
            try {
                this.throwError.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("interrupted");
            }
            throw this.testError;
        }

        public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {
        }

        public void wakeUp() {
        }

        public void close() throws Exception {
        }

        public void awaitAllRecordsReturned() throws InterruptedException {
            this.inBlocking.await();
        }

        public void triggerThrowException() {
            this.throwError.trigger();
        }
    }
}

