/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.udf.local.runner;

import com.aliyun.odps.Odps;
import com.aliyun.odps.local.common.security.SecurityClient;
import com.aliyun.odps.local.common.utils.SchemaUtils;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.UDTFCollector;
import com.aliyun.odps.udf.annotation.Resolve;
import com.aliyun.odps.udf.local.InvalidFunctionException;
import com.aliyun.odps.udf.local.LocalRunException;
import com.aliyun.odps.udf.local.datasource.UDTFStdoutCollector;
import com.aliyun.odps.udf.local.runner.BaseRunner;
import com.aliyun.odps.udf.local.util.ArgumentConverterUtils;
import com.aliyun.odps.udf.local.util.ClassUtils;
import com.aliyun.odps.utils.StringUtils;
import java.io.IOException;
import java.util.List;

public class UDTFRunner
extends BaseRunner {
    private UDTF tf;
    private ArgumentConverterUtils.ArgumentConverter[] converters;

    public UDTFRunner(Odps odps, UDTF udtf) throws LocalRunException, UDFException {
        super(odps);
        this.tf = udtf;
        this.tf.setCollector((UDTFCollector)new UDTFStdoutCollector(this.buffer));
        this.checkArguments(this.tf);
        try {
            SecurityClient.open();
            this.tf.setup((ExecutionContext)this.context);
        }
        catch (Exception ex) {
            throw new UDFException((Throwable)ex);
        }
        finally {
            SecurityClient.close();
        }
    }

    public UDTFRunner(Odps odps, String className) throws LocalRunException, UDFException {
        super(odps);
        if (StringUtils.isBlank((String)className)) {
            throw new IllegalArgumentException("Missing arguments:className");
        }
        this.tf = (UDTF)ClassUtils.newInstance(UDTFRunner.class.getClassLoader(), className);
        this.tf.setCollector((UDTFCollector)new UDTFStdoutCollector(this.buffer));
        this.checkArguments(this.tf);
        try {
            SecurityClient.open();
            this.tf.setup((ExecutionContext)this.context);
        }
        catch (Exception ex) {
            throw new UDFException((Throwable)ex);
        }
        finally {
            SecurityClient.close();
        }
    }

    private void checkArguments(UDTF tf) throws LocalRunException {
        Resolve r = tf.getClass().getAnnotation(Resolve.class);
        if (r == null) {
            throw new LocalRunException("You must specify @Resolve annotation.");
        }
        String info = r.value()[0];
        String[] outs = UDTFRunner.parseResolveInfo(info);
        List inputTypes = SchemaUtils.parseResolveTypeInfo((String)outs[0]);
        this.converters = new ArgumentConverterUtils.ArgumentConverter[inputTypes.size()];
        for (int i = 0; i < inputTypes.size(); ++i) {
            String sigType = ArgumentConverterUtils.getSigType((TypeInfo)inputTypes.get(i));
            this.converters[i] = ArgumentConverterUtils.validSigType.get(sigType);
        }
    }

    public static String[] parseResolveInfo(String info) throws InvalidFunctionException {
        String errMsg = "@Resolve({\"" + info + "\"}) ";
        if (info.isEmpty()) {
            throw new InvalidFunctionException(errMsg + "must not be empty string");
        }
        int pos = info.indexOf("->");
        String args = "";
        if (pos > 0) {
            args = info.substring(0, pos);
        } else if (pos < 0) {
            throw new InvalidFunctionException(errMsg);
        }
        int tPos = info.indexOf("->", pos + 2);
        if (tPos >= 0) {
            throw new InvalidFunctionException(errMsg + "contains not exactly one '->'");
        }
        List argTypeInfos = SchemaUtils.parseResolveTypeInfo((String)args);
        if (!UDTFRunner.validTypeInfo(argTypeInfos)) {
            throw new InvalidFunctionException(errMsg + "annotates wrong arguments '" + args + "'");
        }
        String rtypes = info.substring(pos + 2);
        List rtTypeInfos = SchemaUtils.parseResolveTypeInfo((String)rtypes);
        if (rtTypeInfos.isEmpty()) {
            throw new InvalidFunctionException(errMsg + "annotates no output types '" + args + "'");
        }
        if (!UDTFRunner.validTypeInfo(rtTypeInfos)) {
            throw new InvalidFunctionException(errMsg + "annotates wrong output types '" + rtypes + "'");
        }
        return new String[]{args, rtypes};
    }

    public static boolean validTypeInfo(List<TypeInfo> typeInfos) {
        if (typeInfos.isEmpty()) {
            return true;
        }
        for (TypeInfo type : typeInfos) {
            String sigType = ArgumentConverterUtils.getSigType(type);
            if (ArgumentConverterUtils.validSigType.containsKey(sigType)) continue;
            return false;
        }
        return true;
    }

    @Override
    public BaseRunner internalFeed(Object[] input) throws LocalRunException {
        if (input.length != this.converters.length) {
            throw new LocalRunException("Input column count expected:" + this.converters.length + ", while is:" + input.length);
        }
        for (int i = 0; i < this.converters.length; ++i) {
            this.converters[i].check(input[i]);
        }
        try {
            SecurityClient.open();
            this.tf.process(input);
        }
        catch (UDFException e) {
            throw new LocalRunException(e);
        }
        catch (IOException e) {
            throw new LocalRunException(e);
        }
        finally {
            SecurityClient.close();
        }
        return this;
    }

    @Override
    public List<Object[]> internalYield() throws LocalRunException {
        try {
            SecurityClient.open();
            this.tf.close();
        }
        catch (UDFException e) {
            throw new LocalRunException(e);
        }
        finally {
            SecurityClient.close();
        }
        this.tf = null;
        return this.buffer;
    }
}

