|
@@ -2,7 +2,9 @@ package com.tsi.comm.server.process.packet;
|
|
|
|
|
|
import com.tsi.app.common.app.AppUtils;
|
|
|
import com.tsi.app.common.cpu.enums.eOpCode;
|
|
|
+import com.tsi.app.common.utils.HexString;
|
|
|
import com.tsi.app.common.utils.TimeUtils;
|
|
|
+import com.tsi.comm.server.config.TraceConfig;
|
|
|
import com.tsi.comm.server.config.TsiCvimServerConfig;
|
|
|
import com.tsi.comm.server.kafka.KafkaProducerService;
|
|
|
import com.tsi.comm.server.process.AbstractTsiCvimWorker;
|
|
@@ -24,9 +26,11 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
|
|
|
//private ConcurrentLinkedQueue<AbstractTsiPacket> DATA_QUEUE;
|
|
|
private final LinkedBlockingQueue<AbstractTsiPacket> DATA_QUEUE;
|
|
|
+
|
|
|
private final KafkaProducerService kafkaProducer;
|
|
|
private final TsiCvimLoggingProcess loggingService;
|
|
|
- private TsiCvimServerConfig cvimServerConfig;
|
|
|
+ private final TraceConfig traceConfig;
|
|
|
+ private final TsiCvimServerConfig cvimServerConfig;
|
|
|
|
|
|
public TsiCvimPacketWorker(int idx, int qSize, KafkaProducerService kafkaProducer) {
|
|
|
this.idx = idx;
|
|
@@ -34,6 +38,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
this.kafkaProducer = kafkaProducer;
|
|
|
this.DATA_QUEUE = new LinkedBlockingQueue<>(qSize);
|
|
|
this.loggingService = (TsiCvimLoggingProcess) AppUtils.getBean(TsiCvimLoggingProcess.class);
|
|
|
+ this.traceConfig = (TraceConfig) AppUtils.getBean(TraceConfig.class);
|
|
|
this.cvimServerConfig = (TsiCvimServerConfig) AppUtils.getBean(TsiCvimServerConfig.class);
|
|
|
}
|
|
|
|
|
@@ -56,7 +61,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
break;
|
|
|
} catch (Exception e) {
|
|
|
if (packet != null) {
|
|
|
- TsiCpuPacket cpuPacket = (TsiCpuPacket)((AbstractTsiPacket)packet);
|
|
|
+ TsiCpuPacket cpuPacket = (TsiCpuPacket) packet;
|
|
|
TsiNodeVo nodeVo = (TsiNodeVo)cpuPacket.getObj();
|
|
|
if (nodeVo != null) {
|
|
|
log.warn("PacketWorker({}): {} Node: {}, Network Conn: {}", this.idx, Thread.currentThread().getName(), nodeVo.getKey(), nodeVo.isConnect());
|
|
@@ -115,11 +120,13 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
this.idx, packet.getNodeId(),
|
|
|
this.DATA_QUEUE.size(), this.qSize, this.DATA_QUEUE.remainingCapacity(),
|
|
|
TimeUtils.elapsedTime(packet.getRcv()), Thread.currentThread().getName());
|
|
|
+ MDC.remove(Long.toString(packet.getNodeId()));
|
|
|
MDC.clear();
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
MDC.put("id", Long.toString(packet.getNodeId()));
|
|
|
log.error("PacketWorker({}): Queue.offer: Exception: {}, {}, {}", this.idx, packet.getNodeId(), Thread.currentThread().getName(), e.getMessage());
|
|
|
+ MDC.remove(Long.toString(packet.getNodeId()));
|
|
|
MDC.clear();
|
|
|
}
|
|
|
return offer;
|
|
@@ -129,6 +136,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
AbstractTsiPacket packet = (AbstractTsiPacket)object;
|
|
|
TsiCpuPacket cpuPacket = (TsiCpuPacket)packet;
|
|
|
TsiNodeVo nodeVo = (TsiNodeVo)cpuPacket.getObj();//TsiNodeManager.getInstance().get(packet.getNodeId());
|
|
|
+
|
|
|
long nodeId = packet.getNodeId();
|
|
|
if (nodeVo == null) {
|
|
|
// 노드 정보가 없는 경우
|
|
@@ -136,96 +144,123 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ boolean isLogging = this.traceConfig.isNodeLogging() || nodeVo.isDump();
|
|
|
+ String logKey = nodeVo.getKey();
|
|
|
int loggingIdx = nodeVo.getIdx();
|
|
|
|
|
|
- long curr = System.nanoTime();
|
|
|
- if (TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS) > 3000) {
|
|
|
- log.error("Packet skip::: {}, {} ms.", nodeId, TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS));
|
|
|
- return;
|
|
|
- }
|
|
|
- packet.setPop(curr);
|
|
|
+ MDC.put("id", logKey);
|
|
|
+ try {
|
|
|
+ long curr = System.nanoTime();
|
|
|
+ if (TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS) > 3000) {
|
|
|
+ log.error("Packet skip::: {}, {} ms.", nodeId, TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ packet.setPop(curr);
|
|
|
|
|
|
- if (packet.getOpCode() == (byte) eOpCode.TSI_CPU_DISCONNECTED.getValue()) {
|
|
|
- MDC.put("id", nodeVo.getKey());
|
|
|
- try {
|
|
|
- TsiCpuDisconnected disconnected = (TsiCpuDisconnected) packet;
|
|
|
- disconnected.parsing(nodeVo);
|
|
|
- if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
|
|
|
- this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
|
|
|
- }
|
|
|
+ if (packet.getOpCode() == (byte) eOpCode.TSI_CPU_DISCONNECTED.getValue()) {
|
|
|
+ try {
|
|
|
+ TsiCpuDisconnected disconnected = (TsiCpuDisconnected) packet;
|
|
|
+ disconnected.parsing(nodeVo);
|
|
|
+ if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
|
|
|
+ this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
|
|
|
+ }
|
|
|
|
|
|
- if (disconnected.getAddNodes() != null) {
|
|
|
- // 연등지 인 경우
|
|
|
- for (int ii = 0; ii < disconnected.getAddNodes().size(); ii++) {
|
|
|
- this.kafkaProducer.sendCvim(disconnected.getAddNodes().get(ii).getNodeId(), disconnected.getAddNodes().get(ii).getCvimData());
|
|
|
+ if (disconnected.getAddNodes() != null) {
|
|
|
+ // 연등지 인 경우
|
|
|
+ for (int ii = 0; ii < disconnected.getAddNodes().size(); ii++) {
|
|
|
+ this.kafkaProducer.sendCvim(disconnected.getAddNodes().get(ii).getNodeId(), disconnected.getAddNodes().get(ii).getCvimData());
|
|
|
+ }
|
|
|
}
|
|
|
+ if (isLogging) {
|
|
|
+ this.loggingService.add(packet, loggingIdx);//(int)Thread.currentThread().getId());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Disconnect Packet parsing error: {}, {}, {}", nodeId, Thread.currentThread().getName(), e.getMessage());
|
|
|
}
|
|
|
- this.loggingService.add(packet, loggingIdx);//(int)Thread.currentThread().getId());
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Disconnect Packet parsing error: {}, {}, {}", nodeId, Thread.currentThread().getName(), e.getMessage());
|
|
|
+ return;
|
|
|
}
|
|
|
- MDC.clear();
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
- // 메모리에서 객체를 찾은 후 패킷 파싱
|
|
|
- // 20250425: packet parsing 의 로그를 노드별 로그파일에 저장되도록 MDC 위치 변경
|
|
|
- // 20250425: parsing 함수에 packet-check 여부를 같이 넘겨줘서 CRC 체크여부 확인
|
|
|
- MDC.put("id", nodeVo.getKey());
|
|
|
- try {
|
|
|
- if (!cpuPacket.parsing(nodeVo, this.cvimServerConfig.isCheckPacket())) {
|
|
|
- // MDC.put("id", nodeVo.getKey());
|
|
|
- log.error("Packet parsing failed: {}", nodeId);
|
|
|
- // MDC.clear();
|
|
|
+ // 메모리에서 객체를 찾은 후 패킷 파싱
|
|
|
+ // 20250425: packet parsing 의 로그를 노드별 로그파일에 저장되도록 MDC 위치 변경
|
|
|
+ // 20250425: parsing 함수에 packet-check 여부를 같이 넘겨줘서 CRC 체크여부 확인
|
|
|
+ try {
|
|
|
+ int result = cpuPacket.parsing(nodeVo, this.cvimServerConfig.isCheckPacket());
|
|
|
+ if (0 != result) {
|
|
|
+ byte[] buf = cpuPacket.getBuf();
|
|
|
+ int reqLength = TsiCpuPacket.SIZE_PACKET_DATA + (TsiCpuPacket.SIZE_STATUS_DATA * cpuPacket.getCount());
|
|
|
+ log.error("Node: {}, Packet parsing failed.", nodeId);
|
|
|
+ log.error("{}", HexString.fromBytes(buf));
|
|
|
+ switch (result) {
|
|
|
+ case -1:
|
|
|
+ log.info("Node: {}, STX Error: {}, {}", nodeId, buf[TsiCpuPacket.INDEX_STX1], buf[TsiCpuPacket.INDEX_STX2]);
|
|
|
+ break;
|
|
|
+ case -2:
|
|
|
+ log.info("Node: {}, Length Error: {}, Version: {}, status count: {}, {}", nodeId, cpuPacket.getLength(), buf[TsiCpuPacket.INDEX_VERSION],
|
|
|
+ cpuPacket.getCount(), reqLength);
|
|
|
+ break;
|
|
|
+ case -3:
|
|
|
+ log.error("Node: {}, Check Sum Error: Version: {}, recv: {}, calc: {}", nodeId, buf[TsiCpuPacket.INDEX_VERSION], cpuPacket.getCheckSum(),
|
|
|
+ cpuPacket.getCalcCheckSum());
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.error("Node: {}, Packet parsing error: {}", nodeId, result);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Cpu Packet parsing error: {}, {}, {}", nodeId, Thread.currentThread().getName(), e.getMessage());
|
|
|
}
|
|
|
- }
|
|
|
- catch (Exception e) {
|
|
|
- log.error("Cpu Packet parsing error: {}, {}, {}", nodeId, Thread.currentThread().getName(), e.getMessage());
|
|
|
- }
|
|
|
- MDC.clear();
|
|
|
|
|
|
- packet.setPar(System.nanoTime());
|
|
|
+ packet.setPar(System.nanoTime());
|
|
|
|
|
|
- // 카프카 전송
|
|
|
- if (nodeVo.isSendNode() && packet.getNodeData() != null) {
|
|
|
- this.kafkaProducer.sendNode(Long.toString(nodeId), packet.getNodeData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendTest()) {
|
|
|
- this.kafkaProducer.sendTest(nodeId, packet.getTestData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendCvim() && packet.getCvimData() != null) {
|
|
|
- this.kafkaProducer.sendCvim(nodeId, packet.getCvimData());
|
|
|
- }
|
|
|
+ // 카프카 전송
|
|
|
+ if (nodeVo.isSendNode() && packet.getNodeData() != null) {
|
|
|
+ this.kafkaProducer.sendNode(Long.toString(nodeId), packet.getNodeData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendTest()) {
|
|
|
+ this.kafkaProducer.sendTest(nodeId, packet.getTestData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendCvim() && packet.getCvimData() != null) {
|
|
|
+ this.kafkaProducer.sendCvim(nodeId, packet.getCvimData());
|
|
|
+ }
|
|
|
|
|
|
- if (cpuPacket.getAddNodes() != null) {
|
|
|
- // 연등지 인 경우
|
|
|
- for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
- if (nodeVo.isSendNode()) {
|
|
|
- this.kafkaProducer.sendNode(Long.toString(cpuPacket.getAddNodes().get(ii).getNodeId()), cpuPacket.getAddNodes().get(ii).getNodeData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendCvim()) {
|
|
|
- this.kafkaProducer.sendCvim(cpuPacket.getAddNodes().get(ii).getNodeId(), cpuPacket.getAddNodes().get(ii).getCvimData());
|
|
|
+ if (cpuPacket.getAddNodes() != null) {
|
|
|
+ // 연등지 인 경우
|
|
|
+ for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
+ if (nodeVo.isSendNode()) {
|
|
|
+ this.kafkaProducer.sendNode(Long.toString(cpuPacket.getAddNodes().get(ii).getNodeId()), cpuPacket.getAddNodes().get(ii).getNodeData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendCvim()) {
|
|
|
+ this.kafkaProducer.sendCvim(cpuPacket.getAddNodes().get(ii).getNodeId(), cpuPacket.getAddNodes().get(ii).getCvimData());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- packet.setEnd(System.nanoTime());
|
|
|
+ packet.setEnd(System.nanoTime());
|
|
|
|
|
|
- packet.setAvg(calcProcessTime(packet.getRcv()));
|
|
|
+ packet.setAvg(calcProcessTime(packet.getRcv()));
|
|
|
|
|
|
- // 로그큐로 전송한다.
|
|
|
- this.loggingService.add(packet, loggingIdx);//(int)Thread.currentThread().getId());
|
|
|
+ // 로그큐로 전송한다.
|
|
|
+ if (isLogging) {
|
|
|
+ this.loggingService.add(packet, loggingIdx);//(int)Thread.currentThread().getId());
|
|
|
+ }
|
|
|
|
|
|
- // 연등지 인 경우
|
|
|
- if (cpuPacket.getAddNodes() != null) {
|
|
|
- for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
- cpuPacket.getAddNodes().get(ii).setAdd(packet.getAdd());
|
|
|
- cpuPacket.getAddNodes().get(ii).setPop(packet.getPop());
|
|
|
- cpuPacket.getAddNodes().get(ii).setPar(packet.getPar());
|
|
|
- cpuPacket.getAddNodes().get(ii).setEnd(packet.getEnd());
|
|
|
- cpuPacket.getAddNodes().get(ii).setAvg(packet.getAvg());
|
|
|
- this.loggingService.add(cpuPacket.getAddNodes().get(ii), loggingIdx);
|
|
|
+ // 연등지 인 경우
|
|
|
+ if (cpuPacket.getAddNodes() != null) {
|
|
|
+ for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
+ cpuPacket.getAddNodes().get(ii).setAdd(packet.getAdd());
|
|
|
+ cpuPacket.getAddNodes().get(ii).setPop(packet.getPop());
|
|
|
+ cpuPacket.getAddNodes().get(ii).setPar(packet.getPar());
|
|
|
+ cpuPacket.getAddNodes().get(ii).setEnd(packet.getEnd());
|
|
|
+ cpuPacket.getAddNodes().get(ii).setAvg(packet.getAvg());
|
|
|
+ if (isLogging) {
|
|
|
+ this.loggingService.add(cpuPacket.getAddNodes().get(ii), loggingIdx);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ MDC.remove(logKey);
|
|
|
+ MDC.clear();
|
|
|
}
|
|
|
|
|
|
}
|