/*
 * Decompiled with CFR 0.152.
 */
package com.dangdang.ddframe.job.executor.type;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor;
import com.dangdang.ddframe.job.executor.JobFacade;
import java.util.List;

public final class DataflowJobExecutor
extends AbstractElasticJobExecutor {
    private final DataflowJob<Object> dataflowJob;

    public DataflowJobExecutor(DataflowJob<Object> dataflowJob, JobFacade jobFacade) {
        super(jobFacade);
        this.dataflowJob = dataflowJob;
    }

    @Override
    protected void process(ShardingContext shardingContext) {
        DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration)this.getJobRootConfig().getTypeConfig();
        if (dataflowConfig.isStreamingProcess()) {
            this.streamingExecute(shardingContext);
        } else {
            this.oneOffExecute(shardingContext);
        }
    }

    private void streamingExecute(ShardingContext shardingContext) {
        List<Object> data = this.fetchData(shardingContext);
        while (null != data && !data.isEmpty()) {
            this.processData(shardingContext, data);
            if (!this.getJobFacade().isEligibleForJobRunning()) break;
            data = this.fetchData(shardingContext);
        }
    }

    private void oneOffExecute(ShardingContext shardingContext) {
        List<Object> data = this.fetchData(shardingContext);
        if (null != data && !data.isEmpty()) {
            this.processData(shardingContext, data);
        }
    }

    private List<Object> fetchData(ShardingContext shardingContext) {
        return this.dataflowJob.fetchData(shardingContext);
    }

    private void processData(ShardingContext shardingContext, List<Object> data) {
        this.dataflowJob.processData(shardingContext, data);
    }
}

