/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateDirectory {
    private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    static final String LOCK_FILE_NAME = ".lock";
    private final Object taskDirCreationLock = new Object();
    private final Time time;
    private final String appId;
    private final File stateDir;
    private final boolean hasPersistentStores;
    private final HashMap<TaskId, FileChannel> channels = new HashMap();
    private final HashMap<TaskId, LockAndOwner> locks = new HashMap();
    private FileChannel globalStateChannel;
    private FileLock globalStateLock;

    public StateDirectory(StreamsConfig config, Time time, boolean hasPersistentStores) {
        this.time = time;
        this.hasPersistentStores = hasPersistentStores;
        this.appId = config.getString("application.id");
        String stateDirName = config.getString("state.dir");
        File baseDir = new File(stateDirName);
        if (this.hasPersistentStores && !baseDir.exists() && !baseDir.mkdirs()) {
            throw new ProcessorStateException(String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
        }
        this.stateDir = new File(baseDir, this.appId);
        if (this.hasPersistentStores && !this.stateDir.exists() && !this.stateDir.mkdir()) {
            throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public File directoryForTask(TaskId taskId) {
        File taskDir = new File(this.stateDir, taskId.toString());
        if (this.hasPersistentStores && !taskDir.exists()) {
            Object object = this.taskDirCreationLock;
            synchronized (object) {
                if (!taskDir.exists() && !taskDir.mkdir()) {
                    throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
                }
            }
        }
        return taskDir;
    }

    File checkpointFileFor(TaskId taskId) {
        return new File(this.directoryForTask(taskId), ".checkpoint");
    }

    boolean directoryForTaskIsEmpty(TaskId taskId) {
        File taskDir = this.directoryForTask(taskId);
        return this.taskDirEmpty(taskDir);
    }

    private boolean taskDirEmpty(File taskDir) {
        File[] storeDirs = taskDir.listFiles(pathname -> !pathname.getName().equals(LOCK_FILE_NAME) && !pathname.getName().equals(".checkpoint"));
        return storeDirs == null || storeDirs.length == 0;
    }

    File globalStateDir() {
        File dir = new File(this.stateDir, "global");
        if (this.hasPersistentStores && !dir.exists() && !dir.mkdir()) {
            throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
        }
        return dir;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    synchronized boolean lock(TaskId taskId) throws IOException {
        FileChannel channel;
        File lockFile;
        if (!this.hasPersistentStores) {
            return true;
        }
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            log.trace("{} Found cached state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            return true;
        }
        if (lockAndOwner != null) {
            return false;
        }
        try {
            lockFile = new File(this.directoryForTask(taskId), LOCK_FILE_NAME);
        }
        catch (ProcessorStateException e) {
            return false;
        }
        try {
            channel = this.getOrCreateFileChannel(taskId, lockFile.toPath());
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock lock = this.tryLock(channel);
        if (lock != null) {
            this.locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
            log.debug("{} Acquired state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
        }
        return lock != null;
    }

    synchronized boolean lockGlobalState() throws IOException {
        FileChannel channel;
        if (!this.hasPersistentStores) {
            return true;
        }
        if (this.globalStateLock != null) {
            log.trace("{} Found cached state dir lock for the global task", (Object)this.logPrefix());
            return true;
        }
        File lockFile = new File(this.globalStateDir(), LOCK_FILE_NAME);
        try {
            channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock fileLock = this.tryLock(channel);
        if (fileLock == null) {
            channel.close();
            return false;
        }
        this.globalStateChannel = channel;
        this.globalStateLock = fileLock;
        log.debug("{} Acquired global state dir lock", (Object)this.logPrefix());
        return true;
    }

    synchronized void unlockGlobalState() throws IOException {
        if (this.globalStateLock == null) {
            return;
        }
        this.globalStateLock.release();
        this.globalStateChannel.close();
        this.globalStateLock = null;
        this.globalStateChannel = null;
        log.debug("{} Released global state dir lock", (Object)this.logPrefix());
    }

    synchronized void unlock(TaskId taskId) throws IOException {
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            this.locks.remove(taskId);
            lockAndOwner.lock.release();
            log.debug("{} Released state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            FileChannel fileChannel = this.channels.remove(taskId);
            if (fileChannel != null) {
                fileChannel.close();
            }
        }
    }

    public synchronized void clean() {
        try {
            this.cleanRemovedTasks(0L, true);
        }
        catch (Exception e) {
            throw new StreamsException(e);
        }
        try {
            if (this.stateDir.exists()) {
                Utils.delete((File)this.globalStateDir().getAbsoluteFile());
            }
        }
        catch (IOException e) {
            log.error("{} Failed to delete global state directory of {} due to an unexpected exception", new Object[]{this.appId, this.logPrefix(), e});
            throw new StreamsException(e);
        }
    }

    public synchronized void cleanRemovedTasks(long cleanupDelayMs) {
        try {
            this.cleanRemovedTasks(cleanupDelayMs, false);
        }
        catch (Exception cannotHappen) {
            throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void cleanRemovedTasks(long cleanupDelayMs, boolean manualUserCall) throws Exception {
        File[] taskDirs = this.listAllTaskDirectories();
        if (taskDirs == null || taskDirs.length == 0) {
            return;
        }
        for (File taskDir : taskDirs) {
            String dirName = taskDir.getName();
            TaskId id = TaskId.parse(dirName);
            if (this.locks.containsKey(id)) continue;
            Exception exception = null;
            try {
                if (this.lock(id)) {
                    long lastModifiedMs;
                    long now = this.time.milliseconds();
                    if (now > (lastModifiedMs = taskDir.lastModified()) + cleanupDelayMs) {
                        log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", new Object[]{this.logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs});
                        Utils.delete((File)taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                    } else if (manualUserCall) {
                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.", new Object[]{this.logPrefix(), dirName, id});
                        Utils.delete((File)taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                    }
                }
            }
            catch (IOException | OverlappingFileLockException e) {
                exception = e;
            }
            finally {
                try {
                    this.unlock(id);
                    if (manualUserCall) {
                        Utils.delete((File)taskDir);
                    }
                }
                catch (IOException e) {
                    exception = e;
                }
            }
            if (exception == null || !manualUserCall) continue;
            log.error("{} Failed to release the state directory lock.", (Object)this.logPrefix());
            throw exception;
        }
    }

    File[] listNonEmptyTaskDirectories() {
        File[] taskDirectories = !this.hasPersistentStores || !this.stateDir.exists() ? new File[]{} : this.stateDir.listFiles(pathname -> {
            if (!pathname.isDirectory() || !PATH_NAME.matcher(pathname.getName()).matches()) {
                return false;
            }
            return !this.taskDirEmpty(pathname);
        });
        return taskDirectories == null ? new File[]{} : taskDirectories;
    }

    File[] listAllTaskDirectories() {
        File[] taskDirectories = !this.hasPersistentStores || !this.stateDir.exists() ? new File[]{} : this.stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
        return taskDirectories == null ? new File[]{} : taskDirectories;
    }

    private FileChannel getOrCreateFileChannel(TaskId taskId, Path lockPath) throws IOException {
        if (!this.channels.containsKey(taskId)) {
            this.channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
        }
        return this.channels.get(taskId);
    }

    private FileLock tryLock(FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        }
        catch (OverlappingFileLockException e) {
            return null;
        }
    }

    private static class LockAndOwner {
        final FileLock lock;
        final String owningThread;

        LockAndOwner(String owningThread, FileLock lock) {
            this.owningThread = owningThread;
            this.lock = lock;
        }
    }
}

