/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.apm.agent.core.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.kafka.KafkaConnectionStatus;
import org.apache.skywalking.apm.agent.core.kafka.KafkaConnectionStatusListener;
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.admin.AdminClient;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.KafkaFuture;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.serialization.Serializer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.serialization.StringSerializer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;

@DefaultImplementor
public class KafkaProducerManager
implements BootService,
Runnable {
    private static final ILog LOGGER = LogManager.getLogger(KafkaProducerManager.class);
    private Set<String> topics = new HashSet<String>();
    private List<KafkaConnectionStatusListener> listeners = new ArrayList<KafkaConnectionStatusListener>();
    private volatile KafkaProducer<String, Bytes> producer;
    private ScheduledFuture<?> bootProducerFuture;

    public void prepare() {
    }

    public void boot() {
        this.bootProducerFuture = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultNamedThreadFactory("org.apache.skywalking.apm.dependencies.kafkaProducerInitThread")).scheduleAtFixedRate((Runnable)new RunnableWithExceptionProtection((Runnable)this, t -> LOGGER.error("unexpected exception.", t)), 0L, 120L, TimeUnit.SECONDS);
    }

    String formatTopicNameThenRegister(String topic) {
        String topicName = StringUtil.isBlank((String)KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? topic : KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + topic;
        this.topics.add(topicName);
        return topicName;
    }

    public void addListener(KafkaConnectionStatusListener listener) {
        if (!this.listeners.contains(listener)) {
            this.listeners.add(listener);
        }
    }

    public void onComplete() {
    }

    @Override
    public void run() {
        Thread.currentThread().setContextClassLoader((ClassLoader)AgentClassLoader.getDefault());
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
        KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
        try (AdminClient adminClient = AdminClient.create(properties);){
            DescribeTopicsResult topicsResult = adminClient.describeTopics(this.topics);
            Set topics = topicsResult.values().entrySet().stream().map(entry -> {
                try {
                    ((KafkaFuture)entry.getValue()).get(KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT, TimeUnit.SECONDS);
                    return null;
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LOGGER.error((Throwable)e, "Get KAFKA topic:{} error.", new Object[]{entry.getKey()});
                    return (String)entry.getKey();
                }
            }).filter(Objects::nonNull).collect(Collectors.toSet());
            if (!topics.isEmpty()) {
                LOGGER.warn("org.apache.skywalking.apm.dependencies.kafka topics {} is not exist, connect to kafka cluster abort", new Object[]{topics});
                return;
            }
            try {
                this.producer = new KafkaProducer<String, Bytes>(properties, (Serializer<String>)new StringSerializer(), (Serializer<Bytes>)new BytesSerializer());
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e, "connect to kafka cluster '{}' failed", new Object[]{KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS});
                if (adminClient != null) {
                    if (var3_3 != null) {
                        try {
                            adminClient.close();
                        }
                        catch (Throwable throwable) {
                            var3_3.addSuppressed(throwable);
                        }
                    } else {
                        adminClient.close();
                    }
                }
                return;
            }
            this.notifyListeners(KafkaConnectionStatus.CONNECTED);
            this.bootProducerFuture.cancel(true);
        }
    }

    private void notifyListeners(KafkaConnectionStatus status) {
        for (KafkaConnectionStatusListener listener : this.listeners) {
            listener.onStatusChanged(status);
        }
    }

    public final KafkaProducer<String, Bytes> getProducer() {
        return this.producer;
    }

    public int priority() {
        return ((GRPCChannelManager)ServiceManager.INSTANCE.findService(GRPCChannelManager.class)).priority() - 1;
    }

    public void shutdown() {
        this.producer.flush();
        this.producer.close();
    }
}

