/*
 * Decompiled with CFR 0.152.
 */
package com.tsi.comm.server.process.packet;

import com.tsi.comm.server.config.TraceConfig;
import com.tsi.comm.server.config.TsiCvimServerConfig;
import com.tsi.comm.server.kafka.KafkaProducerService;
import com.tsi.comm.server.process.AbstractTsiCvimWorker;
import com.tsi.comm.server.process.logging.TsiCvimLoggingProcess;
import com.tsi.comm.server.protocol.AbstractTsiPacket;
import com.tsi.comm.server.protocol.TsiCpuAddPacket;
import com.tsi.comm.server.protocol.TsiCpuDisconnected;
import com.tsi.comm.server.protocol.TsiCpuPacket;
import com.tsi.comm.server.protocol.enums.eOpCode;
import com.tsi.comm.server.vo.TsiNodeVo;
import com.tsi.common.spring.SpringUtils;
import com.tsi.common.utils.TimeUtils;
import io.netty.buffer.ByteBufUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class TsiCvimPacketWorker
extends AbstractTsiCvimWorker
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(TsiCvimPacketWorker.class);
    public static final AbstractTsiPacket SHUTDOWN_PACKET = new TsiCpuPacket(-1L, 0L, 0L, 0L, 0);
    private final LinkedBlockingQueue<AbstractTsiPacket> DATA_QUEUE;
    private final KafkaProducerService kafkaProducer;
    private final TsiCvimLoggingProcess loggingProcess;
    private final TraceConfig traceConfig;
    private final TsiCvimServerConfig serverConfig;
    private final boolean useLoggingThread;
    private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public TsiCvimPacketWorker(int idx, int qSize, KafkaProducerService kafkaProducer, boolean useLoggingThread) {
        this.idx = idx;
        this.qSize = qSize;
        this.kafkaProducer = kafkaProducer;
        this.useLoggingThread = useLoggingThread;
        this.DATA_QUEUE = new LinkedBlockingQueue(qSize);
        this.loggingProcess = (TsiCvimLoggingProcess)SpringUtils.getBean(TsiCvimLoggingProcess.class);
        this.traceConfig = (TraceConfig)SpringUtils.getBean(TraceConfig.class);
        this.serverConfig = (TsiCvimServerConfig)SpringUtils.getBean(TsiCvimServerConfig.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        log.info("PacketWorker({}): {} Start. QSIZE: {}", new Object[]{this.idx, Thread.currentThread().getName(), this.qSize});
        try {
            while (!Thread.currentThread().isInterrupted()) {
                TsiNodeVo nodeVo;
                TsiCpuPacket cpuPacket;
                AbstractTsiPacket packet = null;
                try {
                    packet = (AbstractTsiPacket)this.DATA_QUEUE.take();
                    if (packet == SHUTDOWN_PACKET) {
                        log.info("PacketWorker({}): Shutdown signal received. Exiting...", (Object)this.idx);
                        return;
                    }
                    this.process((Object)packet);
                }
                catch (InterruptedException ie) {
                    log.warn("PacketWorker({}): {} Interrupted. Exiting...", (Object)this.idx, (Object)Thread.currentThread().getName());
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Exception e) {
                    if (packet != null) {
                        cpuPacket = (TsiCpuPacket)packet;
                        nodeVo = (TsiNodeVo)cpuPacket.getObj();
                        if (nodeVo == null) continue;
                        log.warn("PacketWorker({}): {} Node: {}, Network Conn: {}", new Object[]{this.idx, Thread.currentThread().getName(), nodeVo.getKey(), nodeVo.isConnect()});
                        continue;
                    }
                    log.error("PacketWorker({}): {} Exception: {}", new Object[]{this.idx, Thread.currentThread().getName(), e.getMessage()});
                }
                finally {
                    if (packet == null || packet == SHUTDOWN_PACKET || (nodeVo = (TsiNodeVo)(cpuPacket = (TsiCpuPacket)packet).getObj()) == null) continue;
                    nodeVo.packetProcessingFinished(cpuPacket);
                }
            }
            return;
        }
        finally {
            log.info("PacketWorker({}): {} Stopped.", (Object)this.idx, (Object)Thread.currentThread().getName());
        }
    }

    public boolean add(Object object) {
        boolean offer = false;
        AbstractTsiPacket packet = (AbstractTsiPacket)object;
        try {
            offer = this.DATA_QUEUE.offer(packet);
            if (!offer) {
                MDC.put((String)"id", (String)Long.toString(packet.getNodeId()));
                log.error("PacketWorker({}): Queue.offer(Full): {}, {}/{}/{}, {}, {}", new Object[]{this.idx, packet.getNodeId(), this.DATA_QUEUE.size(), this.qSize, this.DATA_QUEUE.remainingCapacity(), TimeUtils.elapsedTime((long)packet.getRcv()), Thread.currentThread().getName()});
                MDC.clear();
            }
        }
        catch (Exception e) {
            MDC.put((String)"id", (String)Long.toString(packet.getNodeId()));
            log.error("PacketWorker({}): Queue.offer: Exception: {}, {}, {}", new Object[]{this.idx, packet.getNodeId(), Thread.currentThread().getName(), e.getMessage()});
            MDC.clear();
        }
        return offer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Object object) {
        AbstractTsiPacket packet = (AbstractTsiPacket)object;
        TsiCpuPacket cpuPacket = (TsiCpuPacket)packet;
        TsiNodeVo nodeVo = (TsiNodeVo)cpuPacket.getObj();
        long nodeId = packet.getNodeId();
        if (nodeVo == null) {
            log.error("Packet process NodeVo is null: {}, {}", (Object)nodeId, (Object)Thread.currentThread().getName());
            return;
        }
        if (!nodeVo.isRegistered()) {
            return;
        }
        long curr = System.nanoTime();
        packet.setPop(curr);
        MDC.put((String)"id", (String)nodeVo.getKey());
        try {
            if (this.isPacketDelayed(cpuPacket, curr)) {
                return;
            }
            if (this.isDisconnectedPacket(cpuPacket)) {
                this.handleDisconnectedPacket(cpuPacket, nodeVo);
            } else {
                this.handleDataPacket(cpuPacket, nodeVo);
            }
        }
        finally {
            MDC.clear();
        }
    }

    private void handleDataPacket(TsiCpuPacket cpuPacket, TsiNodeVo nodeVo) {
        if (!this.parseAndValidate(cpuPacket, nodeVo)) {
            return;
        }
        cpuPacket.setPar(System.nanoTime());
        this.sendToKafka(cpuPacket, nodeVo);
        cpuPacket.setEnd(System.nanoTime());
        cpuPacket.setAvg(this.calcProcessTime(cpuPacket.getRcv()));
        this.logPacketIfNeeded((AbstractTsiPacket)cpuPacket, nodeVo);
    }

    private void sendToKafka(TsiCpuPacket cpuPacket, TsiNodeVo nodeVo) {
        if (nodeVo.isSendNode() && cpuPacket.getNodeData() != null) {
            this.kafkaProducer.sendNode(Long.toString(nodeVo.getNodeId()), cpuPacket.getNodeData());
        }
        if (nodeVo.isSendTest() && cpuPacket.getTestData() != null) {
            this.kafkaProducer.sendTest(nodeVo.getNodeId(), cpuPacket.getTestData());
        }
        if (nodeVo.isSendCvim() && cpuPacket.getCvimData() != null) {
            this.kafkaProducer.sendCvim(nodeVo.getNodeId(), cpuPacket.getCvimData());
        }
        if (cpuPacket.getAddNodes() != null) {
            for (TsiCpuAddPacket addNodePacket : cpuPacket.getAddNodes()) {
                if (nodeVo.isSendNode()) {
                    this.kafkaProducer.sendNode(Long.toString(addNodePacket.getNodeId()), addNodePacket.getNodeData());
                }
                if (!nodeVo.isSendCvim()) continue;
                this.kafkaProducer.sendCvim(addNodePacket.getNodeId(), addNodePacket.getCvimData());
            }
        }
    }

    private boolean parseAndValidate(TsiCpuPacket cpuPacket, TsiNodeVo nodeVo) {
        try {
            int result = cpuPacket.parsing(nodeVo, this.serverConfig.isCheckPacket());
            if (result == 0) {
                return true;
            }
            if (!nodeVo.isConnect()) {
                log.error("parsing: errno({}), NodeId: {}, Disconnected.", (Object)result, (Object)nodeVo.getNodeId());
                return false;
            }
            if (cpuPacket.getBuf() != null) {
                byte version = 0;
                byte[] buf = cpuPacket.getBuf();
                if (buf.length > 5) {
                    version = buf[5];
                }
                log.error("parsing: errno({}), NodeId: {}, version: {}", new Object[]{nodeVo.getNodeId(), result, version});
                log.error("parsing: {}", (Object)ByteBufUtil.hexDump((byte[])buf));
            }
            return false;
        }
        catch (Exception e) {
            log.warn("Node: {}, CPU Packet parsing error: {}, connect: {}, {}", new Object[]{nodeVo.getNodeId(), Thread.currentThread().getName(), nodeVo.isConnect(), e.getMessage()});
            return false;
        }
    }

    private void handleDisconnectedPacket(TsiCpuPacket packet, TsiNodeVo nodeVo) {
        block6: {
            TsiCpuDisconnected disconnected = (TsiCpuDisconnected)packet;
            try {
                if (!disconnected.parsing(nodeVo)) {
                    return;
                }
                packet.setPar(System.nanoTime());
                if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
                    this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
                }
                if (nodeVo.isSendCvim() && disconnected.getAddNodes() != null) {
                    for (TsiCpuPacket addNodePacket : disconnected.getAddNodes()) {
                        this.kafkaProducer.sendCvim(addNodePacket.getNodeId(), addNodePacket.getCvimData());
                    }
                }
                packet.setEnd(System.nanoTime());
                packet.setAvg(this.calcProcessTime(packet.getRcv()));
                this.logPacketIfNeeded((AbstractTsiPacket)packet, nodeVo);
            }
            catch (Exception e) {
                if (!nodeVo.isConnect()) break block6;
                log.warn("Node: {}, Disconnect parsing error: {}, connect: {}, {}", new Object[]{nodeVo.getNodeId(), Thread.currentThread().getName(), nodeVo.isConnect(), e.getMessage()});
            }
        }
    }

    private boolean isDisconnectedPacket(TsiCpuPacket packet) {
        return packet.getOpCode() == (byte)eOpCode.TSI_CPU_DISCONNECTED.getValue();
    }

    private boolean isPacketDelayed(TsiCpuPacket packet, long curr) {
        long delayMillis = TimeUnit.NANOSECONDS.toMillis(curr - packet.getRcv());
        if (delayMillis > 3000L) {
            log.warn("Packet skip::: {}, {} ms.", (Object)packet.getNodeId(), (Object)delayMillis);
            return true;
        }
        return false;
    }

    private void logPacketIfNeeded(AbstractTsiPacket packet, TsiNodeVo nodeVo) {
        if (this.traceConfig.isNodeLogging() || nodeVo.isDump()) {
            TsiCpuPacket cpuPacket;
            if (this.useLoggingThread) {
                this.loggingProcess.add((Object)packet, nodeVo.getLogQIdx());
            } else {
                this.logging(packet);
            }
            if (packet instanceof TsiCpuPacket && (cpuPacket = (TsiCpuPacket)packet).getAddNodes() != null) {
                for (TsiCpuAddPacket addNodePacket : cpuPacket.getAddNodes()) {
                    addNodePacket.setAdd(packet.getAdd());
                    addNodePacket.setPop(packet.getPop());
                    addNodePacket.setPar(packet.getPar());
                    addNodePacket.setEnd(packet.getEnd());
                    addNodePacket.setAvg(packet.getAvg());
                    if (this.useLoggingThread) {
                        this.loggingProcess.add((Object)addNodePacket, nodeVo.getLogQIdx());
                        continue;
                    }
                    this.logging((AbstractTsiPacket)addNodePacket);
                }
            }
        }
    }

    private void logging(AbstractTsiPacket packet) {
        long job = packet.getEnd() - packet.getRcv();
        Date date = new Date(packet.getTimespec().times() * 1000L);
        String collectTime = this.sdf.format(date);
        log.info("{} Node: {},        Job: {} {}, {} bytes, Average {}", new Object[]{collectTime, packet.getNodeId(), TimeUtils.elapsedTimeStr((long)job), Thread.currentThread().getName(), packet.getPacketLength(), TimeUtils.elapsedTimeStr((long)packet.getAvg())});
    }

    public void report() {
        log.info("PacketWorker({}), Queue Total/Size/Remain: {}/{}/{}, Average: {}, {}", new Object[]{this.idx, this.qSize, this.DATA_QUEUE.size(), this.DATA_QUEUE.remainingCapacity(), TimeUtils.elapsedTimeStr((long)this.avgTime), Thread.currentThread().getName()});
    }

    public LinkedBlockingQueue<AbstractTsiPacket> getDATA_QUEUE() {
        return this.DATA_QUEUE;
    }

    public KafkaProducerService getKafkaProducer() {
        return this.kafkaProducer;
    }

    public TsiCvimLoggingProcess getLoggingProcess() {
        return this.loggingProcess;
    }

    public TraceConfig getTraceConfig() {
        return this.traceConfig;
    }

    public TsiCvimServerConfig getServerConfig() {
        return this.serverConfig;
    }

    public boolean isUseLoggingThread() {
        return this.useLoggingThread;
    }

    public SimpleDateFormat getSdf() {
        return this.sdf;
    }
}

