/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.graph.local;

import com.aliyun.odps.Column;
import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.counter.Counters;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.JobConf;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.local.LocalRecordReader;
import com.aliyun.odps.graph.local.LocalRecordWriter;
import com.aliyun.odps.graph.local.LocalVertexMutations;
import com.aliyun.odps.graph.local.RuntimeContext;
import com.aliyun.odps.graph.local.SQLRecord;
import com.aliyun.odps.graph.local.utils.LocalGraphRunUtils;
import com.aliyun.odps.graph.local.worker.Worker;
import com.aliyun.odps.graph.utils.VerifyUtils;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.local.common.TableMeta;
import com.aliyun.odps.local.common.WareHouse;
import com.aliyun.odps.local.common.utils.LocalRunUtils;
import com.aliyun.odps.local.common.utils.SchemaUtils;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class TaskContextImpl<VERTEX_ID extends WritableComparable<?>, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable, MESSAGE extends Writable>
extends ComputeContext<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE, MESSAGE> {
    private static Log LOG = LogFactory.getLog(TaskContextImpl.class);
    private List<Aggregator> mAggregators;
    private Counters mCounters;
    private RuntimeContext mCtx;
    private JobConf mJob;
    private Map<String, WritableRecord> mOutputRecords;
    private Map<String, TableInfo> mOutputs;
    private int mTaskNum;
    long mTotalNumEdges;
    long mTotalNumVertices;
    private Worker mWorker;
    private int mWorkerID;
    private Map<String, LocalRecordWriter> mWriters;
    private int maxUserDefinedCountersNum = 64;
    private Map<String, Counter> userCounters = new HashMap<String, Counter>();

    public TaskContextImpl(RuntimeContext ctx, JobConf conf, Worker worker, int workerID, int taskNum, Map<String, TableInfo> outputs, Counters counters) {
        this.mWorker = worker;
        this.mCtx = ctx;
        this.mJob = conf;
        this.mWorkerID = workerID;
        this.mTaskNum = taskNum;
        this.mOutputs = outputs;
        this.mCounters = counters;
        this.mAggregators = LocalGraphRunUtils.getAggregator(this.mJob);
        this.mOutputRecords = new HashMap<String, WritableRecord>();
        this.maxUserDefinedCountersNum = this.mJob.getInt("odps.graph.job.max.user.defined.counters.num", 64);
    }

    public LocalVertexMutations getRealVertexMutations(VERTEX_ID id) {
        if (this.mJob.getRuntimePartitioning()) {
            return this.mWorker.getMaster().getVertexMutations(id);
        }
        return this.mWorker.getVertexMutations(id);
    }

    public void addVertexRequest(Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE, MESSAGE> vertex) throws IOException {
        VerifyUtils.verifyVertex(vertex);
        LocalVertexMutations vertexMutations = this.getRealVertexMutations(vertex.getId());
        vertexMutations.addVertex(vertex);
    }

    public void addEdgeRequest(VERTEX_ID sourceVertexId, Edge<VERTEX_ID, EDGE_VALUE> edge) throws IOException {
        VerifyUtils.verifyVertexId(sourceVertexId);
        VerifyUtils.verifyVertexEdge(edge);
        LocalVertexMutations vertexMutations = this.getRealVertexMutations(sourceVertexId);
        vertexMutations.addEdge(edge);
    }

    public void removeEdgeRequest(VERTEX_ID sourceVertexId, VERTEX_ID targetVertexId) throws IOException {
        VerifyUtils.verifyVertexId(sourceVertexId);
        VerifyUtils.verifyVertexId(targetVertexId);
        LocalVertexMutations vertexMutations = this.getRealVertexMutations(sourceVertexId);
        vertexMutations.removeEdge(targetVertexId);
    }

    public void removeVertexRequest(VERTEX_ID vertexId) throws IOException {
        VerifyUtils.verifyVertexId(vertexId);
        LocalVertexMutations vertexMutations = this.getRealVertexMutations(vertexId);
        vertexMutations.removeVertex();
    }

    public void sendMessage(VERTEX_ID destVertexID, MESSAGE msg) throws IOException {
        if (!this.mJob.getRuntimePartitioning()) {
            throw new RuntimeException("ODPS-0730001: vertex partitioning disabled, cannot send message");
        }
        if (msg == null) {
            throw new IllegalArgumentException("ODPS-0730001: sendMessage: Cannot send null message to " + destVertexID);
        }
        this.mWorker.getMaster().pushMsg(this.mCtx, this.getSuperstep() + 1L, destVertexID, (Writable)msg);
    }

    public void sendMessage(Iterable<VERTEX_ID> destVertexIDs, MESSAGE msg) throws IOException {
        for (WritableComparable vertexId : destVertexIDs) {
            this.sendMessage(vertexId, msg);
        }
    }

    public void sendMessageToNeighbors(Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE, MESSAGE> vertex, MESSAGE msg) throws IOException {
        if (vertex.hasEdges()) {
            for (Edge edge : vertex.getEdges()) {
                this.sendMessage(edge.getDestVertexId(), msg);
            }
        }
    }

    public void aggregate(Object item) throws IOException {
        for (int i = 0; i < this.mAggregators.size(); ++i) {
            this.aggregate(i, item);
        }
    }

    public void aggregate(int aggregatorIndex, Object value) throws IOException {
        List<Writable> aggregatorValues = this.mWorker.getAggregatorValues();
        Writable aggValue = aggregatorValues.get(aggregatorIndex);
        this.mAggregators.get(aggregatorIndex).aggregate(aggValue, value);
    }

    private WritableRecord createOutputRecord(String label) throws IOException {
        Column[] cols = SchemaUtils.readSchema((File)this.mCtx.getOutputDir(label)).getCols();
        return new SQLRecord(cols);
    }

    public Configuration getConfiguration() {
        return new JobConf((Configuration)this.mJob, JobConf.JobState.RUNNING);
    }

    public Counter getCounter(Enum<?> name) {
        if (name == null) {
            throw new RuntimeException("ODPS-0730001: Counter name must be not null.");
        }
        return this.getCounter(name.getDeclaringClass().getName(), name.toString());
    }

    public Counter getCounter(String group, String name) {
        String key = group + "#" + name;
        if (this.userCounters.containsKey(key)) {
            return this.userCounters.get(key);
        }
        this.checkUserDefinedCounters(group, name);
        Counter counter = this.mCounters.findCounter(group, name);
        this.userCounters.put(key, counter);
        return counter;
    }

    public <VALUE extends Writable> VALUE getLastAggregatedValue(int aggregatorIndex) {
        return (VALUE)this.mWorker.getLastAggregatedValue().get(aggregatorIndex);
    }

    public long getMaxIteration() {
        return this.getConfiguration().getLong("odps.graph.max.iteration", -1L);
    }

    public int getNumWorkers() {
        return this.mTaskNum;
    }

    public TableInfo getOutputTable() throws IOException {
        return this.getOutputTable("");
    }

    public TableInfo getOutputTable(String label) throws IOException {
        return this.mOutputs.get(label);
    }

    public long getSuperstep() {
        return this.mWorker.getMaster().getSuperStep();
    }

    public long getTotalNumEdges() {
        return this.mTotalNumEdges;
    }

    public long getTotalNumVertices() {
        return this.mTotalNumVertices;
    }

    public Writable getWorkerValue() {
        return this.mWorker.getWorkerValue();
    }

    public Writable getComputeValue() {
        return null;
    }

    public int getWorkerId() {
        return this.mWorkerID;
    }

    public long getWorkerNumEdges() {
        return this.mWorker.getEgeNumber();
    }

    public long getWorkerNumVertices() {
        return this.mWorker.getVertexNumber();
    }

    public void progress() {
        LOG.debug((Object)"Graph Local Mode Just Mock progress method. Not Calculate Time");
    }

    public void closeWriters() throws IOException {
        for (LocalRecordWriter writer : this.mWriters.values()) {
            writer.close();
        }
    }

    public void setOutputWriters(Map<String, LocalRecordWriter> writers) {
        this.mWriters = writers;
    }

    public void setTotalNumEdges(long totalEdges) {
        this.mTotalNumEdges = totalEdges;
    }

    public void setTotalNumVertices(long totalVertices) {
        this.mTotalNumVertices = totalVertices;
    }

    public void write(String label, Writable ... fieldVals) throws IOException {
        LocalRecordWriter writer = this.mWriters.get(label);
        if (writer == null) {
            throw new IOException("The label " + label + " is not found in output");
        }
        if (this.mOutputRecords.get(label) == null) {
            this.mOutputRecords.put(label, this.createOutputRecord(label));
        }
        WritableRecord record = this.mOutputRecords.get(label);
        record.set(fieldVals);
        writer.write(record);
    }

    public void write(Writable ... fieldVals) throws IOException {
        this.write("", fieldVals);
    }

    private void checkUserDefinedCounters(String groupName, String counterName) {
        if (counterName == null || counterName.isEmpty()) {
            throw new RuntimeException("ODPS-0730001: CounterName must be not null or empty.");
        }
        if (groupName == null || groupName.isEmpty()) {
            throw new RuntimeException("ODPS-0730001: groupName must be not null or empty.");
        }
        if (groupName.contains("#")) {
            throw new RuntimeException("ODPS-0730001: Group name: " + groupName + " is invalid, It should not contain '#'");
        }
        if (counterName.contains("#")) {
            throw new RuntimeException("ODPS-0730001: Counter name: " + counterName + " is invalid, It should not contain '#'");
        }
        int maxLength = 100;
        if (groupName.length() + counterName.length() > maxLength) {
            throw new RuntimeException("ODPS-0730001: Group name '" + groupName + "' and Counter name '" + counterName + "' is too long, sum of their length must <= " + maxLength);
        }
        if (this.userCounters.size() >= this.maxUserDefinedCountersNum) {
            throw new RuntimeException("ODPS-0730001: Total num of user defined counters is too more, must be <= " + this.maxUserDefinedCountersNum);
        }
    }

    public byte[] readCacheFile(String resourceName) throws IOException {
        return IOUtils.toByteArray((InputStream)this.readCacheFileAsStream(resourceName));
    }

    public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException {
        File file = new File(this.mCtx.getResourceDir(), resourceName);
        return new BufferedInputStream(new FileInputStream(file));
    }

    public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException {
        return this.readCacheArchive(resourceName, "");
    }

    public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath) throws IOException {
        File baseDir = new File(this.mCtx.getResourceDir(), resourceName);
        File dir = new File(baseDir, relativePath);
        File[] files = dir.listFiles();
        final ArrayList<byte[]> list = new ArrayList<byte[]>();
        for (File file : files) {
            list.add(IOUtils.toByteArray((InputStream)new BufferedInputStream(new FileInputStream(file))));
        }
        return new Iterable<byte[]>(){

            @Override
            public Iterator<byte[]> iterator() {
                return list.iterator();
            }
        };
    }

    public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException {
        return this.readCacheArchiveAsStream(resourceName, "");
    }

    public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException {
        File baseDir = new File(this.mCtx.getResourceDir(), resourceName);
        File dir = new File(baseDir, relativePath);
        File[] files = dir.listFiles();
        final ArrayList<BufferedInputStream> list = new ArrayList<BufferedInputStream>();
        for (File file : files) {
            list.add(new BufferedInputStream(new FileInputStream(file)));
        }
        return new Iterable<BufferedInputStream>(){

            @Override
            public Iterator<BufferedInputStream> iterator() {
                return list.iterator();
            }
        };
    }

    public Iterable<WritableRecord> readResourceTable(String resourceName) throws IOException {
        final File tableDir = new File(this.mCtx.getResourceDir(), resourceName);
        if (!tableDir.exists()) {
            throw new RuntimeException("resource " + resourceName + " not found!");
        }
        if (tableDir.isFile()) {
            throw new RuntimeException("resource " + resourceName + " is not a table resource!");
        }
        final ArrayList dataFiles = new ArrayList();
        LocalRunUtils.listAllDataFiles((File)tableDir, dataFiles);
        return new Iterable<WritableRecord>(){

            @Override
            public Iterator<WritableRecord> iterator() {
                return new WrappedRecordIterator(tableDir, dataFiles);
            }
        };
    }

    public TableInfo getResourceTable(String resourceName) throws IOException {
        File dir = new File(this.mCtx.getResourceDir(), resourceName);
        if (!dir.exists()) {
            throw new RuntimeException("resource " + resourceName + " not found!");
        }
        if (dir.isFile()) {
            throw new RuntimeException("resource " + resourceName + " is not a table resource!");
        }
        TableMeta meta = SchemaUtils.readSchema((File)dir);
        return WareHouse.getInstance().getReferencedTable(meta.getProjName(), resourceName);
    }

    private class WrappedRecordIterator
    implements Iterator<WritableRecord> {
        LocalRecordReader reader;
        WritableRecord current;
        boolean fetched;
        Iterator<File> fileIter;
        File tableDir;

        WrappedRecordIterator(File tableDir, List<File> dataFiles) {
            this.tableDir = tableDir;
            this.fileIter = dataFiles.iterator();
        }

        @Override
        public boolean hasNext() {
            if (this.fetched) {
                return this.current != null;
            }
            try {
                this.fetch();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return this.current != null;
        }

        private void fetch() throws IOException {
            this.fetched = true;
            while (true) {
                if (this.reader == null) {
                    if (!this.fileIter.hasNext()) {
                        this.current = null;
                        return;
                    }
                    File dataFile = this.fileIter.next();
                    this.reader = new LocalRecordReader(dataFile.getParentFile(), this.tableDir, null, null);
                }
                if (this.reader.nextKeyValue()) {
                    this.current = this.reader.getCurrentValue();
                    return;
                }
                this.reader = null;
            }
        }

        @Override
        public WritableRecord next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            this.fetched = false;
            return this.current;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

