/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.Collections;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityConflictResolver;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;

public class InputPriorityConflictResolverTest {
    @Test
    public void testDetectAndResolve() {
        TestingBatchExecNode[] nodes = new TestingBatchExecNode[9];
        for (int i = 0; i < nodes.length; ++i) {
            nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        nodes[1].addInput((ExecNode<?>)nodes[0], InputProperty.builder().priority(0).build());
        nodes[2].addInput((ExecNode<?>)nodes[1], InputProperty.builder().priority(0).build());
        nodes[3].addInput((ExecNode<?>)nodes[0], InputProperty.builder().priority(0).build());
        nodes[4].addInput((ExecNode<?>)nodes[8], InputProperty.builder().priority(0).build());
        nodes[4].addInput((ExecNode<?>)nodes[0], InputProperty.builder().priority(0).build());
        nodes[5].addInput((ExecNode<?>)nodes[8], InputProperty.builder().priority(0).build());
        nodes[5].addInput((ExecNode<?>)nodes[0], InputProperty.builder().damBehavior(InputProperty.DamBehavior.END_INPUT).priority(0).build());
        nodes[7].addInput((ExecNode<?>)nodes[1], InputProperty.builder().priority(0).build());
        nodes[7].addInput((ExecNode<?>)nodes[2], InputProperty.builder().priority(0).build());
        nodes[7].addInput((ExecNode<?>)nodes[3], InputProperty.builder().priority(1).build());
        nodes[7].addInput((ExecNode<?>)nodes[4], InputProperty.builder().priority(10).build());
        nodes[7].addInput((ExecNode<?>)nodes[5], InputProperty.builder().priority(10).build());
        nodes[7].addInput((ExecNode<?>)nodes[6], InputProperty.builder().priority(100).build());
        InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(Collections.singletonList(nodes[7]), InputProperty.DamBehavior.END_INPUT, StreamExchangeMode.BATCH, (ReadableConfig)new Configuration());
        resolver.detectAndResolve();
        Assert.assertEquals((Object)nodes[1], nodes[7].getInputNodes().get(0));
        Assert.assertEquals((Object)nodes[2], nodes[7].getInputNodes().get(1));
        Assert.assertTrue((boolean)(nodes[7].getInputNodes().get(2) instanceof BatchExecExchange));
        Assert.assertEquals(Optional.of(StreamExchangeMode.BATCH), (Object)((BatchExecExchange)nodes[7].getInputNodes().get(2)).getRequiredExchangeMode());
        Assert.assertEquals((Object)nodes[3], (Object)((ExecEdge)nodes[7].getInputNodes().get(2).getInputEdges().get(0)).getSource());
        Assert.assertTrue((boolean)(nodes[7].getInputNodes().get(3) instanceof BatchExecExchange));
        Assert.assertEquals(Optional.of(StreamExchangeMode.BATCH), (Object)((BatchExecExchange)nodes[7].getInputNodes().get(3)).getRequiredExchangeMode());
        Assert.assertEquals((Object)nodes[4], (Object)((ExecEdge)nodes[7].getInputNodes().get(3).getInputEdges().get(0)).getSource());
        Assert.assertEquals((Object)nodes[5], nodes[7].getInputNodes().get(4));
        Assert.assertEquals((Object)nodes[6], nodes[7].getInputNodes().get(5));
    }

    @Test
    public void testDeadlockCausedByExchange() {
        TestingBatchExecNode[] nodes = new TestingBatchExecNode[2];
        for (int i = 0; i < nodes.length; ++i) {
            nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        BatchExecExchange exchange = new BatchExecExchange((ReadableConfig)new Configuration(), InputProperty.builder().requiredDistribution(InputProperty.ANY_DISTRIBUTION).build(), (RowType)nodes[0].getOutputType(), "Exchange");
        exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
        ExecEdge execEdge = ExecEdge.builder().source((ExecNode)nodes[0]).target((ExecNode)exchange).build();
        exchange.setInputEdges(Collections.singletonList(execEdge));
        nodes[1].addInput((ExecNode<?>)exchange, InputProperty.builder().priority(0).build());
        nodes[1].addInput((ExecNode<?>)exchange, InputProperty.builder().priority(1).build());
        InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(Collections.singletonList(nodes[1]), InputProperty.DamBehavior.END_INPUT, StreamExchangeMode.BATCH, (ReadableConfig)new Configuration());
        resolver.detectAndResolve();
        ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
        ExecNode<?> input1 = nodes[1].getInputNodes().get(1);
        Assert.assertNotSame(input0, input1);
        Consumer<ExecNode> checkExchange = execNode -> {
            Assert.assertTrue((boolean)(execNode instanceof BatchExecExchange));
            BatchExecExchange e = (BatchExecExchange)execNode;
            Assert.assertEquals(Optional.of(StreamExchangeMode.BATCH), (Object)e.getRequiredExchangeMode());
            Assert.assertEquals((Object)nodes[0], (Object)((ExecEdge)e.getInputEdges().get(0)).getSource());
        };
        checkExchange.accept(input0);
        checkExchange.accept(input1);
    }
}

