|
|
@@ -1,18 +1,19 @@
|
|
|
package com.tsi.comm.server.process.packet;
|
|
|
|
|
|
-import com.tsi.comm.server.protocol.enums.eOpCode;
|
|
|
import com.tsi.comm.server.config.TraceConfig;
|
|
|
import com.tsi.comm.server.config.TsiCvimServerConfig;
|
|
|
import com.tsi.comm.server.kafka.KafkaProducerService;
|
|
|
import com.tsi.comm.server.process.AbstractTsiCvimWorker;
|
|
|
import com.tsi.comm.server.process.logging.TsiCvimLoggingProcess;
|
|
|
import com.tsi.comm.server.protocol.AbstractTsiPacket;
|
|
|
+import com.tsi.comm.server.protocol.TsiCpuAddPacket;
|
|
|
import com.tsi.comm.server.protocol.TsiCpuDisconnected;
|
|
|
import com.tsi.comm.server.protocol.TsiCpuPacket;
|
|
|
+import com.tsi.comm.server.protocol.enums.eOpCode;
|
|
|
import com.tsi.comm.server.vo.TsiNodeVo;
|
|
|
import com.tsi.common.spring.SpringUtils;
|
|
|
-import com.tsi.common.utils.HexString;
|
|
|
import com.tsi.common.utils.TimeUtils;
|
|
|
+import io.netty.buffer.ByteBufUtil;
|
|
|
import lombok.Getter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.slf4j.MDC;
|
|
|
@@ -31,7 +32,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
private final KafkaProducerService kafkaProducer;
|
|
|
private final TsiCvimLoggingProcess loggingProcess;
|
|
|
private final TraceConfig traceConfig;
|
|
|
- private final TsiCvimServerConfig cvimServerConfig;
|
|
|
+ private final TsiCvimServerConfig serverConfig;
|
|
|
|
|
|
public TsiCvimPacketWorker(int idx, int qSize, KafkaProducerService kafkaProducer) {
|
|
|
this.idx = idx;
|
|
|
@@ -40,7 +41,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
this.DATA_QUEUE = new LinkedBlockingQueue<>(qSize);
|
|
|
this.loggingProcess = SpringUtils.getBean(TsiCvimLoggingProcess.class);
|
|
|
this.traceConfig = SpringUtils.getBean(TraceConfig.class);
|
|
|
- this.cvimServerConfig = SpringUtils.getBean(TsiCvimServerConfig.class);
|
|
|
+ this.serverConfig = SpringUtils.getBean(TsiCvimServerConfig.class);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -48,7 +49,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
log.info("PacketWorker({}): {} Start. QSIZE: {}", this.idx, Thread.currentThread().getName(), this.qSize);
|
|
|
try {
|
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
|
- Object packet = null;
|
|
|
+ AbstractTsiPacket packet = null;
|
|
|
try {
|
|
|
packet = this.DATA_QUEUE.take();
|
|
|
if (packet == SHUTDOWN_PACKET) {
|
|
|
@@ -72,7 +73,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
log.error("PacketWorker({}): {} Exception: {}", this.idx, Thread.currentThread().getName(), e.getMessage());
|
|
|
}
|
|
|
} finally {
|
|
|
- if (packet != null) {
|
|
|
+ if (packet != null && packet != SHUTDOWN_PACKET) {
|
|
|
TsiCpuPacket cpuPacket = (TsiCpuPacket)packet;
|
|
|
TsiNodeVo nodeVo = (TsiNodeVo)cpuPacket.getObj();
|
|
|
if (nodeVo != null) {
|
|
|
@@ -123,140 +124,204 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- boolean isLogging = this.traceConfig.isNodeLogging() || nodeVo.isDump();
|
|
|
- String logKey = nodeVo.getKey();
|
|
|
- int loggingIdx = nodeVo.getLogQIdx(); // 로깅인덱스큐로 데이터 전송
|
|
|
+ if (!nodeVo.isRegistered()) {
|
|
|
+ // 등록되지 않은 노드인 경우 다른 처리를 하지 않는다.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long curr = System.nanoTime();
|
|
|
+ packet.setPop(curr);
|
|
|
|
|
|
- MDC.put("id", logKey);
|
|
|
+ MDC.put("id", nodeVo.getKey());
|
|
|
try {
|
|
|
- long curr = System.nanoTime();
|
|
|
- if (TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS) > 3000) {
|
|
|
- log.warn("Packet skip::: {}, {} ms.", nodeId, TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS));
|
|
|
+ if (isPacketDelayed(cpuPacket, curr)) {
|
|
|
return;
|
|
|
}
|
|
|
- packet.setPop(curr);
|
|
|
|
|
|
- if (packet.getOpCode() == (byte) eOpCode.TSI_CPU_DISCONNECTED.getValue()) {
|
|
|
- try {
|
|
|
- TsiCpuDisconnected disconnected = (TsiCpuDisconnected) packet;
|
|
|
- disconnected.parsing(nodeVo);
|
|
|
- if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
|
|
|
- this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
|
|
|
- }
|
|
|
-
|
|
|
- if (disconnected.getAddNodes() != null) {
|
|
|
- // 연등지 인 경우
|
|
|
- for (int ii = 0; ii < disconnected.getAddNodes().size(); ii++) {
|
|
|
- this.kafkaProducer.sendCvim(disconnected.getAddNodes().get(ii).getNodeId(), disconnected.getAddNodes().get(ii).getCvimData());
|
|
|
- }
|
|
|
- }
|
|
|
- if (isLogging) {
|
|
|
- this.loggingProcess.add(packet, loggingIdx);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- if (nodeVo.isConnect()) {
|
|
|
- log.warn("Node: {}, Disconnect parsing error: {}, connect: {}", nodeId, Thread.currentThread().getName(), nodeVo.isConnect());
|
|
|
- }
|
|
|
- }
|
|
|
- return;
|
|
|
+ if (isDisconnectedPacket(cpuPacket)) {
|
|
|
+ handleDisconnectedPacket(cpuPacket, nodeVo);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ handleDataPacket(cpuPacket, nodeVo);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ MDC.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // 20250425: packet parsing 의 로그를 노드별 로그파일에 저장되도록 MDC 위치 변경
|
|
|
- // 20250425: parsing 함수에 packet-check 여부를 같이 넘겨줘서 CRC 체크여부 확인
|
|
|
- try {
|
|
|
- int result = cpuPacket.parsing(nodeVo, this.cvimServerConfig.isCheckPacket());
|
|
|
- if (0 != result) {
|
|
|
- if (-4 == result) {
|
|
|
- return; // 패킷 버퍼가 null 이거나 너무 짧은 경우
|
|
|
- }
|
|
|
- if (!nodeVo.isConnect()) {
|
|
|
- return; // 연결이 끊어진 경우
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * 일반 데이터 패킷에 대한 처리
|
|
|
+ */
|
|
|
+ private void handleDataPacket(TsiCpuPacket cpuPacket, TsiNodeVo nodeVo) {
|
|
|
+ // 20250425: packet parsing 의 로그를 노드별 로그파일에 저장되도록 MDC 위치 변경
|
|
|
+ // 20250425: parsing 함수에 packet-check 여부를 같이 넘겨줘서 CRC 체크여부 확인
|
|
|
|
|
|
- byte[] buf = cpuPacket.getBuf();
|
|
|
- log.error("Node: {}, CPU Packet parsing failed. error: {}.", nodeId, result);
|
|
|
- log.error("{}", HexString.fromBytes(buf));
|
|
|
- byte stx1 = 0x00, stx2 = 0x00, version = 0x00;
|
|
|
- if (buf.length > TsiCpuPacket.INDEX_VERSION) {
|
|
|
- stx1 = buf[TsiCpuPacket.INDEX_STX1];
|
|
|
- stx2 = buf[TsiCpuPacket.INDEX_STX2];
|
|
|
- version = buf[TsiCpuPacket.INDEX_VERSION];
|
|
|
- }
|
|
|
+ // 1. 파싱 및 유효성 검사
|
|
|
+ if (!parseAndValidate(cpuPacket, nodeVo)) {
|
|
|
+ return; // 파싱 실패 시 처리 중단
|
|
|
+ }
|
|
|
+ nodeVo.setPacketError(false);
|
|
|
+ cpuPacket.setPar(System.nanoTime());
|
|
|
|
|
|
- int reqLength = TsiCpuPacket.SIZE_PACKET_DATA + (TsiCpuPacket.SIZE_STATUS_DATA * cpuPacket.getCount());
|
|
|
- switch (result) {
|
|
|
- case -1:
|
|
|
- log.error("Node: {}, STX Error: {}, {}", nodeId, stx1, stx2);
|
|
|
- break;
|
|
|
- case -2:
|
|
|
- log.error("Node: {}, Length Error: {}, Version: {}, status count: {}, {}", nodeId, cpuPacket.getLength(), version,
|
|
|
- cpuPacket.getCount(), reqLength);
|
|
|
- break;
|
|
|
- case -3:
|
|
|
- log.error("Node: {}, Check Sum Error: Version: {}, recv: {}, calc: {}", nodeId, version, cpuPacket.getCheckSum(),
|
|
|
- cpuPacket.getCalcCheckSum());
|
|
|
- break;
|
|
|
- default:
|
|
|
- log.error("Node: {}, Packet parsing error: {}", nodeId, result);
|
|
|
- break;
|
|
|
- }
|
|
|
- return;
|
|
|
+ // 2. Kafka로 데이터 전송
|
|
|
+ sendToKafka(cpuPacket, nodeVo);
|
|
|
+ cpuPacket.setEnd(System.nanoTime());
|
|
|
+
|
|
|
+ // 3. 통계 및 로깅
|
|
|
+ cpuPacket.setAvg(calcProcessTime(cpuPacket.getRcv()));
|
|
|
+
|
|
|
+ logPacketIfNeeded(cpuPacket, nodeVo);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendToKafka(TsiCpuPacket cpuPacket, TsiNodeVo nodeVo) {
|
|
|
+ if (nodeVo.isSendNode() && cpuPacket.getNodeData() != null) {
|
|
|
+ this.kafkaProducer.sendNode(Long.toString(nodeVo.getNodeId()), cpuPacket.getNodeData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendTest() && cpuPacket.getTestData() != null) {
|
|
|
+ this.kafkaProducer.sendTest(nodeVo.getNodeId(), cpuPacket.getTestData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendCvim() && cpuPacket.getCvimData() != null) {
|
|
|
+ this.kafkaProducer.sendCvim(nodeVo.getNodeId(), cpuPacket.getCvimData());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cpuPacket.getAddNodes() != null) {
|
|
|
+ for (TsiCpuAddPacket addNodePacket : cpuPacket.getAddNodes()) {
|
|
|
+ // 연등지 노드 패킷 카프카 전송
|
|
|
+ if (nodeVo.isSendNode()) {
|
|
|
+ this.kafkaProducer.sendNode(Long.toString(addNodePacket.getNodeId()), addNodePacket.getNodeData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendCvim()) {
|
|
|
+ this.kafkaProducer.sendCvim(addNodePacket.getNodeId(), addNodePacket.getCvimData());
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- log.warn("Node: {}, CPU Packet parsing error: {}, connect: {}", nodeId, Thread.currentThread().getName(), nodeVo.isConnect());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean parseAndValidate(TsiCpuPacket cpuPacket, TsiNodeVo nodeVo) {
|
|
|
+ try {
|
|
|
+ nodeVo.setCrcError(false);
|
|
|
+ nodeVo.setPacketError(false);
|
|
|
+
|
|
|
+ int result = cpuPacket.parsing(nodeVo, this.serverConfig.isCheckPacket());
|
|
|
+ if (result == 0) {
|
|
|
+ return true; // 파싱 성공
|
|
|
+ }
|
|
|
+ if (!nodeVo.isConnect()) {
|
|
|
+ return false; // 네트워크 연결이 끊어진 경우
|
|
|
}
|
|
|
|
|
|
- packet.setPar(System.nanoTime());
|
|
|
+ nodeVo.setPacketError(true);
|
|
|
|
|
|
- // 카프카 전송
|
|
|
- if (nodeVo.isSendNode() && packet.getNodeData() != null) {
|
|
|
- this.kafkaProducer.sendNode(Long.toString(nodeId), packet.getNodeData());
|
|
|
+ if (-4 == result) {
|
|
|
+ return false; // 패킷 버퍼가 null 이거나 너무 짧은 경우
|
|
|
}
|
|
|
- if (nodeVo.isSendTest()) {
|
|
|
- this.kafkaProducer.sendTest(nodeId, packet.getTestData());
|
|
|
+
|
|
|
+ long nodeId = nodeVo.getNodeId();
|
|
|
+ int reqLength = TsiCpuPacket.SIZE_PACKET_DATA + (TsiCpuPacket.SIZE_STATUS_DATA * cpuPacket.getCount());
|
|
|
+ byte stx1 = 0x00, stx2 = 0x00, version = 0x00;
|
|
|
+ if (cpuPacket.getBuf() != null) {
|
|
|
+ byte[] buf = cpuPacket.getBuf();
|
|
|
+ log.error("packet parse error: {}", ByteBufUtil.hexDump(buf));
|
|
|
+ if (buf.length > TsiCpuPacket.INDEX_VERSION) {
|
|
|
+ stx1 = buf[TsiCpuPacket.INDEX_STX1];
|
|
|
+ stx2 = buf[TsiCpuPacket.INDEX_STX2];
|
|
|
+ version = buf[TsiCpuPacket.INDEX_VERSION];
|
|
|
+ }
|
|
|
}
|
|
|
- if (nodeVo.isSendCvim() && packet.getCvimData() != null) {
|
|
|
- this.kafkaProducer.sendCvim(nodeId, packet.getCvimData());
|
|
|
+
|
|
|
+ switch (result) {
|
|
|
+ case -1:
|
|
|
+ log.error("Node: {}, STX Error: {}, {}", nodeId, stx1, stx2);
|
|
|
+ break;
|
|
|
+ case -2:
|
|
|
+ log.error("Node: {}, Length Error: {}, Version: {}, status count: {}, {}", nodeId, cpuPacket.getLength(), version,
|
|
|
+ cpuPacket.getCount(), reqLength);
|
|
|
+ break;
|
|
|
+ case -3:
|
|
|
+ log.error("Node: {}, Check Sum Error: Version: {}, recv: {}, calc: {}", nodeId, version, cpuPacket.getCheckSum(),
|
|
|
+ cpuPacket.getCalcCheckSum());
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.error("Node: {}, Packet parsing error: {}", nodeId, result);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("Node: {}, CPU Packet parsing error: {}, connect: {}", nodeVo.getNodeId(), Thread.currentThread().getName(), nodeVo.isConnect(), e);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 연결 종료 패킷에 대한 처리
|
|
|
+ */
|
|
|
+ private void handleDisconnectedPacket(TsiCpuPacket packet, TsiNodeVo nodeVo) {
|
|
|
+ TsiCpuDisconnected disconnected = (TsiCpuDisconnected) packet;
|
|
|
+ try {
|
|
|
+ disconnected.parsing(nodeVo);
|
|
|
+
|
|
|
+ if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
|
|
|
+ this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
|
|
|
}
|
|
|
|
|
|
- if (cpuPacket.getAddNodes() != null) {
|
|
|
+ if (nodeVo.isSendCvim() && disconnected.getAddNodes() != null) {
|
|
|
// 연등지 인 경우
|
|
|
- for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
- if (nodeVo.isSendNode()) {
|
|
|
- this.kafkaProducer.sendNode(Long.toString(cpuPacket.getAddNodes().get(ii).getNodeId()), cpuPacket.getAddNodes().get(ii).getNodeData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendCvim()) {
|
|
|
- this.kafkaProducer.sendCvim(cpuPacket.getAddNodes().get(ii).getNodeId(), cpuPacket.getAddNodes().get(ii).getCvimData());
|
|
|
- }
|
|
|
+ for (TsiCpuPacket addNodePacket : disconnected.getAddNodes()) {
|
|
|
+ this.kafkaProducer.sendCvim(addNodePacket.getNodeId(), addNodePacket.getCvimData());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- packet.setEnd(System.nanoTime());
|
|
|
+ logPacketIfNeeded(packet, nodeVo);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (nodeVo.isConnect()) {
|
|
|
+ log.warn("Node: {}, Disconnect parsing error: {}, connect: {}", nodeVo.getNodeId(), Thread.currentThread().getName(), nodeVo.isConnect(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- packet.setAvg(calcProcessTime(packet.getRcv()));
|
|
|
+ /**
|
|
|
+ * 연결 종료 패킷 여부 확인
|
|
|
+ */
|
|
|
+ private boolean isDisconnectedPacket(TsiCpuPacket packet) {
|
|
|
+ return packet.getOpCode() == (byte) eOpCode.TSI_CPU_DISCONNECTED.getValue();
|
|
|
+ }
|
|
|
|
|
|
- // 로그큐로 전송한다.
|
|
|
- if (isLogging) {
|
|
|
- this.loggingProcess.add(packet, loggingIdx);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * 수신한 패킷의 시간초과 여부 체크
|
|
|
+ * 큐에 수신한 시각과 큐에서 읽어 낸 시간의 차이를 계산
|
|
|
+ */
|
|
|
+ private boolean isPacketDelayed(TsiCpuPacket packet, long curr) {
|
|
|
+ final long delayMillis = TimeUnit.NANOSECONDS.toMillis(curr - packet.getRcv());
|
|
|
+ if (delayMillis > 3000) {
|
|
|
+ log.warn("Packet skip::: {}, {} ms.", packet.getNodeId(), delayMillis);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 로깅 여부에 따라 로길 프로세스로 데이터 전송
|
|
|
+ */
|
|
|
+ private void logPacketIfNeeded(AbstractTsiPacket packet, TsiNodeVo nodeVo) {
|
|
|
+ if (this.traceConfig.isNodeLogging() || nodeVo.isDump()) {
|
|
|
|
|
|
- // 연등지 인 경우
|
|
|
- if (cpuPacket.getAddNodes() != null) {
|
|
|
- for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
- cpuPacket.getAddNodes().get(ii).setAdd(packet.getAdd());
|
|
|
- cpuPacket.getAddNodes().get(ii).setPop(packet.getPop());
|
|
|
- cpuPacket.getAddNodes().get(ii).setPar(packet.getPar());
|
|
|
- cpuPacket.getAddNodes().get(ii).setEnd(packet.getEnd());
|
|
|
- cpuPacket.getAddNodes().get(ii).setAvg(packet.getAvg());
|
|
|
- if (isLogging) {
|
|
|
- this.loggingProcess.add(cpuPacket.getAddNodes().get(ii), loggingIdx);
|
|
|
+ this.loggingProcess.add(packet, nodeVo.getLogQIdx());
|
|
|
+
|
|
|
+ if (packet instanceof TsiCpuPacket) {
|
|
|
+ TsiCpuPacket cpuPacket = (TsiCpuPacket) packet;
|
|
|
+ if (cpuPacket.getAddNodes() != null) {
|
|
|
+ // 연등지 인 경우 연등지 패킷 로깅
|
|
|
+ for (TsiCpuAddPacket addNodePacket : cpuPacket.getAddNodes()) {
|
|
|
+ addNodePacket.setAdd(packet.getAdd());
|
|
|
+ addNodePacket.setPop(packet.getPop());
|
|
|
+ addNodePacket.setPar(packet.getPar());
|
|
|
+ addNodePacket.setEnd(packet.getEnd());
|
|
|
+ addNodePacket.setAvg(packet.getAvg());
|
|
|
+ this.loggingProcess.add(addNodePacket, nodeVo.getLogQIdx());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- } finally {
|
|
|
- MDC.clear();
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
public void report() {
|