package gov.nanoraptor.dataservices.protocol;

import gov.nanoraptor.api.dataservices.DataServicesSyncOption;
import gov.nanoraptor.commons.lang.ExceptionUtils;
import gov.nanoraptor.dataservices.channels.ADataMessageTask;
import gov.nanoraptor.dataservices.channels.BatchDataMessageTask;
import gov.nanoraptor.dataservices.channels.QueuedTask;
import gov.nanoraptor.dataservices.utils.RoundRobinPriorityBlockingQueue;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.Logger;

/* loaded from: classes.dex */
public abstract class APublisherMessagePump<T extends ADataMessageTask> extends Thread {
    protected static final int BATCH_DATA_MSG_COUNT = 5;
    private static final int PENDING_MSG_LIMIT = 10;
    private static final boolean allowBatching = true;
    private static final Logger logger = Logger.getLogger(APublisherMessagePump.class);
    private RoundRobinPriorityBlockingQueue<T> RDMQueue;
    private BlockingQueue<QueuedTask> RPMQueue;
    private RoundRobinPriorityBlockingQueue<T> backfillRDMQueue;
    private BlockingQueue<QueuedTask> controlQueue;
    private volatile int pendingDataMsgCount;
    private volatile boolean readyToSendData;
    private volatile boolean running;
    private final Object somethingToSend;

    public APublisherMessagePump() {
        this.pendingDataMsgCount = 0;
        this.somethingToSend = new Object();
        this.running = true;
        this.readyToSendData = false;
    }

    public APublisherMessagePump(String str) {
        super(str);
        this.pendingDataMsgCount = 0;
        this.somethingToSend = new Object();
        this.running = true;
        this.readyToSendData = false;
    }

    private boolean haveActionableWork() {
        return (!this.RPMQueue.isEmpty() || !this.controlQueue.isEmpty()) || ((!this.RDMQueue.isEmpty() || !this.backfillRDMQueue.isEmpty()) && this.pendingDataMsgCount < 10);
    }

    private void processRDMQueue(BlockingQueue<T> blockingQueue) throws IOException {
        if (this.pendingDataMsgCount >= 10) {
            logger.warn("Exceeded pending msg limit, skipping " + (blockingQueue == this.RDMQueue ? "Live" : "Backfill"));
            return;
        }
        T poll = blockingQueue.poll();
        if (poll != null) {
            if (blockingQueue.peek() == null) {
                processTask(poll);
                this.pendingDataMsgCount++;
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Creating Batch Task");
            }
            BatchDataMessageTask batchDataMessageTask = new BatchDataMessageTask();
            int i = 0;
            while (poll != null) {
                batchDataMessageTask.addTask(poll);
                i++;
                poll = i < 5 ? blockingQueue.poll() : null;
            }
            processTask(batchDataMessageTask);
            this.pendingDataMsgCount += batchDataMessageTask.getBatchSize();
        }
    }

    private boolean reprioritizeQueueEntry(RoundRobinPriorityBlockingQueue<T> roundRobinPriorityBlockingQueue, int i, int i2, int i3) {
        T removeById = roundRobinPriorityBlockingQueue.removeById(i, i2);
        if (removeById == null) {
            return false;
        }
        removeById.setMessagePriority(i3);
        roundRobinPriorityBlockingQueue.add((RoundRobinPriorityBlockingQueue<T>) removeById);
        return true;
    }

    private void waitForWork() {
        synchronized (this.somethingToSend) {
            if (logger.isDebugEnabled()) {
                logger.debug("Waiting for message to send...");
            }
            while (this.running && !haveActionableWork()) {
                try {
                    this.somethingToSend.wait(10000L);
                } catch (InterruptedException e) {
                }
                testBacklog();
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Done wating for work");
        }
    }

    public void clearQueuesForSyncOption(DataServicesSyncOption dataServicesSyncOption) {
        if (dataServicesSyncOption == DataServicesSyncOption.NONE) {
            this.RDMQueue.clear();
            this.backfillRDMQueue.clear();
        } else if (dataServicesSyncOption == DataServicesSyncOption.MOST_RECENT) {
            this.backfillRDMQueue.clear();
        }
    }

    public void dataMessageACKed() {
        if (logger.isTraceEnabled()) {
            logger.trace("ACK: pending=" + this.pendingDataMsgCount);
        }
        this.pendingDataMsgCount--;
        notifySomethingToSend();
    }

    protected abstract void doFinally(QueuedTask queuedTask);

    protected abstract void handleAckTimeout();

    protected abstract void handleIOException(IOException iOException);

    protected abstract void handleUnknownException(Exception exc);

    public void notifySomethingToSend() {
        synchronized (this.somethingToSend) {
            this.somethingToSend.notifyAll();
        }
    }

    protected abstract void processTask(QueuedTask queuedTask) throws IOException;

    public void queueBackfillDataMessageTask(T t) {
        this.backfillRDMQueue.offer((RoundRobinPriorityBlockingQueue<T>) t);
        notifySomethingToSend();
    }

    public boolean reprioritizeMessage(int i, int i2, int i3) {
        return reprioritizeQueueEntry(this.RDMQueue, i, i2, i3) || reprioritizeQueueEntry(this.backfillRDMQueue, i, i2, i3);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        QueuedTask poll;
        QueuedTask queuedTask = null;
        while (this.running) {
            while (this.running && haveActionableWork()) {
                try {
                    queuedTask = this.controlQueue.poll();
                    if (queuedTask != null) {
                        processTask(queuedTask);
                    } else if (this.readyToSendData) {
                        processRDMQueue(this.RDMQueue);
                        processRDMQueue(this.backfillRDMQueue);
                        testBacklog();
                        do {
                            poll = this.controlQueue.poll();
                            if (poll != null) {
                                processTask(poll);
                            }
                        } while (poll != null);
                        QueuedTask poll2 = this.RPMQueue.poll();
                        if (poll2 != null) {
                            processTask(poll2);
                        }
                        queuedTask = null;
                    }
                } catch (Exception e) {
                    if (ExceptionUtils.anyInstanceOf(e, InterruptedException.class)) {
                        logger.debug("TRDS publisher thread interrupted");
                    } else {
                        logger.error("Exception sending message", e);
                        handleUnknownException(e);
                    }
                } catch (AcknowledgeTimeoutException e2) {
                    handleAckTimeout();
                    this.running = false;
                } catch (IOException e3) {
                    handleIOException(e3);
                    this.running = false;
                } finally {
                    doFinally(queuedTask);
                }
            }
            waitForWork();
            waitForWork();
        }
        wrapUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setupQueues(BlockingQueue<QueuedTask> blockingQueue, BlockingQueue<QueuedTask> blockingQueue2, RoundRobinPriorityBlockingQueue<T> roundRobinPriorityBlockingQueue, RoundRobinPriorityBlockingQueue<T> roundRobinPriorityBlockingQueue2) {
        this.controlQueue = blockingQueue;
        this.RPMQueue = blockingQueue2;
        this.RDMQueue = roundRobinPriorityBlockingQueue;
        this.backfillRDMQueue = roundRobinPriorityBlockingQueue2;
    }

    public void startSendingData() {
        this.readyToSendData = true;
    }

    public void stopThread() {
        this.running = false;
        interrupt();
    }

    protected void testBacklog() {
    }

    protected abstract void wrapUp();
}
