package software.amazon.awssdk.utils.async;

import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

@SdkProtectedApi
/* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher.class */
public final class SimplePublisher<T> implements Publisher<T> {
    private static final Logger log = Logger.loggerFor((Class<?>) SimplePublisher.class);
    private final AtomicLong outstandingDemand = new AtomicLong();
    private final Queue<QueueEntry<T>> standardPriorityQueue = new ConcurrentLinkedQueue();
    private final Queue<QueueEntry<T>> highPriorityQueue = new ConcurrentLinkedQueue();
    private final AtomicBoolean processingQueue = new AtomicBoolean(false);
    private final FailureMessage failureMessage = new FailureMessage();
    private Subscriber<? super T> subscriber;

    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$CancelQueueEntry.class */
    private static final class CancelQueueEntry<T> extends QueueEntry<T> {
        private CancelQueueEntry() {
        }

        @Override // software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry
        protected QueueEntry.Type type() {
            return QueueEntry.Type.CANCEL;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$FailureMessage.class */
    public static final class FailureMessage {
        private Supplier<Throwable> failureMessageSupplier;
        private Throwable failureMessage;

        private FailureMessage() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void trySet(Supplier<Throwable> supplier) {
            if (this.failureMessageSupplier == null) {
                this.failureMessageSupplier = supplier;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isSet() {
            return this.failureMessageSupplier != null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Throwable get() {
            if (this.failureMessage == null) {
                this.failureMessage = this.failureMessageSupplier.get();
            }
            return this.failureMessage;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$NoOpSubscription.class */
    private static final class NoOpSubscription implements Subscription {
        private NoOpSubscription() {
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$OnCompleteQueueEntry.class */
    private static final class OnCompleteQueueEntry<T> extends QueueEntry<T> {
        private OnCompleteQueueEntry() {
        }

        @Override // software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry
        protected QueueEntry.Type type() {
            return QueueEntry.Type.ON_COMPLETE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$OnErrorQueueEntry.class */
    public static final class OnErrorQueueEntry<T> extends QueueEntry<T> {
        private final Throwable failure;

        private OnErrorQueueEntry(Throwable th) {
            this.failure = th;
        }

        @Override // software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry
        protected QueueEntry.Type type() {
            return QueueEntry.Type.ON_ERROR;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$OnNextQueueEntry.class */
    public static final class OnNextQueueEntry<T> extends QueueEntry<T> {
        private final T value;

        private OnNextQueueEntry(T t) {
            this.value = t;
        }

        @Override // software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry
        protected QueueEntry.Type type() {
            return QueueEntry.Type.ON_NEXT;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$QueueEntry.class */
    public static abstract class QueueEntry<T> {
        protected final CompletableFuture<Void> resultFuture = new CompletableFuture<>();

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$QueueEntry$Type.class */
        public enum Type {
            ON_NEXT,
            ON_COMPLETE,
            ON_ERROR,
            CANCEL
        }

        QueueEntry() {
        }

        protected abstract Type type();
    }

    /* loaded from: input_file:BOOT-INF/lib/utils-2.28.26.jar:software/amazon/awssdk/utils/async/SimplePublisher$SubscriptionImpl.class */
    private class SubscriptionImpl implements Subscription {
        private SubscriptionImpl() {
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            SimplePublisher.log.trace(() -> {
                return "Received request() with " + j;
            });
            if (j <= 0) {
                SimplePublisher.this.highPriorityQueue.add(new OnErrorQueueEntry(new IllegalArgumentException("A downstream publisher requested an invalid amount of data: " + j)));
                SimplePublisher.this.processEventQueue();
            } else {
                long updateAndGet = SimplePublisher.this.outstandingDemand.updateAndGet(j2 -> {
                    if (Long.MAX_VALUE - j2 < j) {
                        return Long.MAX_VALUE;
                    }
                    return j2 + j;
                });
                SimplePublisher.log.trace(() -> {
                    return "Increased demand to " + updateAndGet;
                });
                SimplePublisher.this.processEventQueue();
            }
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            SimplePublisher.log.trace(() -> {
                return "Received cancel() from " + SimplePublisher.this.subscriber;
            });
            SimplePublisher.this.highPriorityQueue.add(new CancelQueueEntry());
            SimplePublisher.this.processEventQueue();
        }
    }

    public CompletableFuture<Void> send(T t) {
        log.trace(() -> {
            return "Received send() with " + t;
        });
        OnNextQueueEntry onNextQueueEntry = new OnNextQueueEntry(t);
        try {
            Validate.notNull(t, "Null cannot be written.", new Object[0]);
            this.standardPriorityQueue.add(onNextQueueEntry);
            processEventQueue();
        } catch (RuntimeException e) {
            onNextQueueEntry.resultFuture.completeExceptionally(e);
        }
        return onNextQueueEntry.resultFuture;
    }

    public CompletableFuture<Void> complete() {
        log.trace(() -> {
            return "Received complete()";
        });
        OnCompleteQueueEntry onCompleteQueueEntry = new OnCompleteQueueEntry();
        try {
            this.standardPriorityQueue.add(onCompleteQueueEntry);
            processEventQueue();
        } catch (RuntimeException e) {
            onCompleteQueueEntry.resultFuture.completeExceptionally(e);
        }
        return onCompleteQueueEntry.resultFuture;
    }

    public CompletableFuture<Void> error(Throwable th) {
        log.trace(() -> {
            return "Received error() with " + th;
        }, th);
        OnErrorQueueEntry onErrorQueueEntry = new OnErrorQueueEntry(th);
        try {
            this.standardPriorityQueue.add(onErrorQueueEntry);
            processEventQueue();
        } catch (RuntimeException e) {
            onErrorQueueEntry.resultFuture.completeExceptionally(e);
        }
        return onErrorQueueEntry.resultFuture;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        if (this.subscriber != null) {
            subscriber.onSubscribe(new NoOpSubscription());
            subscriber.onError(new IllegalStateException("Only one subscription may be active at a time."));
        }
        this.subscriber = subscriber;
        subscriber.onSubscribe(new SubscriptionImpl());
        processEventQueue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processEventQueue() {
        while (this.processingQueue.compareAndSet(false, true)) {
            try {
                doProcessQueue();
                if (!shouldProcessQueueEntry(this.standardPriorityQueue.peek()) && !shouldProcessQueueEntry(this.highPriorityQueue.peek())) {
                    return;
                }
            } catch (Throwable th) {
                panicAndDie(th);
                return;
            } finally {
                this.processingQueue.set(false);
            }
        }
    }

    private void doProcessQueue() {
        while (true) {
            QueueEntry<T> peek = this.highPriorityQueue.peek();
            Queue<QueueEntry<T>> queue = this.highPriorityQueue;
            if (peek == null) {
                peek = this.standardPriorityQueue.peek();
                queue = this.standardPriorityQueue;
            }
            if (!shouldProcessQueueEntry(peek)) {
                return;
            }
            if (this.failureMessage.isSet()) {
                peek.resultFuture.completeExceptionally(this.failureMessage.get());
            } else {
                switch (peek.type()) {
                    case ON_NEXT:
                        OnNextQueueEntry onNextQueueEntry = (OnNextQueueEntry) peek;
                        log.trace(() -> {
                            return "Calling onNext() with " + onNextQueueEntry.value;
                        });
                        this.subscriber.onNext((Object) onNextQueueEntry.value);
                        long decrementAndGet = this.outstandingDemand.decrementAndGet();
                        log.trace(() -> {
                            return "Decreased demand to " + decrementAndGet;
                        });
                        break;
                    case ON_COMPLETE:
                        this.failureMessage.trySet(() -> {
                            return new IllegalStateException("onComplete() was already invoked.");
                        });
                        log.trace(() -> {
                            return "Calling onComplete()";
                        });
                        this.subscriber.onComplete();
                        break;
                    case ON_ERROR:
                        OnErrorQueueEntry onErrorQueueEntry = (OnErrorQueueEntry) peek;
                        this.failureMessage.trySet(() -> {
                            return new IllegalStateException("onError() was already invoked.", onErrorQueueEntry.failure);
                        });
                        log.trace(() -> {
                            return "Calling onError() with " + onErrorQueueEntry.failure;
                        }, onErrorQueueEntry.failure);
                        this.subscriber.onError(onErrorQueueEntry.failure);
                        break;
                    case CANCEL:
                        this.failureMessage.trySet(() -> {
                            return new CancellationException("subscription has been cancelled.");
                        });
                        this.subscriber = null;
                        break;
                    default:
                        throw new IllegalStateException("Unknown entry type: " + peek.type());
                }
                peek.resultFuture.complete(null);
            }
            queue.remove();
        }
    }

    private boolean shouldProcessQueueEntry(QueueEntry<T> queueEntry) {
        if (queueEntry == null) {
            return false;
        }
        if (this.failureMessage.isSet()) {
            return true;
        }
        if (this.subscriber == null) {
            return false;
        }
        return queueEntry.type() != QueueEntry.Type.ON_NEXT || this.outstandingDemand.get() > 0;
    }

    private void panicAndDie(Throwable th) {
        try {
            IllegalStateException illegalStateException = new IllegalStateException("Encountered fatal error in publisher", th);
            this.failureMessage.trySet(() -> {
                return illegalStateException;
            });
            this.subscriber.onError(th instanceof Error ? th : illegalStateException);
            while (true) {
                QueueEntry<T> poll = this.standardPriorityQueue.poll();
                if (poll == null) {
                    return;
                } else {
                    poll.resultFuture.completeExceptionally(illegalStateException);
                }
            }
        } catch (Throwable th2) {
            th2.addSuppressed(th);
            log.error(() -> {
                return "Failed while processing a failure. This could result in stuck futures.";
            }, th2);
        }
    }
}
