Browse Source

kafka send option add:ggtis-comm-server:sig-comm-server, sig-consumer update

shjung 9 months ago
parent
commit
6aa80ae1b1

+ 2 - 0
conf/ggits-comm-server.yml

@@ -11,3 +11,5 @@ application:
   kafka:
     bootstrap-servers: 192.168.11.23:9092
     group-id: ggits-comm-server
+    enable-node: false
+    enable-sig: true

+ 5 - 3
conf/sig-comm-server.yml

@@ -7,6 +7,8 @@ server:
 application:
   process-id: 81010
   comm-binding-port: 7900
-  relay-binding-port: 7901
-  relay-ip-address:
-    - 192.168.11.27
+  kafka:
+    bootstrap-servers: 192.168.11.23:9092
+    group-id: sig-comm-server
+    enable-node: false
+    enable-sig: true

+ 2 - 0
ggits-comm-server/conf/ggits-comm-server.yml

@@ -11,3 +11,5 @@ application:
   kafka:
     bootstrap-servers: 192.168.11.23:9092
     group-id: ggits-comm-server
+    enable-node: false
+    enable-sig: true

+ 21 - 0
ggits-comm-server/src/main/java/com/sig/ggits/comm/server/config/KafkaConfig.java

@@ -1,11 +1,17 @@
 package com.sig.ggits.comm.server.config;
 
+import com.sig.ggits.comm.server.GgitsCommServerApplication;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
+import org.yaml.snakeyaml.Yaml;
 
 import javax.annotation.PostConstruct;
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,4 +36,19 @@ public class KafkaConfig {
         log.info("{}", this);
     }
 
+    @SuppressWarnings("unchecked")
+    public void reload() {
+        String configFile = System.getProperty("user.dir") + File.separator + "conf" + File.separator + GgitsCommServerApplication.APPLICATION_NAME + ".yml";
+        Yaml yaml = new Yaml();
+        try (InputStream inputStream = Files.newInputStream(Paths.get(configFile))) {
+            Map<String, Object> obj = yaml.load(inputStream);
+            Map<String, Object> appConf = (Map<String, Object>) obj.get("application");
+            Map<String, Object> kafkaConf = (Map<String, Object>) appConf.get("kafka");
+            this.enableNode = (boolean) kafkaConf.get("enable-node");
+            this.enableSig = (boolean) kafkaConf.get("enable-sig");
+        }
+        catch (Exception e) {
+//            log.error("KafkaConfig.reload: Failed to load configuration file. {}", e.getMessage());
+        }
+    }
 }

+ 9 - 3
ggits-comm-server/src/main/java/com/sig/ggits/comm/server/process/kafka/KafkaDataProcess.java

@@ -1,6 +1,7 @@
 package com.sig.ggits.comm.server.process.kafka;
 
 import com.its.common.spring.SpringUtils;
+import com.sig.ggits.comm.server.config.KafkaConfig;
 import com.sig.ggits.comm.server.config.ThreadPoolInitializer;
 import com.sig.ggits.comm.server.dto.IntStatusDto;
 import com.sig.ggits.comm.server.dto.RegionCenter;
@@ -26,6 +27,7 @@ public class KafkaDataProcess {
     private final LinkedBlockingQueue<DbmsData> kafkaDataBlockingQueue = new LinkedBlockingQueue<>(2000);
     private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
 
+    private final KafkaConfig kafkaConfig;
     private final KafkaDataAsyncTask asyncTask;
     private final KafkaProducerService kafkaProducer;
 
@@ -64,7 +66,7 @@ public class KafkaDataProcess {
             if (DbmsData.DBMS_DATA_INT_STATUS_UPDATE == data.getType()) {
                 RegionCenter center = data.getCenter();
                 if (center == null) {
-                    log.error("RegionCenter is null");
+                    log.error("KafkaDataProcess.process: RegionCenter is null.");
                     return;
                 }
                 List<IntStatusDto> intStatusLists = (List<IntStatusDto>)data.getData();
@@ -81,10 +83,14 @@ public class KafkaDataProcess {
                     for (IntStatusDto status : intStatusLists) {
                         if (status.getNodeId() > 1000000000) {
                             buffer.put(status.getKafkaData());
-                            this.kafkaProducer.sendNode(Long.toString(status.getNodeId()), status.getKafkaData());
+                            if (this.kafkaConfig.isEnableNode()) {
+                                this.kafkaProducer.sendNode(Long.toString(status.getNodeId()), status.getKafkaData());
+                            }
                         }
                     }
-                    this.kafkaProducer.sendSig(center.getRegionCd(), buffer.array());
+                    if (this.kafkaConfig.isEnableSig()) {
+                        this.kafkaProducer.sendSig(center.getRegionCd(), buffer.array());
+                    }
                 }
 //                intStatusLists.forEach(status -> {
 //                    if (status.getNodeId() > 1000000000) {

+ 47 - 35
ggits-comm-server/src/main/java/com/sig/ggits/comm/server/scheduler/SigCommScheduler.java

@@ -1,6 +1,6 @@
 package com.sig.ggits.comm.server.scheduler;
 
-import com.its.common.utils.Elapsed;
+import com.sig.ggits.comm.server.config.KafkaConfig;
 import com.sig.ggits.comm.server.repository.ApplicationRepository;
 import com.sig.ggits.comm.server.service.UnitSystService;
 import lombok.RequiredArgsConstructor;
@@ -18,6 +18,7 @@ import javax.annotation.PreDestroy;
 @Component
 public class SigCommScheduler {
 
+    private final KafkaConfig kafkaConfig;
     private final UnitSystService unitSystService;
     private final ApplicationRepository applicationRepository;
 
@@ -25,6 +26,17 @@ public class SigCommScheduler {
     public void onShutDown() {
     }
 
+    @Async
+    @Scheduled(cron = "30 0/5 * * * *")  // 5분주기 작업 실행
+    public void reloadConfig() {
+        try {
+            this.kafkaConfig.reload();
+        }
+        catch(Exception e) {
+            log.error("ApplicationScheduler.reloadConfig: Exception {}", e.getMessage());
+        }
+    }
+
 //    @Async
 //    @Scheduled(cron = "* * * * * *")  // 1초 주기 작업 실행
 //    public void staticsForPacketSecond() {
@@ -40,9 +52,9 @@ public class SigCommScheduler {
 ////        log.info("{}", String.format("%25s: %s", "checkKafkaServerAlive", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
 //    }
 
-    @Async
-    @Scheduled(cron = "0/5 * * * * *")  // 5초 주기 작업 실행
-    public void checkSessionTimeout() {
+//    @Async
+//    @Scheduled(cron = "0/5 * * * * *")  // 5초 주기 작업 실행
+//    public void checkSessionTimeout() {
 //        if (!TsiAlarmManager.getInstance().checkAlarm(TsiAlarmConfigVo.COMM_02)) {
 //            return;
 //        }
@@ -57,47 +69,47 @@ public class SigCommScheduler {
 //        }
 //        TsiNodeManager.getInstance().checkSessionTimeout(timeout);
 //        log.info("{}", String.format("%25s: %s", "checkSessionTimeout", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
-    }
+//    }
 
-    @Async
-    @Scheduled(cron = "0/10 * * * * *")  // 10초 주기 작업 실행
-    public void reportNodeSessionAlive() {
+//    @Async
+//    @Scheduled(cron = "0/10 * * * * *")  // 10초 주기 작업 실행
+//    public void reportNodeSessionAlive() {
         //Elapsed elapsed = new Elapsed();
         //this.appRepositoryService.reportChannelSessions();
         //log.info("{}", String.format("%25s: %s", "reportNodeSessionAlive", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
-    }
+//    }
 
-    @Async
-    @Scheduled(cron = "0/30 * * * * *")  // 10초 주기 작업 실행
-    public void updateProcessState() {
+//    @Async
+//    @Scheduled(cron = "0/30 * * * * *")  // 10초 주기 작업 실행
+//    public void updateProcessState() {
 //        Elapsed elapsed = new Elapsed();
 //        this.unitSystService.updateUnitSystStts(true);
 //        log.info("{}", String.format("%25s: %s", "updateProcessState", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
-    }
+//    }
 
-    @Async
-    @Scheduled(cron = "0 * * * * *")  // 1분 주기 작업 실행
-    public void staticsForPacketMinute() {
-        Elapsed elapsed = new Elapsed();
-        //TsiTpmsManager.getInstance().resetMinute();
-        //log.info("{}", String.format("%25s: %s", "staticsForPacketMinute", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
-    }
+//    @Async
+//    @Scheduled(cron = "0 * * * * *")  // 1분 주기 작업 실행
+//    public void staticsForPacketMinute() {
+//        Elapsed elapsed = new Elapsed();
+//        //TsiTpmsManager.getInstance().resetMinute();
+//        //log.info("{}", String.format("%25s: %s", "staticsForPacketMinute", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
+//    }
 
-    @Async
-    @Scheduled(cron = "0 0 0 * * *")  // 1일 주기 작업 실행
-    public void staticsForPacketDay() {
-        Elapsed elapsed = new Elapsed();
-        //TsiTpmsManager.getInstance().resetDay();
-        //log.info("{}", String.format("%25s: %s", "staticsForPacketDay", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
-    }
+//    @Async
+//    @Scheduled(cron = "0 0 0 * * *")  // 1일 주기 작업 실행
+//    public void staticsForPacketDay() {
+//        Elapsed elapsed = new Elapsed();
+//        //TsiTpmsManager.getInstance().resetDay();
+//        //log.info("{}", String.format("%25s: %s", "staticsForPacketDay", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
+//    }
 
-    @Async
-    @Scheduled(cron = "0 * * * * *")  // 1분 주기 작업 실행
-    public void loadBaseDatabase() {
-        //Elapsed elapsed = new Elapsed();
-        //this.sigDatabaseService.loadDatabase();
-        //this.sigDatabaseService.updateProcessState(1);
-        //log.info("{}", String.format("%25s: %s", "loadBaseDatabase", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
-    }
+//    @Async
+//    @Scheduled(cron = "0 * * * * *")  // 1분 주기 작업 실행
+//    public void loadBaseDatabase() {
+//        //Elapsed elapsed = new Elapsed();
+//        //this.sigDatabaseService.loadDatabase();
+//        //this.sigDatabaseService.updateProcessState(1);
+//        //log.info("{}", String.format("%25s: %s", "loadBaseDatabase", TimeUtils.elapsedTimeStr(elapsed.nanoSeconds())));
+//    }
 
 }

+ 1 - 1
ggits-comm-server/src/main/resources/application.yml

@@ -46,7 +46,7 @@ application:
   kafka:
     bootstrap-servers: 192.168.11.23:9092
     group-id: ggits-comm-server
-    enable-node: true
+    enable-node: false
     enable-sig: true
     props:
     #  - request.timeout.ms: 100

+ 0 - 2
ggits-etlp-server/src/main/java/com/sig/ggits/etlp/server/config/ApplicationConfig.java

@@ -37,8 +37,6 @@ public class ApplicationConfig {
 
         log.info("[{}] -------------------------", this.getClass().getSimpleName());
         log.info("[{}]     pageCount: {}", this.getClass().getSimpleName(), this.pageCount);
-
-        reload();
     }
 
     @SuppressWarnings("unchecked")

+ 5 - 3
sig-comm-server/conf/sig-comm-server.yml

@@ -7,6 +7,8 @@ server:
 application:
   process-id: 81010
   comm-binding-port: 7900
-  relay-binding-port: 7901
-  relay-ip-address:
-    - 192.168.11.27
+  kafka:
+    bootstrap-servers: 192.168.11.23:9092
+    group-id: sig-comm-server
+    enable-node: false
+    enable-sig: true

+ 21 - 1
sig-comm-server/src/main/java/com/sig/comm/server/config/KafkaConfig.java

@@ -1,11 +1,17 @@
 package com.sig.comm.server.config;
 
+import com.sig.comm.server.SigCommServerApplication;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
+import org.yaml.snakeyaml.Yaml;
 
 import javax.annotation.PostConstruct;
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,8 +34,22 @@ public class KafkaConfig {
 
     @PostConstruct
     private void init() {
-
         log.info("{}", this);
     }
 
+    @SuppressWarnings("unchecked")
+    public void reload() {
+        String configFile = System.getProperty("user.dir") + File.separator + "conf" + File.separator + SigCommServerApplication.APPLICATION_NAME + ".yml";
+        Yaml yaml = new Yaml();
+        try (InputStream inputStream = Files.newInputStream(Paths.get(configFile))) {
+            Map<String, Object> obj = yaml.load(inputStream);
+            Map<String, Object> appConf = (Map<String, Object>) obj.get("application");
+            Map<String, Object> kafkaConf = (Map<String, Object>) appConf.get("kafka");
+            this.enableNode = (boolean) kafkaConf.get("enable-node");
+            this.enableSig = (boolean) kafkaConf.get("enable-sig");
+        }
+        catch (Exception e) {
+//            log.error("KafkaConfig.reload: Failed to load configuration file. {}", e.getMessage());
+        }
+    }
 }

+ 8 - 2
sig-comm-server/src/main/java/com/sig/comm/server/process/kafka/KafkaDataProcess.java

@@ -1,6 +1,7 @@
 package com.sig.comm.server.process.kafka;
 
 import com.its.common.spring.SpringUtils;
+import com.sig.comm.server.config.KafkaConfig;
 import com.sig.comm.server.config.ThreadPoolInitializer;
 import com.sig.comm.server.dto.IntStatusDto;
 import com.sig.comm.server.dto.RegionCenter;
@@ -26,6 +27,7 @@ public class KafkaDataProcess {
     private final LinkedBlockingQueue<DbmsData> kafkaDataBlockingQueue = new LinkedBlockingQueue<>(2000);
     private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
 
+    private final KafkaConfig kafkaConfig;
     private final KafkaDataAsyncTask asyncTask;
     private final KafkaProducerService kafkaProducer;
 
@@ -81,10 +83,14 @@ public class KafkaDataProcess {
                     for (IntStatusDto status : intStatusLists) {
                         if (status.getNodeId() > 1000000000) {
                             buffer.put(status.getKafkaData());
-                            this.kafkaProducer.sendNode(Long.toString(status.getNodeId()), status.getKafkaData());
+                            if (this.kafkaConfig.isEnableNode()) {
+                                this.kafkaProducer.sendNode(Long.toString(status.getNodeId()), status.getKafkaData());
+                            }
                         }
                     }
-                    this.kafkaProducer.sendSig(center.getRegionCd(), buffer.array());
+                    if (this.kafkaConfig.isEnableSig()) {
+                        this.kafkaProducer.sendSig(center.getRegionCd(), buffer.array());
+                    }
                 }
 //                intStatusLists.forEach(status -> {
 //                    if (status.getNodeId() > 1000000000) {

+ 10 - 8
sig-comm-server/src/main/java/com/sig/comm/server/scheduler/ApplicationScheduler.java

@@ -1,5 +1,6 @@
 package com.sig.comm.server.scheduler;
 
+import com.sig.comm.server.config.KafkaConfig;
 import com.sig.comm.server.config.TraceConfig;
 import com.sig.comm.server.repository.ApplicationRepository;
 import com.sig.comm.server.service.UnitSystService;
@@ -18,6 +19,7 @@ import javax.annotation.PreDestroy;
 @Component
 public class ApplicationScheduler {
 
+    private final KafkaConfig kafkaConfig;
     private final TraceConfig traceConfig;
     private final UnitSystService unitSystService;
     private final ApplicationRepository applicationRepository;
@@ -28,14 +30,14 @@ public class ApplicationScheduler {
     // 초(0-59) 분(0-59) 시간(0-23) 일(1-31) 월(1-12) 요일(0-6) (0: 일, 1: 월, 2:화, 3:수, 4:목, 5:금, 6:토)
 
     @Async
-    @Scheduled(cron = "0/10 * * * * *")  // 10초 주기 작업 실행
-    public void reportNodeSessionAlive() {
-//        try {
-//            this.applicationRepository.reportChannelSessions();
-//        }
-//        catch(Exception e) {
-//            log.error("ApplicationScheduler.updateProcessState: Exception {}", e.getMessage());
-//        }
+    @Scheduled(cron = "30 0/5 * * * *")  // 5분주기 작업 실행
+    public void reloadConfig() {
+        try {
+            this.kafkaConfig.reload();
+        }
+        catch(Exception e) {
+            log.error("ApplicationScheduler.reloadConfig: Exception {}", e.getMessage());
+        }
     }
 
 //    @ScheduleElapsed

+ 1 - 1
sig-comm-server/src/main/resources/application.yml

@@ -59,7 +59,7 @@ application:
   kafka:
     bootstrap-servers: 192.168.11.23:9092
     group-id: sig-comm-server
-    enable-node: true
+    enable-node: false
     enable-sig: true
     props:
     #  - request.timeout.ms: 100

+ 5 - 1
sig-consumer/src/main/java/com/sig/comm/consumer/SigCommConsumerApplication.java

@@ -26,10 +26,11 @@ public class SigCommConsumerApplication implements ApplicationRunner, Applicatio
     private static String topic = "node";
     private static String consumerGroup = "sig-consumer";
     private static String nodeId = "";
+    public static boolean showAll = false;
 
     private KafkaConsumerService kafkaConsumerService = null;
 
-    // java -jar sig-consumer.jar --servers=192.168.11.23:9092 --topic=node --group=sig-consumer1 --key=1348004425
+    // java -jar sig-consumer.jar --servers=192.168.11.23:9092 --topic=node --group=sig-consumer1 --key=2440322900
     public static void main(String[] args) {
         SpringApplication application = new SpringApplicationBuilder()
                 .sources(SigCommConsumerApplication.class)
@@ -75,6 +76,9 @@ public class SigCommConsumerApplication implements ApplicationRunner, Applicatio
                 else if ("group".equals(optionName)) {
                     consumerGroup = optionValue;
                 }
+                else if ("showall".equals(optionName)) {
+                    showAll = true;
+                }
             }
         }
 

+ 21 - 15
sig-consumer/src/main/java/com/sig/comm/consumer/kafka/KafkaConsumerWorker.java

@@ -2,6 +2,7 @@ package com.sig.comm.consumer.kafka;
 
 import com.its.common.utils.ByteUtils;
 import com.its.common.utils.SysUtils;
+import com.sig.comm.consumer.SigCommConsumerApplication;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -31,27 +32,32 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
                 return;
             }
 
+            if (SigCommConsumerApplication.showAll) {
+                log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}, Node {} EA.", this.topic, record.key(), record.value().length, this.keyValues, count);
+            }
+
             boolean find = false;
             for (int ii = 0; ii < count; ii++) {
                 idx = 2 + (ii * 31);
                 long nodeId = ((long) (buffer[idx] & 0xFF) << 24) | ((buffer[idx+1] & 0xFF) << 16) | ((buffer[idx+2] & 0xFF) << 8) | (buffer[idx+3] & 0xFF);
                 String nodeKey = String.valueOf(nodeId);
                 if (this.keyValues.contains(nodeKey)) {
-                    find = true;
-                    break;
-                }
-            }
-            if (find) {
-                log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {} EA.", this.topic, record.key(), record.value().length, count);
-                for (int ii = 0; ii < count; ii++) {
-                    idx = 2 + (ii * 31);
-                    long nodeId = ((long) (buffer[idx] & 0xFF) << 24) | ((buffer[idx+1] & 0xFF) << 16) | ((buffer[idx+2] & 0xFF) << 8) | (buffer[idx+3] & 0xFF);
-                    String nodeKey = String.valueOf(nodeId);
-                    if (this.keyValues.contains(nodeKey)) {
-                        displayPacket(record.key(), record.value(), idx, tail);
-                    }
+                    displayPacket(record.key(), record.value(), idx, tail);
+//                    find = true;
+//                    break;
                 }
             }
+//            if (find) {
+//                log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {} EA.", this.topic, record.key(), record.value().length, count);
+//                for (int ii = 0; ii < count; ii++) {
+//                    idx = 2 + (ii * 31);
+//                    long nodeId = ((long) (buffer[idx] & 0xFF) << 24) | ((buffer[idx+1] & 0xFF) << 16) | ((buffer[idx+2] & 0xFF) << 8) | (buffer[idx+3] & 0xFF);
+//                    String nodeKey = String.valueOf(nodeId);
+//                    if (this.keyValues.contains(nodeKey)) {
+//                        displayPacket(record.key(), record.value(), idx, tail);
+//                    }
+//                }
+//            }
         }
         else {
             if (!this.keyValues.contains(record.key())) {
@@ -126,8 +132,8 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
         blink         = (lcStts >> 1) & 0x01;
         ppc       = (lcStts     ) & 0x01;
 
-        log.info("             NodeId({}), Topic({})", nodeId, topicKey);
-        log.info("            Version({})", version);
+        log.info("-------------------------------------------------------------------------------------------");
+        log.info("             NodeId({}), Key({}), Version({})", nodeId, topicKey, version);
         log.info("          LocalTime({})", signalTime);
         log.info("         SystemTime({})", SysUtils.getSysTimeStr());
         log.info("               통신({}), (0:정상, 1:통신 FAIL)", centerComm);