/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AkkaRpcActorOversizedResponseMessageTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private static final int FRAMESIZE = 32000;
    private static final String OVERSIZED_PAYLOAD = new String(new byte[32000]);
    private static final String PAYLOAD = "Hello";
    private static RpcService rpcService1;
    private static RpcService rpcService2;

    @BeforeClass
    public static void setupClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(AkkaOptions.FRAMESIZE, "32000 b");
        rpcService1 = AkkaRpcServiceUtils.remoteServiceBuilder((Configuration)configuration, (String)"localhost", (int)0).createAndStart();
        rpcService2 = AkkaRpcServiceUtils.remoteServiceBuilder((Configuration)configuration, (String)"localhost", (int)0).createAndStart();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        RpcUtils.terminateRpcServices((Time)TIMEOUT, (RpcService[])new RpcService[]{rpcService1, rpcService2});
    }

    @Test
    public void testOverSizedResponseMsgAsync() throws Exception {
        try {
            this.runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
            Assert.fail((String)"Expected the RPC to fail.");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, AkkaRpcException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            Assert.assertThat((Object)e.getCause().getMessage().contains(String.valueOf(32000)), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testNormalSizedResponseMsgAsync() throws Exception {
        String message = (String)this.runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync);
        Assert.assertThat((Object)message, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)PAYLOAD)));
    }

    @Test
    public void testNormalSizedResponseMsgSync() throws Exception {
        String message = (String)this.runRemoteMessageResponseTest(PAYLOAD, MessageRpcGateway::messageSync);
        Assert.assertThat((Object)message, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)PAYLOAD)));
    }

    @Test
    public void testOverSizedResponseMsgSync() throws Exception {
        try {
            this.runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
            Assert.fail((String)"Expected the RPC to fail.");
        }
        catch (RpcException e) {
            Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, AkkaRpcException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            Assert.assertThat((Object)e.getCause().getMessage().contains(String.valueOf(32000)), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testLocalOverSizedResponseMsgSync() throws Exception {
        String message = (String)this.runLocalMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
        Assert.assertThat((Object)message, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)OVERSIZED_PAYLOAD)));
    }

    @Test
    public void testLocalOverSizedResponseMsgAsync() throws Exception {
        String message = (String)this.runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
        Assert.assertThat((Object)message, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)OVERSIZED_PAYLOAD)));
    }

    private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
        CompletableFuture<String> messageFuture = messageRpcGateway.messageAsync();
        return messageFuture.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T runRemoteMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
        MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
        try {
            rpcEndpoint.start();
            MessageRpcGateway rpcGateway = (MessageRpcGateway)rpcService2.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
            Object object = rpcCall.apply((Object)rpcGateway);
            return (T)object;
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)TIMEOUT);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T runLocalMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
        MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
        try {
            rpcEndpoint.start();
            MessageRpcGateway rpcGateway = (MessageRpcGateway)rpcService1.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
            Object object = rpcCall.apply((Object)rpcGateway);
            return (T)object;
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)TIMEOUT);
        }
    }

    static class MessageRpcEndpoint
    extends RpcEndpoint
    implements MessageRpcGateway {
        @Nonnull
        private final String message;

        MessageRpcEndpoint(RpcService rpcService, @Nonnull String message) {
            super(rpcService);
            this.message = message;
        }

        @Override
        public CompletableFuture<String> messageAsync() {
            return CompletableFuture.completedFuture(this.message);
        }

        @Override
        public String messageSync() throws RpcException {
            return this.message;
        }
    }

    static interface MessageRpcGateway
    extends RpcGateway {
        public CompletableFuture<String> messageAsync();

        public String messageSync() throws RpcException;
    }
}

