package org.apache.rocketmq.streams.core.function.supplier;

import java.util.function.Supplier;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
import org.apache.rocketmq.streams.core.util.Utils;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.class */
public class SinkSupplier<K, T> implements Supplier<Processor<T>> {
    private final String topicName;
    private final KeyValueSerializer<K, T> serializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/SinkSupplier$SinkProcessor.class */
    public class SinkProcessor extends AbstractProcessor<T> {
        private final String topicName;
        private DefaultMQProducer producer;
        private final KeyValueSerializer<K, T> serializer;
        private K key;

        public SinkProcessor(String str, KeyValueSerializer<K, T> keyValueSerializer) {
            this.topicName = str;
            this.serializer = keyValueSerializer;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<T> streamContext) {
            this.context = streamContext;
            this.producer = streamContext.getDefaultMQProducer();
            this.key = (K) streamContext.getKey();
        }

        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(T t) throws Throwable {
            byte[] serialize;
            if (t == null || (serialize = this.serializer.serialize(this.key, t)) == null || serialize.length == 0) {
                return;
            }
            if (this.key == null) {
                Message message = new Message(this.topicName, serialize);
                message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, t.getClass().getName());
                if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                    message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
                }
                this.producer.send(message);
                return;
            }
            Message message2 = new Message(this.topicName, serialize);
            message2.setKeys(Utils.toHexString(this.key));
            message2.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, this.key.getClass().getName());
            message2.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, t.getClass().getName());
            if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                message2.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
            }
            this.producer.send(message2, new SelectMessageQueueByHash(), this.key);
        }
    }

    public SinkSupplier(String str, KeyValueSerializer<K, T> keyValueSerializer) {
        this.topicName = str;
        this.serializer = keyValueSerializer;
    }

    @Override // java.util.function.Supplier
    public Processor<T> get() {
        return new SinkProcessor(this.topicName, this.serializer);
    }
}
