/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.spring;

import com.alipay.sofa.sofamq.api.MessageProducer;
import com.alipay.sofa.sofamq.api.Producer;
import com.alipay.sofa.sofamq.api.TransactionChecker;
import com.alipay.sofa.sofamq.client.TransactionProducerImpl;
import com.alipay.sofa.sofamq.spring.AbstractComponent;
import com.alipay.sofa.sofamq.spring.ProducerImpl;
import com.alipay.sofa.sofamq.spring.SofaMQProducer;
import com.alipay.sofa.sofamq.spring.TransactionCheckProcessor;
import io.openmessaging.api.GenericMessage;
import io.openmessaging.api.transaction.GenericLocalTransactionChecker;
import io.openmessaging.api.transaction.TransactionStatus;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ProducerManager
extends AbstractComponent {
    private ConcurrentMap<String, SofaMQProducer> producers = new ConcurrentHashMap<String, SofaMQProducer>();
    private ConcurrentMap<String, GenericLocalTransactionChecker> checkers = new ConcurrentHashMap<String, GenericLocalTransactionChecker>();
    private String endpoint;
    private String schemaRegistryUrl;

    public ProducerManager(String endpoint, String schemaRegistryUrl) {
        this.endpoint = endpoint;
        this.schemaRegistryUrl = schemaRegistryUrl;
    }

    public synchronized Producer getProducer(MessageProducer messageProducer) {
        if (this.producers.containsKey(messageProducer.group())) {
            return new ProducerImpl((SofaMQProducer)this.producers.get(messageProducer.group()), messageProducer.topic(), messageProducer.payload()[0]);
        }
        Properties properties = new Properties();
        properties.setProperty("endpoint", this.endpoint);
        properties.setProperty("schemaRegistryUrl", this.schemaRegistryUrl);
        properties.setProperty("groupId", messageProducer.group());
        SofaMQProducer producer = this.newProducer(properties);
        producer.start();
        this.producers.put(messageProducer.group(), producer);
        for (Map.Entry entry : this.checkers.entrySet()) {
            String topic = (String)entry.getKey();
            GenericLocalTransactionChecker checker = (GenericLocalTransactionChecker)entry.getValue();
            producer.registerGenericLocalTransactionChecker(topic, checker);
        }
        return new ProducerImpl((SofaMQProducer)this.producers.get(messageProducer.group()), messageProducer.topic(), messageProducer.payload()[0]);
    }

    public synchronized void registerTransactionChecker(TransactionChecker transactionChecker, TransactionCheckProcessor processor) {
        GenericLocalTransactionCheckerImpl checker = new GenericLocalTransactionCheckerImpl(processor, transactionChecker.payload());
        this.checkers.put(transactionChecker.topic(), checker);
        for (SofaMQProducer producer : this.producers.values()) {
            producer.registerGenericLocalTransactionChecker(transactionChecker.topic(), checker);
        }
    }

    @Override
    protected void start0() {
    }

    @Override
    protected void stop0() {
        for (TransactionProducerImpl producer : this.producers.values()) {
            producer.shutdown();
        }
    }

    protected SofaMQProducer newProducer(Properties properties) {
        return new SofaMQProducer(properties);
    }

    static class GenericLocalTransactionCheckerImpl<T>
    implements GenericLocalTransactionChecker<T> {
        private TransactionCheckProcessor<T> processor;
        private Class<T> clazz;

        public GenericLocalTransactionCheckerImpl(TransactionCheckProcessor<T> processor, Class<T> clazz) {
            this.processor = processor;
            this.clazz = clazz;
        }

        @Override
        public TransactionStatus check(GenericMessage<T> message) {
            return this.processor.check(message);
        }

        @Override
        public Class<T> payloadClass() {
            return this.clazz;
        }
    }
}

