package cyclops.reactive;

import com.oath.cyclops.hkt.DataWitness;
import com.oath.cyclops.hkt.Higher;
import com.oath.cyclops.internal.stream.ReactiveStreamX;
import com.oath.cyclops.internal.stream.spliterators.UnfoldSpliterator;
import com.oath.cyclops.internal.stream.spliterators.push.AmbOperator;
import com.oath.cyclops.internal.stream.spliterators.push.ArrayOfValuesOperator;
import com.oath.cyclops.internal.stream.spliterators.push.BufferingSinkOperator;
import com.oath.cyclops.internal.stream.spliterators.push.GenerateOperator;
import com.oath.cyclops.internal.stream.spliterators.push.IterableSourceOperator;
import com.oath.cyclops.internal.stream.spliterators.push.IterateOperator;
import com.oath.cyclops.internal.stream.spliterators.push.IteratePredicateOperator;
import com.oath.cyclops.internal.stream.spliterators.push.LazyArrayConcatonatingOperator;
import com.oath.cyclops.internal.stream.spliterators.push.MergeLatestOperator;
import com.oath.cyclops.internal.stream.spliterators.push.Operator;
import com.oath.cyclops.internal.stream.spliterators.push.PublisherToOperator;
import com.oath.cyclops.internal.stream.spliterators.push.RangeIntOperator;
import com.oath.cyclops.internal.stream.spliterators.push.RangeLongOperator;
import com.oath.cyclops.internal.stream.spliterators.push.SingleValueOperator;
import com.oath.cyclops.internal.stream.spliterators.push.SpliteratorToOperator;
import com.oath.cyclops.types.reactive.AsyncSubscriber;
import com.oath.cyclops.types.reactive.BufferOverflowPolicy;
import com.oath.cyclops.types.reactive.PushSubscriber;
import com.oath.cyclops.types.reactive.ReactiveSubscriber;
import com.oath.cyclops.types.traversable.IterableX;
import com.oath.cyclops.util.ExceptionSoftener;
import cyclops.control.Future;
import cyclops.control.Option;
import cyclops.data.Seq;
import cyclops.data.tuple.Tuple2;
import cyclops.function.checked.CheckedSupplier;
import java.util.Queue;
import java.util.Spliterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.agrona.concurrent.ManyToManyConcurrentArrayQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:cyclops/reactive/Spouts.class */
public interface Spouts {
    static <T> ReactiveSeq<T> once(CheckedSupplier<T> checkedSupplier) {
        return generate(ExceptionSoftener.softenSupplier(checkedSupplier)).take(1L);
    }

    @Deprecated
    static <T> AsyncSubscriber<T> asyncSubscriber() {
        return new AsyncSubscriber<>();
    }

    @Deprecated
    static <T> ReactiveSeq<T> async(Consumer<? super PushSubscriber<T>> consumer) {
        AsyncSubscriber asyncSubscriber = asyncSubscriber();
        return asyncSubscriber.registerAndstream(() -> {
            while (!asyncSubscriber.isInitialized()) {
                LockSupport.parkNanos(1L);
            }
            consumer.accept(asyncSubscriber);
        });
    }

    @Deprecated
    static <T> ReactiveSeq<T> async(Stream<T> stream, Executor executor) {
        return async(pushSubscriber -> {
            ReactiveSeq.fromStream(stream).foldFuture(executor, iterableX -> {
                pushSubscriber.getClass();
                Consumer consumer = pushSubscriber::onNext;
                pushSubscriber.getClass();
                Consumer<? super Throwable> consumer2 = pushSubscriber::onError;
                pushSubscriber.getClass();
                iterableX.forEach(consumer, consumer2, pushSubscriber::onComplete);
                return null;
            });
        });
    }

    static <T> ReactiveSeq<T> reactiveBuffer(int i, Consumer<? super Subscriber<T>> consumer) {
        return reactiveStream(new BufferingSinkOperator(new ManyToManyConcurrentArrayQueue(i), consumer, BufferOverflowPolicy.DROP));
    }

    static <T> ReactiveSeq<T> reactiveBufferBlock(int i, Consumer<? super Subscriber<T>> consumer) {
        return reactiveStream(new BufferingSinkOperator(new ManyToManyConcurrentArrayQueue(i), consumer, BufferOverflowPolicy.BLOCK));
    }

    static <T> ReactiveSeq<T> reactiveBuffer(Queue<T> queue, BufferOverflowPolicy bufferOverflowPolicy, Consumer<? super Subscriber<T>> consumer) {
        return reactiveStream(new BufferingSinkOperator(queue, consumer, bufferOverflowPolicy));
    }

    @Deprecated
    static <T> ReactiveSeq<T> asyncBuffer(int i, Consumer<? super PushSubscriber<T>> consumer) {
        return asyncStream(new BufferingSinkOperator(new ManyToManyConcurrentArrayQueue(i), subscriber -> {
            consumer.accept(PushSubscriber.of(subscriber));
        }, BufferOverflowPolicy.DROP));
    }

    @Deprecated
    static <T> ReactiveSeq<T> asyncBufferBlock(int i, Consumer<? super PushSubscriber<T>> consumer) {
        return asyncStream(new BufferingSinkOperator(new ManyToManyConcurrentArrayQueue(i), subscriber -> {
            consumer.accept(PushSubscriber.of(subscriber));
        }, BufferOverflowPolicy.BLOCK));
    }

    @Deprecated
    static <T> ReactiveSeq<T> asyncBuffer(Queue<T> queue, BufferOverflowPolicy bufferOverflowPolicy, Consumer<? super PushSubscriber<T>> consumer) {
        return asyncStream(new BufferingSinkOperator(queue, subscriber -> {
            consumer.accept(PushSubscriber.of(subscriber));
        }, bufferOverflowPolicy));
    }

    static <T> ReactiveSeq<T> reactive(Stream<T> stream, Executor executor) {
        final Future future = Future.future();
        final Future future2 = Future.future();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicLong atomicLong = new AtomicLong(0L);
        ReactiveSeq.fromStream(stream).foldFuture(executor, iterableX -> {
            Subscriber subscriber = (Subscriber) future.getFuture().join();
            subscriber.getClass();
            Consumer consumer = subscriber::onNext;
            subscriber.getClass();
            final Subscription forEach = iterableX.forEach(0L, consumer, subscriber::onError, () -> {
                atomicBoolean.set(true);
                subscriber.onComplete();
            });
            future2.complete(new Subscription() { // from class: cyclops.reactive.Spouts.1
                public void request(long j) {
                    atomicLong.addAndGet(j);
                }

                public void cancel() {
                    forEach.cancel();
                }
            });
            while (!atomicBoolean.get()) {
                long j = atomicLong.get();
                if (j == 0) {
                    Thread.yield();
                } else {
                    while (!atomicLong.compareAndSet(j, 0L)) {
                        j = atomicLong.get();
                    }
                    forEach.request(j);
                }
            }
            return null;
        });
        return reactiveStream(new PublisherToOperator(new Publisher<T>() { // from class: cyclops.reactive.Spouts.2
            public void subscribe(Subscriber<? super T> subscriber) {
                Future.this.complete(subscriber);
                subscriber.onSubscribe((Subscription) future2.getFuture().join());
            }
        }));
    }

    static <T> ReactiveSubscriber<T> reactiveSubscriber() {
        return new ReactiveSubscriber<>();
    }

    static <T> ReactiveSeq<T> reactive(Consumer<? super Subscriber<T>> consumer) {
        ReactiveSubscriber reactiveSubscriber = new ReactiveSubscriber();
        consumer.accept(reactiveSubscriber);
        return reactiveSubscriber.reactiveStream();
    }

    static <T> ReactiveSeq<T> reactiveStream(Operator<T> operator) {
        return new ReactiveStreamX(operator, ReactiveStreamX.Type.BACKPRESSURE);
    }

    @Deprecated
    static <T> ReactiveSeq<T> asyncStream(Operator<T> operator) {
        return new ReactiveStreamX(operator, ReactiveStreamX.Type.NO_BACKPRESSURE);
    }

    static <T> ReactiveSeq<T> syncStream(Operator<T> operator) {
        return new ReactiveStreamX(operator);
    }

    static <T> ReactiveSeq<T> iterate(T t, UnaryOperator<T> unaryOperator) {
        return syncStream(new IterateOperator(t, unaryOperator));
    }

    static <T> ReactiveSeq<T> iterate(T t, Predicate<? super T> predicate, UnaryOperator<T> unaryOperator) {
        return syncStream(new IteratePredicateOperator(t, unaryOperator, predicate));
    }

    static ReactiveSeq<Integer> range(int i, int i2) {
        return i < i2 ? syncStream(new RangeIntOperator(i, i2)) : syncStream(new RangeIntOperator(i2, i));
    }

    static ReactiveSeq<Long> rangeLong(long j, long j2) {
        return j < j2 ? syncStream(new RangeLongOperator(j, j2)) : syncStream(new RangeLongOperator(j2, j));
    }

    static <T> ReactiveSeq<T> of(T t) {
        return syncStream(new SingleValueOperator(t));
    }

    static <T> ReactiveSeq<T> ofNullable(T t) {
        return t == null ? empty() : of(t);
    }

    static <T> ReactiveSeq<T> empty() {
        return of(new Object[0]);
    }

    static <T> ReactiveSeq<T> of(T... tArr) {
        return syncStream(new ArrayOfValuesOperator(tArr));
    }

    static <T> ReactiveSeq<T> fromIterable(Iterable<T> iterable) {
        return iterable instanceof ReactiveStreamX ? (ReactiveSeq) iterable : syncStream(new IterableSourceOperator(iterable));
    }

    static <T> ReactiveSeq<T> fromSpliterator(Spliterator<T> spliterator) {
        return syncStream(new SpliteratorToOperator(spliterator));
    }

    static <T> ReactiveSeq<T> generate(Supplier<T> supplier) {
        return syncStream(new GenerateOperator(supplier));
    }

    static <T> ReactiveSeq<T> from(Publisher<? extends T> publisher) {
        return publisher instanceof ReactiveSeq ? (ReactiveSeq) publisher : reactiveStream(new PublisherToOperator(publisher));
    }

    @Deprecated
    static <T> ReactiveSeq<T> merge(Publisher<? extends Publisher<T>> publisher) {
        return mergeLatest(publisher);
    }

    static <T> ReactiveSeq<T> mergeLatestList(Seq<? extends Publisher<? extends T>> seq) {
        return mergeLatest((Publisher[]) seq.toArray(i -> {
            return new Publisher[i];
        }));
    }

    static <T> ReactiveSeq<T> mergeLatest(Publisher<? extends Publisher<T>> publisher) {
        return from(publisher).mergeMap(Integer.MAX_VALUE, publisher2 -> {
            return publisher2;
        });
    }

    static <T> ReactiveSeq<T> mergeLatest(int i, Publisher<T>... publisherArr) {
        return of((Object[]) publisherArr).mergeMap(i, publisher -> {
            return publisher;
        });
    }

    static <T> ReactiveSeq<T> mergeLatest(Publisher<T>... publisherArr) {
        Operator[] operatorArr = new Operator[publisherArr.length];
        for (int i = 0; i < publisherArr.length; i++) {
            if (publisherArr[i] instanceof ReactiveStreamX) {
                operatorArr[i] = ((ReactiveStreamX) publisherArr[i]).getSource();
            } else {
                operatorArr[i] = new PublisherToOperator(publisherArr[i]);
            }
        }
        return reactiveStream(new MergeLatestOperator(operatorArr));
    }

    static <T> ReactiveSeq<T> amb(IterableX<? extends Publisher<? extends T>> iterableX) {
        return amb((Publisher[]) iterableX.toArray(i -> {
            return new ReactiveSeq[i];
        }));
    }

    static <T> ReactiveSeq<T> amb(Publisher<? extends T>... publisherArr) {
        return ambWith(publisherArr);
    }

    static <T> ReactiveSeq<T> ambWith(Publisher<? extends T>[] publisherArr) {
        return reactiveStream(new AmbOperator(publisherArr));
    }

    static ReactiveSeq<Integer> interval(String str, ScheduledExecutorService scheduledExecutorService) {
        ReactiveSubscriber reactiveSubscriber = reactiveSubscriber();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final Subscription[] subscriptionArr = {null};
        reactiveSubscriber.onSubscribe(new Subscription() { // from class: cyclops.reactive.Spouts.3
            public void request(long j) {
                subscriptionArr[0].request(j);
            }

            public void cancel() {
                atomicBoolean.set(false);
            }
        });
        subscriptionArr[0] = ReactiveSeq.iterate(1, (UnaryOperator<int>) num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).takeWhile(num2 -> {
            return atomicBoolean.get();
        }).schedule(str, scheduledExecutorService).connect().forEach(1L, num3 -> {
            reactiveSubscriber.onNext(num3);
        });
        return reactiveSubscriber.reactiveStream();
    }

    static ReactiveSeq<Integer> interval(long j, ScheduledExecutorService scheduledExecutorService) {
        ReactiveSubscriber reactiveSubscriber = reactiveSubscriber();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final Subscription[] subscriptionArr = {null};
        reactiveSubscriber.onSubscribe(new Subscription() { // from class: cyclops.reactive.Spouts.4
            public void request(long j2) {
                subscriptionArr[0].request(j2);
            }

            public void cancel() {
                atomicBoolean.set(false);
            }
        });
        subscriptionArr[0] = ReactiveSeq.iterate(1, (UnaryOperator<int>) num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).takeWhile(num2 -> {
            return atomicBoolean.get();
        }).scheduleFixedDelay(j, scheduledExecutorService).connect().forEach(1L, num3 -> {
            reactiveSubscriber.onNext(num3);
        });
        return reactiveSubscriber.reactiveStream();
    }

    static <T> ReactiveSeq<T> schedule(Stream<T> stream, String str, ScheduledExecutorService scheduledExecutorService) {
        ReactiveSubscriber reactiveSubscriber = reactiveSubscriber();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final Subscription[] subscriptionArr = {null};
        reactiveSubscriber.onSubscribe(new Subscription() { // from class: cyclops.reactive.Spouts.5
            public void request(long j) {
                subscriptionArr[0].request(j);
            }

            public void cancel() {
                atomicBoolean.set(false);
            }
        });
        subscriptionArr[0] = ReactiveSeq.fromStream(stream).takeWhile((Predicate) obj -> {
            return atomicBoolean.get();
        }).schedule(str, scheduledExecutorService).connect().forEach(0L, obj2 -> {
            reactiveSubscriber.onNext(obj2);
        }, th -> {
            reactiveSubscriber.onError(th);
        }, () -> {
            reactiveSubscriber.onComplete();
        });
        return reactiveSubscriber.reactiveStream();
    }

    static <T> ReactiveSeq<T> defer(Supplier<? extends Publisher<? extends T>> supplier) {
        return of(supplier).mergeMap(supplier2 -> {
            return (Publisher) supplier2.get();
        });
    }

    static <T> ReactiveSeq<T> deferFromStream(Supplier<? extends Stream<? extends T>> supplier) {
        return of(supplier).flatMap(supplier2 -> {
            return (Stream) supplier2.get();
        });
    }

    static <T> ReactiveSeq<T> deferFromIterable(Supplier<? extends Iterable<? extends T>> supplier) {
        return of(supplier).concatMap(supplier2 -> {
            return (Iterable) supplier2.get();
        });
    }

    static <U, T> ReactiveSeq<T> unfold(U u, Function<? super U, Option<Tuple2<T, U>>> function) {
        return reactiveStream(new SpliteratorToOperator(new UnfoldSpliterator(u, function)));
    }

    static <T> ReactiveSeq<T> concat(Publisher<Publisher<T>> publisher) {
        return reactiveStream(new LazyArrayConcatonatingOperator(from(publisher).seq().map((Function) publisher2 -> {
            return new PublisherToOperator(publisher2);
        })));
    }

    @Deprecated
    static <T> ReactiveSeq<T> lazyConcat(Publisher<Publisher<T>> publisher) {
        return reactiveStream(new LazyArrayConcatonatingOperator(from(publisher).seq().map((Function) publisher2 -> {
            return new PublisherToOperator(publisher2);
        })));
    }

    static <T> ReactiveSeq<T> concat(Stream<? extends T>... streamArr) {
        return concat(ReactiveSeq.of((Object[]) streamArr).map(ReactiveSeq::fromStream));
    }

    static <T> ReactiveSeq<T> narrowK(Higher<DataWitness.reactiveSeq, T> higher) {
        return (ReactiveSeq) higher;
    }
}
