package org.apache.shenyu.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.concurrent.CompletableFuture;
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;

/* loaded from: input_file:org/apache/shenyu/protocol/mqtt/Publish.class */
public class Publish extends MessageType {
    @Override // org.apache.shenyu.protocol.mqtt.MessageType, org.apache.shenyu.protocol.mqtt.AbstractMessageType
    public void publish(ChannelHandlerContext channelHandlerContext, MqttPublishMessage mqttPublishMessage) {
        if (isConnected()) {
            return;
        }
        String str = mqttPublishMessage.variableHeader().topicName();
        ByteBuf payload = mqttPublishMessage.payload();
        String byteBufToString = byteBufToString(payload);
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        ((TopicRepository) Singleton.INST.get(TopicRepository.class)).add(str, byteBufToString);
        int packetId = mqttPublishMessage.variableHeader().packetId();
        CompletableFuture.runAsync(() -> {
            send(str, payload, packetId);
        });
        switch (qosLevel.value()) {
            case 0:
            default:
                return;
            case 1:
                qos1(channelHandlerContext, packetId);
                return;
            case 2:
                qos2(channelHandlerContext, packetId);
                return;
        }
    }

    private void qos0() {
    }

    private void qos1(ChannelHandlerContext channelHandlerContext, int i) {
        channelHandlerContext.writeAndFlush(new MqttPubAckMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i)));
    }

    private void qos2(ChannelHandlerContext channelHandlerContext, int i) {
        channelHandlerContext.writeAndFlush(new MqttPubAckMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.EXACTLY_ONCE, false, 0), MqttMessageIdVariableHeader.from(i)));
    }

    private String byteBufToString(ByteBuf byteBuf) {
        if (byteBuf.hasArray()) {
            return new String(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes());
        }
        byte[] bArr = new byte[byteBuf.readableBytes()];
        byteBuf.getBytes(byteBuf.readerIndex(), bArr);
        return new String(bArr, 0, byteBuf.readableBytes());
    }

    private void send(String str, ByteBuf byteBuf, int i) {
        ((SubscribeRepository) Singleton.INST.get(SubscribeRepository.class)).get(str).parallelStream().forEach(channel -> {
            if (channel.isActive()) {
                channel.writeAndFlush(new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttPublishVariableHeader(str, i), Unpooled.wrappedBuffer(byteBuf)));
            }
        });
    }
}
