/*
 * Decompiled with CFR 0.152.
 */
package com.yqsmartcity.data.swap.service.busi.impl.oozie;

import com.alibaba.druid.filter.config.ConfigTools;
import com.alibaba.fastjson.JSONObject;
import com.ohaotian.plugin.base.exception.ZTBusinessException;
import com.yqsmartcity.data.swap.api.azkaban.bo.AzkabanUploadZipRspBO;
import com.yqsmartcity.data.swap.api.azkaban.service.AzkabanLoginService;
import com.yqsmartcity.data.swap.api.bo.RemoteConnectBO;
import com.yqsmartcity.data.swap.api.dataworks.bo.TaskPublishFileBO;
import com.yqsmartcity.data.swap.api.hdfs.bo.UploadHdfsFileInfoBO;
import com.yqsmartcity.data.swap.api.hdfs.service.UploadFile2HdfsService;
import com.yqsmartcity.data.swap.api.oozie.bo.OoziePublishReqBO;
import com.yqsmartcity.data.swap.api.oozie.service.CreateJobPropertiesService;
import com.yqsmartcity.data.swap.api.oozie.service.PublishToOozieService;
import com.yqsmartcity.data.swap.constant.ConnectLinuxCommand;
import com.yqsmartcity.data.swap.dao.HostInfoMapper;
import com.yqsmartcity.data.swap.dao.TaskPublishFileMapper;
import com.yqsmartcity.data.swap.dao.TaskPublishMapper;
import com.yqsmartcity.data.swap.po.HostInfoPO;
import com.yqsmartcity.data.swap.po.TaskPublishPO;
import com.yqsmartcity.data.swap.service.busi.impl.dataworks.FileZipUtil;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Service
public class PublishToOozieServiceImpl
implements PublishToOozieService {
    private static final Logger log = LoggerFactory.getLogger(PublishToOozieServiceImpl.class);
    private static final Logger logger = LoggerFactory.getLogger(PublishToOozieServiceImpl.class);
    @Autowired
    AzkabanLoginService azkabanLogin;
    @Autowired
    private UploadFile2HdfsService uploadFile2HdfsService;
    @Autowired
    private TaskPublishMapper taskPublishMapper;
    @Autowired
    private TaskPublishFileMapper taskPublishFileMapper;
    @Autowired
    private HostInfoMapper hostInfoMapper;
    @Autowired
    private CreateJobPropertiesService createJobPropertiesService;
    @Value(value="${oozie.url}")
    private String oozieUrl;
    @Value(value="${hdfs.defalutFS}")
    private String hdfsDefalutFS;

    public void publishOozie(OoziePublishReqBO reqBO) throws ZTBusinessException {
        AzkabanUploadZipRspBO rspBO = new AzkabanUploadZipRspBO();
        logger.info("---------------------call publishOozie");
        this.validateParam(reqBO);
        List<TaskPublishFileBO> createImportFileBOListAll = this.taskPublishFileMapper.getFileLIstByPublishId(reqBO.getUnidList());
        if (CollectionUtils.isEmpty(createImportFileBOListAll)) {
            throw new ZTBusinessException("\u672a\u67e5\u8be2\u5230\u5f85\u53d1\u5e03\u4fe1\u606f");
        }
        for (final Long unid : reqBO.getUnidList()) {
            List createImportFileBOList = createImportFileBOListAll.stream().filter(fileBo -> fileBo.getUnid().equals(unid)).collect(Collectors.toList());
            ArrayList jsonFile = new ArrayList();
            final ArrayList<UploadHdfsFileInfoBO> jobFile = new ArrayList<UploadHdfsFileInfoBO>();
            ArrayList shFile = new ArrayList();
            final ArrayList<UploadHdfsFileInfoBO> fileList = new ArrayList<UploadHdfsFileInfoBO>();
            for (TaskPublishFileBO createImportFileBO : createImportFileBOList) {
                if ("mappers/job".equalsIgnoreCase(createImportFileBO.getFileType())) {
                    UploadHdfsFileInfoBO jobFileInfoBO = new UploadHdfsFileInfoBO();
                    jobFileInfoBO.setUnid(createImportFileBO.getUnid());
                    jobFileInfoBO.setLocalFile(createImportFileBO.getFileName());
                    jobFileInfoBO.setFileContent(createImportFileBO.getFileContent().toString());
                    jobFile.add(jobFileInfoBO);
                    continue;
                }
                UploadHdfsFileInfoBO uploadHdfsFileInfoBO = new UploadHdfsFileInfoBO();
                uploadHdfsFileInfoBO.setTargetFilePath(createImportFileBO.getFilePath() + "/" + createImportFileBO.getFileName());
                uploadHdfsFileInfoBO.setFileContent(createImportFileBO.getFileContent().toString());
                fileList.add(uploadHdfsFileInfoBO);
            }
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 2L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2));
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        String jobId = null;
                        try {
                            PublishToOozieServiceImpl.this.writeFile2Hdfs(fileList);
                            jobId = PublishToOozieServiceImpl.submitJobPro(jobFile);
                        }
                        catch (ZTBusinessException e) {
                            TaskPublishPO taskPublishPO = new TaskPublishPO();
                            taskPublishPO.setUnid(unid);
                            taskPublishPO.setStatus("4");
                            taskPublishPO.setPublishTime(new Date());
                            taskPublishPO.setRemark(e.getLocalizedMessage());
                            PublishToOozieServiceImpl.this.taskPublishMapper.updateByPrimaryKeySelective(taskPublishPO);
                            e.printStackTrace();
                        }
                        if (!StringUtils.isEmpty((Object)jobId)) {
                            TaskPublishPO taskPublishPO = new TaskPublishPO();
                            taskPublishPO.setUnid(unid);
                            taskPublishPO.setPublishTime(new Date());
                            taskPublishPO.setJobId(jobId);
                            PublishToOozieServiceImpl.this.taskPublishMapper.updateByPrimaryKeySelective(taskPublishPO);
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            TaskPublishPO taskPublishPO = new TaskPublishPO();
            taskPublishPO.setUnid(unid);
            taskPublishPO.setStatus("2");
            if (null != reqBO.getUserId()) {
                taskPublishPO.setPublishOperId(String.valueOf(reqBO.getUserId()));
            }
            taskPublishPO.setPublishOperName(reqBO.getName());
            this.taskPublishMapper.updateByPrimaryKeySelective(taskPublishPO);
        }
        rspBO.setZipFilePath("workflow_conf_file");
    }

    private static String submitJobPro(List<UploadHdfsFileInfoBO> jobFile) throws ZTBusinessException {
        String jobId = null;
        for (UploadHdfsFileInfoBO uploadHdfsFileInfoBO : jobFile) {
            JSONObject confFile = JSONObject.parseObject((String)uploadHdfsFileInfoBO.getFileContent());
            OozieClient wc = new OozieClient(confFile.get((Object)"oozieUrl").toString());
            Properties conf = wc.createConfiguration();
            conf.setProperty("nameNode", confFile.get((Object)"nameNode").toString());
            conf.setProperty("jobTracker", confFile.get((Object)"jobTracker").toString());
            conf.setProperty("queueName", confFile.get((Object)"queueName").toString());
            conf.setProperty("examplesRoot", confFile.get((Object)"examplesRoot").toString());
            conf.setProperty("workflowAppUri", confFile.get((Object)"workflowAppUri").toString());
            conf.setProperty("outputpath", confFile.get((Object)"outputpath").toString());
            conf.setProperty("user.name", confFile.get((Object)"user.name").toString());
            conf.setProperty("oozie.use.system.libpath", "true");
            conf.setProperty("start", confFile.get((Object)"start").toString());
            conf.setProperty("end", confFile.get((Object)"end").toString());
            if (!StringUtils.isEmpty((Object)confFile.get((Object)"runCycle"))) {
                conf.setProperty("runCycle", confFile.get((Object)"runCycle").toString());
                conf.setProperty("oozie.coord.application.path", confFile.get((Object)"oozie.coord.application.path").toString());
            } else {
                conf.setProperty("oozie.wf.application.path", confFile.get((Object)"oozie.wf.application.path").toString());
            }
            logger.info("\u63d0\u4ea4oozie:" + confFile.get((Object)"workflowAppUri").toString());
            try {
                logger.info("--------------oozie run---------");
                jobId = wc.run(conf);
                logger.info("--------------oozie run----end-----");
                uploadHdfsFileInfoBO.setJobId(jobId);
                logger.info("--------------jobId=" + jobId);
            }
            catch (OozieClientException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
                throw new ZTBusinessException("\u63d0\u4ea4oozie\u4efb\u52a1\u5931\u8d25\uff01" + e.getLocalizedMessage());
            }
        }
        return jobId;
    }

    private static String zipFile(String filePath) throws ZTBusinessException {
        File zipFile = new File(filePath + ".zip");
        try {
            FileOutputStream fos1 = new FileOutputStream(zipFile);
            FileZipUtil.toZip(new File(filePath), fos1, false);
        }
        catch (FileNotFoundException e) {
            throw new ZTBusinessException(e.getLocalizedMessage());
        }
        return zipFile.getName();
    }

    private static void scpPutJson(HostInfoPO hostInfoPO, List<UploadHdfsFileInfoBO> jsonFile) throws ZTBusinessException {
        RemoteConnectBO remoteConnect = new RemoteConnectBO();
        remoteConnect.setIp(hostInfoPO.getHostIp());
        remoteConnect.setPort(hostInfoPO.getHostPort().intValue());
        remoteConnect.setUserName(hostInfoPO.getHostUser());
        try {
            if ("key".equals(hostInfoPO.getLandMode())) {
                File keyFile = new File(hostInfoPO.getPathAndKey());
                if (!ConnectLinuxCommand.loginByFileKey(remoteConnect, keyFile, null).booleanValue()) {
                    throw new ZTBusinessException("\u767b\u9646\u4e3b\u673a" + hostInfoPO.getHostIp() + "\u5931\u8d25");
                }
            } else if ("password".equals(hostInfoPO.getLandMode())) {
                if ("password".equalsIgnoreCase(hostInfoPO.getLandMode())) {
                    try {
                        remoteConnect.setPassword(ConfigTools.decrypt((String)hostInfoPO.getHostPass()));
                    }
                    catch (Exception e) {
                        throw new ZTBusinessException("\u4e0a\u4f20\u811a\u672c\u4e3b\u673a\u7684\u5bc6\u7801\u9519\u8bef\uff01");
                    }
                }
                if (!ConnectLinuxCommand.login(remoteConnect).booleanValue()) {
                    throw new ZTBusinessException("\u767b\u9646\u4e3b\u673a" + hostInfoPO.getHostIp() + "\u5931\u8d25");
                }
            }
            Random random = new Random(999999999L);
            for (UploadHdfsFileInfoBO fileName : jsonFile) {
                int ran = random.nextInt(999999999);
                FileZipUtil.txtFile(fileName.getFileContent(), "datax_json_file/" + ran + "/", fileName.getLocalFile());
                ConnectLinuxCommand.scpPut("datax_json_file/" + ran + "/" + fileName.getLocalFile(), fileName.getTargetFilePath());
                FileZipUtil.deleteDir(new File("datax_json_file/" + ran));
            }
        }
        catch (IOException e) {
            e.printStackTrace();
            throw new ZTBusinessException("\u4e0a\u4f20\u6587\u4ef6\u81f3\u670d\u52a1\u5668\u5931\u8d25\uff01");
        }
        finally {
            ConnectLinuxCommand.logOut();
        }
    }

    private void validateParam(OoziePublishReqBO reqBO) {
        if (CollectionUtils.isEmpty((Collection)reqBO.getUnidList())) {
            throw new ZTBusinessException("\u5fc5\u4f20\u53c2\u6570\u8868\u540d\u5217\u8868\u3010unidList\u3011\u4e3a\u7a7a");
        }
    }

    public void writeFile2Hdfs(List<UploadHdfsFileInfoBO> updateFile) throws ZTBusinessException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", this.hdfsDefalutFS);
        conf.set("user", "oozie");
        try {
            FileSystem fs = FileSystem.get((URI)new URI(this.hdfsDefalutFS), (Configuration)conf, (String)"hdfs");
            for (UploadHdfsFileInfoBO ploadHdfsFileInfoBO : updateFile) {
                logger.info("------\u51991\uff1a" + ploadHdfsFileInfoBO.getTargetFilePath());
                FSDataOutputStream out = fs.create(new Path("/" + ploadHdfsFileInfoBO.getTargetFilePath()));
                logger.info("------\u51992\uff1a");
                String fileContent = ploadHdfsFileInfoBO.getFileContent();
                out.write(fileContent.getBytes("UTF-8"));
                logger.info("------\u51993\uff1a");
                out.close();
            }
            fs.close();
            logger.info("------close1\uff1a");
        }
        catch (IOException e) {
            e.printStackTrace();
            throw new ZTBusinessException("\u4e0a\u4f20\u6587\u4ef6\u5230HDFS\u5931\u8d25\uff01");
        }
        catch (URISyntaxException ee2) {
            throw new ZTBusinessException("\u4e0a\u4f20\u6587\u4ef6\u5230HDFS\u5931\u8d25\uff01");
        }
        catch (InterruptedException ee2) {
            throw new ZTBusinessException("\u4e0a\u4f20\u6587\u4ef6\u5230HDFS\u5931\u8d25\uff01");
        }
    }
}

