package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.AbstractRedisReactiveCommands;
import com.lambdaworks.redis.GeoArgs;
import com.lambdaworks.redis.GeoWithin;
import com.lambdaworks.redis.KeyScanCursor;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.ScanArgs;
import com.lambdaworks.redis.ScanCursor;
import com.lambdaworks.redis.StreamScanCursor;
import com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands;
import com.lambdaworks.redis.api.rx.Success;
import com.lambdaworks.redis.cluster.ClusterConnectionProvider;
import com.lambdaworks.redis.cluster.ClusterScanSupport;
import com.lambdaworks.redis.cluster.api.rx.RedisAdvancedClusterReactiveCommands;
import com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.internal.LettuceLists;
import com.lambdaworks.redis.output.KeyStreamingChannel;
import com.lambdaworks.redis.output.ValueStreamingChannel;
import com.lambdaworks.redis.protocol.CommandType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import rx.Observable;
import rx.Single;

/* loaded from: input_file:WEB-INF/lib/lettuce-4.5.0.Final.jar:com/lambdaworks/redis/cluster/RedisAdvancedClusterReactiveCommandsImpl.class */
public class RedisAdvancedClusterReactiveCommandsImpl<K, V> extends AbstractRedisReactiveCommands<K, V> implements RedisAdvancedClusterReactiveCommands<K, V> {
    private static Function<RedisClusterNode, Boolean> ALL_NODE_FILTER = redisClusterNode -> {
        return true;
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/lettuce-4.5.0.Final.jar:com/lambdaworks/redis/cluster/RedisAdvancedClusterReactiveCommandsImpl$FlattenTransform.class */
    public static class FlattenTransform<T> implements Observable.Transformer<Iterable<T>, T> {
        FlattenTransform() {
        }

        @Override // rx.functions.Func1
        public Observable<T> call(Observable<Iterable<T>> observable) {
            return (Observable<T>) observable.flatMap(Observable::from);
        }
    }

    public RedisAdvancedClusterReactiveCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> statefulRedisClusterConnectionImpl, RedisCodec<K, V> redisCodec) {
        super(statefulRedisClusterConnectionImpl, redisCodec);
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisServerReactiveCommands
    public Observable<String> clientSetname(K k) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(super.clientSetname(k));
        Iterator<RedisClusterNode> it = getStatefulConnection().getPartitions().iterator();
        while (it.hasNext()) {
            RedisClusterNode next = it.next();
            arrayList.add(getConnectionReactive(next.getNodeId()).flatMapObservable(redisClusterReactiveCommands -> {
                return redisClusterReactiveCommands.isOpen() ? redisClusterReactiveCommands.clientSetname(k) : Observable.empty();
            }));
            arrayList.add(getConnectionReactive(next.getUri().getHost(), next.getUri().getPort()).flatMapObservable(redisClusterReactiveCommands2 -> {
                return redisClusterReactiveCommands2.isOpen() ? redisClusterReactiveCommands2.clientSetname(k) : Observable.empty();
            }));
        }
        return Observable.merge(arrayList).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands
    public Observable<Long> clusterCountKeysInSlot(int i) {
        return findConnectionBySlotReactive(i).flatMapObservable(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.clusterCountKeysInSlot(i);
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands
    public Observable<K> clusterGetKeysInSlot(int i, int i2) {
        return (Observable<K>) findConnectionBySlotReactive(i).flatMapObservable(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.clusterGetKeysInSlot(i, i2);
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisServerReactiveCommands
    public Observable<Long> dbsize() {
        return Observable.merge(executeOnMasters((v0) -> {
            return v0.dbsize();
        }).values()).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands, com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands
    public Observable<Long> del(K... kArr) {
        return del(Arrays.asList(kArr));
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands
    public Observable<Long> del(Iterable<K> iterable) {
        Map partition = SlotHash.partition(this.codec, iterable);
        if (partition.size() < 2) {
            return super.del(iterable);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.del((Iterable) it.next().getValue()));
        }
        return Observable.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<Long> exists(K... kArr) {
        return exists((Iterable) Arrays.asList(kArr));
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands
    public Observable<Long> exists(Iterable<K> iterable) {
        Map partition = SlotHash.partition(this.codec, iterable);
        if (partition.size() < 2) {
            return super.exists((Iterable) iterable);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.exists((Iterable) it.next().getValue()));
        }
        return Observable.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisServerReactiveCommands
    public Observable<String> flushall() {
        return Observable.merge(executeOnMasters((v0) -> {
            return v0.flushall();
        }).values()).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisServerReactiveCommands
    public Observable<String> flushdb() {
        return Observable.merge(executeOnMasters((v0) -> {
            return v0.flushdb();
        }).values()).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisGeoReactiveCommands
    public Observable<V> georadius(K k, double d, double d2, double d3, GeoArgs.Unit unit) {
        return getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO) ? super.georadius_ro(k, d, d2, d3, unit) : super.georadius(k, d, d2, d3, unit);
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisGeoReactiveCommands
    public Observable<GeoWithin<V>> georadius(K k, double d, double d2, double d3, GeoArgs.Unit unit, GeoArgs geoArgs) {
        return getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO) ? super.georadius_ro(k, d, d2, d3, unit, geoArgs) : super.georadius((RedisAdvancedClusterReactiveCommandsImpl<K, V>) k, d, d2, d3, unit, geoArgs);
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisGeoReactiveCommands
    public Observable<V> georadiusbymember(K k, V v, double d, GeoArgs.Unit unit) {
        return getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO) ? super.georadiusbymember_ro(k, v, d, unit) : super.georadiusbymember(k, v, d, unit);
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisGeoReactiveCommands
    public Observable<GeoWithin<V>> georadiusbymember(K k, V v, double d, GeoArgs.Unit unit, GeoArgs geoArgs) {
        return getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO) ? super.georadiusbymember_ro(k, v, d, unit, geoArgs) : super.georadiusbymember((RedisAdvancedClusterReactiveCommandsImpl<K, V>) k, (K) v, d, unit, geoArgs);
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<K> keys(K k) {
        return Observable.merge(executeOnMasters(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.keys(k);
        }).values());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<Long> keys(KeyStreamingChannel<K> keyStreamingChannel, K k) {
        return Observable.merge(executeOnMasters(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.keys(keyStreamingChannel, k);
        }).values()).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisStringReactiveCommands, com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands
    public Observable<V> mget(K... kArr) {
        return mget(Arrays.asList(kArr));
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands
    public Observable<V> mget(Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.mget(newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.mget((Iterable) it.next().getValue()));
        }
        return Observable.concat(Observable.from(arrayList)).toList().map(list -> {
            Object[] objArr = new Object[list.size()];
            int i = 0;
            for (Map.Entry<K, V> entry : partition.entrySet()) {
                for (int i2 = 0; i2 < newList.size(); i2++) {
                    int indexOf = ((List) entry.getValue()).indexOf(newList.get(i2));
                    if (indexOf != -1) {
                        objArr[i2] = list.get(i + indexOf);
                    }
                }
                i += ((List) entry.getValue()).size();
            }
            return new ArrayList(Arrays.asList(objArr));
        }).compose(new FlattenTransform());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisStringReactiveCommands
    public Observable<Long> mget(ValueStreamingChannel<V> valueStreamingChannel, K... kArr) {
        return mget(valueStreamingChannel, Arrays.asList(kArr));
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands
    public Observable<Long> mget(ValueStreamingChannel<V> valueStreamingChannel, Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.mget(valueStreamingChannel, newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.mget(valueStreamingChannel, (Iterable) it.next().getValue()));
        }
        return Observable.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisStringReactiveCommands, com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands
    public Observable<Boolean> msetnx(Map<K, V> map) {
        return pipeliningWithMap(map, map2 -> {
            return super.msetnx(map2);
        }, observable -> {
            return observable.reduce((bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            });
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisStringReactiveCommands, com.lambdaworks.redis.cluster.api.rx.RedisClusterReactiveCommands
    public Observable<String> mset(Map<K, V> map) {
        return pipeliningWithMap(map, map2 -> {
            return super.mset(map2);
        }, (v0) -> {
            return v0.last();
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<V> randomkey() {
        Partitions partitions = getStatefulConnection().getPartitions();
        return (Observable<V>) getConnectionReactive(partitions.getPartition(ThreadLocalRandom.current().nextInt(partitions.size())).getNodeId()).flatMapObservable((v0) -> {
            return v0.randomkey();
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisScriptingReactiveCommands
    public Observable<String> scriptFlush() {
        return Observable.merge(executeOnNodes((v0) -> {
            return v0.scriptFlush();
        }, ALL_NODE_FILTER).values()).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisScriptingReactiveCommands
    public Observable<String> scriptKill() {
        return Observable.merge(executeOnNodes((v0) -> {
            return v0.scriptFlush();
        }, ALL_NODE_FILTER).values()).onErrorReturn(th -> {
            return "OK";
        }).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisScriptingReactiveCommands
    public Observable<String> scriptLoad(V v) {
        return Observable.merge(executeOnNodes(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.scriptLoad(v);
        }, ALL_NODE_FILTER).values()).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisServerReactiveCommands
    public Observable<Success> shutdown(boolean z) {
        return Observable.merge(executeOnNodes(redisClusterReactiveCommands -> {
            return redisClusterReactiveCommands.shutdown(z);
        }, ALL_NODE_FILTER).values()).onErrorReturn(th -> {
            return null;
        }).last();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<Long> touch(K... kArr) {
        return touch(Arrays.asList(kArr));
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands
    public Observable<Long> touch(Iterable<K> iterable) {
        List newList = LettuceLists.newList(iterable);
        Map partition = SlotHash.partition(this.codec, newList);
        if (partition.size() < 2) {
            return super.touch(newList);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.touch((Iterable) it.next().getValue()));
        }
        return Observable.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<Long> unlink(K... kArr) {
        return unlink(Arrays.asList(kArr));
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands
    public Observable<Long> unlink(Iterable<K> iterable) {
        Map partition = SlotHash.partition(this.codec, iterable);
        if (partition.size() < 2) {
            return super.unlink(iterable);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<K, V>> it = partition.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(super.unlink((Iterable) it.next().getValue()));
        }
        return Observable.merge(arrayList).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        });
    }

    protected <T> Map<String, Observable<T>> executeOnMasters(Function<RedisClusterReactiveCommands<K, V>, Observable<T>> function) {
        return executeOnNodes(function, redisClusterNode -> {
            return Boolean.valueOf(redisClusterNode.is(RedisClusterNode.NodeFlag.MASTER));
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <T> Map<String, Observable<T>> executeOnNodes(Function<RedisClusterReactiveCommands<K, V>, Observable<T>> function, Function<RedisClusterNode, Boolean> function2) {
        HashMap hashMap = new HashMap();
        Iterator<RedisClusterNode> it = getStatefulConnection().getPartitions().iterator();
        while (it.hasNext()) {
            RedisClusterNode next = it.next();
            if (function2.apply(next).booleanValue()) {
                RedisURI uri = next.getUri();
                Single<RedisClusterReactiveCommands<K, V>> connectionReactive = getConnectionReactive(uri.getHost(), uri.getPort());
                String nodeId = next.getNodeId();
                function.getClass();
                hashMap.put(nodeId, connectionReactive.flatMapObservable((v1) -> {
                    return r3.apply(v1);
                }));
            }
        }
        return hashMap;
    }

    private Single<RedisClusterReactiveCommands<K, V>> findConnectionBySlotReactive(int i) {
        RedisClusterNode partitionBySlot = getStatefulConnection().getPartitions().getPartitionBySlot(i);
        return partitionBySlot != null ? getConnectionReactive(partitionBySlot.getUri().getHost(), partitionBySlot.getUri().getPort()) : Single.error(new RedisException("No partition for slot " + i));
    }

    @Override // com.lambdaworks.redis.cluster.api.rx.RedisAdvancedClusterReactiveCommands
    public StatefulRedisClusterConnectionImpl<K, V> getStatefulConnection() {
        return (StatefulRedisClusterConnectionImpl) this.connection;
    }

    @Override // com.lambdaworks.redis.cluster.api.rx.RedisAdvancedClusterReactiveCommands
    public RedisClusterReactiveCommands<K, V> getConnection(String str) {
        return getStatefulConnection().getConnection(str).reactive();
    }

    private Single<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String str) {
        return getSingle(getConnectionProvider().getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, str)).map((v0) -> {
            return v0.reactive();
        });
    }

    @Override // com.lambdaworks.redis.cluster.api.rx.RedisAdvancedClusterReactiveCommands
    public RedisClusterReactiveCommands<K, V> getConnection(String str, int i) {
        return getStatefulConnection().getConnection(str, i).reactive();
    }

    private Single<RedisClusterReactiveCommands<K, V>> getConnectionReactive(String str, int i) {
        return getSingle(getConnectionProvider().getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, str, i)).map((v0) -> {
            return v0.reactive();
        });
    }

    private AsyncClusterConnectionProvider getConnectionProvider() {
        return (AsyncClusterConnectionProvider) getStatefulConnection().getClusterDistributionChannelWriter().getClusterConnectionProvider();
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<KeyScanCursor<K>> scan() {
        return (Observable<KeyScanCursor<K>>) clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan();
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<KeyScanCursor<K>> scan(ScanArgs scanArgs) {
        return (Observable<KeyScanCursor<K>>) clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan(scanArgs);
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<KeyScanCursor<K>> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return (Observable<KeyScanCursor<K>>) clusterScan(scanCursor, (redisKeyReactiveCommands, scanCursor2) -> {
            return redisKeyReactiveCommands.scan(scanCursor2, scanArgs);
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<KeyScanCursor<K>> scan(ScanCursor scanCursor) {
        return (Observable<KeyScanCursor<K>>) clusterScan(scanCursor, (v0, v1) -> {
            return v0.scan(v1);
        }, ClusterScanSupport.reactiveClusterKeyScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel) {
        return clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel, ScanArgs scanArgs) {
        return clusterScan(ScanCursor.INITIAL, (redisKeyReactiveCommands, scanCursor) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel, scanArgs);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel, ScanCursor scanCursor, ScanArgs scanArgs) {
        return clusterScan(scanCursor, (redisKeyReactiveCommands, scanCursor2) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel, scanCursor2, scanArgs);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    @Override // com.lambdaworks.redis.AbstractRedisReactiveCommands, com.lambdaworks.redis.api.rx.RedisKeyReactiveCommands
    public Observable<StreamScanCursor> scan(KeyStreamingChannel<K> keyStreamingChannel, ScanCursor scanCursor) {
        return clusterScan(scanCursor, (redisKeyReactiveCommands, scanCursor2) -> {
            return redisKeyReactiveCommands.scan(keyStreamingChannel, scanCursor2);
        }, ClusterScanSupport.reactiveClusterStreamScanCursorMapper());
    }

    private <T extends ScanCursor> Observable<T> clusterScan(ScanCursor scanCursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Observable<T>> biFunction, ClusterScanSupport.ScanCursorMapper<Observable<T>> scanCursorMapper) {
        return clusterScan(getStatefulConnection(), scanCursor, biFunction, scanCursorMapper);
    }

    private static <T extends ScanCursor, K, V> Observable<T> clusterScan(StatefulRedisClusterConnectionImpl<K, V> statefulRedisClusterConnectionImpl, ScanCursor scanCursor, BiFunction<RedisKeyReactiveCommands<K, V>, ScanCursor, Observable<T>> biFunction, ClusterScanSupport.ScanCursorMapper<Observable<T>> scanCursorMapper) {
        List<String> nodeIds = ClusterScanSupport.getNodeIds(statefulRedisClusterConnectionImpl, scanCursor);
        String currentNodeId = ClusterScanSupport.getCurrentNodeId(scanCursor, nodeIds);
        ScanCursor continuationCursor = ClusterScanSupport.getContinuationCursor(scanCursor);
        return scanCursorMapper.map(nodeIds, currentNodeId, getSingle(((AsyncClusterConnectionProvider) statefulRedisClusterConnectionImpl.getClusterDistributionChannelWriter().getClusterConnectionProvider()).getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, currentNodeId)).flatMapObservable(statefulRedisConnection -> {
            return (Observable) biFunction.apply(statefulRedisConnection.reactive(), continuationCursor);
        }));
    }

    private <T> Observable<T> pipeliningWithMap(Map<K, V> map, Function<Map<K, V>, Observable<T>> function, Function<Observable<T>, Observable<T>> function2) {
        Map partition = SlotHash.partition(this.codec, map.keySet());
        return partition.size() < 2 ? function.apply(map) : function2.apply(Observable.merge((List) partition.values().stream().map(list -> {
            HashMap hashMap = new HashMap();
            list.forEach(obj -> {
                hashMap.put(obj, map.get(obj));
            });
            return (Observable) function.apply(hashMap);
        }).collect(Collectors.toList())));
    }

    private static <T> Single<T> getSingle(CompletableFuture<T> completableFuture) {
        return Single.create(singleSubscriber -> {
            completableFuture.whenComplete((obj, th) -> {
                if (th != null) {
                    singleSubscriber.onError(th);
                } else {
                    singleSubscriber.onSuccess(obj);
                }
            });
        });
    }
}
