package gov.nanoraptor.core.connection.dataservices;

import gov.nanoraptor.Raptor;
import gov.nanoraptor.api.dataportal.IRaptorDataListener;
import gov.nanoraptor.api.dataportal.IRaptorDataPortal;
import gov.nanoraptor.api.dataportal.IRaptorDataPortalController;
import gov.nanoraptor.api.dataportal.IRaptorPropertyListener;
import gov.nanoraptor.api.dataservices.IChannelDefinition;
import gov.nanoraptor.api.dataservices.ICustomCommand;
import gov.nanoraptor.api.dataservices.NetworkNodeInfo;
import gov.nanoraptor.api.mapobject.IMapObject;
import gov.nanoraptor.api.messages.IMapEntity;
import gov.nanoraptor.api.messages.IMapObjectOfflineMessage;
import gov.nanoraptor.api.messages.IMapObjectProxy;
import gov.nanoraptor.api.messages.IPrePersistRaptorPropertyMessage;
import gov.nanoraptor.api.messages.IRaptorDataMessage;
import gov.nanoraptor.api.messages.IRaptorDataStructure;
import gov.nanoraptor.api.messages.IRaptorMessage;
import gov.nanoraptor.api.messages.IRaptorPropertyMessage;
import gov.nanoraptor.api.messages.internal.IPriorityAssignmentListener;
import gov.nanoraptor.api.persist.IRDMPersist;
import gov.nanoraptor.api.ping.IPingEvent;
import gov.nanoraptor.api.ping.IPingListener;
import gov.nanoraptor.api.services.IRaptorProjectSession;
import gov.nanoraptor.core.persist.PersistService;
import gov.nanoraptor.dataservices.channels.ADataMessageTask;
import gov.nanoraptor.dataservices.channels.BackfillGap;
import gov.nanoraptor.dataservices.channels.BackfillMessageTaskFields;
import gov.nanoraptor.dataservices.channels.BackfillTask;
import gov.nanoraptor.dataservices.channels.BatchDataMessageTask;
import gov.nanoraptor.dataservices.channels.CommandTask;
import gov.nanoraptor.dataservices.channels.CustomCommandTask;
import gov.nanoraptor.dataservices.channels.PropertyMessageTask;
import gov.nanoraptor.dataservices.channels.QueuedTask;
import gov.nanoraptor.dataservices.persist.TerraRaptorMessageManager;
import gov.nanoraptor.dataservices.protocol.APublisherMessagePump;
import gov.nanoraptor.dataservices.protocol.Command;
import gov.nanoraptor.dataservices.protocol.CommandType;
import gov.nanoraptor.dataservices.protocol.DeleteMapObjectCommand;
import gov.nanoraptor.dataservices.protocol.NetworkInfoCommand;
import gov.nanoraptor.dataservices.protocol.PingCommand;
import gov.nanoraptor.dataservices.protocol.PriorityCommand;
import gov.nanoraptor.dataservices.protocol.Protocol;
import gov.nanoraptor.dataservices.utils.HashUtils;
import gov.nanoraptor.dataservices.utils.RoundRobinPriorityBlockingQueue;
import gov.nanoraptor.platform.KeyUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes.dex */
public class DataServicesPublisher extends APublisherMessagePump<ClientDataMessageTask> implements IRaptorDataListener, IRaptorPropertyListener, IPingListener, IPriorityAssignmentListener {
    private static final long NETWORK_INFO_FREQUENCY = 60000;
    private static final long NETWORK_INFO_INIT = 30000;
    private static Logger logger = Logger.getLogger(DataServicesPublisher.class);
    private DataServicesSubscriber connection;
    private final IRaptorDataPortal dataPortal;
    private DataServicesPersist dsPersist;
    private boolean enabled;
    private NetworkInfoTimerTask networkInfoTimerTask;
    private Protocol protocol;
    private IRDMPersist rdmPersist;
    private int trChannelId;
    private final RoundRobinPriorityBlockingQueue<ClientDataMessageTask> RDMQueue = new RoundRobinPriorityBlockingQueue<>();
    private final RoundRobinPriorityBlockingQueue<ClientDataMessageTask> backfillRDMQueue = new RoundRobinPriorityBlockingQueue<>();
    private final BlockingQueue<QueuedTask> controlQueue = new LinkedBlockingQueue();
    private final BlockingQueue<QueuedTask> RPMQueue = new LinkedBlockingQueue();
    private boolean lastBacklogState = false;
    private final Map<String, String> mapEntitiesSent = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public final class NetworkInfoTimerTask extends TimerTask {
        private NetworkInfoTimerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            NetworkNodeInfo networkNodeInfo = new NetworkNodeInfo();
            networkNodeInfo.setNodeUUID(DataServicesPublisher.this.connection.getUUID());
            networkNodeInfo.setUserName(DataServicesPublisher.this.connection.getUsername());
            networkNodeInfo.setConnectionStartTime(DataServicesPublisher.this.connection.getConnectionStartTime());
            networkNodeInfo.setOutboundQueueSize(DataServicesPublisher.this.RDMQueue.size());
            networkNodeInfo.setBackfillQueueSize(DataServicesPublisher.this.backfillRDMQueue.size());
            DataServicesPublisher.this.queueCommand(new NetworkInfoCommand(networkNodeInfo));
        }
    }

    public DataServicesPublisher(DataServicesSubscriber dataServicesSubscriber, Protocol protocol, int i) {
        if (dataServicesSubscriber == null) {
            throw new IllegalArgumentException("Parameter connection may not be null");
        }
        if (protocol == null) {
            throw new IllegalArgumentException("Parameter protocol may not be null");
        }
        logger.debug("Data Services publisher starting");
        IRaptorProjectSession projectSession = Raptor.getProjectSession();
        this.dataPortal = projectSession.getRaptorDataPortal();
        PersistService persistService = projectSession.getPersistService();
        this.dsPersist = new DataServicesPersist();
        this.rdmPersist = persistService.getRDMPersist();
        this.protocol = protocol;
        this.trChannelId = i;
        this.connection = dataServicesSubscriber;
        setupQueues(this.controlQueue, this.RPMQueue, this.RDMQueue, this.backfillRDMQueue);
        setName("TRDS pub: " + dataServicesSubscriber.getChannelDescriptor().getName());
    }

    private void addControlTask(QueuedTask queuedTask) {
        this.controlQueue.add(queuedTask);
        notifySomethingToSend();
    }

    private boolean canSendMessage(IMapObject iMapObject) {
        IChannelDefinition channelDefinition = this.connection.getChannelDefinition();
        boolean z = this.enabled && this.connection.isPublisher() && channelDefinition.matches(iMapObject);
        if (!z && logger.isTraceEnabled()) {
            if (!this.enabled) {
                logger.trace("Not sending message because publisher is disabled");
            } else if (!this.connection.isPublisher()) {
                logger.trace("Not sending message because connection is not publisher");
            } else if (!channelDefinition.matches(iMapObject)) {
                logger.trace("Not sending message because it does not match channel definition");
            }
        }
        return z;
    }

    private boolean canSendMessage(IMapEntity iMapEntity) {
        IChannelDefinition channelDefinition = this.connection.getChannelDefinition();
        boolean z = this.enabled && this.connection.isPublisher() && !this.connection.isFromTRDS(iMapEntity.getSource()) && channelDefinition.matches(iMapEntity);
        if (!z && logger.isTraceEnabled()) {
            if (!this.enabled) {
                logger.trace("Not sending message because publisher is disabled");
            } else if (!this.connection.isPublisher()) {
                logger.trace("Not sending message because connection is not publisher");
            } else if (this.connection.isFromTRDS(iMapEntity.getSource())) {
                logger.trace("Not sending message because it originated from TRDS");
            } else if (!channelDefinition.matches(iMapEntity)) {
                logger.trace("Not sending message because it does not match channel definition");
            }
        }
        return z;
    }

    private boolean canSendMessage(IRaptorMessage iRaptorMessage) {
        IChannelDefinition channelDefinition = this.connection.getChannelDefinition();
        boolean z = this.enabled && this.connection.isPublisher() && !this.connection.isFromTRDS(iRaptorMessage.getSource()) && channelDefinition.matches(iRaptorMessage);
        if (!z && logger.isTraceEnabled()) {
            if (!this.enabled) {
                logger.trace("Not sending message because publisher is disabled");
            } else if (!this.connection.isPublisher()) {
                logger.trace("Not sending message because connection is not publisher");
            } else if (this.connection.isFromTRDS(iRaptorMessage.getSource())) {
                logger.trace("Not sending message because it originated from TRDS");
            } else if (!channelDefinition.matches(iRaptorMessage)) {
                logger.trace("Not sending message because it does not match channel definition");
            }
        }
        return z;
    }

    private void queuePropertyMessage(IRaptorPropertyMessage iRaptorPropertyMessage, boolean z) {
        if (!this.connection.isPublisher()) {
            throw new IllegalStateException("You can't publish on this channel");
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Queueing RPM " + iRaptorPropertyMessage);
        }
        ClientPropertyMessageTask clientPropertyMessageTask = new ClientPropertyMessageTask(iRaptorPropertyMessage, this.connection, this.dsPersist);
        if (z) {
            clientPropertyMessageTask.setStateRPM(z);
            this.controlQueue.add(clientPropertyMessageTask);
        } else {
            this.RPMQueue.add(clientPropertyMessageTask);
        }
        notifySomethingToSend();
    }

    public void backfillMessages(List<BackfillMessageTaskFields> list) {
        if (!this.connection.isPublisher()) {
            throw new IllegalStateException("You can't publish on this channel");
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Adding " + list.size() + " to backfill queue");
        }
        for (BackfillMessageTaskFields backfillMessageTaskFields : list) {
            int messageId = backfillMessageTaskFields.getMessageId();
            if (!this.RDMQueue.containsById(messageId, backfillMessageTaskFields.getRoundRobinGroupId().intValue())) {
                queueBackfillDataMessageTask(new ClientDataMessageTask(backfillMessageTaskFields, this.rdmPersist));
            } else if (logger.isDebugEnabled()) {
                logger.debug("RDM already on queue: ID=" + messageId);
            }
        }
    }

    public void disable() {
        if (this.enabled) {
            this.enabled = false;
            this.dataPortal.removeListener(this);
            this.dataPortal.removePropertyListener(this);
            Raptor.getProjectSession().getPingService().removeListener(this);
            TerraRaptorMessageManager.instance().removePriorityAssignmentListener(this);
            if (this.networkInfoTimerTask != null) {
                this.networkInfoTimerTask.cancel();
                this.networkInfoTimerTask = null;
            }
            stopThread();
        }
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void doFinally(QueuedTask queuedTask) {
    }

    public void enable() {
        if (this.enabled) {
            return;
        }
        this.enabled = true;
        this.dataPortal.addListener(this);
        this.dataPortal.addPropertyListener(this);
        IRaptorProjectSession projectSession = Raptor.getProjectSession();
        projectSession.getPingService().addListener(this);
        TerraRaptorMessageManager.instance().addPriorityAssignmentListener(this);
        startSendingData();
        if (this.networkInfoTimerTask != null) {
            this.networkInfoTimerTask.cancel();
            this.networkInfoTimerTask = null;
        }
        this.networkInfoTimerTask = new NetworkInfoTimerTask();
        projectSession.getTimerService().scheduleAtFixedRate(this.networkInfoTimerTask, NETWORK_INFO_INIT, 60000L);
        start();
    }

    public int getControlQueueSize() {
        return this.controlQueue.size();
    }

    public int getOutboundBackfillQueueSize() {
        return this.backfillRDMQueue.size();
    }

    public int getOutboundQueueSize() {
        return this.RDMQueue.size();
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void handleAckTimeout() {
        logger.warn("Time out waiting for ack on channel " + this.connection.getChannelDescriptor().getName());
        this.connection.breakConnection(true);
        stopThread();
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void handleIOException(IOException iOException) {
        logger.warn("Lost connection to channel " + this.connection.getChannelDescriptor().getName() + " (" + iOException.getMessage() + ")", iOException);
        this.connection.breakConnection(true);
        stopThread();
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void handleUnknownException(Exception exc) {
    }

    public boolean isBacklogged() {
        return this.lastBacklogState;
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public boolean matchMapObject(IMapObjectProxy iMapObjectProxy) {
        return true;
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public void onDataMessage(String str, IRaptorDataMessage iRaptorDataMessage, IRaptorDataStructure iRaptorDataStructure) {
        IChannelDefinition channelDefinition = this.connection.getChannelDefinition();
        if (iRaptorDataMessage.isPersistable() && canSendMessage(iRaptorDataMessage) && !this.dsPersist.isMessageOnChannel(iRaptorDataMessage, channelDefinition)) {
            queueCurrentMessage(iRaptorDataMessage);
        }
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public void onDisconnect(IRaptorDataPortalController iRaptorDataPortalController) {
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public void onMapObjectAdded(IMapObjectProxy iMapObjectProxy, IRaptorDataPortalController iRaptorDataPortalController) {
        IMapEntity mapEntity = iMapObjectProxy.getMapEntity();
        IMapObject mapObject = iMapObjectProxy.getMapObject();
        onMapObjectDefined(mapEntity);
        if (canSendMessage(mapObject)) {
            IPrePersistRaptorPropertyMessage fullStatePropertyMessage = mapObject.getFullStatePropertyMessage();
            if (logger.isDebugEnabled()) {
                logger.debug("Send STATE RPM for: " + mapObject.getKey() + " :" + fullStatePropertyMessage);
            }
            queuePropertyMessage(fullStatePropertyMessage, true);
        }
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public void onMapObjectDefined(IMapEntity iMapEntity) {
        if (canSendMessage(iMapEntity)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Sending MapEntity: " + iMapEntity);
            }
            queueDefinition(iMapEntity);
        }
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public void onMapObjectOffline(IMapObjectOfflineMessage iMapObjectOfflineMessage, IRaptorDataPortalController iRaptorDataPortalController, boolean z) {
        String mapObjectKey = iMapObjectOfflineMessage.getMapObjectKey();
        boolean isFromTRDS = this.connection.isFromTRDS(iMapObjectOfflineMessage.getSource());
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("MO Offline: key=%s deleted=%s fromTRDS=%s", mapObjectKey, Boolean.valueOf(z), Boolean.valueOf(isFromTRDS)));
        }
        if (!z || isFromTRDS) {
            return;
        }
        IMapObject mapObject = iMapObjectOfflineMessage.getMOP().getMapObject();
        String uniqueKey = KeyUtils.getUniqueKey(mapObject.getMapentity());
        long timeOfOperation = iMapObjectOfflineMessage.getTimeOfOperation();
        if (logger.isTraceEnabled()) {
            logger.trace("Testing MO: " + mapObjectKey);
        }
        if (canSendMessage(mapObject)) {
            logger.debug("Queueing MOState command for " + mapObjectKey);
            queueCommand(new DeleteMapObjectCommand(uniqueKey, mapObject.getUnitID(), timeOfOperation));
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Updating deleted MO: " + mapObjectKey + " time=" + timeOfOperation);
            }
            this.connection.updateDeletionTime(mapObjectKey, timeOfOperation);
        }
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorDataListener
    public void onMapObjectUnsuppress(String str, String str2, String str3) {
    }

    @Override // gov.nanoraptor.api.ping.IPingListener
    public void onPing(IPingEvent iPingEvent) {
        if (!this.connection.isPublisher() || this.connection.isFromTRDS(iPingEvent.getSource()) || iPingEvent.isPluginEvent()) {
            return;
        }
        queueCommand(new PingCommand(iPingEvent));
    }

    @Override // gov.nanoraptor.api.messages.internal.IPriorityAssignmentListener
    public void onPriorityAssignedToRDM(int i, int i2, IRaptorDataMessage iRaptorDataMessage) {
        Integer valueOf = Integer.valueOf(iRaptorDataMessage.getId());
        if (valueOf.intValue() == -1) {
            return;
        }
        if (reprioritizeMessage(valueOf.intValue(), Integer.valueOf(iRaptorDataMessage.getMapObjectProxy().getId()).intValue(), i2)) {
            return;
        }
        if (this.dsPersist.isMessageOnChannel(iRaptorDataMessage, this.connection.getChannelDefinition())) {
            queueCommand(new PriorityCommand(i2, valueOf));
        }
    }

    @Override // gov.nanoraptor.api.dataportal.IRaptorPropertyListener
    public void onPropertyMessage(String str, IRaptorPropertyMessage iRaptorPropertyMessage) {
        if (canSendMessage(iRaptorPropertyMessage)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Publish RPM: " + iRaptorPropertyMessage);
            }
            queuePropertyMessage(iRaptorPropertyMessage);
        }
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void processTask(QueuedTask queuedTask) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug("Processing " + queuedTask);
        }
        long nanoTime = System.nanoTime();
        queuedTask.process(this.protocol);
        if (queuedTask instanceof BatchDataMessageTask) {
            this.connection.updateMessageSendTime(nanoTime, ((BatchDataMessageTask) queuedTask).getBatchSize());
        } else if ((queuedTask instanceof ADataMessageTask) || (queuedTask instanceof PropertyMessageTask)) {
            this.connection.updateMessageSendTime(nanoTime, 1);
        }
    }

    public void queueCommand(Command command) {
        CommandType commandType = command.getCommandType();
        if (!(commandType == CommandType.SYNC || commandType == CommandType.RPC || commandType == CommandType.NETWORK_INFO || commandType == CommandType.PING) && !this.connection.isPublisher()) {
            throw new IllegalStateException("You can't publish on this channel");
        }
        addControlTask(new CommandTask(command));
    }

    public void queueCurrentMessage(IRaptorDataMessage iRaptorDataMessage) {
        if (!this.connection.isPublisher()) {
            throw new IllegalStateException("You can't publish on this channel");
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Queueing RDM " + iRaptorDataMessage.getUUID());
        }
        this.RDMQueue.add((RoundRobinPriorityBlockingQueue<ClientDataMessageTask>) new ClientDataMessageTask(iRaptorDataMessage, this.rdmPersist));
        notifySomethingToSend();
    }

    public void queueCustomCommand(ICustomCommand iCustomCommand) {
        addControlTask(new CustomCommandTask(iCustomCommand));
    }

    public void queueDefinition(IMapEntity iMapEntity) {
        if (!this.connection.isPublisher()) {
            throw new IllegalStateException("You can't publish on this channel");
        }
        String uniqueKey = KeyUtils.getUniqueKey(iMapEntity);
        String hash = HashUtils.getHash(iMapEntity);
        if (hash.equals(this.mapEntitiesSent.get(uniqueKey))) {
            if (logger.isDebugEnabled()) {
                logger.debug("Not sending ME, already sent: " + uniqueKey);
            }
        } else {
            this.mapEntitiesSent.put(uniqueKey, hash);
            this.protocol.getProtocolKeyTracker().addMapEntityStructures(iMapEntity);
            addControlTask(new ClientDefinitionTask(iMapEntity));
        }
    }

    public void queuePropertyMessage(IRaptorPropertyMessage iRaptorPropertyMessage) {
        queuePropertyMessage(iRaptorPropertyMessage, false);
    }

    public void requestBackfill(List<BackfillGap> list) {
        if (list.size() > 0) {
            addControlTask(new BackfillTask(list, this.trChannelId));
        }
    }

    void sendStateRPMs(Map<String, Collection<String>> map) {
        if (!this.connection.isPublisher()) {
            throw new IllegalStateException("You can't publish on this channel");
        }
        for (IMapObject iMapObject : this.dsPersist.findAllMapObjectsOnChannel(this.connection.getChannelDefinition())) {
            String key = iMapObject.getKey();
            Collection<String> collection = map.get(key);
            if (logger.isDebugEnabled()) {
                logger.debug("MO " + key + " LM fields: " + collection);
            }
            IPrePersistRaptorPropertyMessage fullStatePropertyMessage = iMapObject.getFullStatePropertyMessage();
            if (logger.isTraceEnabled()) {
                logger.trace("MO " + key + " FULL STATE RPM: " + fullStatePropertyMessage);
            }
            if (collection == null) {
                if (canSendMessage(fullStatePropertyMessage)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send STATE RPM for: " + key);
                    }
                    queuePropertyMessage(fullStatePropertyMessage, true);
                }
            } else if (!collection.isEmpty()) {
                Set<String> fieldNames = fullStatePropertyMessage.getFieldNames();
                fieldNames.removeAll(collection);
                Iterator<String> it = fieldNames.iterator();
                while (it.hasNext()) {
                    fullStatePropertyMessage.removeField(it.next());
                }
                Set<String> fieldNames2 = fullStatePropertyMessage.getFieldNames();
                if (fieldNames2.isEmpty()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Skip sending bcause no fields remain");
                    }
                } else if (canSendMessage(fullStatePropertyMessage)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send UPDATE RPM for " + key + " with fields: " + fieldNames2);
                    }
                    queuePropertyMessage(fullStatePropertyMessage);
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug("No locally modified fields");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startupComplete(Map<String, Collection<String>> map) {
        enable();
        if (this.connection.isPublisher()) {
            sendStateRPMs(map);
        }
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void testBacklog() {
        boolean z = getOutboundQueueSize() + getOutboundBackfillQueueSize() > 0 && System.currentTimeMillis() - this.connection.getLastMessageSentTime() > NETWORK_INFO_INIT;
        if (z != this.lastBacklogState) {
            if (logger.isDebugEnabled()) {
                logger.debug("backlog state changed: now=" + z);
            }
            this.lastBacklogState = z;
            this.connection.fireChannelStateChanged();
        }
    }

    @Override // gov.nanoraptor.dataservices.protocol.APublisherMessagePump
    protected void wrapUp() {
        logger.debug("Data services publisher for channel " + this.connection.getChannelDescriptor().getName() + " stopping, clearing message queues");
        this.RDMQueue.clear();
        this.RPMQueue.clear();
        this.backfillRDMQueue.clear();
        this.controlQueue.clear();
    }
}
