package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.Consumer;
import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.kuali.kfs.module.cam.CamsConstants;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/* loaded from: input_file:WEB-INF/lib/spring-data-redis-2.7.15.jar:org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.class */
class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
    private final LettuceReactiveRedisConnection connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LettuceReactiveStreamCommands(LettuceReactiveRedisConnection lettuceReactiveRedisConnection) {
        Assert.notNull(lettuceReactiveRedisConnection, "Connection must not be null!");
        this.connection = lettuceReactiveRedisConnection;
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>> xAck(Publisher<ReactiveStreamCommands.AcknowledgeCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(acknowledgeCommand -> {
                Assert.notNull(acknowledgeCommand.getKey(), "Key must not be null!");
                Assert.notNull(acknowledgeCommand.getGroup(), "Group must not be null!");
                Assert.notNull(acknowledgeCommand.getRecordIds(), "recordIds must not be null!");
                return redisClusterReactiveCommands.xack(acknowledgeCommand.getKey(), ByteUtils.getByteBuffer(acknowledgeCommand.getGroup()), entryIdsToString(acknowledgeCommand.getRecordIds())).map(l -> {
                    return new ReactiveRedisConnection.NumericResponse(acknowledgeCommand, l);
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>> xAdd(Publisher<ReactiveStreamCommands.AddStreamRecord> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(addStreamRecord -> {
                Assert.notNull(addStreamRecord.getKey(), "Key must not be null!");
                Assert.notNull(addStreamRecord.getBody(), "Body must not be null!");
                XAddArgs xAddArgs = new XAddArgs();
                if (!addStreamRecord.getRecord().getId().shouldBeAutoGenerated()) {
                    xAddArgs.id(addStreamRecord.getRecord().getId().getValue());
                }
                if (addStreamRecord.hasMaxlen()) {
                    xAddArgs.maxlen(addStreamRecord.getMaxlen().longValue());
                }
                if (addStreamRecord.hasMinId()) {
                    xAddArgs.minId(addStreamRecord.getMinId().getValue());
                }
                xAddArgs.nomkstream(addStreamRecord.isNoMkStream());
                xAddArgs.approximateTrimming(addStreamRecord.isApproximateTrimming());
                return redisClusterReactiveCommands.xadd((RedisClusterReactiveCommands) addStreamRecord.getKey(), xAddArgs, (Map<RedisClusterReactiveCommands, V>) addStreamRecord.getBody()).map(str -> {
                    return new ReactiveRedisConnection.CommandResponse(addStreamRecord, RecordId.of(str));
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<ReactiveStreamCommands.XClaimCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).map(xClaimCommand -> {
                String[] idsAsStringArray = xClaimCommand.getOptions().getIdsAsStringArray();
                return new ReactiveRedisConnection.CommandResponse(xClaimCommand, redisClusterReactiveCommands.xclaim((RedisClusterReactiveCommands) xClaimCommand.getKey(), (Consumer<RedisClusterReactiveCommands>) Consumer.from(ByteUtils.getByteBuffer(xClaimCommand.getGroupName()), ByteUtils.getByteBuffer(xClaimCommand.getNewOwner())), StreamConverters.toXClaimArgs(xClaimCommand.getOptions()).justid(), idsAsStringArray).map(streamMessage -> {
                    return RecordId.of(streamMessage.getId());
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publisher<ReactiveStreamCommands.XClaimCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).map(xClaimCommand -> {
                String[] idsAsStringArray = xClaimCommand.getOptions().getIdsAsStringArray();
                return new ReactiveRedisConnection.CommandResponse(xClaimCommand, redisClusterReactiveCommands.xclaim((RedisClusterReactiveCommands) xClaimCommand.getKey(), (Consumer<RedisClusterReactiveCommands>) Consumer.from(ByteUtils.getByteBuffer(xClaimCommand.getGroupName()), ByteUtils.getByteBuffer(xClaimCommand.getNewOwner())), StreamConverters.toXClaimArgs(xClaimCommand.getOptions()), idsAsStringArray).map(streamMessage -> {
                    return StreamRecords.newRecord().in(streamMessage.getStream()).withId(streamMessage.getId()).ofBuffer(streamMessage.getBody());
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>> xDel(Publisher<ReactiveStreamCommands.DeleteCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(deleteCommand -> {
                Assert.notNull(deleteCommand.getKey(), "Key must not be null!");
                Assert.notNull(deleteCommand.getRecordIds(), "recordIds must not be null!");
                return redisClusterReactiveCommands.xdel(deleteCommand.getKey(), entryIdsToString(deleteCommand.getRecordIds())).map(l -> {
                    return new ReactiveRedisConnection.NumericResponse(deleteCommand, l);
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>> xGroup(Publisher<ReactiveStreamCommands.GroupCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(groupCommand -> {
                Assert.notNull(groupCommand.getKey(), "Key must not be null!");
                Assert.notNull(groupCommand.getGroupName(), "GroupName must not be null!");
                if (groupCommand.getAction().equals(ReactiveStreamCommands.GroupCommand.GroupCommandAction.CREATE)) {
                    Assert.notNull(groupCommand.getReadOffset(), "ReadOffset must not be null!");
                    return redisClusterReactiveCommands.xgroupCreate(XReadArgs.StreamOffset.from(groupCommand.getKey(), groupCommand.getReadOffset().getOffset()), ByteUtils.getByteBuffer(groupCommand.getGroupName()), XGroupCreateArgs.Builder.mkstream(groupCommand.isMkStream())).map(obj -> {
                        return new ReactiveRedisConnection.CommandResponse(groupCommand, obj);
                    });
                }
                if (groupCommand.getAction().equals(ReactiveStreamCommands.GroupCommand.GroupCommandAction.DELETE_CONSUMER)) {
                    return redisClusterReactiveCommands.xgroupDelconsumer(groupCommand.getKey(), Consumer.from(ByteUtils.getByteBuffer(groupCommand.getGroupName()), ByteUtils.getByteBuffer(groupCommand.getConsumerName()))).map(l -> {
                        return new ReactiveRedisConnection.CommandResponse(groupCommand, "OK");
                    });
                }
                if (groupCommand.getAction().equals(ReactiveStreamCommands.GroupCommand.GroupCommandAction.DESTROY)) {
                    return redisClusterReactiveCommands.xgroupDestroy(groupCommand.getKey(), ByteUtils.getByteBuffer(groupCommand.getGroupName())).map(bool -> {
                        return new ReactiveRedisConnection.CommandResponse(groupCommand, Boolean.TRUE.equals(bool) ? "OK" : CamsConstants.BarCodeInventoryError.STATUS_CODE_ERROR_DESCRIPTION);
                    });
                }
                throw new IllegalArgumentException("Unknown group command " + groupCommand.getAction());
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xLen(Publisher<ReactiveRedisConnection.KeyCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(keyCommand -> {
                Assert.notNull(keyCommand.getKey(), "Key must not be null!");
                return redisClusterReactiveCommands.xlen(keyCommand.getKey()).map(l -> {
                    return new ReactiveRedisConnection.NumericResponse(keyCommand, l);
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessagesSummary>> xPendingSummary(Publisher<ReactiveStreamCommands.PendingRecordsCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(pendingRecordsCommand -> {
                Assert.notNull(pendingRecordsCommand.getKey(), "Key must not be null!");
                return redisClusterReactiveCommands.xpending(pendingRecordsCommand.getKey(), ByteUtils.getByteBuffer(pendingRecordsCommand.getGroupName())).map(pendingMessages -> {
                    return StreamConverters.toPendingMessagesInfo(pendingRecordsCommand.getGroupName(), pendingMessages);
                }).map(pendingMessagesSummary -> {
                    return new ReactiveRedisConnection.CommandResponse(pendingRecordsCommand, pendingMessagesSummary);
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessages>> xPending(Publisher<ReactiveStreamCommands.PendingRecordsCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(pendingRecordsCommand -> {
                Assert.notNull(pendingRecordsCommand.getKey(), "Key must not be null!");
                ByteBuffer byteBuffer = ByteUtils.getByteBuffer(pendingRecordsCommand.getGroupName());
                Range<String> rangeWithDefault = RangeConverter.toRangeWithDefault(pendingRecordsCommand.getRange(), "-", "+");
                Limit from = pendingRecordsCommand.isLimited() ? Limit.from(pendingRecordsCommand.getCount().longValue()) : Limit.unlimited();
                return (pendingRecordsCommand.hasConsumer() ? redisClusterReactiveCommands.xpending((RedisClusterReactiveCommands) pendingRecordsCommand.getKey(), (Consumer<RedisClusterReactiveCommands>) Consumer.from(byteBuffer, ByteUtils.getByteBuffer(pendingRecordsCommand.getConsumerName())), rangeWithDefault, from) : redisClusterReactiveCommands.xpending(pendingRecordsCommand.getKey(), byteBuffer, rangeWithDefault, from)).collectList().map(list -> {
                    return StreamConverters.toPendingMessages(pendingRecordsCommand.getGroupName(), pendingRecordsCommand.getRange(), list);
                }).map(pendingMessages -> {
                    return new ReactiveRedisConnection.CommandResponse(pendingRecordsCommand, pendingMessages);
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).map(rangeCommand -> {
                Assert.notNull(rangeCommand.getKey(), "Key must not be null!");
                Assert.notNull(rangeCommand.getRange(), "Range must not be null!");
                Assert.notNull(rangeCommand.getLimit(), "Limit must not be null!");
                return new ReactiveRedisConnection.CommandResponse(rangeCommand, redisClusterReactiveCommands.xrange(rangeCommand.getKey(), RangeConverter.toRange(rangeCommand.getRange(), Function.identity()), LettuceConverters.toLimit(rangeCommand.getLimit())).map(streamMessage -> {
                    return StreamRecords.newRecord().in(streamMessage.getStream()).withId(streamMessage.getId()).ofBuffer(streamMessage.getBody());
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReactiveStreamCommands.ReadCommand> publisher) {
        return Flux.from(publisher).map(readCommand -> {
            Assert.notNull(readCommand.getStreamOffsets(), "StreamOffsets must not be null!");
            Assert.notNull(readCommand.getReadOptions(), "ReadOptions must not be null!");
            StreamReadOptions readOptions = readCommand.getReadOptions();
            return readOptions.isBlocking() ? new ReactiveRedisConnection.CommandResponse(readCommand, this.connection.executeDedicated(redisClusterReactiveCommands -> {
                return doRead(readCommand, readOptions, redisClusterReactiveCommands);
            })) : new ReactiveRedisConnection.CommandResponse(readCommand, this.connection.execute(redisClusterReactiveCommands2 -> {
                return doRead(readCommand, readOptions, redisClusterReactiveCommands2);
            }));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Flux<ByteBufferRecord> doRead(ReactiveStreamCommands.ReadCommand readCommand, StreamReadOptions streamReadOptions, RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> redisClusterReactiveCommands) {
        XReadArgs.StreamOffset<ByteBuffer>[] streamOffsets = toStreamOffsets(readCommand.getStreamOffsets());
        XReadArgs readArgs = StreamConverters.toReadArgs(streamReadOptions);
        return readCommand.getConsumer() == null ? redisClusterReactiveCommands.xread(readArgs, streamOffsets).map(streamMessage -> {
            return StreamRecords.newRecord().in(streamMessage.getStream()).withId(streamMessage.getId()).ofBuffer(streamMessage.getBody());
        }) : redisClusterReactiveCommands.xreadgroup(toConsumer(readCommand.getConsumer()), readArgs, streamOffsets).map(streamMessage2 -> {
            return StreamRecords.newRecord().in(streamMessage2.getStream()).withId(streamMessage2.getId()).ofBuffer(streamMessage2.getBody());
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, StreamInfo.XInfoStream>> xInfo(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).flatMap(xInfoCommand -> {
                Assert.notNull(xInfoCommand.getKey(), "Key must not be null!");
                return redisClusterReactiveCommands.xinfoStream(xInfoCommand.getKey()).collectList().map(StreamInfo.XInfoStream::fromList).map(xInfoStream -> {
                    return new ReactiveRedisConnection.CommandResponse(xInfoCommand, xInfoStream);
                });
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoGroup>>> xInfoGroups(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).map(xInfoCommand -> {
                Assert.notNull(xInfoCommand.getKey(), "Key must not be null!");
                return new ReactiveRedisConnection.CommandResponse(xInfoCommand, redisClusterReactiveCommands.xinfoGroups(xInfoCommand.getKey()).map(obj -> {
                    return StreamInfo.XInfoGroup.fromList((List) obj);
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoConsumer>>> xInfoConsumers(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).map(xInfoCommand -> {
                Assert.notNull(xInfoCommand.getKey(), "Key must not be null!");
                return new ReactiveRedisConnection.CommandResponse(xInfoCommand, redisClusterReactiveCommands.xinfoConsumers(xInfoCommand.getKey(), ByteUtils.getByteBuffer(xInfoCommand.getGroupName())).map(obj -> {
                    return new StreamInfo.XInfoConsumer(xInfoCommand.getGroupName(), (List) obj);
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).map(rangeCommand -> {
                Assert.notNull(rangeCommand.getKey(), "Key must not be null!");
                Assert.notNull(rangeCommand.getRange(), "Range must not be null!");
                Assert.notNull(rangeCommand.getLimit(), "Limit must not be null!");
                return new ReactiveRedisConnection.CommandResponse(rangeCommand, redisClusterReactiveCommands.xrevrange(rangeCommand.getKey(), RangeConverter.toRange(rangeCommand.getRange(), Function.identity()), LettuceConverters.toLimit(rangeCommand.getLimit())).map(streamMessage -> {
                    return StreamRecords.newRecord().in(streamMessage.getStream()).withId(streamMessage.getId()).ofBuffer(streamMessage.getBody());
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xTrim(Publisher<ReactiveStreamCommands.TrimCommand> publisher) {
        return this.connection.execute(redisClusterReactiveCommands -> {
            return Flux.from(publisher).concatMap(trimCommand -> {
                Assert.notNull(trimCommand.getKey(), "Key must not be null!");
                Assert.notNull(trimCommand.getCount(), "Count must not be null!");
                return redisClusterReactiveCommands.xtrim(trimCommand.getKey(), trimCommand.isApproximateTrimming(), trimCommand.getCount().longValue()).map(l -> {
                    return new ReactiveRedisConnection.NumericResponse(trimCommand, l);
                });
            });
        });
    }

    private static <T> XReadArgs.StreamOffset<T>[] toStreamOffsets(Collection<StreamOffset<T>> collection) {
        return (XReadArgs.StreamOffset[]) collection.stream().map(streamOffset -> {
            return XReadArgs.StreamOffset.from(streamOffset.getKey(), streamOffset.getOffset().getOffset());
        }).toArray(i -> {
            return new XReadArgs.StreamOffset[i];
        });
    }

    private static Consumer<ByteBuffer> toConsumer(org.springframework.data.redis.connection.stream.Consumer consumer) {
        return Consumer.from(ByteUtils.getByteBuffer(consumer.getGroup()), ByteUtils.getByteBuffer(consumer.getName()));
    }

    private static String[] entryIdsToString(List<RecordId> list) {
        return list.size() == 1 ? new String[]{list.get(0).getValue()} : (String[]) list.stream().map((v0) -> {
            return v0.getValue();
        }).toArray(i -> {
            return new String[i];
        });
    }
}
