/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask;
import org.apache.flink.connector.base.source.reader.fetcher.FetchTask;
import org.apache.flink.connector.base.source.reader.fetcher.PauseOrResumeSplitsTask;
import org.apache.flink.connector.base.source.reader.fetcher.RemoveSplitsTask;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
    private final int id;
    @GuardedBy(value="lock")
    private final Deque<SplitFetcherTask> taskQueue = new ArrayDeque<SplitFetcherTask>();
    private final Map<String, SplitT> assignedSplits = new HashMap<String, SplitT>();
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Consumer<Throwable> errorHandler;
    private final Runnable shutdownHook;
    @GuardedBy(value="lock")
    private boolean closed;
    @GuardedBy(value="lock")
    private boolean paused;
    private final FetchTask<E, SplitT> fetchTask;
    @Nullable
    @GuardedBy(value="lock")
    private SplitFetcherTask runningTask = null;
    private final ReentrantLock lock = new ReentrantLock();
    @GuardedBy(value="lock")
    private final Condition nonEmpty = this.lock.newCondition();
    @GuardedBy(value="lock")
    private final Condition resumed = this.lock.newCondition();
    private final boolean allowUnalignedSourceSplits;
    private final Consumer<Collection<String>> splitFinishedHook;

    SplitFetcher(int id, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitReader<E, SplitT> splitReader, Consumer<Throwable> errorHandler, Runnable shutdownHook, Consumer<Collection<String>> splitFinishedHook, boolean allowUnalignedSourceSplits) {
        this.id = id;
        this.elementsQueue = (FutureCompletingBlockingQueue)Preconditions.checkNotNull(elementsQueue);
        this.splitReader = (SplitReader)Preconditions.checkNotNull(splitReader);
        this.errorHandler = (Consumer)Preconditions.checkNotNull(errorHandler);
        this.shutdownHook = (Runnable)Preconditions.checkNotNull((Object)shutdownHook);
        this.allowUnalignedSourceSplits = allowUnalignedSourceSplits;
        this.splitFinishedHook = splitFinishedHook;
        this.fetchTask = new FetchTask<E, SplitT>(splitReader, elementsQueue, ids -> {
            ids.forEach(this.assignedSplits::remove);
            splitFinishedHook.accept((Collection<String>)ids);
            LOG.info("Finished reading from splits {}", ids);
        }, id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Starting split fetcher {}", (Object)this.id);
        try {
            while (this.runOnce()) {
            }
        }
        catch (Throwable t) {
            this.errorHandler.accept(t);
        }
        finally {
            try {
                this.splitReader.close();
            }
            catch (Exception e) {
                this.errorHandler.accept(e);
            }
            finally {
                LOG.info("Split fetcher {} exited.", (Object)this.id);
                this.shutdownHook.run();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean runOnce() {
        boolean taskFinished;
        SplitFetcherTask task;
        this.lock.lock();
        try {
            if (this.closed) {
                boolean bl = false;
                return bl;
            }
            task = this.getNextTaskUnsafe();
            if (task == null) {
                boolean bl = true;
                return bl;
            }
            LOG.debug("Prepare to run {}", (Object)task);
            this.runningTask = task;
        }
        finally {
            this.lock.unlock();
        }
        try {
            taskFinished = task.run();
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("SplitFetcher thread %d received unexpected exception while polling the records", this.id), e);
        }
        this.lock.lock();
        try {
            this.runningTask = null;
            this.processTaskResultUnsafe(task, taskFinished);
        }
        finally {
            this.lock.unlock();
        }
        return true;
    }

    private void processTaskResultUnsafe(SplitFetcherTask task, boolean taskFinished) {
        assert (this.lock.isHeldByCurrentThread());
        if (taskFinished) {
            LOG.debug("Finished running task {}", (Object)task);
            if (this.assignedSplits.isEmpty() && this.taskQueue.isEmpty()) {
                this.elementsQueue.notifyAvailable();
            }
        } else if (task != this.fetchTask) {
            this.taskQueue.addFirst(task);
            LOG.debug("Reenqueuing woken task {}", (Object)task);
        }
    }

    @Nullable
    private SplitFetcherTask getNextTaskUnsafe() {
        assert (this.lock.isHeldByCurrentThread());
        try {
            if (this.paused) {
                this.resumed.await();
                return null;
            }
            if (!this.taskQueue.isEmpty()) {
                return this.taskQueue.poll();
            }
            if (!this.assignedSplits.isEmpty()) {
                return this.fetchTask;
            }
            this.nonEmpty.await();
            return this.taskQueue.poll();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("The thread was interrupted while waiting for a fetcher task.");
        }
    }

    public void addSplits(List<SplitT> splitsToAdd) {
        this.lock.lock();
        try {
            this.enqueueTaskUnsafe(new AddSplitsTask<SplitT>(this.splitReader, splitsToAdd, this.assignedSplits));
            this.wakeUpUnsafe(true);
        }
        finally {
            this.lock.unlock();
        }
    }

    public void removeSplits(List<SplitT> splitsToRemove) {
        this.lock.lock();
        try {
            this.enqueueTaskUnsafe(new RemoveSplitsTask<SplitT>(this.splitReader, splitsToRemove, this.assignedSplits, this.splitFinishedHook));
            this.wakeUpUnsafe(true);
        }
        finally {
            this.lock.unlock();
        }
    }

    public void pauseOrResumeSplits(Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume) {
        this.lock.lock();
        try {
            this.enqueueTaskUnsafe(new PauseOrResumeSplitsTask<SplitT>(this.splitReader, splitsToPause, splitsToResume, this.allowUnalignedSourceSplits));
            this.wakeUpUnsafe(true);
        }
        finally {
            this.lock.unlock();
        }
    }

    public void enqueueTask(SplitFetcherTask task) {
        this.lock.lock();
        try {
            this.enqueueTaskUnsafe(task);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void enqueueTaskUnsafe(SplitFetcherTask task) {
        assert (this.lock.isHeldByCurrentThread());
        this.taskQueue.add(task);
        this.nonEmpty.signal();
    }

    public SplitReader<E, SplitT> getSplitReader() {
        return this.splitReader;
    }

    public int fetcherId() {
        return this.id;
    }

    public void shutdown() {
        this.lock.lock();
        try {
            if (!this.closed) {
                this.closed = true;
                this.paused = false;
                LOG.info("Shutting down split fetcher {}", (Object)this.id);
                this.wakeUpUnsafe(false);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    Map<String, SplitT> assignedSplits() {
        return this.assignedSplits;
    }

    boolean isIdle() {
        this.lock.lock();
        try {
            boolean bl = this.assignedSplits.isEmpty() && this.taskQueue.isEmpty() && this.runningTask == null;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    void wakeUp(boolean taskOnly) {
        this.lock.lock();
        try {
            this.wakeUpUnsafe(taskOnly);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void wakeUpUnsafe(boolean taskOnly) {
        assert (this.lock.isHeldByCurrentThread());
        SplitFetcherTask currentTask = this.runningTask;
        if (currentTask != null) {
            LOG.debug("Waking up running task {}", (Object)currentTask);
            currentTask.wakeUp();
        } else if (!taskOnly) {
            LOG.debug("Waking up fetcher thread.");
            this.nonEmpty.signal();
            this.resumed.signal();
        }
    }

    public void pause() {
        this.lock.lock();
        try {
            this.paused = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void resume() {
        this.lock.lock();
        try {
            this.paused = false;
            this.resumed.signal();
        }
        finally {
            this.lock.unlock();
        }
    }
}

