package com.nr.instrumentation.kafka.config;

import com.newrelic.api.agent.Agent;
import com.newrelic.api.agent.NewRelic;
import com.nr.instrumentation.kafka.ThreadFactories;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;

/* loaded from: input_file:newrelic/newrelic-agent.jar:instrumentation/kafka-clients-config-1.1.0-1.0.jar:com/nr/instrumentation/kafka/config/ConfigEmitter.class */
public class ConfigEmitter {
    private static final ConfigEmitter INSTANCE;
    private static final List<String> environmentAttrs;
    private final Map<KafkaConfigKey, ConfigurationEvent> events = new ConcurrentHashMap();
    private final AtomicBoolean warnedAboutTooManyConfigurations = new AtomicBoolean(false);
    private volatile ConfigEmissionConfiguration config = null;
    protected ScheduledExecutorService emissionExecutor;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static ConfigEmitter get() {
        return INSTANCE;
    }

    ConfigEmitter() {
    }

    private synchronized void checkInitialization() {
        if (this.config == null) {
            this.config = readConfiguration();
            if (this.config.isEnabled()) {
                scheduleEmissions();
            }
        }
    }

    protected void scheduleEmissions() {
        this.emissionExecutor = Executors.newSingleThreadScheduledExecutor(ThreadFactories.build("Kafka-configEmitter"));
        getAgent().getLogger().log(Level.INFO, "Kafka configurations will be emitted every {0}, starting in {1}. SSL reporting: {2}; SASL reporting: {3}", this.config.getReportingFrequency(), this.config.getReportingDelay(), Boolean.valueOf(this.config.isSslReportingEnabled()), Boolean.valueOf(this.config.isSaslReportingEnabled()));
        this.emissionExecutor.scheduleAtFixedRate(this::reportConfigs, this.config.getReportingDelay().getSeconds(), this.config.getReportingFrequency().getSeconds(), TimeUnit.SECONDS);
    }

    protected ConfigEmissionConfiguration readConfiguration() {
        return ConfigEmissionConfiguration.read();
    }

    public void registerConfiguration(String str, AbstractConfig abstractConfig, ConfigDef configDef) {
        checkInitialization();
        if (!this.config.isEnabled() || wouldConfigurationReportingCapBeExceeded(str, abstractConfig)) {
            return;
        }
        getAgent().getLogger().log(Level.FINE, "registering {0} for client {1}", abstractConfig.getClass().getSimpleName(), str);
        if (installConfigurationEvent(str, abstractConfig, configDef, ConfigScope.GENERAL).getNumRegistrations() == 3) {
            getAgent().getLogger().log(Level.WARNING, "There have been multiple constructions for client.id {0}; are these consumers/producers being instantiated too frequently? Can they be re-used? Query the {1} attribute on configuration events to review construction counts.", str, ConfigurationEvent.ATTR_CONSTRUCTIONS);
        }
        if (this.config.isGeneralOverriddenDefaultReportingEnabled()) {
            installConfigurationEvent(str, abstractConfig, configDef, ConfigScope.GENERAL_OVERRIDDEN_DEFAULTS);
        }
        if (this.config.isSslReportingEnabled()) {
            installConfigurationEvent(str, abstractConfig, configDef, ConfigScope.SSL);
        }
        if (this.config.isSaslReportingEnabled()) {
            installConfigurationEvent(str, abstractConfig, configDef, ConfigScope.SASL);
        }
    }

    private boolean wouldConfigurationReportingCapBeExceeded(String str, AbstractConfig abstractConfig) {
        if (this.events.containsKey(new KafkaConfigKey(str, abstractConfig, ConfigScope.GENERAL)) || this.events.size() < this.config.getConfigurationEventCap()) {
            return false;
        }
        if (this.warnedAboutTooManyConfigurations.getAndSet(true)) {
            return true;
        }
        getAgent().getLogger().log(Level.WARNING, "More than {0} Kafka client configurations have being used, possibly indicating over-construction of clients with unique clientIds. New clients will not have their configuration state reported. Config reporting is only really designed for stable producer/consumer sets -- consider disabling config reporting ({1}) or increasing the client cap ({2})", Integer.valueOf(this.config.getClientCountCap()), ConfigEmissionConfiguration.PROP_REPORTING_ENABLED, ConfigEmissionConfiguration.PROP_REPORTING_CONFIGURATION_CAP);
        return true;
    }

    private ConfigurationEvent installConfigurationEvent(String str, AbstractConfig abstractConfig, ConfigDef configDef, ConfigScope configScope) {
        ConfigurationEvent configurationEvent = new ConfigurationEvent(configToEventType(abstractConfig, configScope), configToAttributes(str, abstractConfig, configDef, configScope));
        ConfigurationEvent put = this.events.put(new KafkaConfigKey(str, abstractConfig, configScope), configurationEvent);
        if (put != null) {
            configurationEvent.replacing(put);
        }
        return configurationEvent;
    }

    private Map<String, Object> configToAttributes(String str, AbstractConfig abstractConfig, ConfigDef configDef, ConfigScope configScope) {
        Map values = abstractConfig.values();
        HashMap hashMap = new HashMap(values.size());
        Map<String, Object> defaultValues = configDef.defaultValues();
        for (Map.Entry<String, ?> entry : values.entrySet()) {
            if (isPropertyReported(entry, configScope, defaultValues)) {
                hashMap.put(configScope.configEventAttrName(entry.getKey()), getReportedConfigValue(entry, configScope, configDef));
            }
        }
        hashMap.put("clientId", str);
        hashMap.put("kafka.version", AppInfoParser.getVersion());
        addEnvironmentAttributes(hashMap);
        return hashMap;
    }

    private void addEnvironmentAttributes(Map<String, Object> map) {
        if (environmentAttrs == null || environmentAttrs.isEmpty()) {
            return;
        }
        for (String str : environmentAttrs) {
            String readFromEnvironment = readFromEnvironment(str);
            if (readFromEnvironment != null) {
                map.put(str, readFromEnvironment);
            }
        }
    }

    protected String readFromEnvironment(String str) {
        return System.getenv(str);
    }

    private Object getReportedConfigValue(Map.Entry<String, ?> entry, ConfigScope configScope, ConfigDef configDef) {
        return configValueToReportedAttribute(!configScope.isOverriddenDefault() ? entry.getValue() : configDef.defaultValues().get(entry.getKey()));
    }

    private Object configValueToReportedAttribute(Object obj) {
        if (!$assertionsDisabled && obj == null) {
            throw new AssertionError();
        }
        Object obj2 = obj;
        if (obj2 instanceof Class) {
            obj2 = ((Class) obj2).getName();
        } else if (obj2 instanceof List) {
            obj2 = ((List) obj2).stream().map(this::configValueToReportedAttribute).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining("|", "|", "|"));
        }
        return obj2;
    }

    private boolean isPropertyReported(Map.Entry<String, ?> entry, ConfigScope configScope, Map<String, Object> map) {
        if (entry.getValue() == null || "client.id".equals(entry.getKey()) || !configScope.ownsProperty(entry.getKey())) {
            return false;
        }
        return !configScope.isOverriddenDefault() || isDefaultOverridden(entry, map);
    }

    private boolean isDefaultOverridden(Map.Entry<String, ?> entry, Map<String, Object> map) {
        Object obj = map.get(entry.getKey());
        return (obj == null || Objects.equals(entry.getValue(), obj)) ? false : true;
    }

    private String configToEventType(AbstractConfig abstractConfig, ConfigScope configScope) {
        return (abstractConfig instanceof ConsumerConfig ? "KafkaConsumer" : abstractConfig instanceof ProducerConfig ? "KafkaProducer" : abstractConfig instanceof AdminClientConfig ? "KafkaAdmin" : "KafkaUnknownClient") + configScope.getEventNameContribution();
    }

    protected void reportConfigs() {
        getAgent().getLogger().log(Level.FINE, "reporting {0} kafka config event(s)", Integer.valueOf(this.events.size()));
        for (ConfigurationEvent configurationEvent : this.events.values()) {
            getAgent().getInsights().recordCustomEvent(configurationEvent.getEventType(), configurationEvent.getAttributes());
        }
    }

    protected Agent getAgent() {
        return NewRelic.getAgent();
    }

    static {
        $assertionsDisabled = !ConfigEmitter.class.desiredAssertionStatus();
        INSTANCE = new ConfigEmitter();
        environmentAttrs = (List) NewRelic.getAgent().getConfig().getValue("kafka.config.environment_attributes", Collections.emptyList());
    }
}
