/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisCommandInterruptedException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

@ChannelHandler.Sharable
public class CommandHandler<K, V>
extends ChannelDuplexHandler
implements RedisChannelWriter<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
    protected ClientOptions clientOptions;
    protected BlockingQueue<RedisCommand<K, V, ?>> queue;
    protected BlockingQueue<RedisCommand<K, V, ?>> commandBuffer = new LinkedBlockingQueue();
    protected ByteBuf buffer;
    protected RedisStateMachine<K, V> rsm;
    private Channel channel;
    private boolean closed;
    private boolean connected;
    private RedisChannelHandler<K, V> redisChannelHandler;
    private final ReentrantLock writeLock = new ReentrantLock();
    private final ReentrantLock readLock = new ReentrantLock();
    private Throwable connectionError;

    public CommandHandler(ClientOptions clientOptions, BlockingQueue<RedisCommand<K, V, ?>> queue) {
        this.clientOptions = clientOptions;
        this.queue = queue;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.closed = false;
        this.buffer = ctx.alloc().heapBuffer();
        this.rsm = new RedisStateMachine();
        this.channel = ctx.channel();
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.releaseBuffer();
        if (this.closed) {
            this.cancelCommands("Connection closed");
        }
        this.channel = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        try {
            if (!input.isReadable() || input.refCnt() == 0) {
                return;
            }
            if (this.buffer == null) {
                logger.warn("CommandHandler is closed, incoming response will be discarded.");
                return;
            }
            try {
                this.readLock.lock();
                this.buffer.writeBytes(input);
                if (logger.isTraceEnabled()) {
                    logger.trace("[" + ctx.channel().remoteAddress() + "] Received: " + this.buffer.toString(Charset.defaultCharset()).trim());
                }
                this.decode(ctx, this.buffer);
            }
            finally {
                this.readLock.unlock();
            }
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        while (!this.queue.isEmpty() && this.rsm.decode(buffer, (RedisCommand)this.queue.peek(), ((RedisCommand)this.queue.peek()).getOutput())) {
            RedisCommand<K, V, ?> cmd = this.queue.take();
            cmd.complete();
            if (buffer == null || buffer.refCnt() == 0) continue;
            buffer.discardReadBytes();
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (!this.queue.isEmpty()) {
            RedisCommand<K, V, ?> command = this.queue.take();
            command.setException(cause);
            command.complete();
        }
        if (this.channel == null || !this.connected) {
            this.connectionError = cause;
            return;
        }
        super.exceptionCaught(ctx, cause);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
        try {
            if (this.closed) {
                throw new RedisException("Connection is closed");
            }
            try {
                this.writeLock.lock();
                if (this.channel != null && this.connected) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("[" + this + "] write() writeAndFlush Command " + command);
                    }
                    this.channel.writeAndFlush(command);
                    return command;
                }
                if (this.connectionError != null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("[" + this + "] write() completing Command " + command + " due to connection error");
                    }
                    command.setException(this.connectionError);
                    command.complete();
                    RedisCommand<K, V, T> redisCommand = command;
                    return redisCommand;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("[" + this + "] write() buffering Command " + command);
                }
                this.commandBuffer.put(command);
                return command;
            }
            finally {
                this.writeLock.unlock();
            }
        }
        catch (InterruptedException e) {
            throw new RedisCommandInterruptedException(e);
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.connected = true;
        this.closed = false;
        logger.debug("[" + this + "] channelActive()");
        try {
            this.executeQueuedCommands(ctx);
            logger.debug("[" + this + "] channelActive() done");
        }
        catch (Exception e) {
            logger.debug("[" + this + "] channelActive() ran into an exception");
            if (this.clientOptions.isCancelCommandsOnReconnectFailure()) {
                this.reset();
            }
            throw e;
        }
        super.channelActive(ctx);
    }

    protected void executeQueuedCommands(ChannelHandlerContext ctx) {
        ArrayList tmp = new ArrayList(this.queue.size() + this.commandBuffer.size());
        try {
            this.writeLock.lock();
            this.connectionError = null;
            tmp.addAll(this.commandBuffer);
            tmp.addAll(this.queue);
            this.queue.clear();
            this.commandBuffer.clear();
            this.channel = ctx.channel();
            if (this.redisChannelHandler != null) {
                this.redisChannelHandler.activated();
            }
        }
        finally {
            this.writeLock.unlock();
        }
        for (RedisCommand redisCommand : tmp) {
            if (redisCommand.isCancelled()) continue;
            if (logger.isDebugEnabled()) {
                logger.debug("[" + this + "] channelActive() triggering command " + redisCommand);
            }
            ctx.channel().writeAndFlush((Object)redisCommand);
        }
        tmp.clear();
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof RedisCommand) {
            RedisCommand cmd = (RedisCommand)msg;
            ByteBuf buf = ctx.alloc().heapBuffer();
            cmd.encode(buf);
            if (logger.isTraceEnabled()) {
                logger.trace("[" + ctx.channel().remoteAddress() + "] Sent: " + buf.toString(Charset.defaultCharset()).trim());
            }
            if (cmd.getOutput() == null) {
                ctx.write((Object)buf, promise);
                cmd.complete();
            } else {
                this.queue.put(cmd);
                ctx.write((Object)buf, promise);
            }
            return;
        }
        super.write(ctx, msg, promise);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("[" + this + "] channelInactive()");
        this.connected = false;
        if (this.redisChannelHandler != null) {
            this.redisChannelHandler.deactivated();
        }
        logger.debug("[" + this + "] channelInactive() done");
        super.channelInactive(ctx);
    }

    private void cancelCommands(String message) {
        int size = 0;
        if (this.queue != null) {
            size += this.queue.size();
        }
        if (this.commandBuffer != null) {
            size += this.commandBuffer.size();
        }
        ArrayList toCancel = new ArrayList(size);
        if (this.queue != null) {
            toCancel.addAll(this.queue);
            this.queue.clear();
        }
        if (this.commandBuffer != null) {
            toCancel.addAll(this.commandBuffer);
            this.commandBuffer.clear();
        }
        for (RedisCommand redisCommand : toCancel) {
            if (redisCommand.getOutput() != null) {
                redisCommand.getOutput().setError(message);
            }
            redisCommand.cancel(true);
        }
    }

    @Override
    public void close() {
        logger.debug("[" + this + "] close()");
        if (this.closed) {
            return;
        }
        this.closed = true;
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            try {
                currentChannel.closeFuture().await();
            }
            catch (InterruptedException e) {
                throw new RedisException(e);
            }
        }
    }

    private void releaseBuffer() {
        if (this.buffer != null) {
            try {
                this.readLock.lock();
                this.buffer.release();
            }
            finally {
                this.readLock.unlock();
            }
            this.buffer = null;
        }
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override
    public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {
        this.redisChannelHandler = redisChannelHandler;
    }

    @Override
    public void reset() {
        logger.debug("[" + this + "] reset()");
        try {
            this.writeLock.lock();
            this.cancelCommands("Reset");
        }
        finally {
            this.writeLock.unlock();
        }
        if (this.buffer != null) {
            try {
                this.readLock.lock();
                this.rsm.reset();
                this.buffer.clear();
            }
            finally {
                this.readLock.unlock();
            }
        }
    }
}

