shjung 10 月之前
父节点
当前提交
2f59c6d6cb

+ 4 - 29
src/main/java/com/sig/comm/server/config/KafkaConfig.java

@@ -1,15 +1,12 @@
 package com.sig.comm.server.config;
 
-import com.sig.app.common.xnet.NetUtils;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -19,16 +16,14 @@ import java.util.Map;
 @ConfigurationProperties(prefix = "application.kafka")
 public class KafkaConfig {
 
-    private String bootstrapServers;
-    private String groupId = "ggits-comm-server";
-    private String pingTopic = "ping-topic";
+    public static final String SIG_ALL_TOPIC = "sig-all";
 
-    private String consumerGroupId = "tsi-comm-server";
-    private String consumerAckConfig = "1";
+    private String bootstrapServers;
+    private String groupId = "sig-comm-server";
 
     private boolean multiConnect = false;
     private boolean enableNode = false;
-    private String nodeServers = "";
+    private boolean enableSig = false;
     public List<Map<String, String>> props = new ArrayList<Map<String, String>>();
 
     @PostConstruct
@@ -37,24 +32,4 @@ public class KafkaConfig {
         log.info("{}", this);
     }
 
-    public String getGroupId() {
-        return this.consumerGroupId + "-" + NetUtils.getHostName();
-    }
-
-    public Map<String, Object> getConsumerPropertiesMap() {
-        Map<String, Object> properties = new HashMap();
-        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, getGroupId());
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
-        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
-        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
-        properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
-        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongDeserializer.class);
-
-        return properties;
-    }
 }

+ 7 - 0
src/main/java/com/sig/comm/server/dto/IntDto.java

@@ -22,9 +22,12 @@ public class IntDto implements Serializable {
     private int intLampType;  /* 등화기유형(3:3색등화기, 4:4색등화기) */
     private int mainIntNo;    /* 교차로유형이 연등교차로인 경우 주교차로 번호 */
     private int groupNo;      /* 그룹번호 */
+    private long nodeId;
 
     private boolean debug;
 
+    private IntStatusDto status;
+
     public void update(IntDto dto) {
         this.intType = dto.getIntType();
         this.intLcType = dto.getIntLcType();
@@ -32,5 +35,9 @@ public class IntDto implements Serializable {
         this.mainIntNo = dto.getMainIntNo();
         this.groupNo = dto.getGroupNo();
         this.debug = dto.isDebug();
+        if (this.nodeId != dto.getNodeId()) {
+            this.nodeId = dto.getNodeId();
+            this.status = new IntStatusDto(this.nodeId);
+        }
     }
 }

+ 121 - 0
src/main/java/com/sig/comm/server/dto/IntStatusDto.java

@@ -0,0 +1,121 @@
+package com.sig.comm.server.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+//@Data
+//@Builder
+@Getter
+@ToString
+@NoArgsConstructor//(access = AccessLevel.PROTECTED)
+@AllArgsConstructor
+public class IntStatusDto implements Serializable {
+    public static final long serialVersionUID = 1L;
+
+    public static final int MAX_KAFKA_DATA_SIZE = 31;
+
+    private long nodeId;
+    private byte[] kafkaData;
+
+
+//노드 ID       제어기 번호(노드 ID)                                             4  BYTE
+//버전          버전정보                                                         1  BYTE
+//                1-운영현시정보 사용, 시차제 좌회전 여부, 주기 주기길이 사용,
+//                2-운영현시정보 사용안함, 교차로 연등, 주기 주기길이 사용안함
+//
+//시각      UTC Time(1970년 1월 1일 0시 0분 0초 이후부터 현재까지 시간-초)       4  BYTE
+//제어기 운영 상태                                                               1  BYTE
+//            7   센터와 통신 FAIL 상태  0 : 정상, 1 : 통신 FAIL
+//            6~4 현재 운영중인 맵 번호 ( 0 : 일반제, 1~5: 시차제, 6 : 전용맵)
+//            3   시차제 좌회전 여부(버전정보 1) 1: 수행중,  0: 수행 안함
+//                교차로 연등(버전정보 2) 1: 연등 교차로, 0: 일반교차로
+//            2~0 교통신호기 운영 모드
+//                0: SCU 고정주기 모드
+//                1: 감응하지 않는 OFFLINE 제어모드
+//                2: 감응되는 OFFLINE 제어모드
+//                4: 감응되는 온라인 제어모드
+//                5: 감응하지 않는 온라인 제어모드
+//A링 현시                                                                       1  BYTE
+//            7~5 RING A의 PHASE (0∼7)
+//            4~0 RING A의 STEP (0∼31)
+//B링 현시                                                                       1  BYTE
+//            7~5 RING B의 PHASE (0∼7)
+//            4~0 RING B의 STEP (0∼31)
+//제어기 상태                                                                    1  BYTE
+//            7   POLICE PANEL 수동진행S/W상태 (1: ON,  0: OFF)
+//            6   POLICE PANEL 수동 S/W 상태 (1: ON,  0: OFF)
+//            5   POLICE PANEL 점멸 S/W 상태 (1: ON,  0: OFF)
+//            4   POLICE PANEL 소등 S/W 상태 (1: ON,  0: OFF)
+//            3   모순 상태 (1: 모순, 0: 정상)
+//            2   소등 상태 (1: 소등, 0: 정상)
+//            1   점멸 상태 (1: 점멸, 0: 정상)
+//            0   PPC 제어 상태 (0: PPC Disable  1: PPC Enabled)
+//주기Count 주기 Count(초), 버전정보가 1인 경우 정보가 유효함                             1  BYTE
+//주기 길이 현재 주기의 길이(초), 버전정보가 1인 경우 정보가 유효함                       1  BYTE
+//운영 현시 운영현시(A Ring 8 BYTE + B Ring 8 BYTE), 버전정보가 1인 경우 정보가 유효함    16 BYTE
+
+
+    public IntStatusDto(long nodeId) {
+        this.nodeId = nodeId;
+        this.kafkaData = new byte[MAX_KAFKA_DATA_SIZE];
+
+        this.kafkaData[0] = (byte)((nodeId >> 24) & 0x000000FF);
+        this.kafkaData[1] = (byte)((nodeId >> 16) & 0x000000FF);
+        this.kafkaData[2] = (byte)((nodeId >> 8 ) & 0x000000FF);
+        this.kafkaData[3] = (byte)((nodeId      ) & 0x000000FF);
+        this.kafkaData[4] = (byte)0x01;
+
+        initStatus(0);
+    }
+
+    public void initStatus(long unixTimestamp) {
+        int idx = 5;
+        this.kafkaData[idx++] = (byte)((unixTimestamp >> 24) & 0x000000FF);
+        this.kafkaData[idx++] = (byte)((unixTimestamp >> 16) & 0x000000FF);
+        this.kafkaData[idx++] = (byte)((unixTimestamp >> 8 ) & 0x000000FF);
+        this.kafkaData[idx++] = (byte)((unixTimestamp      ) & 0x000000FF);
+
+        for (int ii = idx; ii < MAX_KAFKA_DATA_SIZE; ii++) {
+            this.kafkaData[ii] = (byte)0x00;
+        }
+    }
+
+    public void setOperStatus(int operStts) {
+        this.kafkaData[9] = (byte)(operStts & 0xFF);
+    }
+
+    public void setPhase(int ringA, int ringB) {
+        this.kafkaData[10] = (byte)(ringA & 0xFF);
+        this.kafkaData[11] = (byte)(ringB & 0xFF);
+    }
+    public void setStatus(int status) {
+        this.kafkaData[12] = (byte)(status & 0xFF);
+    }
+    public void setCycle(int count, int length) {
+        this.kafkaData[13] = (byte)(count & 0xFF);
+        this.kafkaData[14] = (byte)(length & 0xFF);
+    }
+    public void setPhaseVal(int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int b1, int b2, int b3, int b4, int b5, int b6, int b7, int b8) {
+        this.kafkaData[15] = (byte)(a1 & 0xFF);
+        this.kafkaData[16] = (byte)(a2 & 0xFF);
+        this.kafkaData[17] = (byte)(a3 & 0xFF);
+        this.kafkaData[18] = (byte)(a4 & 0xFF);
+        this.kafkaData[19] = (byte)(a5 & 0xFF);
+        this.kafkaData[20] = (byte)(a6 & 0xFF);
+        this.kafkaData[21] = (byte)(a7 & 0xFF);
+        this.kafkaData[22] = (byte)(a8 & 0xFF);
+
+        this.kafkaData[23] = (byte)(b1 & 0xFF);
+        this.kafkaData[24] = (byte)(b2 & 0xFF);
+        this.kafkaData[25] = (byte)(b3 & 0xFF);
+        this.kafkaData[26] = (byte)(b4 & 0xFF);
+        this.kafkaData[27] = (byte)(b5 & 0xFF);
+        this.kafkaData[28] = (byte)(b6 & 0xFF);
+        this.kafkaData[29] = (byte)(b7 & 0xFF);
+        this.kafkaData[30] = (byte)(b8 & 0xFF);
+    }
+}

+ 4 - 0
src/main/java/com/sig/comm/server/entity/TbInt.java

@@ -1,6 +1,7 @@
 package com.sig.comm.server.entity;
 
 import com.sig.comm.server.dto.IntDto;
+import com.sig.comm.server.dto.IntStatusDto;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -22,6 +23,7 @@ public class TbInt implements Serializable {
     private int intLampType;    /* 등화기유형(3:3색등화기, 4:4색등화기) */
     private int mainIntNo;      /* 교차로유형이 연등교차로인 경우 주교차로 번호 */
     private int groupNo;        /* 그룹번호 */
+    private long nodeId;
 
     public IntDto toDto() {
         return IntDto.builder()
@@ -33,7 +35,9 @@ public class TbInt implements Serializable {
                 .intLampType(this.intLampType)
                 .mainIntNo(this.mainIntNo)
                 .groupNo(this.groupNo)
+                .nodeId(this.nodeId)
                 .debug(false)
+                .status(new IntStatusDto(this.nodeId))
                 .build();
     }
 }

+ 0 - 86
src/main/java/com/sig/comm/server/kafka/KafkaConsumerService.java

@@ -1,86 +0,0 @@
-package com.sig.comm.server.kafka;
-
-import com.sig.comm.server.config.KafkaConfig;
-import com.sig.comm.server.process.dbms.DbmsDataProcess;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
-import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
-import org.springframework.kafka.listener.ContainerProperties;
-
-import javax.annotation.PostConstruct;
-import java.util.Collection;
-
-@Slf4j
-@RequiredArgsConstructor
-//@Service
-public class KafkaConsumerService {
-
-    private final KafkaConfig config;
-    private final DbmsDataProcess dbmsDataProcess;
-
-    private ConcurrentMessageListenerContainer<String, Long> kafkaListenerContainer;
-
-    @PostConstruct
-    void init() {
-        log.info("[{}] ------------------", this.getClass().getSimpleName());
-        start();
-    }
-
-    public void start() {
-
-        if (this.kafkaListenerContainer != null) {
-            if (!this.kafkaListenerContainer.isRunning()) {
-                log.warn("kafkaListenerContainer restart");
-                this.kafkaListenerContainer.start();
-            }
-            return;
-        }
-
-        ContainerProperties containerProperties = new ContainerProperties(this.config.getPingTopic());
-        containerProperties.setGroupId(this.config.getGroupId());
-        containerProperties.setPollTimeout(5000);
-        //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
-        containerProperties.setMessageListener(new TsiKafkaConsumerWorker(this.dbmsDataProcess));
-        containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
-            @Override
-            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-            }
-            @Override
-            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-            }
-            @Override
-            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
-                consumer.seekToEnd(partitions);
-            }
-        });
-
-        ConsumerFactory<String, Long> consumerFactory = new DefaultKafkaConsumerFactory<>(this.config.getConsumerPropertiesMap());
-        this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
-        this.kafkaListenerContainer.setBeanName("consumer");
-        this.kafkaListenerContainer.setConcurrency(1);
-        this.kafkaListenerContainer.setErrorHandler((thrownException, data) -> {
-            log.error("kafkaListenerContainer error: {}", thrownException.getMessage());
-            this.kafkaListenerContainer.stop();
-        });
-
-        this.kafkaListenerContainer.start();
-    }
-
-    public void shutdown() {
-        try {
-            //if (this.consumer != null) {
-            //    this.consumer.close();
-            //}
-            if (this.kafkaListenerContainer != null) {
-                this.kafkaListenerContainer.stop();
-            }
-        }
-        catch(Exception ignored) {
-        }
-    }
-}

+ 18 - 111
src/main/java/com/sig/comm/server/kafka/KafkaProducerService.java

@@ -1,24 +1,13 @@
 package com.sig.comm.server.kafka;
 
 import com.sig.app.common.kafka.KafkaProducerFactory;
-import com.sig.app.common.utils.TimeUtils;
 import com.sig.comm.server.config.KafkaConfig;
-import com.sig.comm.server.process.dbms.DbmsDataProcess;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
 
 import javax.annotation.PostConstruct;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @AllArgsConstructor
@@ -26,113 +15,53 @@ import java.util.concurrent.TimeUnit;
 public class KafkaProducerService {
 
     private final KafkaConfig config;
-    private final DbmsDataProcess dbmsDataProcess;
 
+    private KafkaTemplate<String, byte[]> sigProducer;
     private KafkaTemplate<String, byte[]> nodeProducer;
-    private KafkaTemplate<String, Long> pingProducer;
 
 
     @PostConstruct
     void init() {
         //this.callback = new ProducerResultCallback();
 
-        if (this.config.isMultiConnect()) {
-            // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용 한다.
-            if (this.config.isEnableNode()) {
-                this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.config.getNodeServers(), this.config.props);
-            }
+        // 동일한 KafkaTemplate 를 사용 한다.
+        KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
+        if (this.config.isEnableNode()) {
+            this.nodeProducer = producer;
         }
-        else {
-            // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
-            // 동일한 KafkaTemplate 를 사용 한다.
-            KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
-            if (this.config.isEnableNode()) {
-                this.nodeProducer = producer;
-            }
+        if (this.config.isEnableSig()) {
+            this.sigProducer = producer;
         }
 
-        createPingProducer();
-
         log.info("[{}] ------------------", this.getClass().getSimpleName());
-        log.info("[{}]   nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
-        log.info("[{}]   pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
+        log.info("[{}]   nodeProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableNode(), this.nodeProducer);
+        log.info("[{}]    sigProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableSig(), this.sigProducer);
 
         //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
     }
 
-    public void createPingProducer() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrapServers());
-        props.put(ProducerConfig.ACKS_CONFIG, this.config.getConsumerAckConfig());
-        props.put(ProducerConfig.RETRIES_CONFIG, 0);
-        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
-        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
-        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
-
-        this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
-        this.pingProducer.setDefaultTopic(this.config.getPingTopic());
-    }
     public void shutdown() {
         try {
             if (this.nodeProducer != null) {
                 this.nodeProducer.destroy();
             }
-            if (this.pingProducer != null) {
-                this.pingProducer.destroy();
+            if (this.sigProducer != null) {
+                this.sigProducer.destroy();
             }
         }
         catch(Exception e) {
             log.error("Failed to shutdown: {}", e.getMessage());
         }
     }
-    public void sendPing() {
-        if (this.pingProducer == null ) {
-            log.info("sendPing: pingProducer == null");
-            return;
-        }
-
-        long sendNanoTime = System.nanoTime();
-//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime);   // nano seconds
-//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0);                    // micro seconds
-//        TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0);                    // micro seconds
-
-        ListenableFuture<SendResult<String, Long>> future =  this.pingProducer.sendDefault("key", sendNanoTime);
-        future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
-
-            @Override
-            public void onSuccess(SendResult<String, Long> result) {
-                long recvNanoTime = System.nanoTime();
-                long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
-//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
-                log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
+    public void sendSig(String key, byte[] data) {
+        if (this.sigProducer != null) {
+            try {
+                this.sigProducer.send(KafkaConfig.SIG_ALL_TOPIC, key, data);
             }
-            @Override
-            public void onFailure(Throwable ex) {
-                long recvNanoTime = System.nanoTime();
-//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
-//                KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
-//                stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
-//                stat.setStatus(0);
-//                stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
-//                stat.setRecvTm(0);
-//                tsiCvimDbmsService.add(stat, (int)Thread.currentThread().getId());
-//                log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
-//
-//                // 카프카 전송 오류 알람 저장
-//                String value = "Send Failed";
-//                if (ex != null) {
-//                    value = ex.getMessage().substring(0, 99);
-//                }
-//                AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
-//                alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
-//                alarm.setAlarmTarget(producerConfig.getBootstrapServers());
-//                alarm.setAlarmValue(value);
-//                tsiCvimDbmsService.add(alarm, (int)Thread.currentThread().getId());
+            catch (Exception e) {
+                log.error("sendSig: {}, {}: {}", KafkaConfig.SIG_ALL_TOPIC, key, e.toString());
             }
-        });
+        }
     }
 
     public void sendNode(String key, byte[] data) {
@@ -145,26 +74,4 @@ public class KafkaProducerService {
             }
         }
     }
-
-    protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
-        try {
-            kafka.send(topic, key, data);
-        }
-        catch(Exception e) {
-            log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
-        }
-    }
-
-    private static class ProducerResultCallback implements Callback {
-        @Override
-        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-            if (e != null) {
-                log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
-            }
-            else {
-                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
-                System.out.println(message);
-            }
-        }
-    }
 }

+ 0 - 50
src/main/java/com/sig/comm/server/kafka/TsiKafkaConsumerWorker.java

@@ -1,50 +0,0 @@
-package com.sig.comm.server.kafka;
-
-import com.sig.app.common.utils.TimeUtils;
-import com.sig.comm.server.process.dbms.DbmsDataProcess;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.listener.MessageListener;
-import org.springframework.kafka.support.Acknowledgment;
-
-@Slf4j
-@AllArgsConstructor
-public class TsiKafkaConsumerWorker implements MessageListener<String, Long> {
-
-    private DbmsDataProcess dbmsDataProcess;
-
-    @Override
-    public void onMessage(ConsumerRecord<String, Long> record) {
-        Long sendNanoTime = record.value();
-        Long recvNanoTime = System.nanoTime();
-
-//        KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
-//        stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
-//        stat.setStatus(1);
-//        if (TsiTpmsManager.getInstance().getKafkaTransVo().getSendNanoTime() == sendNanoTime) {
-//            stat.setSendTm(TsiTpmsManager.getInstance().getKafkaTransVo().getSendTm());
-//            stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
-//        }
-//        else {
-//            stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
-//            stat.setSendTm(stat.getRecvTm());
-//            log.info("recv ping success, sendNanoTime miss match: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
-//        }
-//        dbmsDataProcess.add(stat, (int)Thread.currentThread().getId());
-//        log.info("recv ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
-    }
-
-    @Override
-    public void onMessage(ConsumerRecord<String, Long> record, Acknowledgment acknowledgment) {
-        try {
-            Long sendNanoTime = record.value();
-            Long recvNanoTime = System.nanoTime();
-            log.info("recv ping success, ack: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
-            //acknowledgment.acknowledge();
-        } catch (Exception e) {
-            log.error("onMessage:" + e.getMessage());
-        }
-    }
-
-}

+ 25 - 1
src/main/java/com/sig/comm/server/xnet/server/process/response/SigPhaseChange.java

@@ -1,5 +1,8 @@
 package com.sig.comm.server.xnet.server.process.response;
 
+import com.sig.comm.server.dto.IntDto;
+import com.sig.comm.server.dto.IntStatusDto;
+import com.sig.comm.server.kafka.KafkaProducerService;
 import com.sig.comm.server.process.dbms.DbmsData;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
 import com.sig.comm.server.xnet.server.process.protocol.SigProtocolConst;
@@ -13,9 +16,11 @@ import java.util.List;
 @Slf4j
 public class SigPhaseChange implements SigCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
+    private final KafkaProducerService kafkaProducer;
 
-    public SigPhaseChange(DbmsDataProcess dbmsDataProcess) {
+    public SigPhaseChange(DbmsDataProcess dbmsDataProcess, KafkaProducerService kafkaProducer) {
         this.dbmsDataProcess = dbmsDataProcess;
+        this.kafkaProducer = kafkaProducer;
     }
 
     @Override
@@ -49,6 +54,8 @@ public class SigPhaseChange implements SigCommResponse {
 //                byte phase_b[8];
 //            } pkt_phase_change, *pkt_phase_changep;
 
+            long unixTimestamp = System.currentTimeMillis() / 1000L; // 초 단위로 변환
+
             List<HashMap<String, Object>> lists = new ArrayList<>();
             String regionCd = packet.getCenter().getRegionCd();
             String commDt = packet.getPacket().getCommDate();
@@ -136,6 +143,23 @@ public class SigPhaseChange implements SigCommResponse {
                 param.put("GROUP_NO",                   groupNo);
 
                 lists.add(param);
+
+                IntDto intDto = packet.getCenter().getIntMap().get(intNo);
+                if (intDto != null && intDto.getNodeId() > 1000000000) {
+                    IntStatusDto intStatus = intDto.getStatus();
+                    if (intStatus != null) {
+                        intStatus.initStatus(unixTimestamp);
+                        intStatus.setOperStatus(mode);
+                        intStatus.setPhase(ringA, ringB);
+                        intStatus.setStatus(status);
+                        intStatus.setCycle(count, cycle);
+                        intStatus.setPhaseVal(phaseA1, phaseA2, phaseA3, phaseA4, phaseA5, phaseA6, phaseA7, phaseA8,
+                                phaseB1, phaseB2, phaseB3, phaseB4, phaseB5, phaseB6, phaseB7, phaseB8);
+
+                        this.kafkaProducer.sendNode(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
+                        this.kafkaProducer.sendSig(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
+                    }
+                }
             }
 
             this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INT_PHASE_CHANGE, packet.getCenter(), false, lists));

+ 22 - 1
src/main/java/com/sig/comm/server/xnet/server/process/response/SigPhaseCycle.java

@@ -1,5 +1,6 @@
 package com.sig.comm.server.xnet.server.process.response;
 
+import com.sig.comm.server.kafka.KafkaProducerService;
 import com.sig.comm.server.process.dbms.DbmsData;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
 import com.sig.comm.server.xnet.server.process.protocol.SigProtocolConst;
@@ -13,9 +14,11 @@ import java.util.List;
 @Slf4j
 public class SigPhaseCycle implements SigCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
+    private final KafkaProducerService kafkaProducer;
 
-    public SigPhaseCycle(DbmsDataProcess dbmsDataProcess) {
+    public SigPhaseCycle(DbmsDataProcess dbmsDataProcess, KafkaProducerService kafkaProducer) {
         this.dbmsDataProcess = dbmsDataProcess;
+        this.kafkaProducer = kafkaProducer;
     }
 
     @Override
@@ -46,6 +49,8 @@ public class SigPhaseCycle implements SigCommResponse {
 //                byte cycle  [1];
 //            } pkt_static_cycle, *pkt_static_cyclep;
 
+            long unixTimestamp = System.currentTimeMillis() / 1000L; // 초 단위로 변환
+
             List<HashMap<String, Object>> lists = new ArrayList<>();
             String regionCd = packet.getCenter().getRegionCd();
             String commDt = packet.getPacket().getCommDate();
@@ -99,6 +104,22 @@ public class SigPhaseCycle implements SigCommResponse {
                 param.put("PPC_CONTRL_FLAG",            ppcControl);
 
                 lists.add(param);
+
+//                IntDto intDto = packet.getCenter().getIntMap().get(intNo);
+//                if (intDto != null && intDto.getNodeId() > 1000000000) {
+//                    IntStatusDto intStatus = intDto.getStatus();
+//                    if (intStatus != null) {
+//                        intStatus.initStatus(unixTimestamp);
+//                        intStatus.setOperStatus(oprCd);
+//                        intStatus.setPhase(ringA, ringB);
+//                        intStatus.setStatus(status);
+//                        intStatus.setCycle(count, cycle);
+////                        intStatus.setPhaseVal(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+//
+//                        this.kafkaProducer.sendNode(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
+//                        this.kafkaProducer.sendSig(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
+//                    }
+//                }
             }
 
             this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INT_PHASE_CYCLE, packet.getCenter(), false, lists));

+ 5 - 4
src/main/java/com/sig/comm/server/xnet/server/process/work/DataPacketProcess.java

@@ -3,11 +3,11 @@ package com.sig.comm.server.xnet.server.process.work;
 import com.sig.comm.server.common.SpringUtils;
 import com.sig.comm.server.config.ThreadPoolInitializer;
 import com.sig.comm.server.dto.RegionCenter;
+import com.sig.comm.server.kafka.KafkaProducerService;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
 import com.sig.comm.server.repository.ApplicationRepository;
-import com.sig.comm.server.xnet.server.process.response.RecvPacketDto;
-import com.sig.comm.server.xnet.server.process.response.*;
 import com.sig.comm.server.xnet.server.process.protocol.eSigOpCode;
+import com.sig.comm.server.xnet.server.process.response.*;
 import io.netty.channel.Channel;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +28,7 @@ public class DataPacketProcess {
 
     private final DataPacketAsyncTask asyncTask;
     private final DbmsDataProcess dbmsDataProcess;
+    private final KafkaProducerService kafkaProducer;
 
     public void run() {
         log.info("DataPacketProcess.run: Start.");
@@ -66,10 +67,10 @@ public class DataPacketProcess {
                     response = new SigLogin(this.dbmsDataProcess);
                     break;
                 case SIG_PHASE_CHANGE:
-                    response = new SigPhaseChange(this.dbmsDataProcess);
+                    response = new SigPhaseChange(this.dbmsDataProcess, this.kafkaProducer);
                     break;
                 case SIG_STATIC_CYCLE:
-                    response = new SigPhaseCycle(this.dbmsDataProcess);
+                    response = new SigPhaseCycle(this.dbmsDataProcess, this.kafkaProducer);
                     break;
                 case SIG_EVENT_INFO:
                     response = new SigEventInfo(this.dbmsDataProcess);

+ 3 - 6
src/main/resources/application.yml

@@ -54,12 +54,9 @@ application:
 
   kafka:
     bootstrap-servers: 192.168.11.23:9092
-    group-id: ggits-comm-server
-    consumer-ack-config: 1
-    ping-topic: ping-topic
-    multi-connect: false
-    node-servers:
-    enable-node: false
+    group-id: sig-comm-server
+    enable-node: true
+    enable-sig: true
     props:
     #  - request.timeout.ms: 100
     #  - max.block.ms: 100

+ 2 - 1
src/main/resources/mybatis/mapper/IntMapper.xml

@@ -11,7 +11,8 @@
                NVL(A.INT_LCTYPE,   1) AS intLcType,
                NVL(A.INT_LAMPTYPE, 3) AS intLampType,
                NVL(A.MAIN_INTNO,   0) AS mainIntNo,
-               NVL(A.GROUP_NO,     0) AS groupNo
+               NVL(A.GROUP_NO,     0) AS groupNo,
+               NVL(A.NODE_ID,      0) AS nodeId
         FROM TB_INT A
         WHERE A.REGION_CD = #{regionCd}
         ]]>

+ 2 - 2
src/test/java/com/sig/comm/server/SigCommServerApplicationTests.java

@@ -57,7 +57,7 @@ public class SigCommServerApplicationTests {
                 .packet(sigCommPacket)
                 .build();
         SigSignalMap response = new SigSignalMap(null);
-        response.responseTEST(packet);
+//        response.responseTEST(packet);
     }
 
 
@@ -80,7 +80,7 @@ public class SigCommServerApplicationTests {
                 .packet(sigCommPacket)
                 .build();
         SigSignalMap response = new SigSignalMap(null);
-        response.responseTEST(packet);
+//        response.responseTEST(packet);
     }