package org.apache.storm.utils;

import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
import org.apache.storm.metrics2.DisruptorMetrics;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/utils/DisruptorQueue.class */
public class DisruptorQueue implements IStatefulObject {
    private static final String PREFIX = "disruptor-";
    private final RingBuffer<AtomicReference<Object>> _buffer;
    private final Sequence _consumer;
    private final SequenceBarrier _barrier;
    private final int _inputBatchSize;
    private final Flusher _flusher;
    private final QueueMetrics _metrics;
    private final DisruptorMetrics _disruptorMetrics;
    private String _queueName;
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
    private static final Object INTERRUPT = new Object();
    private static final FlusherPool FLUSHER = new FlusherPool();
    private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("disruptor-metrics-reporter").build());
    private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<>();
    private DisruptorBackpressureCallback _cb = null;
    private int _highWaterMark = 0;
    private int _lowWaterMark = 0;
    private boolean _enableBackpressure = false;
    private final AtomicLong _overflowCount = new AtomicLong(0);
    private final AtomicLong tuplePopulation = new AtomicLong(0);
    private volatile boolean _throttleOn = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$Flusher.class */
    public class Flusher implements Runnable {
        private AtomicBoolean _isFlushing = new AtomicBoolean(false);
        private final long _flushInterval;

        public Flusher(long j, String str) {
            this._flushInterval = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this._isFlushing.compareAndSet(false, true)) {
                for (ThreadLocalInserter threadLocalInserter : DisruptorQueue.this._batchers.values()) {
                    threadLocalInserter.forceBatch();
                    threadLocalInserter.flush(true);
                }
                this._isFlushing.set(false);
            }
        }

        public void start() {
            DisruptorQueue.FLUSHER.start(this, this._flushInterval);
        }

        public void close() {
            DisruptorQueue.FLUSHER.stop(this, this._flushInterval);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$FlusherPool.class */
    public static class FlusherPool {
        private static final String THREAD_PREFIX = "disruptor-flush";
        private Timer _timer = new Timer("disruptor-flush-trigger", true);
        private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>();
        private HashMap<Long, TimerTask> _tt = new HashMap<>();
        private ThreadPoolExecutor _exec = new ThreadPoolExecutor(1, DisruptorQueue.access$000(), 10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadPoolExecutor.DiscardPolicy());

        public FlusherPool() {
            this._exec.setThreadFactory(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("disruptor-flush-task-pool").build());
        }

        public synchronized void start(Flusher flusher, final long j) {
            ArrayList<Flusher> arrayList = this._pendingFlush.get(Long.valueOf(j));
            if (arrayList == null) {
                arrayList = new ArrayList<>();
                TimerTask timerTask = new TimerTask() { // from class: org.apache.storm.utils.DisruptorQueue.FlusherPool.1
                    @Override // java.util.TimerTask, java.lang.Runnable
                    public void run() {
                        FlusherPool.this.invokeAll(j);
                    }
                };
                this._pendingFlush.put(Long.valueOf(j), arrayList);
                this._timer.schedule(timerTask, j, j);
                this._tt.put(Long.valueOf(j), timerTask);
            }
            arrayList.add(flusher);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void invokeAll(long j) {
            ArrayList<Flusher> arrayList = this._pendingFlush.get(Long.valueOf(j));
            if (arrayList != null) {
                Iterator<Flusher> it = arrayList.iterator();
                while (it.hasNext()) {
                    this._exec.submit(it.next());
                }
            }
        }

        public synchronized void stop(Flusher flusher, long j) {
            ArrayList<Flusher> arrayList = this._pendingFlush.get(Long.valueOf(j));
            if (arrayList != null) {
                arrayList.remove(flusher);
                if (arrayList.size() == 0) {
                    this._pendingFlush.remove(Long.valueOf(j));
                    this._tt.remove(Long.valueOf(j)).cancel();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$ObjectEventFactory.class */
    private static class ObjectEventFactory implements EventFactory<AtomicReference<Object>> {
        private ObjectEventFactory() {
        }

        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public AtomicReference<Object> m6387newInstance() {
            return new AtomicReference<>();
        }
    }

    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$QueueMetrics.class */
    public class QueueMetrics {
        private final RateTracker _rateTracker = new RateTracker(10000, 10);

        public QueueMetrics() {
        }

        public long writePos() {
            return DisruptorQueue.this._buffer.getCursor();
        }

        public long readPos() {
            return DisruptorQueue.this._consumer.get();
        }

        public long overflow() {
            return DisruptorQueue.this._overflowCount.get();
        }

        public long population() {
            return writePos() - readPos();
        }

        public long capacity() {
            return DisruptorQueue.this._buffer.getBufferSize();
        }

        public float pctFull() {
            return (1.0f * ((float) population())) / ((float) capacity());
        }

        public double arrivalRate() {
            return this._rateTracker.reportRate();
        }

        public double sojournTime() {
            return (DisruptorQueue.this.tuplePopulation.get() / Math.max(arrivalRate(), 1.0E-5d)) * 1000.0d;
        }

        public Object getState() {
            HashMap hashMap = new HashMap();
            long readPos = readPos();
            long writePos = writePos();
            long j = DisruptorQueue.this.tuplePopulation.get();
            double reportRate = this._rateTracker.reportRate();
            double max = (j / Math.max(reportRate, 1.0E-5d)) * 1000.0d;
            hashMap.put("capacity", Long.valueOf(capacity()));
            hashMap.put("population", Long.valueOf(writePos - readPos));
            hashMap.put("tuple_population", Long.valueOf(j));
            hashMap.put("write_pos", Long.valueOf(writePos));
            hashMap.put("read_pos", Long.valueOf(readPos));
            hashMap.put("arrival_rate_secs", Double.valueOf(reportRate));
            hashMap.put("sojourn_time_ms", Double.valueOf(max));
            hashMap.put("overflow", Long.valueOf(DisruptorQueue.this._overflowCount.get()));
            return hashMap;
        }

        public void notifyArrivals(long j) {
            this._rateTracker.notify(j);
            DisruptorQueue.this.tuplePopulation.getAndAdd(j);
        }

        public void notifyDepartures(long j) {
            DisruptorQueue.this.tuplePopulation.getAndAdd(-j);
        }

        public void close() {
            this._rateTracker.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$ThreadLocalBatcher.class */
    public class ThreadLocalBatcher implements ThreadLocalInserter {
        private final ReentrantLock _flushLock = new ReentrantLock();
        private final ConcurrentLinkedQueue<ArrayList<Object>> _overflow = new ConcurrentLinkedQueue<>();
        private ArrayList<Object> _currentBatch;

        public ThreadLocalBatcher() {
            this._currentBatch = new ArrayList<>(DisruptorQueue.this._inputBatchSize);
        }

        @Override // org.apache.storm.utils.DisruptorQueue.ThreadLocalInserter
        public synchronized void add(Object obj) {
            this._currentBatch.add(obj);
            DisruptorQueue.this._overflowCount.incrementAndGet();
            if (DisruptorQueue.this._enableBackpressure && DisruptorQueue.this._cb != null && DisruptorQueue.this._metrics.population() + DisruptorQueue.this._overflowCount.get() >= DisruptorQueue.this._highWaterMark) {
                try {
                    if (!DisruptorQueue.this._throttleOn) {
                        DisruptorQueue.this._throttleOn = true;
                        DisruptorQueue.this._cb.highWaterMark();
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Exception during calling highWaterMark callback!", e);
                }
            }
            if (this._currentBatch.size() >= DisruptorQueue.this._inputBatchSize) {
                boolean z = false;
                if (this._overflow.isEmpty()) {
                    try {
                        DisruptorQueue.this.publishDirect(this._currentBatch, false);
                        DisruptorQueue.this._overflowCount.addAndGet(0 - this._currentBatch.size());
                        this._currentBatch.clear();
                        z = true;
                    } catch (InsufficientCapacityException e2) {
                    }
                }
                if (z) {
                    return;
                }
                this._overflow.add(this._currentBatch);
                this._currentBatch = new ArrayList<>(DisruptorQueue.this._inputBatchSize);
            }
        }

        @Override // org.apache.storm.utils.DisruptorQueue.ThreadLocalInserter
        public synchronized void forceBatch() {
            if (this._currentBatch.isEmpty()) {
                return;
            }
            this._overflow.add(this._currentBatch);
            this._currentBatch = new ArrayList<>(DisruptorQueue.this._inputBatchSize);
        }

        @Override // org.apache.storm.utils.DisruptorQueue.ThreadLocalInserter
        public void flush(boolean z) {
            if (z) {
                this._flushLock.lock();
            } else if (!this._flushLock.tryLock()) {
                return;
            }
            while (!this._overflow.isEmpty()) {
                try {
                    DisruptorQueue.this.publishDirect(this._overflow.peek(), z);
                    DisruptorQueue.this._overflowCount.addAndGet(0 - this._overflow.poll().size());
                } catch (InsufficientCapacityException e) {
                    return;
                } finally {
                    this._flushLock.unlock();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$ThreadLocalInserter.class */
    public interface ThreadLocalInserter {
        void add(Object obj);

        void forceBatch();

        void flush(boolean z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueue$ThreadLocalJustInserter.class */
    public class ThreadLocalJustInserter implements ThreadLocalInserter {
        private final ReentrantLock _flushLock = new ReentrantLock();
        private final ConcurrentLinkedQueue<Object> _overflow = new ConcurrentLinkedQueue<>();

        public ThreadLocalJustInserter() {
        }

        @Override // org.apache.storm.utils.DisruptorQueue.ThreadLocalInserter
        public synchronized void add(Object obj) {
            boolean z = false;
            if (this._overflow.isEmpty()) {
                try {
                    DisruptorQueue.this.publishDirectSingle(obj, false);
                    z = true;
                } catch (InsufficientCapacityException e) {
                }
            }
            if (!z) {
                DisruptorQueue.this._overflowCount.incrementAndGet();
                this._overflow.add(obj);
            }
            if (!DisruptorQueue.this._enableBackpressure || DisruptorQueue.this._cb == null || DisruptorQueue.this._metrics.population() + DisruptorQueue.this._overflowCount.get() < DisruptorQueue.this._highWaterMark) {
                return;
            }
            try {
                if (!DisruptorQueue.this._throttleOn) {
                    DisruptorQueue.this._throttleOn = true;
                    DisruptorQueue.this._cb.highWaterMark();
                }
            } catch (Exception e2) {
                throw new RuntimeException("Exception during calling highWaterMark callback!", e2);
            }
        }

        @Override // org.apache.storm.utils.DisruptorQueue.ThreadLocalInserter
        public void forceBatch() {
        }

        @Override // org.apache.storm.utils.DisruptorQueue.ThreadLocalInserter
        public void flush(boolean z) {
            if (z) {
                this._flushLock.lock();
            } else if (!this._flushLock.tryLock()) {
                return;
            }
            while (!this._overflow.isEmpty()) {
                try {
                    DisruptorQueue.this.publishDirectSingle(this._overflow.peek(), z);
                    DisruptorQueue.this._overflowCount.addAndGet(-1L);
                    this._overflow.poll();
                } catch (InsufficientCapacityException e) {
                    return;
                } finally {
                    this._flushLock.unlock();
                }
            }
        }
    }

    private static int getNumFlusherPoolThreads() {
        int i = 100;
        try {
            i = Utils.getInt(Utils.readStormConfig().get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), 100).intValue();
        } catch (Exception e) {
            LOG.warn("Error while trying to read system config", e);
        }
        try {
            i = Integer.parseInt(System.getProperty("num_flusher_pool_threads", String.valueOf(i)));
        } catch (Exception e2) {
            LOG.warn("Error while parsing number of flusher pool threads", e2);
        }
        LOG.debug("Reading num_flusher_pool_threads Flusher pool threads: {}", Integer.valueOf(i));
        return i;
    }

    public DisruptorQueue(String str, ProducerType producerType, int i, long j, int i2, long j2, String str2, String str3, Integer num, int i3) {
        this._queueName = "";
        this._queueName = PREFIX + str;
        this._buffer = RingBuffer.create(producerType, new ObjectEventFactory(), i, j <= 0 ? new LiteBlockingWaitStrategy() : new TimeoutBlockingWaitStrategy(j, TimeUnit.MILLISECONDS));
        this._consumer = new Sequence();
        this._barrier = this._buffer.newBarrier(new Sequence[0]);
        this._buffer.addGatingSequences(new Sequence[]{this._consumer});
        this._metrics = new QueueMetrics();
        this._disruptorMetrics = StormMetricRegistry.disruptorMetrics(this._queueName, str2, str3, num, Integer.valueOf(i3));
        this._inputBatchSize = Math.max(1, Math.min(i2, i / 2));
        this._flusher = new Flusher(Math.max(j2, 1L), this._queueName);
        this._flusher.start();
        if (METRICS_REPORTER_EXECUTOR.isShutdown()) {
            return;
        }
        METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() { // from class: org.apache.storm.utils.DisruptorQueue.1
            @Override // java.lang.Runnable
            public void run() {
                DisruptorQueue.this._disruptorMetrics.set(DisruptorQueue.this._metrics);
            }
        }, 15L, 15L, TimeUnit.SECONDS);
    }

    public String getName() {
        return this._queueName;
    }

    public boolean isFull() {
        return this._metrics.population() + this._overflowCount.get() >= this._metrics.capacity();
    }

    public void haltWithInterrupt() {
        try {
            publishDirect(new ArrayList<>(Arrays.asList(INTERRUPT)), true);
            this._flusher.close();
            this._metrics.close();
            METRICS_REPORTER_EXECUTOR.shutdown();
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void consumeBatch(EventHandler<Object> eventHandler) {
        if (this._metrics.population() > 0) {
            consumeBatchWhenAvailable(eventHandler);
        }
    }

    public void consumeBatchWhenAvailable(EventHandler<Object> eventHandler) {
        try {
            long j = this._consumer.get() + 1;
            long waitFor = this._barrier.waitFor(j);
            if (waitFor >= j) {
                consumeBatchToCursor(waitFor, eventHandler);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (AlertException e2) {
            throw new RuntimeException((Throwable) e2);
        } catch (TimeoutException e3) {
        }
    }

    private void consumeBatchToCursor(long j, EventHandler<Object> eventHandler) {
        long j2 = this._consumer.get();
        while (true) {
            long j3 = j2 + 1;
            if (j3 > j) {
                this._consumer.set(j);
                return;
            }
            try {
                Object andSet = ((AtomicReference) this._buffer.get(j3)).getAndSet(null);
                if (andSet == INTERRUPT) {
                    throw new InterruptedException("Disruptor processing interrupted");
                }
                if (andSet == null) {
                    LOG.error("NULL found in {}:{}", getName(), Long.valueOf(j));
                } else {
                    this._metrics.notifyDepartures(getTupleCount(andSet));
                    eventHandler.onEvent(andSet, j3, j3 == j);
                    if (this._enableBackpressure && this._cb != null && (this._metrics.writePos() - j3) + this._overflowCount.get() <= this._lowWaterMark) {
                        try {
                            if (this._throttleOn) {
                                this._throttleOn = false;
                                this._cb.lowWaterMark();
                            }
                        } catch (Exception e) {
                            throw new RuntimeException("Exception during calling lowWaterMark callback!");
                        }
                    }
                }
                j2 = j3;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    public void registerBackpressureCallback(DisruptorBackpressureCallback disruptorBackpressureCallback) {
        this._cb = disruptorBackpressureCallback;
    }

    private static Long getId() {
        return Long.valueOf(Thread.currentThread().getId());
    }

    private long getTupleCount(Object obj) {
        long j;
        if (obj instanceof ArrayList) {
            j = ((ArrayList) obj).size();
        } else if (obj instanceof HashMap) {
            j = 0;
            while (((HashMap) obj).values().iterator().hasNext()) {
                j += ((ArrayList) r0.next()).size();
            }
        } else {
            j = 1;
        }
        return j;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishDirectSingle(Object obj, boolean z) throws InsufficientCapacityException {
        long next = z ? this._buffer.next() : this._buffer.tryNext();
        ((AtomicReference) this._buffer.get(next)).set(obj);
        this._buffer.publish(next);
        this._metrics.notifyArrivals(getTupleCount(obj));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishDirect(ArrayList<Object> arrayList, boolean z) throws InsufficientCapacityException {
        int size = arrayList.size();
        if (size > 0) {
            long next = z ? this._buffer.next(size) : this._buffer.tryNext(size);
            long j = next - (size - 1);
            long j2 = j;
            long j3 = 0;
            Iterator<Object> it = arrayList.iterator();
            while (it.hasNext()) {
                Object next2 = it.next();
                ((AtomicReference) this._buffer.get(j2)).set(next2);
                j2++;
                j3 += getTupleCount(next2);
            }
            this._metrics.notifyArrivals(j3);
            this._buffer.publish(j, next);
        }
    }

    public void publish(Object obj) {
        Long id = getId();
        ThreadLocalInserter threadLocalInserter = this._batchers.get(id);
        if (threadLocalInserter == null) {
            threadLocalInserter = this._inputBatchSize > 1 ? new ThreadLocalBatcher() : new ThreadLocalJustInserter();
            this._batchers.put(id, threadLocalInserter);
        }
        threadLocalInserter.add(obj);
        threadLocalInserter.flush(false);
    }

    @Override // org.apache.storm.metric.api.IStatefulObject
    public Object getState() {
        return this._metrics.getState();
    }

    public DisruptorQueue setHighWaterMark(double d) {
        this._highWaterMark = (int) (this._metrics.capacity() * d);
        return this;
    }

    public DisruptorQueue setLowWaterMark(double d) {
        this._lowWaterMark = (int) (this._metrics.capacity() * d);
        return this;
    }

    public int getHighWaterMark() {
        return this._highWaterMark;
    }

    public int getLowWaterMark() {
        return this._lowWaterMark;
    }

    public DisruptorQueue setEnableBackpressure(boolean z) {
        this._enableBackpressure = z;
        return this;
    }

    public boolean getThrottleOn() {
        return this._throttleOn;
    }

    public QueueMetrics getMetrics() {
        return this._metrics;
    }

    static /* synthetic */ int access$000() {
        return getNumFlusherPoolThreads();
    }
}
