package org.kuali.rice.kcb.quartz;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.jackson.JsonConstants;
import org.kuali.rice.kcb.api.exception.MessageDeliveryProcessingException;
import org.kuali.rice.kcb.bo.Message;
import org.kuali.rice.kcb.bo.MessageDelivery;
import org.kuali.rice.kcb.bo.MessageDeliveryStatus;
import org.kuali.rice.kcb.deliverer.BulkMessageDeliverer;
import org.kuali.rice.kcb.deliverer.MessageDeliverer;
import org.kuali.rice.kcb.quartz.ProcessingResult;
import org.kuali.rice.kcb.service.GlobalKCBServiceLocator;
import org.kuali.rice.kcb.service.MessageDelivererRegistryService;
import org.kuali.rice.kcb.service.MessageDeliveryService;
import org.kuali.rice.kcb.service.MessageService;
import org.kuali.rice.krad.data.DataObjectService;
import org.kuali.rice.krad.data.PersistenceOption;
import org.kuali.rice.krad.service.KRADServiceLocator;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.StatefulJob;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.annotation.AdviceModeImportSelector;

/* loaded from: input_file:WEB-INF/lib/rice-impl-2.5.3.1901.0004-kualico.jar:org/kuali/rice/kcb/quartz/MessageProcessingJob.class */
public class MessageProcessingJob extends ConcurrentJob<MessageDelivery> implements StatefulJob {
    public static final String NAME = "MessageProcessingJobDetail";
    public static final String GROUP = "KCB-Delivery";
    private static final Logger LOG = LogManager.getLogger((Class<?>) MessageProcessingJob.class);
    private DataObjectService dataObjectService;
    private MessageDelivererRegistryService registry;
    private MessageDeliveryService messageDeliveryService;
    private Long messageId;
    private Mode mode;
    private String user;
    private String cause;

    /* loaded from: input_file:WEB-INF/lib/rice-impl-2.5.3.1901.0004-kualico.jar:org/kuali/rice/kcb/quartz/MessageProcessingJob$Mode.class */
    public enum Mode {
        DELIVER,
        REMOVE
    }

    public MessageProcessingJob(Long l, Mode mode, String str, String str2) {
        this();
        this.messageId = l;
        this.mode = mode;
        this.user = str;
        this.cause = str2;
    }

    public MessageProcessingJob() {
        this.mode = null;
        this.registry = GlobalKCBServiceLocator.getInstance().getMessageDelivererRegistryService();
        this.messageDeliveryService = GlobalKCBServiceLocator.getInstance().getMessageDeliveryService();
        this.txManager = GlobalKCBServiceLocator.getInstance().getTransactionManager();
        this.dataObjectService = KRADServiceLocator.getDataObjectService();
    }

    public void setDataObjectService(DataObjectService dataObjectService) {
        this.dataObjectService = dataObjectService;
    }

    @Required
    public void setMessageDelivererRegistry(MessageDelivererRegistryService messageDelivererRegistryService) {
        this.registry = messageDelivererRegistryService;
    }

    @Required
    public void setMessageDeliveryService(MessageDeliveryService messageDeliveryService) {
        this.messageDeliveryService = messageDeliveryService;
    }

    @Override // org.kuali.rice.kcb.quartz.ConcurrentJob
    protected Collection<MessageDelivery> takeAvailableWorkItems() {
        MessageDeliveryStatus[] messageDeliveryStatusArr;
        switch (this.mode) {
            case DELIVER:
                messageDeliveryStatusArr = new MessageDeliveryStatus[]{MessageDeliveryStatus.UNDELIVERED};
                break;
            case REMOVE:
                if (this.messageId == null) {
                    throw new IllegalStateException("Message id must be specified for message removal mode");
                }
                messageDeliveryStatusArr = new MessageDeliveryStatus[]{MessageDeliveryStatus.DELIVERED, MessageDeliveryStatus.UNDELIVERED};
                break;
            default:
                throw new RuntimeException("Invalid mode: " + this.mode);
        }
        for (MessageDeliveryStatus messageDeliveryStatus : messageDeliveryStatusArr) {
            LOG.debug("Taking message deliveries with status: " + messageDeliveryStatus);
        }
        Collection<MessageDelivery> lockAndTakeMessageDeliveries = this.messageDeliveryService.lockAndTakeMessageDeliveries(this.messageId, messageDeliveryStatusArr);
        LOG.debug("Took " + lockAndTakeMessageDeliveries.size() + " deliveries");
        for (MessageDelivery messageDelivery : lockAndTakeMessageDeliveries) {
            LOG.debug(messageDelivery);
            messageDelivery.setProcessCount(Integer.valueOf(messageDelivery.getProcessCount().intValue() + 1));
        }
        return lockAndTakeMessageDeliveries;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.kuali.rice.kcb.quartz.ConcurrentJob
    public void unlockWorkItem(MessageDelivery messageDelivery) {
        messageDelivery.setLockedDate(null);
        this.dataObjectService.save(messageDelivery, new PersistenceOption[0]);
    }

    @Override // org.kuali.rice.kcb.quartz.ConcurrentJob
    protected Collection<Collection<MessageDelivery>> groupWorkItems(Collection<MessageDelivery> collection, ProcessingResult<MessageDelivery> processingResult) {
        ArrayList arrayList = new ArrayList(collection.size());
        HashMap hashMap = new HashMap();
        for (MessageDelivery messageDelivery : collection) {
            MessageDeliverer deliverer = this.registry.getDeliverer(messageDelivery);
            if (deliverer == null) {
                LOG.error("Error obtaining message deliverer for message delivery: " + messageDelivery);
                processingResult.addFailure(new ProcessingResult.Failure<>(messageDelivery, "Error obtaining message deliverer for message delivery"));
                unlockWorkItemAtomically(messageDelivery);
            } else if (deliverer instanceof BulkMessageDeliverer) {
                String str = messageDelivery.getDelivererTypeName() + ":" + messageDelivery.getMessage().getId();
                Collection collection2 = (Collection) hashMap.get(str);
                if (collection2 == null) {
                    collection2 = new LinkedList();
                    hashMap.put(str, collection2);
                }
                collection2.add(messageDelivery);
            } else {
                ArrayList arrayList2 = new ArrayList(1);
                arrayList2.add(messageDelivery);
                arrayList.add(arrayList2);
            }
        }
        return arrayList;
    }

    @Override // org.kuali.rice.kcb.quartz.ConcurrentJob
    protected Collection<MessageDelivery> processWorkItems(Collection<MessageDelivery> collection) {
        MessageDelivery next = collection.iterator().next();
        MessageDeliverer deliverer = this.registry.getDeliverer(next);
        if (deliverer == null) {
            throw new RuntimeException("Message deliverer could not be obtained");
        }
        if (collection.size() <= 1) {
            return process(deliverer, next, this.mode);
        }
        if (deliverer instanceof BulkMessageDeliverer) {
            return bulkProcess((BulkMessageDeliverer) deliverer, collection, this.mode);
        }
        throw new RuntimeException("Discrepency in dispatch service: deliverer for list of message deliveries is not a BulkMessageDeliverer");
    }

    protected Collection<MessageDelivery> process(MessageDeliverer messageDeliverer, MessageDelivery messageDelivery, Mode mode) {
        try {
            if (mode == Mode.DELIVER) {
                messageDeliverer.deliver(messageDelivery);
                messageDelivery.setProcessCount(0);
                updateStatusAndUnlock(messageDelivery, mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED);
            } else {
                messageDeliverer.dismiss(messageDelivery, this.user, this.cause);
                this.messageDeliveryService.deleteMessageDelivery(messageDelivery);
            }
            LOG.debug("Message delivery '" + messageDelivery.getId() + "' for message '" + messageDelivery.getMessage().getId() + "' was successfully processed.");
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(messageDelivery);
            return arrayList;
        } catch (MessageDeliveryProcessingException e) {
            LOG.error("Error processing message delivery " + messageDelivery, (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    protected Collection<MessageDelivery> bulkProcess(BulkMessageDeliverer bulkMessageDeliverer, Collection<MessageDelivery> collection, Mode mode) {
        MessageDeliveryStatus messageDeliveryStatus = mode == Mode.DELIVER ? MessageDeliveryStatus.DELIVERED : MessageDeliveryStatus.REMOVED;
        try {
            if (mode == Mode.DELIVER) {
                bulkMessageDeliverer.bulkDeliver(collection);
            } else {
                bulkMessageDeliverer.bulkDismiss(collection);
            }
            ArrayList arrayList = new ArrayList(collection.size());
            for (MessageDelivery messageDelivery : collection) {
                arrayList.add(messageDelivery);
                LOG.debug("Message delivery '" + messageDelivery.getId() + "' for notification '" + messageDelivery.getMessage().getId() + "' was successfully delivered.");
                if (mode == Mode.REMOVE) {
                    this.messageDeliveryService.deleteMessageDelivery(messageDelivery);
                } else {
                    messageDelivery.setProcessCount(0);
                    updateStatusAndUnlock(messageDelivery, messageDeliveryStatus);
                }
            }
            return arrayList;
        } catch (MessageDeliveryProcessingException e) {
            LOG.error("Error bulk-delivering messages " + collection, (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.kuali.rice.kcb.quartz.ConcurrentJob
    protected void finishProcessing(ProcessingResult<MessageDelivery> processingResult) {
        LOG.debug("Message processing job: " + processingResult.getSuccesses().size() + " processed, " + processingResult.getFailures().size() + " failures");
        HashSet<Long> hashSet = new HashSet(processingResult.getSuccesses().size());
        Iterator<MessageDelivery> it = processingResult.getSuccesses().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getMessage().getId());
        }
        MessageService kcbMessageService = GlobalKCBServiceLocator.getInstance().getKcbMessageService();
        for (Long l : hashSet) {
            LOG.debug("Finishing processing message " + l);
            Message message = kcbMessageService.getMessage(l);
            Collection<MessageDelivery> messageDeliveries = this.messageDeliveryService.getMessageDeliveries(message);
            if (messageDeliveries.size() == 0) {
                LOG.debug("Deleting message " + message);
                kcbMessageService.deleteMessage(message);
            } else {
                LOG.debug("Message " + message.getId() + " has " + messageDeliveries.size() + " deliveries");
                Iterator<MessageDelivery> it2 = messageDeliveries.iterator();
                while (it2.hasNext()) {
                    LOG.debug(it2.next());
                }
            }
        }
    }

    protected void updateStatusAndUnlock(MessageDelivery messageDelivery, MessageDeliveryStatus messageDeliveryStatus) {
        messageDelivery.setDeliveryStatus(messageDeliveryStatus);
        messageDelivery.setLockedDate(null);
        this.dataObjectService.save(messageDelivery, new PersistenceOption[0]);
    }

    @Override // org.kuali.rice.kcb.quartz.ConcurrentJob
    public ProcessingResult<MessageDelivery> run() {
        LOG.debug("MessageProcessingJob running in Thread " + Thread.currentThread() + ": " + this.mode + " " + this.user + " " + this.cause);
        return super.run();
    }

    @Override // org.quartz.Job
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        String string = jobExecutionContext.getMergedJobDataMap().getString(AdviceModeImportSelector.DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME);
        if (string != null) {
            this.mode = Mode.valueOf(string);
        } else {
            this.mode = Mode.DELIVER;
        }
        this.user = jobExecutionContext.getMergedJobDataMap().getString("user");
        this.cause = jobExecutionContext.getMergedJobDataMap().getString(JsonConstants.ELT_CAUSE);
        if (jobExecutionContext.getMergedJobDataMap().containsKey("messageId")) {
            this.messageId = Long.valueOf(jobExecutionContext.getMergedJobDataMap().getLong("messageId"));
        }
        LOG.debug("==== message processing job: " + this.mode + " message id: " + this.messageId + "====");
        super.run();
    }
}
