|
@@ -39,8 +39,9 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
public void run() {
|
|
|
log.info("{} Start. QSIZE: {}", Thread.currentThread().getName(), this.qSize);
|
|
|
while (true) {
|
|
|
+ Object packet = null;
|
|
|
try {
|
|
|
- Object packet = this.DATA_QUEUE.take();
|
|
|
+ packet = this.DATA_QUEUE.take();
|
|
|
if (packet != null) {
|
|
|
process(packet);
|
|
|
}
|
|
@@ -49,7 +50,16 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e) {
|
|
|
- log.error("Exception: {}", e.getMessage());
|
|
|
+ if (packet != null) {
|
|
|
+ TsiCpuPacket cpuPacket = (TsiCpuPacket)((AbstractTsiPacket)packet);
|
|
|
+ TsiNodeVo nodeVo = (TsiNodeVo)cpuPacket.getObj();
|
|
|
+ if (nodeVo != null) {
|
|
|
+ log.warn(" Node: {}, Network Conn: {}", nodeVo.getKey(), nodeVo.isConnect());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.error("Exception(PacketWorker while): {}", e.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -66,8 +76,8 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
offer = this.DATA_QUEUE.offer(packet);
|
|
|
if (!offer) {
|
|
|
MDC.put("id", Long.toString(packet.getNodeId()));
|
|
|
- log.error("Packet Queue.offer: {}/{}, Queue Full: {} EA, {}, {}",
|
|
|
- packet.getNodeId(), this.DATA_QUEUE.size(), this.qSize, TimeUtils.elapsedTime(packet.getRcv()), Thread.currentThread().getName());
|
|
|
+ log.error("Packet Queue.offer: {}/{}, {}, Queue Full: {} EA, {}, {}",
|
|
|
+ packet.getNodeId(), this.DATA_QUEUE.size(), this.idx, this.qSize, TimeUtils.elapsedTime(packet.getRcv()), Thread.currentThread().getName());
|
|
|
MDC.clear();
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -82,6 +92,13 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
AbstractTsiPacket packet = (AbstractTsiPacket)object;
|
|
|
TsiCpuPacket cpuPacket = (TsiCpuPacket)packet;
|
|
|
TsiNodeVo nodeVo = (TsiNodeVo)cpuPacket.getObj();//TsiNodeManager.getInstance().get(packet.getNodeId());
|
|
|
+ if (nodeVo == null) {
|
|
|
+ // 노드 정보가 없는 경우
|
|
|
+ log.error("Packet process NodeVo is null: {}, {}", packet.getNodeId(), Thread.currentThread().getName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ int loggingIdx = nodeVo.getIdx();
|
|
|
|
|
|
long curr = System.nanoTime();
|
|
|
if (TimeUnit.MILLISECONDS.convert(curr - packet.getRcv(), TimeUnit.NANOSECONDS) > 3000) {
|
|
@@ -93,24 +110,22 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
if (packet.getOpCode() == (byte) eOpCode.TSI_CPU_DISCONNECTED.getValue()) {
|
|
|
TsiCpuDisconnected disconnected = (TsiCpuDisconnected)packet;
|
|
|
disconnected.parsing(nodeVo);
|
|
|
- if (nodeVo != null) {
|
|
|
- if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
|
|
|
- this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
|
|
|
- }
|
|
|
+ if (nodeVo.isSendCvim() && disconnected.getCvimData() != null) {
|
|
|
+ this.kafkaProducer.sendCvim(disconnected.getNodeId(), disconnected.getCvimData());
|
|
|
+ }
|
|
|
|
|
|
- if (disconnected.getAddNodes() != null) {
|
|
|
- // 연등지 인 경우
|
|
|
- for (int ii = 0; ii < disconnected.getAddNodes().size(); ii++) {
|
|
|
- this.kafkaProducer.sendCvim(disconnected.getAddNodes().get(ii).getNodeId(), disconnected.getAddNodes().get(ii).getCvimData());
|
|
|
- }
|
|
|
+ if (disconnected.getAddNodes() != null) {
|
|
|
+ // 연등지 인 경우
|
|
|
+ for (int ii = 0; ii < disconnected.getAddNodes().size(); ii++) {
|
|
|
+ this.kafkaProducer.sendCvim(disconnected.getAddNodes().get(ii).getNodeId(), disconnected.getAddNodes().get(ii).getCvimData());
|
|
|
}
|
|
|
- this.loggingService.add(packet, this.idx);//(int)Thread.currentThread().getId());
|
|
|
}
|
|
|
+ this.loggingService.add(packet, loggingIdx);//(int)Thread.currentThread().getId());
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 메모리에서 객체를 찾은 후 패킷 파싱
|
|
|
- // 20250425: packet parsing의 로그를 노드별 로그파일에 저장되도록 MDC 위치 변경
|
|
|
+ // 20250425: packet parsing 의 로그를 노드별 로그파일에 저장되도록 MDC 위치 변경
|
|
|
// 20250425: parsing 함수에 packet-check 여부를 같이 넘겨줘서 CRC 체크여부 확인
|
|
|
MDC.put("id", nodeVo.getKey());
|
|
|
if (!cpuPacket.parsing(nodeVo, this.cvimServerConfig.isCheckPacket())) {
|
|
@@ -123,44 +138,34 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
packet.setPar(System.nanoTime());
|
|
|
|
|
|
// 카프카 전송
|
|
|
- if (nodeVo != null) {
|
|
|
- if (nodeVo.isSendNode() && packet.getNodeData() != null) {
|
|
|
- this.kafkaProducer.sendNode(Long.toString(packet.getNodeId()), packet.getNodeData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendTest()) {
|
|
|
- this.kafkaProducer.sendTest(packet.getNodeId(), packet.getTestData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendCvim() && packet.getCvimData() != null) {
|
|
|
- this.kafkaProducer.sendCvim(packet.getNodeId(), packet.getCvimData());
|
|
|
- }
|
|
|
+ if (nodeVo.isSendNode() && packet.getNodeData() != null) {
|
|
|
+ this.kafkaProducer.sendNode(Long.toString(packet.getNodeId()), packet.getNodeData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendTest()) {
|
|
|
+ this.kafkaProducer.sendTest(packet.getNodeId(), packet.getTestData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendCvim() && packet.getCvimData() != null) {
|
|
|
+ this.kafkaProducer.sendCvim(packet.getNodeId(), packet.getCvimData());
|
|
|
+ }
|
|
|
|
|
|
- if (cpuPacket.getAddNodes() != null) {
|
|
|
- // 연등지 인 경우
|
|
|
- for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
- if (nodeVo.isSendNode()) {
|
|
|
- this.kafkaProducer.sendNode(Long.toString(cpuPacket.getAddNodes().get(ii).getNodeId()), cpuPacket.getAddNodes().get(ii).getNodeData());
|
|
|
- }
|
|
|
- if (nodeVo.isSendCvim()) {
|
|
|
- this.kafkaProducer.sendCvim(cpuPacket.getAddNodes().get(ii).getNodeId(), cpuPacket.getAddNodes().get(ii).getCvimData());
|
|
|
- }
|
|
|
+ if (cpuPacket.getAddNodes() != null) {
|
|
|
+ // 연등지 인 경우
|
|
|
+ for (int ii = 0; ii < cpuPacket.getAddNodes().size(); ii++) {
|
|
|
+ if (nodeVo.isSendNode()) {
|
|
|
+ this.kafkaProducer.sendNode(Long.toString(cpuPacket.getAddNodes().get(ii).getNodeId()), cpuPacket.getAddNodes().get(ii).getNodeData());
|
|
|
+ }
|
|
|
+ if (nodeVo.isSendCvim()) {
|
|
|
+ this.kafkaProducer.sendCvim(cpuPacket.getAddNodes().get(ii).getNodeId(), cpuPacket.getAddNodes().get(ii).getCvimData());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- else {
|
|
|
- // 여기 들어오면 안됌
|
|
|
- MDC.put("id", Long.toString(packet.getNodeId()));
|
|
|
- log.warn("Not found node object: {}", packet.getNodeId());
|
|
|
- MDC.clear();
|
|
|
- // kafka send to topic-for-ssd-test only
|
|
|
- this.kafkaProducer.sendTest(packet.getNodeId(), packet.getTestData());
|
|
|
- }
|
|
|
|
|
|
packet.setEnd(System.nanoTime());
|
|
|
|
|
|
packet.setAvg(calcProcessTime(packet.getRcv()));
|
|
|
|
|
|
// 로그큐로 전송한다.
|
|
|
- this.loggingService.add(packet, this.idx);//(int)Thread.currentThread().getId());
|
|
|
+ this.loggingService.add(packet, loggingIdx);//(int)Thread.currentThread().getId());
|
|
|
|
|
|
// 연등지 인 경우
|
|
|
if (cpuPacket.getAddNodes() != null) {
|
|
@@ -170,7 +175,7 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
|
|
|
cpuPacket.getAddNodes().get(ii).setPar(packet.getPar());
|
|
|
cpuPacket.getAddNodes().get(ii).setEnd(packet.getEnd());
|
|
|
cpuPacket.getAddNodes().get(ii).setAvg(packet.getAvg());
|
|
|
- this.loggingService.add(cpuPacket.getAddNodes().get(ii), this.idx);
|
|
|
+ this.loggingService.add(cpuPacket.getAddNodes().get(ii), loggingIdx);
|
|
|
}
|
|
|
}
|
|
|
|