shjung 10 months ago
parent
commit
cc34c7aff3

+ 4 - 4
src/main/java/com/sig/comm/consumer/SigCommConsumerApplication.java

@@ -29,7 +29,7 @@ public class SigCommConsumerApplication implements ApplicationRunner, Applicatio
 
     private KafkaConsumerService kafkaConsumerService = null;
 
-    // java -jar sig-consumer.jar --servers=192.168.11.23:9092 --topic=node --group=sig-consumer1 --key=4060000101,4060000112
+    // java -jar sig-consumer.jar --servers=192.168.11.23:9092 --topic=node --group=sig-consumer1 --key=1348004425
     public static void main(String[] args) {
         SpringApplication application = new SpringApplicationBuilder()
                 .sources(SigCommConsumerApplication.class)
@@ -93,7 +93,7 @@ public class SigCommConsumerApplication implements ApplicationRunner, Applicatio
             return;
         }
 
-        if (!"node".equals(topic)) {
+        if (!"node".equals(topic) && !"sig".equals(topic)) {
             usage();
             return;
         }
@@ -102,9 +102,9 @@ public class SigCommConsumerApplication implements ApplicationRunner, Applicatio
     }
 
     private void usage() {
-        log.info("\r\n\n\n\n\nUsage: java -jar sig-consumer --servers=xxx.xxx.xxx.xxx:nnnn --topic=node --group:sig-consumer --key=4060000101\n" +
+        log.info("\r\n\n\n\n\nUsage: java -jar sig-consumer --servers=xxx.xxx.xxx.xxx:nnnn --topic=node --group:sig-consumer --key=1348004425\n" +
                 "   --servers=kafka bootstrap server[optional, default=192.168.11.23:9092\n" +
-                "   --topic=topic name[optional, node, default=node]\n" +
+                "   --topic=topic name[optional, node/sig, default=node]\n" +
                 "   --group=kafka consumer group name[optional, default=sig-consumer]\n" +
                 "   --key=node id\n\n\n\n");
     }

+ 2 - 6
src/main/java/com/sig/comm/consumer/kafka/KafkaConsumerService.java

@@ -31,12 +31,8 @@ public class KafkaConsumerService {
 
         HashSet<String> topics = new HashSet<>();
         String consumerTopic = "node";
-        if ("test".equals(this.topic)) {
-            consumerTopic = "topic-for-ssd-test";
-            topics.add(consumerTopic);
-        }
-        else if ("cvim".equals(topic)) {
-            consumerTopic = "cvim-raw";
+        if ("sig".equals(this.topic)) {
+            consumerTopic = "sig-all";
             topics.add(consumerTopic);
         }
         else {

+ 10 - 6
src/main/java/com/sig/comm/consumer/kafka/KafkaConsumerWorker.java

@@ -19,14 +19,18 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
 
     @Override
     public void onMessage(ConsumerRecord<String, byte[]> record) {
+//        log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
+//                this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
+
         if (!this.keyValues.contains(record.key())) {
             return;
         }
 
+
         log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
                 this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
 
-        byte[] buffer = record.value();
+//        byte[] buffer = record.value();
         int idx = 0;
         int tail = 0;
         displayPacket(record.key(), record.value(), idx, tail);
@@ -36,8 +40,6 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
         int idx = start;
         long nodeId = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
 
-        log.info("NodeId: {}, {}", topicKey, nodeId);
-
         int version  = buffer[idx++];
         long localTime = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
 
@@ -73,9 +75,9 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
             int blink;              //1   점멸 상태,                      1 : 점멸, 0 : 정상
             int ppc;                //0   PPC Control,                    0: PPC Disable  1: PPC Enabled
 
+        operStts  = buffer[idx++] & 0xFF;
         aRingCode = buffer[idx++] & 0xFF;
         bRingCode = buffer[idx++] & 0xFF;
-        operStts  = buffer[idx++] & 0xFF;
         lcStts    = buffer[idx++] & 0xFF;
 
         aRingPhase = (aRingCode >> 5) & 0x07;   // 0x7 = 0000 0111
@@ -98,6 +100,7 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
         blink         = (lcStts >> 1) & 0x01;
         ppc       = (lcStts     ) & 0x01;
 
+        log.info("             NodeId({}), Topic({})", nodeId, topicKey);
         log.info("            Version({})", version);
         log.info("          LocalTime({})", signalTime);
         log.info("         SystemTime({})", SysUtils.getSysTimeStr());
@@ -106,9 +109,10 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
         log.info("  B Ring PHASE/STEP({}/{})", bRingPhase, bRingStep);
         log.info("            맵 번호({}), (0:일반제, 1~5:시차제, 6:전용맵)", operMapNo);
         log.info("          운영 모드({}), (0:SCU 고정주기 모드, 1:감응하지 않는 OFFLINE 제어모드, 2:감응되는 OFFLINE 제어모드, 4:감응되는 온라인 제어모드, 5:감응하지 않는 온라인 제어모드)", operMode);
-        log.info("             STATUS({}/{}/{}/{}/{}/{}/{}/{})", policeManProg, policeMan, policeBlink, policeTurnOff, contration, turnOff, blink, ppc);
+        log.info("             STATUS({}/{}/{}/{}/{}/{}/{}/{}), (Police ManualProg/Manual/Blink/Off, Cont/Off/Blink/PPC)",
+                policeManProg, policeMan, policeBlink, policeTurnOff, contration, turnOff, blink, ppc);
 
-//        POLICE PANEL수동진행S/W상태
+//        POLICE PANEL 수동진행S/W상태
 //        POLICE PANEL 수동 S/W 상태
 //        POLICE PANEL 점멸 S/W 상태
 //        POLICE PANEL 소등 S/W 상태