/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.cache.aggcache;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillManager;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.GroupByCache;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.KeyValueUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpillableGroupByCache
implements GroupByCache {
    private static final Logger logger = LoggerFactory.getLogger(SpillableGroupByCache.class);
    private static final int SPGBY_CACHE_MIN_SIZE = 4096;
    private final LinkedHashMap<ImmutableBytesWritable, Aggregator[]> cache;
    private SpillManager spillManager = null;
    private long totalNumElements = 0L;
    private final ServerAggregators aggregators;
    private final RegionCoprocessorEnvironment env;
    private final MemoryManager.MemoryChunk chunk;

    public SpillableGroupByCache(final RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, ServerAggregators aggs, int estSizeNum) {
        this.aggregators = aggs;
        this.env = env;
        final int estValueSize = this.aggregators.getEstimatedByteSize();
        TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
        Configuration conf = env.getConfiguration();
        long maxCacheSizeConf = conf.getLong("phoenix.groupby.maxCacheSize", 0x6400000L);
        final int numSpillFilesConf = conf.getInt("phoenix.groupby.spillFiles", 2);
        int maxSizeNum = (int)(maxCacheSizeConf / (long)estValueSize);
        int minSizeNum = 4096 / estValueSize;
        final int maxCacheSize = Math.max(minSizeNum, Math.min(maxSizeNum, estSizeNum));
        long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(maxCacheSize, estValueSize);
        try {
            this.chunk = tenantCache.getMemoryManager().allocate(estSize);
        }
        catch (InsufficientMemoryException ime) {
            logger.error("Requested Map size exceeds memory limit, please decrease max size via config paramter: phoenix.groupby.maxCacheSize");
            throw ime;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Instantiating LRU groupby cache of element size: " + maxCacheSize);
        }
        this.cache = new LinkedHashMap<ImmutableBytesWritable, Aggregator[]>(maxCacheSize, 0.75f, true){
            boolean spill;
            int cacheSize;
            {
                super(x0, x1, x2);
                this.spill = false;
                this.cacheSize = maxCacheSize;
            }

            @Override
            protected boolean removeEldestEntry(Map.Entry<ImmutableBytesWritable, Aggregator[]> eldest) {
                if (!this.spill && this.size() > this.cacheSize) {
                    this.cacheSize = (int)((float)this.cacheSize * 1.5f);
                    long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(this.cacheSize, estValueSize);
                    try {
                        SpillableGroupByCache.this.chunk.resize(estSize);
                    }
                    catch (InsufficientMemoryException im) {
                        this.spill = true;
                    }
                }
                if (this.spill) {
                    try {
                        if (SpillableGroupByCache.this.spillManager == null) {
                            SpillableGroupByCache.this.spillManager = new SpillManager(numSpillFilesConf, SpillableGroupByCache.this.aggregators, env.getConfiguration(), new QueryCache());
                        }
                        SpillableGroupByCache.this.spillManager.spill(eldest.getKey(), eldest.getValue());
                    }
                    catch (IOException ioe) {
                        try {
                            throw new RuntimeException(ioe);
                        }
                        catch (Throwable throwable) {
                            Closeables.closeQuietly(SpillableGroupByCache.this);
                            throw throwable;
                        }
                    }
                    return true;
                }
                return false;
            }
        };
    }

    @Override
    public long size() {
        return this.totalNumElements;
    }

    @Override
    public Aggregator[] cache(ImmutableBytesPtr cacheKey) {
        ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey);
        Aggregator[] rowAggregators = this.cache.get((Object)key);
        if (rowAggregators == null) {
            if (this.spillManager != null) {
                try {
                    rowAggregators = this.spillManager.loadEntry(key);
                }
                catch (IOException ioe) {
                    try {
                        throw new RuntimeException(ioe);
                    }
                    catch (Throwable throwable) {
                        Closeables.closeQuietly(this);
                        throw throwable;
                    }
                }
            }
            if (rowAggregators == null) {
                rowAggregators = this.aggregators.newAggregators(this.env.getConfiguration());
                if (logger.isDebugEnabled()) {
                    logger.debug("Adding new aggregate bucket for row key " + Bytes.toStringBinary((byte[])key.get(), (int)key.getOffset(), (int)key.getLength()));
                }
            }
            if (this.cache.put(key, rowAggregators) == null) {
                ++this.totalNumElements;
            }
        }
        return rowAggregators;
    }

    @Override
    public void close() throws IOException {
        Closeables.closeQuietly(this.spillManager);
        Closeables.closeQuietly(this.chunk);
    }

    @Override
    public RegionScanner getScanner(final RegionScanner s) {
        final EntryIterator cacheIter = new EntryIterator();
        return new BaseRegionScanner(s){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void close() throws IOException {
                try {
                    s.close();
                }
                finally {
                    Closeables.closeQuietly(SpillableGroupByCache.this);
                }
            }

            @Override
            public boolean next(List<Cell> results) throws IOException {
                if (!cacheIter.hasNext()) {
                    return false;
                }
                Map.Entry ce = (Map.Entry)cacheIter.next();
                ImmutableBytesWritable key = (ImmutableBytesWritable)ce.getKey();
                Aggregator[] aggs = (Aggregator[])ce.getValue();
                byte[] value = SpillableGroupByCache.this.aggregators.toBytes(aggs);
                if (logger.isDebugEnabled()) {
                    logger.debug("Adding new distinct group: " + Bytes.toStringBinary((byte[])key.get(), (int)key.getOffset(), (int)key.getLength()) + " with aggregators " + aggs.toString() + " value = " + Bytes.toStringBinary((byte[])value));
                }
                results.add((Cell)KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), QueryConstants.SINGLE_COLUMN_FAMILY, QueryConstants.SINGLE_COLUMN, Long.MAX_VALUE, value, 0, value.length));
                return cacheIter.hasNext();
            }
        };
    }

    private final class EntryIterator
    implements Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> {
        final Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter;
        final Iterator<byte[]> spilledCacheIter;

        private EntryIterator() {
            this.cacheIter = SpillableGroupByCache.this.cache.entrySet().iterator();
            this.spilledCacheIter = SpillableGroupByCache.this.spillManager != null ? SpillableGroupByCache.this.spillManager.newDataIterator() : null;
        }

        @Override
        public boolean hasNext() {
            return this.cacheIter.hasNext();
        }

        @Override
        public Map.Entry<ImmutableBytesWritable, Aggregator[]> next() {
            if (this.spilledCacheIter != null && this.spilledCacheIter.hasNext()) {
                try {
                    byte[] value = this.spilledCacheIter.next();
                    SpillManager.CacheEntry<Object> spilledEntry = SpillableGroupByCache.this.spillManager.toCacheEntry(value);
                    boolean notFound = false;
                    while (SpillableGroupByCache.this.cache.containsKey(spilledEntry.getKey())) {
                        if (this.spilledCacheIter.hasNext()) {
                            value = this.spilledCacheIter.next();
                            spilledEntry = SpillableGroupByCache.this.spillManager.toCacheEntry(value);
                            continue;
                        }
                        notFound = true;
                        break;
                    }
                    if (!notFound) {
                        return spilledEntry;
                    }
                }
                catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
            Map.Entry<ImmutableBytesWritable, Aggregator[]> entry = this.cacheIter.next();
            return new SpillManager.CacheEntry<ImmutableBytesWritable>(entry.getKey(), entry.getValue());
        }

        @Override
        public void remove() {
            throw new IllegalAccessError("Remove is not supported for this type of iterator");
        }
    }

    public class QueryCache {
        public boolean isKeyContained(ImmutableBytesPtr key) {
            return SpillableGroupByCache.this.cache.containsKey((Object)key);
        }
    }
}

