package org.terracotta.modules.ehcache.async;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.modules.ehcache.async.AsyncCoordinatorImpl;
import org.terracotta.modules.ehcache.async.exceptions.ProcessingException;
import org.terracotta.toolkit.cluster.ClusterInfo;
import org.terracotta.toolkit.internal.collections.ToolkitListInternal;

/* loaded from: input_file:WEB-INF/lib/ehcache-2.11.0.3.31.jar:org/terracotta/modules/ehcache/async/ProcessingBucket.class */
public class ProcessingBucket<E extends Serializable> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingBucket.class.getName());
    private static final int UNLIMITED_QUEUE_SIZE = 0;
    private static final String threadNamePrefix = "ProcessingWorker|";
    private final String bucketName;
    private final AsyncConfig config;
    private final ClusterInfo cluster;
    private final ItemProcessor<E> processor;
    private volatile ItemsFilter<E> filter;
    private final Lock bucketWriteLock;
    private final Lock bucketReadLock;
    private final Condition bucketNotEmpty;
    private final Condition bucketNotFull;
    private final Condition stoppedButBucketNotEmpty;
    private final ToolkitListInternal<E> toolkitList;
    private final AtomicLong workDelay;
    private final ProcessingBucket<E>.ProcessingWorker processingWorkerRunnable;
    private volatile Thread processingWorkerThread;
    private AsyncCoordinatorImpl.Callback cleanupCallback;
    private final boolean workingOnDeadBucket;
    private volatile boolean destroyAfterStop;
    private long lastProcessingTimeMillis = -1;
    private long lastWorkDoneMillis = -1;
    private volatile STOP_STATE stopState = STOP_STATE.NORMAL;
    private final long baselineTimestampMillis = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/ehcache-2.11.0.3.31.jar:org/terracotta/modules/ehcache/async/ProcessingBucket$ProcessingWorker.class */
    public final class ProcessingWorker implements Runnable {
        private final String threadName;

        public ProcessingWorker(String str) {
            this.threadName = str;
        }

        public String getThreadName() {
            return this.threadName;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j;
            while (!ProcessingBucket.this.isCancelled()) {
                try {
                    if (ProcessingBucket.this.cluster.areOperationsEnabled()) {
                        try {
                            ProcessingBucket.this.processItems();
                        } catch (Throwable th) {
                            if (!ProcessingBucket.this.cluster.areOperationsEnabled()) {
                                ProcessingBucket.LOGGER.warn("Caught error on processing items, but looks like we were shut down. This can probably be safely ignored", th);
                            } else if (!ProcessingBucket.this.isTCNRE(th)) {
                                ProcessingBucket.LOGGER.error("Caught error on processing bucket " + ProcessingBucket.this.bucketName, th);
                            }
                        }
                    }
                    long lastProcessing = ProcessingBucket.this.getLastProcessing();
                    ProcessingBucket.this.bucketWriteLock.lock();
                    try {
                        try {
                            j = ProcessingBucket.this.workDelay.get();
                        } catch (Throwable th2) {
                            ProcessingBucket.this.bucketWriteLock.unlock();
                            throw th2;
                        }
                    } catch (InterruptedException e) {
                        ProcessingBucket.this.stop();
                        Thread.currentThread().interrupt();
                    }
                    if (j == 0) {
                        while (!ProcessingBucket.this.workingOnDeadBucket && ProcessingBucket.this.stopState == STOP_STATE.NORMAL && ProcessingBucket.this.toolkitList.isEmpty()) {
                            ProcessingBucket.this.bucketNotEmpty.await();
                        }
                        ProcessingBucket.this.bucketWriteLock.unlock();
                    }
                    do {
                        ProcessingBucket.this.bucketNotEmpty.await(j, TimeUnit.MILLISECONDS);
                        long baselinedCurrentTimeMillis = ProcessingBucket.this.baselinedCurrentTimeMillis() - lastProcessing;
                        j = baselinedCurrentTimeMillis < j ? j - baselinedCurrentTimeMillis : 0L;
                        if (j <= 0) {
                            break;
                        }
                    } while (ProcessingBucket.this.stopState == STOP_STATE.NORMAL);
                    ProcessingBucket.this.bucketWriteLock.unlock();
                } catch (Throwable th3) {
                    if (ProcessingBucket.this.isTCNRE(th3) && !ProcessingBucket.this.cluster.areOperationsEnabled()) {
                        ProcessingBucket.LOGGER.warn("Caught TCNotRunningException on processing thread, but looks like we were shut down. This can safely be ignored!", th3);
                    }
                }
            }
            if (ProcessingBucket.this.destroyAfterStop) {
                if (ProcessingBucket.this.workingOnDeadBucket) {
                    ProcessingBucket.this.cleanupCallback.callback();
                } else {
                    ProcessingBucket.this.destroy();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/ehcache-2.11.0.3.31.jar:org/terracotta/modules/ehcache/async/ProcessingBucket$STOP_STATE.class */
    public enum STOP_STATE {
        NORMAL,
        STOP_REQUESTED,
        STOPPED
    }

    public ProcessingBucket(String str, AsyncConfig asyncConfig, ToolkitListInternal<E> toolkitListInternal, ClusterInfo clusterInfo, ItemProcessor<E> itemProcessor, boolean z) {
        this.bucketName = str;
        this.config = asyncConfig;
        this.cluster = clusterInfo;
        this.processor = itemProcessor;
        this.toolkitList = toolkitListInternal;
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        this.bucketReadLock = reentrantReadWriteLock.readLock();
        this.bucketWriteLock = reentrantReadWriteLock.writeLock();
        this.bucketNotEmpty = this.bucketWriteLock.newCondition();
        this.bucketNotFull = this.bucketWriteLock.newCondition();
        this.stoppedButBucketNotEmpty = this.bucketWriteLock.newCondition();
        this.workDelay = new AtomicLong(asyncConfig.getWorkDelay());
        this.workingOnDeadBucket = z;
        this.processingWorkerRunnable = new ProcessingWorker("ProcessingWorker|" + str);
        this.destroyAfterStop = true;
    }

    public String getBucketName() {
        return this.bucketName;
    }

    public long getLastProcessing() {
        this.bucketReadLock.lock();
        try {
            return this.lastProcessingTimeMillis;
        } finally {
            this.bucketReadLock.unlock();
        }
    }

    public void setItemsFilter(ItemsFilter<E> itemsFilter) {
        this.filter = itemsFilter;
    }

    private long baselinedCurrentTimeMillis() {
        return System.currentTimeMillis() - this.baselineTimestampMillis;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.bucketWriteLock.lock();
        try {
            ensureNonExistingThread();
            this.processingWorkerThread = new Thread(this.processingWorkerRunnable);
            this.processingWorkerThread.setName(this.processingWorkerRunnable.getThreadName());
            this.processingWorkerThread.setDaemon(true);
            this.processingWorkerThread.start();
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    private void ensureNonExistingThread() {
        if (this.processingWorkerThread != null) {
            throw new AssertionError(this.processingWorkerRunnable.getThreadName());
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:9:0x0023, code lost:
    
        if (r3.toolkitList.isEmpty() != false) goto L9;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean isCancelled() {
        /*
            r3 = this;
            r0 = r3
            java.util.concurrent.locks.Lock r0 = r0.bucketReadLock     // Catch: java.lang.RuntimeException -> L43
            r0.lock()     // Catch: java.lang.RuntimeException -> L43
            r0 = r3
            org.terracotta.modules.ehcache.async.ProcessingBucket$STOP_STATE r0 = r0.stopState     // Catch: java.lang.Throwable -> L37 java.lang.RuntimeException -> L43
            org.terracotta.modules.ehcache.async.ProcessingBucket$STOP_STATE r1 = org.terracotta.modules.ehcache.async.ProcessingBucket.STOP_STATE.STOPPED     // Catch: java.lang.Throwable -> L37 java.lang.RuntimeException -> L43
            if (r0 == r1) goto L26
            r0 = r3
            boolean r0 = r0.workingOnDeadBucket     // Catch: java.lang.Throwable -> L37 java.lang.RuntimeException -> L43
            if (r0 == 0) goto L2a
            r0 = r3
            org.terracotta.toolkit.internal.collections.ToolkitListInternal<E extends java.io.Serializable> r0 = r0.toolkitList     // Catch: java.lang.Throwable -> L37 java.lang.RuntimeException -> L43
            boolean r0 = r0.isEmpty()     // Catch: java.lang.Throwable -> L37 java.lang.RuntimeException -> L43
            if (r0 == 0) goto L2a
        L26:
            r0 = 1
            goto L2b
        L2a:
            r0 = 0
        L2b:
            r4 = r0
            r0 = r3
            java.util.concurrent.locks.Lock r0 = r0.bucketReadLock     // Catch: java.lang.RuntimeException -> L43
            r0.unlock()     // Catch: java.lang.RuntimeException -> L43
            r0 = r4
            return r0
        L37:
            r5 = move-exception
            r0 = r3
            java.util.concurrent.locks.Lock r0 = r0.bucketReadLock     // Catch: java.lang.RuntimeException -> L43
            r0.unlock()     // Catch: java.lang.RuntimeException -> L43
            r0 = r5
            throw r0     // Catch: java.lang.RuntimeException -> L43
        L43:
            r4 = move-exception
            r0 = r3
            r1 = r4
            boolean r0 = r0.isTCNRE(r1)
            if (r0 == 0) goto L4e
            r0 = 1
            return r0
        L4e:
            r0 = r4
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.terracotta.modules.ehcache.async.ProcessingBucket.isCancelled():boolean");
    }

    private boolean isTCNRE(Throwable th) {
        return th.getClass().getName().equals("com.tc.exception.TCNotRunningException");
    }

    public int getWaitCount() {
        this.bucketReadLock.lock();
        try {
            return this.toolkitList.size();
        } finally {
            this.bucketReadLock.unlock();
        }
    }

    public void stopNow() {
        this.bucketWriteLock.lock();
        try {
            this.destroyAfterStop = false;
            this.stopState = STOP_STATE.STOPPED;
            this.bucketNotEmpty.signalAll();
            this.bucketNotFull.signalAll();
            this.processingWorkerThread.interrupt();
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    public void stop() {
        this.bucketWriteLock.lock();
        try {
            this.workDelay.set(0L);
            this.stopState = STOP_STATE.STOP_REQUESTED;
            while (!this.toolkitList.isEmpty()) {
                this.stoppedButBucketNotEmpty.await();
            }
            this.stopState = STOP_STATE.STOPPED;
            this.bucketNotEmpty.signalAll();
            this.bucketNotFull.signalAll();
            this.processingWorkerThread.interrupt();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    public void destroy() {
        try {
            debug("destroying bucket " + this.bucketName + " " + this.toolkitList.size());
            this.toolkitList.destroy();
        } catch (Throwable th) {
            if (!isTCNRE(th) || this.cluster.areOperationsEnabled()) {
                return;
            }
            LOGGER.warn("destroyToolkitList caught TCNotRunningException on processing thread, but looks like we were shut down. This can safely be ignored!", th);
        }
    }

    private String getThreadName() {
        return this.processingWorkerRunnable.getThreadName();
    }

    public void add(E e) {
        if (null == e) {
            return;
        }
        int maxQueueSize = this.config.getMaxQueueSize();
        this.bucketWriteLock.lock();
        boolean z = false;
        if (maxQueueSize != 0) {
            while (!isCancelled() && this.toolkitList.size() >= maxQueueSize) {
                try {
                    try {
                        this.bucketNotFull.await();
                    } catch (InterruptedException e2) {
                        z = true;
                    }
                } finally {
                    this.bucketWriteLock.unlock();
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        boolean isEmpty = this.toolkitList.isEmpty();
        this.toolkitList.unlockedAdd(e);
        if (isEmpty) {
            this.bucketNotEmpty.signalAll();
        }
    }

    private int determineBatchSize() {
        int batchSize = this.config.getBatchSize();
        int size = this.toolkitList.size();
        if (size < batchSize) {
            batchSize = size;
        }
        return batchSize;
    }

    private void debug(String str) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(str);
        }
    }

    private void filterQuarantined() {
        if (null == this.filter) {
            return;
        }
        this.bucketWriteLock.lock();
        try {
            ItemsFilter<E> itemsFilter = this.filter;
            if (itemsFilter != null) {
                debug(getThreadName() + " : filterQuarantined: filtering " + this.toolkitList.size() + " quarantined items");
                itemsFilter.filter(this.toolkitList);
                debug(getThreadName() + " : filterQuarantined: retained " + this.toolkitList.size() + " quarantined items");
            }
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    private void processItems() throws ProcessingException {
        this.bucketWriteLock.lock();
        try {
            this.lastProcessingTimeMillis = baselinedCurrentTimeMillis();
            int size = this.toolkitList.size();
            if (0 == size) {
                debug(getThreadName() + " : processItems : nothing to process");
                return;
            }
            filterQuarantined();
            int batchSize = this.config.getBatchSize();
            if (this.config.isBatchingEnabled() && batchSize > 0) {
                if (size < batchSize && this.config.getMaxAllowedFallBehind() > this.lastProcessingTimeMillis - this.lastWorkDoneMillis) {
                    this.bucketReadLock.lock();
                    try {
                        if (this.stopState == STOP_STATE.NORMAL) {
                            debug(getThreadName() + " : processItems : only " + size + " work items available, waiting for " + batchSize + " items to fill up a batch");
                            return;
                        }
                    } finally {
                    }
                }
                int rateLimit = this.config.getRateLimit();
                if (rateLimit > 0) {
                    this.bucketReadLock.lock();
                    try {
                        if (this.stopState == STOP_STATE.NORMAL) {
                            long baselinedCurrentTimeMillis = (baselinedCurrentTimeMillis() - this.lastWorkDoneMillis) / 1000;
                            int determineBatchSize = determineBatchSize();
                            if (determineBatchSize > rateLimit * baselinedCurrentTimeMillis) {
                                debug(getThreadName() + " : processItems() : last work was done " + baselinedCurrentTimeMillis + " seconds ago, processing " + this + " batch items would exceed the rate limit of " + determineBatchSize + ", waiting for a while.");
                                return;
                            }
                        }
                    } finally {
                    }
                }
            }
            this.bucketWriteLock.lock();
            try {
                this.lastWorkDoneMillis = baselinedCurrentTimeMillis();
                doProcessItems();
            } finally {
            }
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    private void doProcessItems() throws ProcessingException {
        if (this.cluster.areOperationsEnabled()) {
            if (!this.config.isBatchingEnabled() || this.config.getBatchSize() <= 0) {
                processListSnapshot();
            } else {
                processBatchedItems();
            }
            if (this.toolkitList.isEmpty() && this.stopState == STOP_STATE.STOP_REQUESTED) {
                signalStop();
            }
        }
    }

    private void signalStop() {
        this.bucketWriteLock.lock();
        try {
            this.stoppedButBucketNotEmpty.signalAll();
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    private void processListSnapshot() throws ProcessingException {
        int size = this.toolkitList.size();
        debug(getThreadName() + " : processListSnapshot size " + size + " quarantined items");
        while (true) {
            int i = size;
            size--;
            if (i <= 0) {
                return;
            } else {
                processSingleItem();
            }
        }
    }

    private void processSingleItem() throws ProcessingException {
        E e = getItemsFromQueue(1).get(0);
        int retryAttempts = this.config.getRetryAttempts();
        int i = retryAttempts + 1;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                break;
            }
            try {
                this.processor.process((ItemProcessor<E>) e);
                break;
            } catch (RuntimeException e2) {
                if (i <= 0) {
                    try {
                        this.processor.throwAway(e, e2);
                    } catch (Throwable th) {
                        LOGGER.warn("processSingleItem caught error while throwing away an item: " + e, th);
                    }
                } else {
                    LOGGER.warn(getThreadName() + " : processSingleItem() : exception during processing, retrying in " + retryAttempts + " milliseconds, " + i + " retries left", (Throwable) e2);
                    try {
                        Thread.sleep(this.config.getRetryAttemptDelay());
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                        throw e2;
                    }
                }
            }
        }
        removeFromQueue(1);
    }

    private void processBatchedItems() throws ProcessingException {
        int determineBatchSize = determineBatchSize();
        List<E> itemsFromQueue = getItemsFromQueue(determineBatchSize);
        int retryAttempts = this.config.getRetryAttempts();
        int i = retryAttempts + 1;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                break;
            }
            try {
                this.processor.process(itemsFromQueue);
                break;
            } catch (RuntimeException e) {
                LOGGER.warn("processBatchedItems caught error while processing batch of " + itemsFromQueue.size(), (Throwable) e);
                if (i <= 0) {
                    for (E e2 : itemsFromQueue) {
                        try {
                            this.processor.throwAway(e2, e);
                        } catch (Throwable th) {
                            LOGGER.warn("processBatchedItems caught error while throwing away an item: " + e2, th);
                        }
                    }
                } else {
                    LOGGER.warn(getThreadName() + " : processBatchedItems() : exception during processing, retrying in " + retryAttempts + " milliseconds, " + i + " retries left", (Throwable) e);
                    try {
                        Thread.sleep(this.config.getRetryAttemptDelay());
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                        throw e;
                    }
                }
            }
        }
        removeFromQueue(determineBatchSize);
    }

    private List<E> getItemsFromQueue(int i) {
        this.bucketReadLock.lock();
        try {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add((Serializable) this.toolkitList.get(i2));
            }
            return arrayList;
        } finally {
            this.bucketReadLock.unlock();
        }
    }

    private void removeFromQueue(int i) {
        this.bucketWriteLock.lock();
        try {
            boolean z = this.toolkitList.size() >= this.config.getMaxQueueSize();
            for (int i2 = 0; i2 < i; i2++) {
                this.toolkitList.remove(0);
            }
            if (z) {
                this.bucketNotFull.signalAll();
            }
        } finally {
            this.bucketWriteLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setCleanupCallback(AsyncCoordinatorImpl.Callback callback) {
        this.cleanupCallback = callback;
    }
}
