/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.alink.linksdk.channel.core.persistent.mqtt.send;

import com.aliyun.alink.linksdk.channel.core.base.ASend;
import com.aliyun.alink.linksdk.channel.core.persistent.BadNetworkException;
import com.aliyun.alink.linksdk.channel.core.persistent.ISendExecutor;
import com.aliyun.alink.linksdk.channel.core.persistent.PersistentConnectState;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.MqttNet;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.request.MqttPublishRequest;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.request.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.send.MqttRpcMessageCallback;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.send.MqttSend;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.send.MqttSendStatus;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.send.MqttThrowable;
import com.aliyun.alink.linksdk.channel.core.persistent.mqtt.utils.MqttAlinkProtocolHelper;
import com.aliyun.alink.utils.LogUtils;
import com.aliyun.alink.utils.TextUtils;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttSendExecutor
implements ISendExecutor {
    private static final String TAG = "MqttSendExecutor";

    @Override
    public void asyncSend(ASend send) {
        if (null == send || null == send.getRequest()) {
            LogUtils.error(TAG, "asyncSend(): bad parameters: NULL");
            return;
        }
        IMqttAsyncClient mqttAsyncClient = MqttNet.getInstance().getClient();
        if (null == mqttAsyncClient) {
            LogUtils.error(TAG, "asyncSend(): MqttNet::getClient() return null");
            return;
        }
        if (!(send instanceof MqttSend)) {
            LogUtils.print(TAG, "asyncSend(): bad parameter: need MqttSend");
            return;
        }
        MqttSend mqttSend = (MqttSend)send;
        if (MqttNet.getInstance().getConnectState() != PersistentConnectState.CONNECTED) {
            LogUtils.print(TAG, "asyncSend(): gateway disconnect");
            mqttSend.setStatus(MqttSendStatus.completed);
            mqttSend.onFailure(null, new BadNetworkException());
            return;
        }
        if (send.getRequest() instanceof MqttPublishRequest) {
            MqttPublishRequest publishRequest = (MqttPublishRequest)send.getRequest();
            if (TextUtils.isEmpty(publishRequest.topic) || publishRequest.payloadObj == null) {
                LogUtils.error(TAG, "asyncSend(): bad parameters: topic or payload empty");
                mqttSend.onFailure(null, new NullPointerException("topic or payload empty"));
                return;
            }
            if (publishRequest.isRPC && (mqttSend.getStatus() == MqttSendStatus.waitingToSend || mqttSend.getStatus() == MqttSendStatus.completed)) {
                try {
                    String payloadStr = null;
                    if (publishRequest.payloadObj instanceof String) {
                        payloadStr = publishRequest.payloadObj.toString();
                    } else if (publishRequest.payloadObj instanceof byte[]) {
                        payloadStr = new String((byte[])publishRequest.payloadObj, "UTF-8");
                    } else {
                        try {
                            payloadStr = publishRequest.payloadObj.toString();
                        }
                        catch (Exception e) {
                            LogUtils.print(TAG, "asyncSend(), publish , toString error," + e.toString());
                            mqttSend.setStatus(MqttSendStatus.completed);
                            mqttSend.onFailure(null, new MqttThrowable("RPC request ,payload should be String or byte[]"));
                            return;
                        }
                    }
                    publishRequest.msgId = MqttAlinkProtocolHelper.parseMsgIdFromPayload(payloadStr);
                    if (TextUtils.isEmpty(publishRequest.replyTopic)) {
                        publishRequest.replyTopic = publishRequest.topic + "_reply";
                    }
                    LogUtils.print(TAG, "publish: RPC sub reply topic: [ " + publishRequest.replyTopic + " ]");
                    mqttSend.setStatus(MqttSendStatus.waitingToSubReply);
                    mqttAsyncClient.subscribe(publishRequest.replyTopic, 0, null, (IMqttActionListener)mqttSend, (IMqttMessageListener)new MqttRpcMessageCallback(publishRequest.replyTopic, mqttSend));
                }
                catch (Exception e) {
                    LogUtils.print(TAG, "asyncSend(), publish , send subsribe reply error, e = " + e.toString());
                    mqttSend.setStatus(MqttSendStatus.completed);
                    mqttSend.onFailure(null, new MqttThrowable(e.getMessage()));
                }
            } else {
                try {
                    byte[] payload = null;
                    if (publishRequest.payloadObj instanceof String) {
                        payload = publishRequest.payloadObj.toString().getBytes("utf-8");
                    } else if (publishRequest.payloadObj instanceof byte[]) {
                        payload = (byte[])publishRequest.payloadObj;
                    } else {
                        try {
                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
                            ObjectOutputStream oos = new ObjectOutputStream(bos);
                            oos.writeObject(publishRequest.payloadObj);
                            oos.flush();
                            payload = bos.toByteArray();
                        }
                        catch (Exception e) {
                            LogUtils.error(TAG, "asyncSend(): convert payload Obj to byte array error");
                            e.printStackTrace();
                        }
                    }
                    if (publishRequest.payloadObj == null) {
                        LogUtils.print(TAG, "asyncSend(): payload is empty");
                        mqttSend.onFailure(null, new NullPointerException("payload empty"));
                        return;
                    }
                    LogUtils.print(TAG, "publish: topic: [ " + publishRequest.topic + " ]");
                    LogUtils.print(TAG, "publish: payload: [ " + publishRequest.payloadObj.toString() + " ]");
                    MqttMessage message = new MqttMessage(payload);
                    message.setQos(publishRequest.qos);
                    if (publishRequest.isRPC) {
                        mqttSend.setStatus(MqttSendStatus.waitingToPublish);
                    } else {
                        mqttSend.setStatus(MqttSendStatus.waitingToComplete);
                    }
                    mqttAsyncClient.publish(publishRequest.topic, message, null, (IMqttActionListener)mqttSend);
                }
                catch (Exception e) {
                    LogUtils.print(TAG, "asyncSend(), send publish error, e = " + e.toString());
                    mqttSend.setStatus(MqttSendStatus.completed);
                    mqttSend.onFailure(null, new MqttThrowable(e.getMessage()));
                }
            }
        } else if (send.getRequest() instanceof MqttSubscribeRequest) {
            MqttSubscribeRequest subscribeRequest = (MqttSubscribeRequest)send.getRequest();
            if (TextUtils.isEmpty(subscribeRequest.topic)) {
                LogUtils.error(TAG, "asyncSend(): bad parameters: subsribe req , topic empty");
                mqttSend.onFailure(null, new NullPointerException("subsribe req , topic empty"));
                return;
            }
            try {
                mqttSend.setStatus(MqttSendStatus.waitingToComplete);
                if (subscribeRequest.isSubscribe) {
                    LogUtils.print(TAG, "subscribe: topic: [ " + subscribeRequest.topic + " ]");
                    mqttAsyncClient.subscribe(subscribeRequest.topic, 0, null, (IMqttActionListener)mqttSend);
                } else {
                    LogUtils.print(TAG, "unsubscribe: topic: [ " + subscribeRequest.topic + " ]");
                    mqttAsyncClient.unsubscribe(subscribeRequest.topic, null, (IMqttActionListener)mqttSend);
                }
            }
            catch (Exception e) {
                LogUtils.print(TAG, "asyncSend(), send subsribe error, e = " + e.toString());
                mqttSend.setStatus(MqttSendStatus.completed);
                mqttSend.onFailure(null, new MqttThrowable(e.getMessage()));
            }
        }
    }
}

