/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.task.TaskRunner2Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class StatsRecordingThreadPool
extends ThreadPoolExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class);
    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;

    public StatsRecordingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, (Thread.UncaughtExceptionHandler)null);
    }

    public StatsRecordingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.uncaughtExceptionHandler = handler;
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(new WrappedCallable<T>(callable, this.uncaughtExceptionHandler));
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) {
        this.uncaughtExceptionHandler = handler;
    }

    private static class WrappedCallable<V>
    implements Callable<V> {
        private Callable<V> actualCallable;
        private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;

        WrappedCallable(Callable<V> callable, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
            this.actualCallable = callable;
            this.uncaughtExceptionHandler = uncaughtExceptionHandler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public V call() throws Exception {
            Thread thread = Thread.currentThread();
            if (this.uncaughtExceptionHandler != null) {
                thread.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
            }
            List statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics();
            this.setupMDCFromNDC(this.actualCallable);
            try {
                V v = this.actualCallable.call();
                return v;
            }
            finally {
                this.updateFileSystemCounters(statsBefore, this.actualCallable);
                MDC.clear();
            }
        }

        private void setupMDCFromNDC(Callable<V> actualCallable) {
            if (actualCallable instanceof CallableWithNdc) {
                CallableWithNdc callableWithNdc = (CallableWithNdc)actualCallable;
                try {
                    Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack");
                    field.setAccessible(true);
                    Stack ndcStack = (Stack)field.get(callableWithNdc);
                    Stack clonedStack = (Stack)ndcStack.clone();
                    String fragmentId = (String)clonedStack.pop();
                    String queryId = (String)clonedStack.pop();
                    String dagId = (String)clonedStack.pop();
                    MDC.put((String)"dagId", (String)dagId);
                    MDC.put((String)"queryId", (String)queryId);
                    MDC.put((String)"fragmentId", (String)fragmentId);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received dagId: {} queryId: {} instanceType: {}", new Object[]{dagId, queryId, actualCallable.getClass().getSimpleName()});
                    }
                }
                catch (Exception e) {
                    LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for instance type: {} exception type: {}", (Object)actualCallable.getClass().getSimpleName(), (Object)e.getClass().getSimpleName());
                }
            } else {
                LOG.warn("Not setting up MDC as unknown callable instance type received: {}", (Object)actualCallable.getClass().getSimpleName());
            }
        }

        private void updateFileSystemCounters(List<LlapUtil.StatisticsData> statsBefore, Callable<V> actualCallable) {
            Thread thread = Thread.currentThread();
            TezCounters tezCounters = null;
            if (actualCallable instanceof TaskRunner2Callable) {
                TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable)actualCallable;
                tezCounters = taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
            } else if (actualCallable instanceof TezCounterSource) {
                tezCounters = ((TezCounterSource)((Object)actualCallable)).getTezCounters();
            }
            if (tezCounters != null) {
                if (statsBefore != null) {
                    Map schemeToStats = LlapUtil.getCombinedFileSystemStatistics();
                    for (Map.Entry entry : schemeToStats.entrySet()) {
                        String scheme = (String)entry.getKey();
                        FileSystem.Statistics statistics = (FileSystem.Statistics)entry.getValue();
                        FileSystem.Statistics.StatisticsData threadFSStats = statistics.getThreadStatistics();
                        List allStatsBefore = LlapUtil.getStatisticsForScheme((String)scheme, statsBefore);
                        long bytesReadDelta = 0L;
                        long bytesWrittenDelta = 0L;
                        long readOpsDelta = 0L;
                        long largeReadOpsDelta = 0L;
                        long writeOpsDelta = 0L;
                        if (allStatsBefore != null && !allStatsBefore.isEmpty()) {
                            for (LlapUtil.StatisticsData sb : allStatsBefore) {
                                bytesReadDelta += threadFSStats.getBytesRead() - sb.getBytesRead();
                                bytesWrittenDelta += threadFSStats.getBytesWritten() - sb.getBytesWritten();
                                readOpsDelta += (long)(threadFSStats.getReadOps() - sb.getReadOps());
                                largeReadOpsDelta += (long)(threadFSStats.getLargeReadOps() - sb.getLargeReadOps());
                                writeOpsDelta += (long)(threadFSStats.getWriteOps() - sb.getWriteOps());
                            }
                        } else {
                            bytesReadDelta = threadFSStats.getBytesRead();
                            bytesWrittenDelta = threadFSStats.getBytesWritten();
                            readOpsDelta = threadFSStats.getReadOps();
                            largeReadOpsDelta = threadFSStats.getLargeReadOps();
                            writeOpsDelta = threadFSStats.getWriteOps();
                        }
                        tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ).increment(bytesReadDelta);
                        tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN).increment(bytesWrittenDelta);
                        tezCounters.findCounter(scheme, FileSystemCounter.READ_OPS).increment(readOpsDelta);
                        tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS).increment(largeReadOpsDelta);
                        tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS).increment(writeOpsDelta);
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug("Updated stats: instance: {} thread name: {} thread id: {} scheme: {} bytesRead: {} bytesWritten: {} readOps: {} largeReadOps: {} writeOps: {}", new Object[]{actualCallable.getClass().getSimpleName(), thread.getName(), thread.getId(), scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta, largeReadOpsDelta, writeOpsDelta});
                    }
                } else {
                    LOG.warn("File system statistics snapshot before execution of thread is null.Thread name: {} id: {} allStats: {}", new Object[]{thread.getName(), thread.getId(), statsBefore});
                }
            } else {
                LOG.warn("TezCounters is null for callable type: {}", (Object)actualCallable.getClass().getSimpleName());
            }
        }
    }
}

