/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.socket.api.source;

import java.io.InputStream;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.mule.extension.socket.api.ImmutableSocketAttributes;
import org.mule.extension.socket.api.config.ListenerConfig;
import org.mule.extension.socket.api.connection.ListenerConnection;
import org.mule.extension.socket.api.worker.SocketWorker;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EmitsResponse
@Alias(value="listener")
@MediaType(value="*/*", strict=false)
public final class SocketListener
extends Source<InputStream, ImmutableSocketAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SocketListener.class);
    @Inject
    private MuleContext muleContext;
    @Inject
    private SchedulerService schedulerService;
    @Connection
    private ConnectionProvider<ListenerConnection> connectionProvider;
    private ListenerConnection connection;
    @Config
    private ListenerConfig config;
    private ComponentLocation location;
    private AtomicBoolean stopRequested = new AtomicBoolean(false);
    private Scheduler workManager;
    private Scheduler listenerExecutor;
    private Future<?> submittedListenerTask;

    public void onStart(SourceCallback<InputStream, ImmutableSocketAttributes> sourceCallback) throws MuleException {
        this.connection = (ListenerConnection)this.connectionProvider.connect();
        this.workManager = this.schedulerService.ioScheduler(this.muleContext.getSchedulerBaseConfig().withName(String.format("%s.socket.worker", this.location.getRootContainerName())));
        this.stopRequested.set(false);
        this.listenerExecutor = this.schedulerService.customScheduler(this.muleContext.getSchedulerBaseConfig().withMaxConcurrentTasks(1).withName(String.format("%s.socket.listener", this.location.getRootContainerName())));
        this.submittedListenerTask = this.listenerExecutor.submit(() -> this.listen(sourceCallback));
    }

    @OnSuccess
    public void onSuccess(@Optional(defaultValue="#[payload]") @ParameterDsl(allowReferences=false) InputStream responseValue, SourceCallbackContext context) {
        context.getVariable("work").ifPresent(worker -> worker.onComplete(responseValue));
    }

    @OnError
    public void onError(Error error, SourceCallbackContext context) {
        context.getVariable("work").ifPresent(woker -> woker.onError(error.getCause()));
    }

    public void onStop() {
        this.stopRequested.set(true);
        if (this.submittedListenerTask != null) {
            this.submittedListenerTask.cancel(false);
        }
        if (this.listenerExecutor != null) {
            this.listenerExecutor.stop();
        }
        if (this.workManager != null) {
            this.workManager.stop();
        }
        if (this.connection != null) {
            this.connectionProvider.disconnect((Object)this.connection);
        }
    }

    private boolean isRequestedToStop() {
        return this.stopRequested.get() || Thread.currentThread().isInterrupted();
    }

    private void listen(SourceCallback<InputStream, ImmutableSocketAttributes> sourceCallback) {
        while (!this.isRequestedToStop()) {
            try {
                SocketWorker worker = this.connection.listen(sourceCallback);
                worker.onError(e -> {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(String.format("Got exception '%s'. Work being executed was: %s", e.getClass().getName(), worker.toString()));
                    }
                    ExceptionUtils.extractConnectionException((Throwable)e).ifPresent(arg_0 -> ((SourceCallback)sourceCallback).onConnectionException(arg_0));
                });
                this.workManager.execute((Runnable)worker);
                continue;
            }
            catch (ConnectionException e2) {
                if (this.isRequestedToStop()) continue;
                sourceCallback.onConnectionException(e2);
                continue;
            }
            catch (Exception e3) {
                if (this.isRequestedToStop()) {
                    return;
                }
                if (!LOGGER.isDebugEnabled()) continue;
                LOGGER.debug("An exception occurred while listening for new connections", (Throwable)e3);
                continue;
            }
            break;
        }
        return;
    }
}

