/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.graph.local.worker;

import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.counter.Counters;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.JobConf;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.WorkerComputer;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.graph.local.BaseRecordReader;
import com.aliyun.odps.graph.local.COUNTER;
import com.aliyun.odps.graph.local.EmptyRecordReader;
import com.aliyun.odps.graph.local.GraphTaskAttemptID;
import com.aliyun.odps.graph.local.InputSplit;
import com.aliyun.odps.graph.local.LocalRecordReader;
import com.aliyun.odps.graph.local.LocalRecordWriter;
import com.aliyun.odps.graph.local.LocalVertexMutations;
import com.aliyun.odps.graph.local.RuntimeContext;
import com.aliyun.odps.graph.local.SQLRecord;
import com.aliyun.odps.graph.local.TaskContextImpl;
import com.aliyun.odps.graph.local.master.Master;
import com.aliyun.odps.graph.local.message.MsgManager;
import com.aliyun.odps.graph.local.utils.LocalGraphRunUtils;
import com.aliyun.odps.graph.utils.VerifyUtils;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.io.WritableUtils;
import com.aliyun.odps.utils.ReflectionUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class Worker<VERTEX_ID extends WritableComparable<?>, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable, MESSAGE extends Writable, VALUE extends Writable> {
    private static Log LOG = LogFactory.getLog(Worker.class);
    private List<Aggregator> mAggregators;
    private List<Writable> mAggregatorValues;
    private Master master;
    private Counters mCounters;
    private RuntimeContext mCtx;
    private InputSplit mInput;
    private JobConf mJob;
    private List<Writable> mLastAggregatorValues;
    private Map<Vertex, Iterable<Writable>> mLastStepMessage = new HashMap<Vertex, Iterable<Writable>>();
    private Map<String, TableInfo> mOutputs;
    private Map<VERTEX_ID, LocalVertexMutations> mVertexMutations;
    private GraphTaskAttemptID mTaskAttemptID;
    private TaskContextImpl mTaskContext;
    private WorkerComputer mWorkerComputer;
    private int mWorkerID;
    private int mWorkerNum;
    private Map<String, LocalRecordWriter> mWriters;
    private Map<VERTEX_ID, Vertex> vertices = new HashMap<VERTEX_ID, Vertex>();
    private Writable mWorkerValue;
    private MsgManager mMsgManager;
    private Combiner mCombiner;

    public Worker(JobConf job, RuntimeContext ctx, Master m, GraphTaskAttemptID taskAttemptID, int workerID, int workerNum, InputSplit input, Map<String, TableInfo> outputs) throws InstantiationException, IllegalAccessException, IOException, ClassNotFoundException {
        this.mJob = job;
        this.mCtx = ctx;
        this.master = m;
        this.mTaskAttemptID = taskAttemptID;
        this.mWorkerID = workerID;
        this.mWorkerNum = workerNum;
        this.mInput = input;
        this.mOutputs = outputs;
        this.mCounters = new Counters();
        this.mAggregators = LocalGraphRunUtils.getAggregator(this.mJob);
        this.mMsgManager = new MsgManager();
        this.mTaskContext = new TaskContextImpl(this.mCtx, this.mJob, this, this.mWorkerID, this.mWorkerNum, this.mOutputs, this.mCounters);
        this.mVertexMutations = new HashMap<VERTEX_ID, LocalVertexMutations>();
    }

    public boolean allVertexVoltHalt() {
        boolean ret = true;
        for (Vertex v : this.vertices.values()) {
            ret = ret && v.isHalted();
        }
        return ret && !this.mMsgManager.hasNextStepMessages();
    }

    private void initCombiner() {
        Class combinerCls = this.mJob.getCombinerClass();
        if (combinerCls != null) {
            try {
                this.mCombiner = (Combiner)combinerCls.newInstance();
                this.mCombiner.configure((Configuration)this.mJob);
            }
            catch (Exception e) {
                throw new RuntimeException("exception occored when Instantiate combiner ", e);
            }
        } else {
            this.mCombiner = null;
        }
    }

    private void initWorkerComputer() throws IOException {
        try {
            Class workerComputerClass = this.mJob.getWorkerComputerClass();
            this.mWorkerComputer = (WorkerComputer)workerComputerClass.newInstance();
            List workerValueClass = ReflectionUtils.getTypeArguments(WorkerComputer.class, (Class)workerComputerClass);
            if (workerValueClass.size() > 1) {
                throw new IOException("more than ONE workerValue Type Declared");
            }
            this.mWorkerValue = workerValueClass.size() == 0 ? NullWritable.get() : (workerValueClass.get(0) == null ? NullWritable.get() : (Writable)ReflectionUtils.newInstance((Class)((Class)workerValueClass.get(0)), (Configuration)this.mJob));
            this.mWorkerComputer.setup((WorkerContext)this.mTaskContext, this.mWorkerValue);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void initAggregatorValues() throws IOException {
        this.mLastAggregatorValues = new ArrayList<Writable>();
        for (Aggregator agg : this.mAggregators) {
            this.mLastAggregatorValues.add(agg.createStartupValue((WorkerContext)this.mTaskContext));
        }
    }

    public void loadGraph() throws IOException {
        BaseRecordReader reader;
        Class loaderClass = this.mJob.getGraphLoaderClass();
        GraphLoader graphLoader = (GraphLoader)ReflectionUtils.newInstance((Class)loaderClass, (Configuration)this.mJob);
        graphLoader.setup(this.mTaskContext.getConfiguration(), this.mTaskContext.getWorkerId(), this.mInput.getTable(), (MutationContext)this.mTaskContext);
        if (this.mInput != null && this.mInput.getTable() != null) {
            String fullName = this.mInput.getTable().getProjectName() + "." + this.mInput.getTable().getTableName();
            File tableInfo = this.mInput.getFile().getParentFile();
            while (!tableInfo.getName().equals(this.mInput.getTable().getTableName())) {
                tableInfo = tableInfo.getParentFile();
            }
            reader = new LocalRecordReader(this.mInput.getFile().getParentFile(), tableInfo, this.mCounters.findCounter((Enum)COUNTER.TASK_INPUT_RECORD), this.mCounters.findCounter((Enum)COUNTER.TASK_INPUT_BYTE));
        } else {
            reader = new EmptyRecordReader();
        }
        this.mWriters = new HashMap<String, LocalRecordWriter>();
        for (String label : this.mOutputs.keySet()) {
            Counter outputRecordCounter = this.mCounters.findCounter((Enum)COUNTER.TASK_OUTPUT_RECORD);
            Counter outputByteCounter = this.mCounters.findCounter((Enum)COUNTER.TASK_OUTPUT_BYTE);
            LocalRecordWriter writer = new LocalRecordWriter(new File(this.mCtx.getOutputDir(label), this.mTaskAttemptID.toString()), outputRecordCounter, outputByteCounter);
            this.mWriters.put(label, writer);
        }
        while (((BaseRecordReader)reader).nextKeyValue()) {
            LongWritable recordNum = new LongWritable();
            recordNum.set(((LongWritable)((BaseRecordReader)reader).getCurrentKey()).get());
            graphLoader.load(recordNum, (WritableRecord)((SQLRecord)((BaseRecordReader)reader).getCurrentValue()).clone(), (MutationContext)this.mTaskContext);
        }
        ((BaseRecordReader)reader).close();
        this.mTaskContext.setOutputWriters(this.mWriters);
    }

    public void init() throws IOException {
        this.initCombiner();
        this.initWorkerComputer();
        this.initAggregatorValues();
        for (Vertex v : this.vertices.values()) {
            v.setup((WorkerContext)this.mTaskContext);
        }
    }

    public void processNextStep() throws IOException {
        this.mAggregatorValues = new ArrayList<Writable>();
        for (int i = 0; i < this.mAggregators.size(); ++i) {
            Writable initAggregatorValue = this.mAggregators.get(i).createInitialValue((WorkerContext)this.mTaskContext);
            if (initAggregatorValue == null) {
                throw new RuntimeException("ODPS-0730001: " + this.mAggregators.get(i).getClass().getName() + " createInitialValue return null");
            }
            this.mAggregatorValues.add(initAggregatorValue);
        }
        this.mMsgManager.nextSuperStep(this.mCtx);
    }

    public void Compute() throws IOException {
        this.prepareMsg();
        for (Vertex v : this.vertices.values()) {
            Iterable<Writable> msg = this.mLastStepMessage.get(v);
            if (v.isHalted() && msg.iterator().hasNext()) {
                v.wakeUp();
            }
            if (v.isHalted()) continue;
            v.compute((ComputeContext)this.mTaskContext, msg);
        }
    }

    public void processMutation(VERTEX_ID id, LocalVertexMutations mutations, VertexResolver vertexResolver) throws IOException {
        Vertex v = this.vertices.get(id);
        boolean hasMessage = this.mMsgManager.hasMessageForVertex(this.mCtx, this.master.getSuperStep(), (WritableComparable<?>)id);
        if (vertexResolver == null) {
            throw new IOException("ODPS-0730001: encounter mutations in compute but not set the mutation resolver.");
        }
        Vertex new_v = vertexResolver.resolve(id, v, (VertexChanges)mutations, hasMessage);
        if (new_v == null) {
            this.vertices.remove(id);
        } else {
            VerifyUtils.verifyVertex((Vertex)new_v);
            this.vertices.put(id, new_v);
        }
    }

    public void processWorkerMutations(VertexResolver vertexResolver) throws IOException {
        HashSet<Object> mutationIDs = new HashSet<Object>();
        mutationIDs.addAll(this.mVertexMutations.keySet());
        for (WritableComparable<?> writableComparable : this.mMsgManager.getVertexIDList()) {
            if (this.vertices.get(writableComparable) != null) continue;
            mutationIDs.add(writableComparable);
        }
        for (WritableComparable writableComparable : mutationIDs) {
            this.processMutation(writableComparable, this.mVertexMutations.get(writableComparable), vertexResolver);
        }
        this.mVertexMutations = new HashMap<VERTEX_ID, LocalVertexMutations>();
    }

    public List<Writable> getAggregatorValues() {
        return this.mAggregatorValues;
    }

    public Counters getCounters() {
        return this.mCounters;
    }

    public long getEgeNumber() {
        long egeNumber = 0L;
        for (Vertex v : this.vertices.values()) {
            egeNumber += (long)v.getNumEdges();
        }
        return egeNumber;
    }

    public List<Writable> getLastAggregatedValue() {
        return this.mLastAggregatorValues;
    }

    public Master getMaster() {
        return this.master;
    }

    public WorkerContext getTaskContext() {
        return this.mTaskContext;
    }

    public long getVertexNumber() {
        return this.vertices.size();
    }

    public LocalVertexMutations getVertexMutations(VERTEX_ID id) {
        LocalVertexMutations ret = this.mVertexMutations.get(id);
        if (ret == null) {
            ret = new LocalVertexMutations();
            this.mVertexMutations.put(id, ret);
        }
        return ret;
    }

    public List<Writable> partialAggregate() {
        return this.mAggregatorValues;
    }

    public void pushMsg(RuntimeContext context, long superStep, WritableComparable<?> vertexId, Writable msg) {
        this.mMsgManager.pushMsg(context, superStep, vertexId, msg);
    }

    public void close() throws IOException {
        for (LocalRecordWriter writer : this.mWriters.values()) {
            writer.close();
        }
        this.mWriters = null;
        FileUtils.writeStringToFile((File)new File(this.mCtx.getCounterDir(), String.valueOf(this.mTaskAttemptID.getTaskId())), (String)this.mCounters.toString());
        LOG.debug((Object)this.mCounters);
    }

    public void cleanup() throws IOException {
        for (Vertex v : this.vertices.values()) {
            v.cleanup((WorkerContext)this.mTaskContext);
        }
        this.mWorkerComputer.cleanup((WorkerContext)this.mTaskContext);
    }

    private Iterable<Writable> combineMsg(WritableComparable id, Iterable<Writable> msgs) throws IOException {
        if (this.mCombiner != null) {
            Writable combineMsg = null;
            for (Writable msg : msgs) {
                if (combineMsg == null) {
                    combineMsg = msg;
                    continue;
                }
                this.mCombiner.combine(id, combineMsg, msg);
            }
            ArrayList<Writable> combinedMsgs = new ArrayList<Writable>();
            if (combineMsg != null) {
                combinedMsgs.add(combineMsg);
            }
            return combinedMsgs;
        }
        return msgs;
    }

    private void prepareMsg() throws IOException {
        long superStep = this.master.getSuperStep();
        LOG.debug((Object)("worker super step " + superStep + ", vertices count " + this.vertices.size()));
        this.mLastStepMessage.clear();
        for (Vertex v : this.vertices.values()) {
            Iterable<Writable> msgs = this.mMsgManager.popMsges(this.mCtx, superStep, v.getId());
            if (this.mCombiner != null) {
                msgs = this.combineMsg(v.getId(), msgs);
            }
            this.mLastStepMessage.put(v, msgs);
        }
    }

    public void setLastAggregatedValue(List<Writable> lastAggrValues) {
        this.mLastAggregatorValues = new ArrayList<Writable>(lastAggrValues.size());
        for (int i = 0; i < lastAggrValues.size(); ++i) {
            Writable value = null;
            if (lastAggrValues.get(i) != null) {
                value = WritableUtils.clone((Writable)lastAggrValues.get(i), (Configuration)this.mJob);
            }
            this.mLastAggregatorValues.add(value);
        }
    }

    public void setTotalNumVerticesAndEdges(int totalVertices, int totalEdge) {
        this.mTaskContext.setTotalNumVertices(totalVertices);
        this.mTaskContext.setTotalNumEdges(totalEdge);
    }

    public Writable getWorkerValue() {
        return this.mWorkerValue;
    }
}

