/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extensions.vm.internal.listener;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.mule.extensions.vm.api.VMMessageAttributes;
import org.mule.extensions.vm.internal.QueueDescriptor;
import org.mule.extensions.vm.internal.ReplyToCommand;
import org.mule.extensions.vm.internal.VMConnector;
import org.mule.extensions.vm.internal.VMConnectorQueueManager;
import org.mule.extensions.vm.internal.VMMessage;
import org.mule.extensions.vm.internal.connection.VMConnection;
import org.mule.extensions.vm.internal.listener.VMResponseBuilder;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="listener")
@EmitsResponse
public class VMListener
extends Source<Serializable, VMMessageAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(VMListener.class);
    private static final String REPLY_TO_QUEUE_NAME = "replyTo";
    @Inject
    private VMConnectorQueueManager connectorQueueManager;
    @ParameterGroup(name="queue")
    private QueueDescriptor queueDescriptor;
    @Parameter
    @Optional(defaultValue="4")
    private int numberOfConsumers;
    @Config
    private VMConnector config;
    @Connection
    private ConnectionProvider<VMConnection> connectionProvider;
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private SchedulerConfig schedulerConfig;
    @Inject
    private ConfigurationComponentLocator componentLocator;
    private ComponentLocation location;
    private List<Consumer> consumers;
    private Scheduler scheduler;
    private Semaphore semaphore;

    public void onStart(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) throws MuleException {
        this.connectorQueueManager.registerListenerQueue(this.config, this.queueDescriptor.getQueueName(), this.location);
        this.startConsumers(sourceCallback);
    }

    public void onStop() {
        if (this.consumers != null) {
            this.consumers.forEach(Consumer::stop);
        }
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
        this.connectorQueueManager.unregisterListenerQueue(this.queueDescriptor.getQueueName());
    }

    @OnSuccess
    public void onSuccess(@ParameterGroup(name="Response", showInDsl=true) VMResponseBuilder messageBuilder, CorrelationInfo correlationInfo, SourceCallbackContext ctx) {
        ctx.getVariable(REPLY_TO_QUEUE_NAME).ifPresent(replyTo -> {
            Queue queue;
            VMConnection connection = (VMConnection)ctx.getConnection();
            try {
                queue = connection.getQueue((String)replyTo);
            }
            catch (Exception e) {
                LOGGER.warn(String.format("Found exception trying to obtain replyTo queue '%s'", replyTo), (Throwable)e);
                return;
            }
            if (queue != null) {
                VMMessage message = new VMMessage(messageBuilder.getContent(), correlationInfo.getCorrelationId());
                try {
                    queue.offer((Serializable)message, this.queueDescriptor.getQueueTimeoutInMillis());
                }
                catch (Exception e) {
                    LOGGER.warn(String.format("Found exception trying to send response to replyTo queue '%s'", replyTo), (Throwable)e);
                }
            } else {
                LOGGER.warn("Could not send response to replyTo queue '{}' because it does not exists", replyTo);
            }
        });
    }

    @OnTerminate
    public void onTerminate() {
        this.semaphore.release();
    }

    private void startConsumers(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) {
        this.createScheduler();
        this.consumers = new ArrayList<Consumer>(this.numberOfConsumers);
        this.semaphore = new Semaphore(this.getMaxConcurrency(), false);
        for (int i = 0; i < this.numberOfConsumers; ++i) {
            Consumer consumer = new Consumer(sourceCallback);
            this.consumers.add(consumer);
            this.scheduler.submit(consumer::start);
        }
    }

    private void createScheduler() {
        this.scheduler = this.schedulerService.customScheduler(this.schedulerConfig.withMaxConcurrentTasks(this.numberOfConsumers).withName("vm-listener-flow " + this.location.getRootContainerName()).withWaitAllowed(true).withShutdownTimeout((long)this.queueDescriptor.getTimeout(), this.queueDescriptor.getTimeoutUnit()));
    }

    private int getMaxConcurrency() {
        Flow flow = (Flow)this.componentLocator.find(Location.builder().globalName(this.location.getRootContainerName()).build()).get();
        return flow.getMaxConcurrency();
    }

    private class Consumer {
        private final SourceCallback<Serializable, VMMessageAttributes> sourceCallback;
        private final AtomicBoolean stop = new AtomicBoolean(false);

        public Consumer(SourceCallback<Serializable, VMMessageAttributes> sourceCallback) {
            this.sourceCallback = sourceCallback;
        }

        public void start() {
            long timeout = VMListener.this.queueDescriptor.getQueueTimeoutInMillis();
            while (this.isAlive()) {
                SourceCallbackContext ctx = this.sourceCallback.createContext();
                try {
                    VMListener.this.semaphore.acquire();
                    VMConnection connection = this.connect(ctx);
                    Queue queue = connection.getQueue(VMListener.this.queueDescriptor.getQueueName());
                    TypedValue<Serializable> value = queue.poll(timeout);
                    if (value == null) {
                        this.cancel(ctx);
                        continue;
                    }
                    String correlationId = null;
                    Result.Builder resultBuilder = Result.builder();
                    if (value instanceof VMMessage) {
                        VMMessage command = (VMMessage)value;
                        correlationId = command.getCorrelationId().orElse(null);
                        if (value instanceof ReplyToCommand) {
                            ReplyToCommand replyTo = (ReplyToCommand)value;
                            ctx.addVariable(VMListener.REPLY_TO_QUEUE_NAME, (Object)replyTo.getReplyToQueueName());
                        }
                        value = command.getValue();
                    }
                    if (value instanceof TypedValue) {
                        TypedValue typedValue = (TypedValue)value;
                        resultBuilder.output(typedValue.getValue()).mediaType(typedValue.getDataType().getMediaType());
                    } else {
                        resultBuilder.output((Object)value);
                    }
                    resultBuilder.attributes((Object)new VMMessageAttributes(VMListener.this.queueDescriptor.getQueueName(), correlationId));
                    Result result = resultBuilder.build();
                    ctx.setCorrelationId(correlationId);
                    if (this.isAlive()) {
                        this.sourceCallback.handle(result, ctx);
                        continue;
                    }
                    this.cancel(ctx);
                }
                catch (InterruptedException e) {
                    this.stop();
                    this.cancel(ctx);
                    LOGGER.info("Consumer for <vm:listener> on flow '{}' was interrupted. No more consuming for thread '{}'", (Object)VMListener.this.location.getRootContainerName(), (Object)Thread.currentThread().getName());
                }
                catch (Exception e) {
                    this.cancel(ctx);
                    if (!LOGGER.isErrorEnabled()) continue;
                    LOGGER.error(String.format("Consumer for <vm:listener> on flow '%s' found unexpected exception. Consuming will continue '", VMListener.this.location.getRootContainerName()), (Throwable)e);
                }
            }
        }

        private void cancel(SourceCallbackContext ctx) {
            block2: {
                try {
                    ctx.getTransactionHandle().rollback();
                }
                catch (TransactionException e) {
                    if (!LOGGER.isWarnEnabled()) break block2;
                    LOGGER.warn("Failed to rollback transaction: " + e.getMessage(), (Throwable)e);
                }
            }
            VMListener.this.semaphore.release();
            VMListener.this.connectionProvider.disconnect(ctx.getConnection());
        }

        private VMConnection connect(SourceCallbackContext ctx) throws ConnectionException, TransactionException {
            VMConnection connection = (VMConnection)VMListener.this.connectionProvider.connect();
            ctx.bindConnection((Object)connection);
            return connection;
        }

        private boolean isAlive() {
            return !this.stop.get() && !Thread.currentThread().isInterrupted();
        }

        public void stop() {
            this.stop.set(true);
        }
    }
}

