package gov.nanoraptor.dataservices.protocol;

import gov.nanoraptor.api.dataservices.ChannelDescriptor;
import gov.nanoraptor.api.dataservices.DataServicesSyncOption;
import gov.nanoraptor.api.dataservices.ICustomCommand;
import gov.nanoraptor.api.messages.IMapEntity;
import gov.nanoraptor.api.messages.IRaptorDataMessage;
import gov.nanoraptor.api.messages.IRaptorPropertyMessage;
import gov.nanoraptor.api.registry.IDataStructureRegistry;
import gov.nanoraptor.commons.file.FileUtils;
import gov.nanoraptor.dataservices.channels.BackfillGap;
import gov.nanoraptor.dataservices.channels.ChannelDefinition;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/* loaded from: classes.dex */
public class Protocol {
    private static final String HANDSHAKE = "TRDS";
    private static final long KEEP_ALIVE = 5000;
    private static final byte PROTOCOL_VERSION = 17;
    private static final long TIMEOUT = 60000;
    private Map<String, ICustomCommand> customCommands;
    private DataInput input;
    private KeepAliveTimerTask keepAliveTask;
    private DataOutputStream output;
    private ProtocolReader reader;
    private final IDataStructureRegistry registry;
    private boolean server;
    private long serverClockSkew;
    private Runnable threadCleanupCallback;
    private static final Logger logger = Logger.getLogger(Protocol.class);
    private static final Command OK_COMMAND = new OkCommand();
    private static final Command LOGOUT_COMMAND = new LogoutCommand();
    private static final Command KEEP_ALIVE_COMMAND = new KeepAliveCommand();
    private final ByteArrayOutputStream outputBuffer = new ByteArrayOutputStream(FileUtils.CHUNK_SIZE);
    private StructureSortedFieldsCache cache = new StructureSortedFieldsCache();
    private BlockingQueue<Object> acknowledge = new ArrayBlockingQueue(1);
    private BlockingQueue<Object> normal = new ArrayBlockingQueue(1);
    private final Object writeLock = new Object();
    private ReaderThread thread = new ReaderThread();
    private Timer timer = new Timer("Data services protocol keep alive", true);
    private volatile boolean running = true;
    private final MessageProtocolKeyTracker protocolKeyTracker = new MessageProtocolKeyTracker();
    private long totalBytesSent = 0;
    private long totalBytesReceived = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public class KeepAliveTimerTask extends TimerTask {
        private KeepAliveTimerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                Protocol.this.writeCommand(Protocol.KEEP_ALIVE_COMMAND);
            } catch (IOException e) {
                Protocol.this.timer.cancel();
                Protocol.this.putException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public class ReaderThread extends Thread {
        public ReaderThread() {
            super("Data services protocol reader");
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (Protocol.this.running) {
                try {
                    try {
                        try {
                            Command readCommand = Protocol.this.reader.readCommand();
                            if (readCommand.getCommandType() != CommandType.KEEP_ALIVE) {
                                while (true) {
                                    if (Protocol.this.running) {
                                        try {
                                            if (readCommand instanceof AAckCommand) {
                                                Protocol.this.acknowledge.put(readCommand);
                                            } else {
                                                Protocol.this.normal.put(readCommand);
                                            }
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                }
                            }
                        } catch (Throwable th) {
                            if (Protocol.this.threadCleanupCallback != null) {
                                Protocol.this.threadCleanupCallback.run();
                            }
                            throw th;
                        }
                    } catch (Exception e2) {
                        Protocol.this.putException(new CommandReaderException(e2));
                        if (Protocol.this.threadCleanupCallback != null) {
                            Protocol.this.threadCleanupCallback.run();
                            return;
                        }
                        return;
                    }
                } catch (IOException e3) {
                    Protocol.this.putException(e3);
                    if (Protocol.this.threadCleanupCallback != null) {
                        Protocol.this.threadCleanupCallback.run();
                        return;
                    }
                    return;
                }
            }
            if (Protocol.this.threadCleanupCallback != null) {
                Protocol.this.threadCleanupCallback.run();
            }
        }
    }

    public Protocol(String str, InputStream inputStream, OutputStream outputStream, IDataStructureRegistry iDataStructureRegistry, boolean z) {
        if (str != null) {
            this.thread.setName(str + " reader");
        }
        this.customCommands = new HashMap();
        this.server = z;
        this.input = new DataInputStream(inputStream);
        this.output = new DataOutputStream(outputStream);
        this.registry = iDataStructureRegistry;
        this.reader = new ProtocolReader(this, this.input, this.registry, this.cache, this.server);
    }

    private void checkThreadNotAlive() {
        if (this.thread.isAlive()) {
            throw new IllegalStateException("Protocol thread already started");
        }
    }

    private Command getCommand(Object obj) throws IOException, CommandReaderException {
        if (obj instanceof IOException) {
            throw ((IOException) obj);
        }
        if (obj instanceof CommandReaderException) {
            throw ((CommandReaderException) obj);
        }
        return (Command) obj;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putException(Exception exc) {
        this.acknowledge.offer(exc);
        while (this.running) {
            try {
                this.normal.offer(exc, 5L, TimeUnit.SECONDS);
                return;
            } catch (InterruptedException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, ICustomCommand> getCustomCommands() {
        return this.customCommands;
    }

    public MessageProtocolKeyTracker getProtocolKeyTracker() {
        return this.protocolKeyTracker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProtocolReader getReader() {
        return this.reader;
    }

    public IDataStructureRegistry getRegistry() {
        return this.registry;
    }

    public long getServerClockSkew() {
        return this.serverClockSkew;
    }

    public long getTotalReceivedBytes() {
        return this.totalBytesReceived;
    }

    public long getTotalSentBytes() {
        return this.totalBytesSent;
    }

    public boolean isServer() {
        return this.server;
    }

    public Command readAcknowledgeCommand() throws IOException {
        while (this.running) {
            try {
                Command command = getCommand(this.acknowledge.poll(60000L, TimeUnit.MILLISECONDS));
                if (command != null) {
                    return command;
                }
                throw new AcknowledgeTimeoutException(60000L);
                break;
            } catch (CommandReaderException e) {
                throw new IOException("Failed to read acknolwedge command", e);
            } catch (InterruptedException e2) {
            }
        }
        throw new ShutdownException();
    }

    public Command readNormalCommand() throws IOException, CommandReaderException {
        while (this.running) {
            try {
                return getCommand(this.normal.take());
            } catch (InterruptedException e) {
            }
        }
        throw new ShutdownException();
    }

    public void readServerHandshake() throws IOException {
        checkThreadNotAlive();
        byte[] bytes = "TRDS".getBytes();
        byte[] bArr = new byte[bytes.length];
        this.input.readFully(bArr);
        byte readByte = this.input.readByte();
        int length = bArr.length + 1;
        this.totalBytesReceived += length;
        if (logger.isDebugEnabled()) {
            logger.debug("Read handshake (" + length + " bytes), protocol version: " + ((int) readByte));
        }
        if (!Arrays.equals(bArr, bytes) || readByte != 17) {
            throw new UnsupportedHandshakeException(bArr, readByte);
        }
        this.serverClockSkew = this.input.readLong() - System.currentTimeMillis();
        if (logger.isDebugEnabled()) {
            logger.debug("Server clock skew: " + this.serverClockSkew);
        }
        this.thread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordCommandRead(Command command, int i, int i2) {
        int i3 = i + i2;
        this.totalBytesReceived += i3;
        if (logger.isDebugEnabled()) {
            logger.debug("Read " + command.getCommandType() + " command (" + i3 + " bytes) total=" + this.totalBytesReceived);
        }
    }

    public void registerCustomCommand(ICustomCommand iCustomCommand) {
        this.customCommands.put(iCustomCommand.getCommandIdentifier(), iCustomCommand);
    }

    public void registerCustomCommand(Collection<ICustomCommand> collection) {
        Iterator<ICustomCommand> it = collection.iterator();
        while (it.hasNext()) {
            registerCustomCommand(it.next());
        }
    }

    public void setThreadCleanupCallback(Runnable runnable) {
        this.threadCleanupCallback = runnable;
    }

    public void shutdown() {
        this.running = false;
        this.thread.interrupt();
        synchronized (this.writeLock) {
            if (this.timer != null) {
                this.timer.cancel();
            }
        }
    }

    public void writeBackfillCommand(int i, List<BackfillGap> list) throws IOException {
        writeCommand(new BackfillCommand(i, list));
    }

    public void writeBatchDataMessageCommand(List<MessageSequenceHolder> list) throws IOException {
        writeCommand(new BatchDataMessageCommand(list, this.registry, this.cache));
    }

    public void writeBatchMessageResponseCommand(List<DataMessageResponseHolder> list) throws IOException {
        writeCommand(new BatchMessageResponseCommand(list));
    }

    public void writeChannelCommand(int i, ChannelDefinition channelDefinition) throws IOException {
        writeCommand(new ChannelCommand(i, channelDefinition));
    }

    public void writeChannelsCommand(List<ChannelDescriptor> list) throws IOException {
        if (list == null) {
            logger.warn("Sending ChannelsCommand with null channels", new RuntimeException("Stack"));
        }
        writeCommand(new ChannelsCommand(list));
    }

    public void writeCommand(Command command) throws IOException {
        CommandType commandType = command.getCommandType();
        boolean z = commandType != CommandType.KEEP_ALIVE;
        command.check(this.server);
        synchronized (this.writeLock) {
            if (this.running) {
                if (z && this.keepAliveTask != null) {
                    this.keepAliveTask.cancel();
                }
                this.outputBuffer.reset();
                DataOutputStream dataOutputStream = new DataOutputStream(this.outputBuffer);
                command.write(dataOutputStream, this);
                dataOutputStream.flush();
                int size = this.outputBuffer.size();
                this.output.writeByte(commandType.ordinal());
                this.output.writeInt(size);
                this.outputBuffer.writeTo(this.output);
                this.output.flush();
                int i = size + 5;
                this.totalBytesSent += i;
                if (logger.isDebugEnabled()) {
                    logger.debug("Wrote " + commandType + " command (" + i + " bytes) + total=" + this.totalBytesSent);
                }
                if (z) {
                    this.keepAliveTask = new KeepAliveTimerTask();
                    this.timer.schedule(this.keepAliveTask, KEEP_ALIVE, KEEP_ALIVE);
                }
            }
        }
    }

    public void writeCustomCommand(ICustomCommand iCustomCommand) throws IOException {
        writeCommand(new CustomCommandEnvelope(iCustomCommand));
    }

    public void writeDataMessageCommand(int i, IRaptorDataMessage iRaptorDataMessage) throws IOException {
        writeCommand(new DataMessageCommand(i, iRaptorDataMessage, this.registry, this.cache));
    }

    public void writeDataMessageCommand(IRaptorDataMessage iRaptorDataMessage) throws IOException {
        writeCommand(new DataMessageCommand(iRaptorDataMessage, this.registry, this.cache));
    }

    public void writeDefinitionCommand(IMapEntity iMapEntity) throws IOException {
        writeCommand(new DefinitionCommand(iMapEntity, this.cache, this.server));
    }

    public void writeDeletedDataMessageCommand(int i, IRaptorDataMessage iRaptorDataMessage) throws IOException {
        DataMessageCommand dataMessageCommand = new DataMessageCommand(i, iRaptorDataMessage, this.registry, this.cache);
        dataMessageCommand.setMapObjectDeleted(true);
        writeCommand(dataMessageCommand);
    }

    public void writeErrorCommand(String str) throws IOException {
        writeCommand(new ErrorCommand(str));
    }

    public void writeLoginCommand(String str, String str2, UUID uuid, String str3) throws IOException {
        writeCommand(new LoginCommand(str, str2, uuid, str3));
    }

    public void writeLogoutCommand() throws IOException {
        writeCommand(LOGOUT_COMMAND);
    }

    public void writeMessageErrorCommand(int i, String str) throws IOException {
        writeCommand(new MessageErrorCommand(i, str));
    }

    public void writeMessageOkCommand(int i, int i2) throws IOException {
        writeCommand(new MessageOkCommand(i, i2));
    }

    public void writeOkCommand() throws IOException {
        writeCommand(OK_COMMAND);
    }

    public void writeOkCommand(int i) throws IOException {
        writeCommand(new OkWithReturnCommand(i));
    }

    public void writePropertyMessageCommand(IRaptorPropertyMessage iRaptorPropertyMessage, boolean z) throws IOException {
        PropertyMessageCommand propertyMessageCommand = new PropertyMessageCommand(iRaptorPropertyMessage, this.registry, this.cache);
        propertyMessageCommand.setStateRPM(z);
        propertyMessageCommand.setLocation(iRaptorPropertyMessage.getMapObjectProxy().getMapObject().getLocation());
        writeCommand(propertyMessageCommand);
    }

    public void writeRedirectCommand(int i, InetSocketAddress inetSocketAddress, String str) throws IOException {
        writeCommand(new RedirectCommand(i, inetSocketAddress, str));
    }

    public void writeSelectCommand(int i) throws IOException {
        writeCommand(new SelectCommand(i));
    }

    public void writeServerHandshake() throws IOException {
        checkThreadNotAlive();
        this.output.writeBytes("TRDS");
        this.output.writeByte(17);
        this.output.writeLong(System.currentTimeMillis());
        this.output.flush();
        int length = "TRDS".getBytes().length + 8 + 1;
        this.totalBytesSent += length;
        if (logger.isDebugEnabled()) {
            logger.debug("Wrote handshake (" + length + " bytes), protocol version: 17");
        }
        this.thread.start();
    }

    public void writeSyncCommand(DataServicesSyncOption dataServicesSyncOption) throws IOException {
        writeCommand(new SyncCommand(dataServicesSyncOption));
    }
}
