package io.lettuce.core.cluster;

import io.lettuce.core.AbstractRedisReactiveCommands;
import io.lettuce.core.FlushMode;
import io.lettuce.core.GeoArgs;
import io.lettuce.core.GeoWithin;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.StreamScanCursor;
import io.lettuce.core.api.reactive.RedisKeyReactiveCommands;
import io.lettuce.core.cluster.ClusterScanSupport;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.output.KeyStreamingChannel;
import io.lettuce.core.output.KeyValueStreamingChannel;
import io.lettuce.core.protocol.ConnectionIntent;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:WEB-INF/lib/lettuce-core-6.3.0.RELEASE.jar:io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.class */
public class RedisAdvancedClusterReactiveCommandsImpl<K, V> extends AbstractRedisReactiveCommands<K, V> implements RedisAdvancedClusterReactiveCommands<K, V> {
    private static final Predicate<RedisClusterNode> ALL_NODES = redisClusterNode -> {
        return true;
    };
    private final RedisCodec<K, V> codec;

    @Deprecated
    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> statefulRedisClusterConnectionImpl, RedisCodec<K, V> redisCodec) {
        super(statefulRedisClusterConnectionImpl, redisCodec);
        this.codec = redisCodec;
    }

    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnection<K, V> statefulRedisClusterConnection, RedisCodec<K, V> redisCodec) {
        super(statefulRedisClusterConnection, redisCodec);
        this.codec = redisCodec;
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<String> clientSetname(K k) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(super.clientSetname(k));
        Iterator<RedisClusterNode> it = getStatefulConnection().getPartitions().iterator();
        while (it.hasNext()) {
            RedisClusterNode next = it.next();
            arrayList.add(getConnectionReactive(next.getNodeId()).flatMap(redisClusterReactiveCommands -> {
                return redisClusterReactiveCommands.isOpen() ? redisClusterReactiveCommands.clientSetname(k) : Mono.empty();
            }));
            arrayList.add(getConnectionReactive(next.getUri().getHost(), next.getUri().getPort()).flatMap(redisClusterReactiveCommands2 -> {
                return redisClusterReactiveCommands2.isOpen() ? redisClusterReactiveCommands2.clientSetname(k) : Mono.empty();
            }));
        }
        return Flux.merge(arrayList).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands
    public Mono<Long> clusterCountKeysInSlot(int i) {
        return findConnectionBySlotReactive(i).flatMap(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.clusterCountKeysInSlot(i);
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands
    public Flux<K> clusterGetKeysInSlot(int i, int i2) {
        return (Flux<K>) findConnectionBySlotReactive(i).flatMapMany(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.clusterGetKeysInSlot(i, i2);
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<Long> dbsize() {
        return Flux.merge(executeOnUpstream((v0) -> {
            return v0.dbsize();
        }).values()).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<Long> del(K... kArr) {
        return del(Arrays.asList(kArr));
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands
    public Mono<Long> del(Iterable<K> iterable) {
        Map partition = SlotHash.partition(this.codec, iterable);
        if (partition.size() < 2) {
            return super.del(iterable);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.del((Iterable) it.next().getValue()));
        }
        return Flux.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<Long> exists(K... kArr) {
        return exists((Iterable) Arrays.asList(kArr));
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands
    public Mono<Long> exists(Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.exists((Iterable) newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.exists((Iterable) it.next().getValue()));
        }
        return Flux.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<String> flushall() {
        return Flux.merge(executeOnUpstream((v0) -> {
            return v0.flushall();
        }).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<String> flushall(FlushMode flushMode) {
        return Flux.merge(executeOnUpstream(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.flushall(flushMode);
        }).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<String> flushallAsync() {
        return Flux.merge(executeOnUpstream((v0) -> {
            return v0.flushallAsync();
        }).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<String> flushdb() {
        return Flux.merge(executeOnUpstream((v0) -> {
            return v0.flushdb();
        }).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<String> flushdb(FlushMode flushMode) {
        return Flux.merge(executeOnUpstream(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.flushdb(flushMode);
        }).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisGeoReactiveCommands
    public Flux<V> georadius(K k, double d, double d2, double d3, GeoArgs.Unit unit) {
        return super.georadius_ro(k, d, d2, d3, unit);
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisGeoReactiveCommands
    public Flux<GeoWithin<V>> georadius(K k, double d, double d2, double d3, GeoArgs.Unit unit, GeoArgs geoArgs) {
        return super.georadius_ro(k, d, d2, d3, unit, geoArgs);
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisGeoReactiveCommands
    public Flux<V> georadiusbymember(K k, V v, double d, GeoArgs.Unit unit) {
        return super.georadiusbymember_ro(k, v, d, unit);
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisGeoReactiveCommands
    public Flux<GeoWithin<V>> georadiusbymember(K k, V v, double d, GeoArgs.Unit unit, GeoArgs geoArgs) {
        return super.georadiusbymember_ro(k, v, d, unit, geoArgs);
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Flux<K> keys(K k) {
        return Flux.merge(executeOnUpstream(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.keys(k);
        }).values());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<Long> keys(KeyStreamingChannel<K> keyStreamingChannel, K k) {
        return Flux.merge(executeOnUpstream(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.keys(keyStreamingChannel, k);
        }).values()).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisStringReactiveCommands
    public Flux<KeyValue<K, V>> mget(K... kArr) {
        return mget(Arrays.asList(kArr));
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands
    public Flux<KeyValue<K, V>> mget(Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.mget(newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.mget((Iterable) it.next().getValue()));
        }
        return Flux.mergeSequential(arrayList).collectList().map(list -> {
            KeyValue[] keyValueArr = new KeyValue[list.size()];
            int i = 0;
            for (Map.Entry<K, V> entry : partition.entrySet()) {
                for (int i2 = 0; i2 < newList.size(); i2++) {
                    int indexOf = ((List) entry.getValue()).indexOf(newList.get(i2));
                    if (indexOf != -1) {
                        keyValueArr[i2] = (KeyValue) list.get(i + indexOf);
                    }
                }
                i += ((List) entry.getValue()).size();
            }
            return Arrays.asList(keyValueArr);
        }).flatMapIterable(list2 -> {
            return list2;
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisStringReactiveCommands
    public Mono<Long> mget(KeyValueStreamingChannel<K, V> keyValueStreamingChannel, K... kArr) {
        return mget(keyValueStreamingChannel, Arrays.asList(kArr));
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands
    public Mono<Long> mget(KeyValueStreamingChannel<K, V> keyValueStreamingChannel, Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.mget(keyValueStreamingChannel, newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.mget(keyValueStreamingChannel, (Iterable) it.next().getValue()));
        }
        return Flux.merge(arrayList).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisStringReactiveCommands
    public Mono<Boolean> msetnx(Map<K, V> map) {
        return pipeliningWithMap(map, map2 -> {
            return super.msetnx(map2).flux();
        }, flux -> {
            return flux;
        }).reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisStringReactiveCommands
    public Mono<String> mset(Map<K, V> map) {
        return pipeliningWithMap(map, map2 -> {
            return super.mset(map2).flux();
        }, flux -> {
            return flux;
        }).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<K> randomkey() {
        Partitions partitions = getStatefulConnection().getPartitions();
        return partitions.isEmpty() ? super.randomkey() : (Mono<K>) getConnectionReactive(partitions.getPartition(ThreadLocalRandom.current().nextInt(partitions.size())).getNodeId()).flatMap((v0) -> {
            return v0.randomkey();
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisScriptingReactiveCommands
    public Mono<String> scriptFlush() {
        return Flux.merge(executeOnNodes((v0) -> {
            return v0.scriptFlush();
        }, ALL_NODES).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisScriptingReactiveCommands
    public Mono<String> scriptKill() {
        return Flux.merge(executeOnNodes((v0) -> {
            return v0.scriptKill();
        }, ALL_NODES).values()).onErrorReturn("OK").last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisScriptingReactiveCommands
    public Mono<String> scriptLoad(byte[] bArr) {
        return Flux.merge(executeOnNodes(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.scriptLoad(bArr);
        }, ALL_NODES).values()).last();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisServerReactiveCommands
    public Mono<Void> shutdown(boolean z) {
        return Flux.merge(executeOnNodes(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.shutdown(z);
        }, ALL_NODES).values()).then();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<Long> touch(K... kArr) {
        return touch(Arrays.asList(kArr));
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands
    public Mono<Long> touch(Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.touch(newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.touch((Iterable) it.next().getValue()));
        }
        return Flux.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<Long> unlink(K... kArr) {
        return unlink(Arrays.asList(kArr));
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands
    public Mono<Long> unlink(Iterable<K> iterable) {
        Map partition = SlotHash.partition(this.codec, iterable);
        if (partition.size() < 2) {
            return super.unlink(iterable);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.unlink((Iterable) it.next().getValue()));
        }
        return Flux.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands
    public RedisClusterReactiveCommands<K, V> getConnection(String str) {
        return getStatefulConnection().getConnection(str).reactive();
    }

    private Mono<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String str) {
        return getMono(getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, str)).map((v0) -> {
            return v0.reactive();
        });
    }

    @Override // io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands
    public RedisClusterReactiveCommands<K, V> getConnection(String str, int i) {
        return getStatefulConnection().getConnection(str, i).reactive();
    }

    private Mono<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String str, int i) {
        return getMono(getConnectionProvider().getConnectionAsync(ConnectionIntent.WRITE, str, i)).map((v0) -> {
            return v0.reactive();
        });
    }

    @Override // io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands
    public StatefulRedisClusterConnection<K, V> getStatefulConnection() {
        return (StatefulRedisClusterConnection) super.getConnection();
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<KeyScanCursor<K>> scan() {
        return (Mono<KeyScanCursor<K>>) clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan();
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<KeyScanCursor<K>> scan(ScanArgs scanArgs) {
        return (Mono<KeyScanCursor<K>>) clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan(scanArgs);
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<KeyScanCursor<K>> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return (Mono<KeyScanCursor<K>>) clusterScan(scanCursor, (redisKeyReactiveCommands, scanCursor2) -> {
            return redisKeyReactiveCommands.scan(scanCursor2, scanArgs);
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<KeyScanCursor<K>> scan(ScanCursor scanCursor) {
        return (Mono<KeyScanCursor<K>>) clusterScan(scanCursor, (v0, v1) -> {
            return v0.scan(v1);
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel) {
        return clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel, ScanArgs scanArgs) {
        return clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel, scanArgs);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel, ScanCursor scanCursor, ScanArgs scanArgs) {
        return clusterScan(scanCursor, (redisKeyReactiveCommands, scanCursor2) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel, scanCursor2, scanArgs);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override // io.lettuce.core.AbstractRedisReactiveCommands, io.lettuce.core.api.reactive.RedisKeyReactiveCommands
    public Mono<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel, ScanCursor scanCursor) {
        return clusterScan(scanCursor, (redisKeyReactiveCommands, scanCursor2) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel, scanCursor2);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    private <T extends ScanCursor> Mono<T> clusterScan(ScanCursor scanCursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Mono<T>> biFunction, ClusterScanSupport.ScanCursorMapper<Mono<T>> scanCursorMapper) {
        return clusterScan(getStatefulConnection(), getConnectionProvider(), scanCursor, biFunction, scanCursorMapper);
    }

    private <T> Flux<T> pipeliningWithMap(Map<K, V> map, Function<Map<K, V>, Flux<T>> function, Function<Flux<T>, Flux<T>> function2) {
        Map partition = SlotHash.partition(this.codec, map.keySet());
        return partition.size() < 2 ? function.apply(map) : function2.apply(Flux.merge((List) partition.values().stream().map(list -> {
            HashMap hashMap = new HashMap();
            list.forEach(obj -> {
                hashMap.put(obj, map.get(obj));
            });
            return (Flux) function.apply(hashMap);
        }).collect(Collectors.toList())));
    }

    protected <T> Map<String, Publisher<T>> executeOnUpstream(Function<RedisClusterReactiveCommands<K, V>, ? extends Publisher<T>> function) {
        return executeOnNodes(function, redisClusterNode -> {
            return redisClusterNode.is(RedisClusterNode.NodeFlag.UPSTREAM);
        });
    }

    protected <T> Map<String, Publisher<T>> executeOnNodes(Function<RedisClusterReactiveCommands<K, V>, ? extends Publisher<T>> function, Predicate<RedisClusterNode> predicate) {
        HashMap hashMap = new HashMap();
        Iterator<RedisClusterNode> it = getStatefulConnection().getPartitions().iterator();
        while (it.hasNext()) {
            RedisClusterNode next = it.next();
            if (predicate.test(next)) {
                RedisURI uri = next.getUri();
                Mono<RedisClusterReactiveCommands<K, V>> connectionReactive = getConnectionReactive(uri.getHost(), uri.getPort());
                String nodeId = next.getNodeId();
                function.getClass();
                hashMap.put(nodeId, connectionReactive.flatMapMany((v1) -> {
                    return r3.apply(v1);
                }));
            }
        }
        return hashMap;
    }

    private Mono<RedisClusterReactiveCommands<K, V>> findConnectionBySlotReactive(int i) {
        RedisClusterNode partitionBySlot = getStatefulConnection().getPartitions().getPartitionBySlot(i);
        return partitionBySlot != null ? getConnectionReactive(partitionBySlot.getUri().getHost(), partitionBySlot.getUri().getPort()) : Mono.error(new RedisException("No partition for slot " + i));
    }

    private AsyncClusterConnectionProvider getConnectionProvider() {
        return (AsyncClusterConnectionProvider) ((ClusterDistributionChannelWriter) getStatefulConnection().getChannelWriter()).getClusterConnectionProvider();
    }

    static <T extends ScanCursor, K, V> Mono<T> clusterScan(StatefulRedisClusterConnection<K, V> statefulRedisClusterConnection, AsyncClusterConnectionProvider asyncClusterConnectionProvider, ScanCursor scanCursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Mono<T>> biFunction, ClusterScanSupport.ScanCursorMapper<Mono<T>> scanCursorMapper) {
        List<String> nodeIds = ClusterScanSupport.getNodeIds(statefulRedisClusterConnection, scanCursor);
        String currentNodeId = ClusterScanSupport.getCurrentNodeId(scanCursor, nodeIds);
        ScanCursor continuationCursor = ClusterScanSupport.getContinuationCursor(scanCursor);
        return scanCursorMapper.map(nodeIds, currentNodeId, getMono(asyncClusterConnectionProvider.getConnectionAsync(ConnectionIntent.WRITE, currentNodeId)).flatMap(statefulRedisConnection -> {
            return (Mono) biFunction.apply(statefulRedisConnection.reactive(), continuationCursor);
        }));
    }

    private static <T> Mono<T> getMono(CompletableFuture<T> completableFuture) {
        return Mono.fromCompletionStage(completableFuture);
    }
}
