shjung hai 10 meses
pai
achega
b012c9c8fd
Modificáronse 1 ficheiros con 38 adicións e 101 borrados
  1. 38 101
      src/main/java/com/sig/comm/consumer/kafka/KafkaConsumerWorker.java

+ 38 - 101
src/main/java/com/sig/comm/consumer/kafka/KafkaConsumerWorker.java

@@ -19,21 +19,46 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
 
     @Override
     public void onMessage(ConsumerRecord<String, byte[]> record) {
-//        log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
-//                this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
 
-        if (!this.keyValues.contains(record.key())) {
-            return;
-        }
-
-
-        log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
-                this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
-
-//        byte[] buffer = record.value();
         int idx = 0;
         int tail = 0;
-        displayPacket(record.key(), record.value(), idx, tail);
+        if ("sig-all".equals(topic)) {
+            byte[] buffer = record.value();
+            int count = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
+            if (((count * 31) + 2) != buffer.length) {
+                log.error("onMessage: Data length Error, Topic: {}, Key: {}, {} EA. {} Bytes.", this.topic, record.key(), count, buffer.length);
+                return;
+            }
+
+            boolean find = false;
+            for (int ii = 0; ii < count; ii++) {
+                idx = 2 + (ii * 31);
+                long nodeId = ((long) (buffer[idx] & 0xFF) << 24) | ((buffer[idx+1] & 0xFF) << 16) | ((buffer[idx+2] & 0xFF) << 8) | (buffer[idx+3] & 0xFF);
+                String nodeKey = String.valueOf(nodeId);
+                if (this.keyValues.contains(nodeKey)) {
+                    find = true;
+                    break;
+                }
+            }
+            if (find) {
+                log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {} EA.", this.topic, record.key(), record.value().length, count);
+                for (int ii = 0; ii < count; ii++) {
+                    idx = 2 + (ii * 31);
+                    long nodeId = ((long) (buffer[idx] & 0xFF) << 24) | ((buffer[idx+1] & 0xFF) << 16) | ((buffer[idx+2] & 0xFF) << 8) | (buffer[idx+3] & 0xFF);
+                    String nodeKey = String.valueOf(nodeId);
+                    if (this.keyValues.contains(nodeKey)) {
+                        displayPacket(record.key(), record.value(), idx, tail);
+                    }
+                }
+            }
+        }
+        else {
+            if (!this.keyValues.contains(record.key())) {
+                return;
+            }
+            log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}", this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
+            displayPacket(record.key(), record.value(), idx, tail);
+        }
     }
 
     public static void displayPacket(String topicKey, byte[] buffer, int start, int tail) {
@@ -146,95 +171,7 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
             log.info(" A Ring Phase Value({}/{}/{}/{}/{}/{}/{}/{})", a1, a2, a3, a4, a5, a6, a7, a8);
             log.info(" B Ring Phase Value({}/{}/{}/{}/{}/{}/{}/{})", b1, b2, b3, b4, b5, b6, b7, b8);
         }
-
-//        log.info("      STATUS: 전이({}), 감응({}), 소등({}), 점멸({}), 수동({}) - (1:전이중, 0:전이완료... 1:상태, 0:정상)",
-//                trans, sensing, turnOff, blink, manual);
-//        log.info("       ERROR: SCU통신({}), 센터통신({}), 모순({}) - (1: 이상, 0: 정상)",
-//                scuComm, centerComm, conflict);
-//        log.info("CycleCounter: {} sec.", counter);
-//        log.info("     Signals: {} EA, DataSplit({}) - (0:First/Middle, 1:Single/Last)", statusCount, splitFlag);
-//
-//        final int SIGNAL_STATUS_SIZE = 5;
-//        int remainLength = buffer.length - idx + tail;
-//        if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
-//            log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
-//            return;
-//        }
-//
-//        log.info("SEQ\t신호등정보\t방향\t시간정보신뢰성\t보행자   \t비보호신호\t신호등상태\t표출\t잔여\t방향코드");
-//        for (int ii = 0; ii < statusCount; ii++) {
-//            int dirInfo = buffer[idx++] & 0xFF;
-//            int sttsInfo = buffer[idx++] & 0xFF;
-//            int displayTm = buffer[idx++] & 0xFF;
-//            int remainTm = buffer[idx++] & 0xFF;
-//            int dirCode = buffer[idx++] & 0xFF;
-//
-//            int dirAdd     = ((dirInfo) & 0x0F);        //3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2)
-//            int lightsType = ((dirInfo >> 4) & 0x0F);   //7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7)
-//
-//            int lighting   = ((sttsInfo     ) & 0x07);  //2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6)
-//            int unprotect  = ((sttsInfo >> 3) & 0x01);  //3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1)
-//            int walkerPush = ((sttsInfo >> 6) & 0x01);  //6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1)
-//            int timeFlag   = ((sttsInfo >> 7) & 0x01);  //7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1)
-//
-//            String plightInfo;
-//            switch (lightsType)
-//            {
-//                case 0: plightInfo = "미지정(0)"; break;
-//                case 1: plightInfo = "직진(1)  "; break;
-//                case 2: plightInfo = "좌회전(2)"; break;
-//                case 3: plightInfo = "보행자(3)"; break;
-//                case 4: plightInfo = "자전거(4)"; break;
-//                case 5: plightInfo = "우회전(5)"; break;
-//                case 6: plightInfo = "버스(6)  "; break;
-//                case 7: plightInfo = "유턴(7)  "; break;
-//                default: plightInfo = "XXX(" + lightsType + ")"; break;
-//            }
-//            String ptimeFlag;
-//            switch (timeFlag)
-//            {
-//                case 0: ptimeFlag = "고정신호(0)"; break;
-//                case 1: ptimeFlag = "가변신호(1)"; break;
-//                default: ptimeFlag = "XXX(" + timeFlag + ")"; break;
-//            }
-//            String pwalkerPush;
-//            switch (walkerPush)
-//            {
-//                case 0: pwalkerPush = "없음(0)    "; break;
-//                case 1: pwalkerPush = "자동검지(1)"; break;
-//                default: pwalkerPush = "XXX(" + walkerPush + ")"; break;
-//            }
-//            String punprotect;
-//            switch (unprotect)
-//            {
-//                case 0: punprotect = "아님(0)  "; break;
-//                case 1: punprotect = "비보호(1)"; break;
-//                default: punprotect = "XXX(" + unprotect + ")"; break;
-//            }
-//            String plighting;
-//            switch (lighting)
-//            {
-//                case 0: plighting = "소등(0)    "; break;
-//                case 1: plighting = "적색점등(1)"; break;
-//                case 2: plighting = "황색점등(2)"; break;
-//                case 3: plighting = "녹색점등(3)"; break;
-//                case 4: plighting = "적색점멸(4)"; break;
-//                case 5: plighting = "황색점멸(5)"; break;
-//                case 6: plighting = "녹색점멸(6)"; break;
-//                default: plighting = "XXX(" + lighting + ")"; break;
-//            }
-//            log.info("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
-//                    String.format("%3d", ii),
-//                    plightInfo,
-//                    dirAdd,
-//                    ptimeFlag,
-//                    pwalkerPush,
-//                    punprotect,
-//                    plighting,
-//                    displayTm,
-//                    remainTm,
-//                    dirCode);
-//        }
+        log.info("");
     }
 
 }