/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extensions.vm.internal.operations;

import java.io.Serializable;
import java.util.Optional;
import javax.inject.Inject;
import org.mule.extensions.vm.api.VMError;
import org.mule.extensions.vm.api.VMMessageAttributes;
import org.mule.extensions.vm.internal.QueueDescriptor;
import org.mule.extensions.vm.internal.ReplyToCommand;
import org.mule.extensions.vm.internal.VMConnector;
import org.mule.extensions.vm.internal.VMConnectorQueueManager;
import org.mule.extensions.vm.internal.VMMessage;
import org.mule.extensions.vm.internal.connection.VMConnection;
import org.mule.extensions.vm.internal.operations.ConsumeErrorTypeProvider;
import org.mule.extensions.vm.internal.operations.PublishConsumeErrorTypeProvider;
import org.mule.extensions.vm.internal.operations.PublishErrorTypeProvider;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.ConfigOverride;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Content;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.error.ErrorTypeDefinition;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy;

public class VMOperations
implements Startable,
Stoppable {
    @Inject
    private VMConnectorQueueManager queueManager;
    @Inject
    private SchedulerService schedulerService;
    private Scheduler scheduler;

    public void start() throws MuleException {
        this.scheduler = this.schedulerService.ioScheduler();
    }

    public void stop() throws MuleException {
        if (this.scheduler != null) {
            this.scheduler.stop();
            this.scheduler = null;
        }
    }

    @Throws(value={PublishErrorTypeProvider.class})
    public void publish(@Content TypedValue<Serializable> content, @ParameterGroup(name="queue") QueueDescriptor queueDescriptor, @ConfigOverride OutboundCorrelationStrategy sendCorrelationId, @org.mule.runtime.extension.api.annotation.param.Optional String correlationId, @Config VMConnector config, @Connection VMConnection connection, CorrelationInfo correlationInfo) {
        Queue queue = this.getQueue(queueDescriptor, config, connection);
        VMMessage message = new VMMessage(content, sendCorrelationId.getOutboundCorrelationId(correlationInfo, correlationId).orElse(null));
        this.doPublish(message, queueDescriptor, queue);
    }

    @Throws(value={ConsumeErrorTypeProvider.class})
    public Result<Serializable, VMMessageAttributes> consume(@ParameterGroup(name="queue") QueueDescriptor queueDescriptor, @Config VMConnector config, @Connection VMConnection connection, ComponentLocation location) {
        this.queueManager.validateNoListenerOnQueue(queueDescriptor.getQueueName(), "consume", location);
        Queue queue = this.getQueue(queueDescriptor, config, connection);
        return this.doConsume(queue, queueDescriptor).map(value -> this.asConsumeResponse((Serializable)value, queueDescriptor)).orElseThrow(() -> new ModuleException(String.format("Tried to consume messages from VM queue '%s' but it was empty after timeout of %d %s", new Object[]{queueDescriptor.getQueueName(), queueDescriptor.getTimeout(), queueDescriptor.getTimeoutUnit()}), (ErrorTypeDefinition)VMError.EMPTY_QUEUE));
    }

    @Throws(value={PublishConsumeErrorTypeProvider.class})
    public Result<Serializable, VMMessageAttributes> publishConsume(@Content TypedValue<Serializable> content, @ParameterGroup(name="queue") QueueDescriptor queueDescriptor, @ConfigOverride OutboundCorrelationStrategy sendCorrelationId, @org.mule.runtime.extension.api.annotation.param.Optional String correlationId, @Config VMConnector config, @Connection VMConnection connection, CorrelationInfo correlationInfo) {
        Queue queue = this.getQueue(queueDescriptor, config, connection);
        Queue replyToQueue = this.queueManager.createReplyToQueue(queue, connection);
        ReplyToCommand message = new ReplyToCommand(content, replyToQueue.getName(), sendCorrelationId.getOutboundCorrelationId(correlationInfo, correlationId).orElse(null));
        try {
            this.doPublish(message, queueDescriptor, queue);
            Result result = this.doConsume(replyToQueue, queueDescriptor).map(value -> this.asConsumeResponse((Serializable)value, queueDescriptor)).orElseThrow(() -> new ModuleException(String.format("Published messages to queue '%s' but got no response after timeout of %d %s", new Object[]{queueDescriptor.getQueueName(), queueDescriptor.getTimeout(), queueDescriptor.getTimeoutUnit()}), (ErrorTypeDefinition)VMError.QUEUE_TIMEOUT));
            return result;
        }
        catch (ModuleException e) {
            throw e;
        }
        catch (Exception e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Found error trying to perform publish-consume to VM queue '%s'", queueDescriptor.getQueueName())), (Throwable)e);
        }
        finally {
            this.queueManager.disposeReplyToQueue(replyToQueue);
        }
    }

    private Result<Serializable, VMMessageAttributes> asConsumeResponse(Serializable value, QueueDescriptor queueDescriptor) {
        Result.Builder resultBuilder = Result.builder();
        String correlationId = null;
        if (value instanceof VMMessage) {
            VMMessage message = (VMMessage)value;
            value = message.getValue();
            correlationId = message.getCorrelationId().orElse(null);
        }
        if (value instanceof TypedValue) {
            TypedValue typedValue = (TypedValue)value;
            resultBuilder.output(typedValue.getValue()).mediaType(typedValue.getDataType().getMediaType());
        } else {
            resultBuilder.output(value);
        }
        resultBuilder.attributes((Object)new VMMessageAttributes(queueDescriptor.getQueueName(), correlationId));
        return resultBuilder.build();
    }

    private void doPublish(Serializable content, QueueDescriptor queueDescriptor, Queue queue) {
        try {
            if (!queue.offer(content, queueDescriptor.getQueueTimeoutInMillis())) {
                throw new ModuleException("Timeout publishing message to VM queue " + queueDescriptor.getQueueName(), (ErrorTypeDefinition)VMError.QUEUE_TIMEOUT);
            }
        }
        catch (InterruptedException e) {
            throw new ModuleException((ErrorTypeDefinition)VMError.QUEUE_TIMEOUT, (Throwable)e);
        }
    }

    private Optional<Serializable> doConsume(Queue queue, QueueDescriptor queueDescriptor) {
        try {
            return Optional.ofNullable(queue.poll(queueDescriptor.getQueueTimeoutInMillis()));
        }
        catch (Exception e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Found error trying to consume messages from VM queue '%s'", queueDescriptor.getQueueName())), (Throwable)e);
        }
    }

    private Queue getQueue(QueueDescriptor queueDescriptor, VMConnector config, VMConnection connection) {
        String queueName = queueDescriptor.getQueueName();
        this.queueManager.validateQueue(queueName, config);
        return connection.getQueue(queueName);
    }
}

