Browse Source

logging file add

shjung 9 months ago
parent
commit
2ecbfcb89e

+ 6 - 43
src/main/java/com/evp/comm/consumer/EvpCommConsumerApplication.java

@@ -13,7 +13,6 @@ import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextClosedEvent;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Set;
@@ -22,14 +21,12 @@ import java.util.Set;
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
 public class EvpCommConsumerApplication implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
 
-    private static String bootstrapServers = "192.168.11.23:9092";
-    private static String topic = "node";
+    private static String bootstrapServers = "172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094";
     private static String consumerGroup = "evp-consumer";
-    private static String nodeId = "";
 
     private KafkaConsumerService kafkaConsumerService = null;
 
-    // java -jar tsi-consumer.jar --servers=172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094 --topic=node --group=tsi-consumer1 --key=4060000101,4060000112
+    // java -jar evp-consumer.jar --servers=192.168.11.23:9092 --group=evp-consumer1
     public static void main(String[] args) {
         SpringApplication application = new SpringApplicationBuilder()
                 .sources(EvpCommConsumerApplication.class)
@@ -54,60 +51,26 @@ public class EvpCommConsumerApplication implements ApplicationRunner, Applicatio
             return;
         }
 
-        boolean nodeOptions = args.containsOption("key");
-        if (!nodeOptions) {
-            log.error("****** key options are required.");
-            usage();
-            return;
-        }
-
         for (String optionName : optionNames) {
             List<String> optionValues = args.getOptionValues(optionName);
             for (String optionValue : optionValues) {
-                if ("key".equals(optionName)) {
-                    nodeId = optionValue;
-                }
-                else if ("servers".equals(optionName)) {
+                if ("servers".equals(optionName)) {
                     bootstrapServers = optionValue;
                 }
-                else if ("topic".equals(optionName)) {
-                    topic = optionValue;
-                }
                 else if ("group".equals(optionName)) {
                     consumerGroup = optionValue;
                 }
             }
         }
 
-        List<String> keyValues = new ArrayList<>();
-        String[] nodeIds = nodeId.split(",");
-        for (int ii = 0; ii < nodeIds.length; ii++) {
-            String node = nodeIds[ii].trim();
-            if (node.isEmpty()) {
-                continue;
-            }
-            keyValues.add(node);
-        }
-        if (keyValues.isEmpty()) {
-            log.error("****** key options are required(all input data is blank).");
-            usage();
-            return;
-        }
-
-        if (!"node".equals(topic) && !"test".equals(topic) && !"cvim".equals(topic)) {
-            usage();
-            return;
-        }
-        kafkaConsumerService = new KafkaConsumerService(bootstrapServers, topic, consumerGroup, keyValues);
+        kafkaConsumerService = new KafkaConsumerService(bootstrapServers, consumerGroup);
         kafkaConsumerService.start();
     }
 
     private void usage() {
-        log.info("\r\n\n\n\n\nUsage: java -jar tsi-consumer --servers=xxx.xxx.xxx.xxx:nnnn --topic=node --group:tsi-consumer --key=4060000101\n" +
+        log.info("\r\n\n\n\n\nUsage: java -jar evp-consumer --servers=xxx.xxx.xxx.xxx:nnnn --group:evp-consumer\n" +
                 "   --servers=kafka bootstrap server[optional, default=172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094]\n" +
-                "   --topic=topic name[optional, node/test/cvim, default=node]\n" +
-                "   --group=kafka consumer group name[optional, default=tsi-consumer]\n" +
-                "   --key=node id\n\n\n\n");
+                "   --group=kafka consumer group name[optional, default=evp-consumer]\n\n\n\n");
     }
 
     @Override

+ 171 - 28
src/main/java/com/evp/comm/consumer/kafka/KafkaConsumerService.java

@@ -1,53 +1,63 @@
 package com.evp.comm.consumer.kafka;
 
+import com.evp.comm.consumer.kafka.dto.*;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
 import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 
 @Slf4j
 @RequiredArgsConstructor
 public class KafkaConsumerService {
 
+    private final String EVPS_SERVICE_TOPIC = "evps-service";
+    private final String EVPS_ROUTE_TOPIC   = "evps-route";
+    private final String EVPS_NODE_TOPIC    = "evps-node";
+    private final String EVPS_PHASE_TOPIC   = "evps-phase";
+    private final String EVPS_SIGNAL_TOPIC  = "evps-signal";
+    private final String EVPS_EVENT_TOPIC   = "evps-event";
+
     private final String bootstrapServers;
-    private final String topic;
     private final String groupId;
-    private final List<String> keyValues;
 
-    private ConcurrentMessageListenerContainer<String, byte[]> kafkaListenerContainer;
+    private ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer;
 
-    public void start() {
-        log.info("Starting Kafka Consumer: bootstrapServers: {}, topic: {}, group: {}, keys: {}", this.bootstrapServers, this.topic, this.groupId, this.keyValues);
+    private ConcurrentMessageListenerContainer<String, KafkaEvpsServiceDto> serviceListenerContainer;
+    private ConcurrentMessageListenerContainer<String, KafkaEvpsRouteDto> routeListenerContainer;
+    private ConcurrentMessageListenerContainer<String, KafkaEvpsNodeDto> nodeListenerContainer;
+    private ConcurrentMessageListenerContainer<String, KafkaEvpsPhaseDto> phaseListenerContainer;
+    private ConcurrentMessageListenerContainer<String, KafkaEvpsSignalDto> signalListenerContainer;
+    private ConcurrentMessageListenerContainer<String, KafkaEvpsEventDto> eventListenerContainer;
 
-        HashSet<String> keys = new HashSet<>(this.keyValues);
+    public void start() {
+        log.info("Starting Kafka Consumer: bootstrapServers: {}, group: {}", this.bootstrapServers, this.groupId);
 
         HashSet<String> topics = new HashSet<>();
-        String consumerTopic = "node";
-        if ("test".equals(this.topic)) {
-            consumerTopic = "topic-for-ssd-test";
-            topics.add(consumerTopic);
-        }
-        else if ("cvim".equals(topic)) {
-            consumerTopic = "cvim-raw";
-            topics.add(consumerTopic);
-        }
-        else {
-            topics = keys;
-        }
+        topics.add(EVPS_SERVICE_TOPIC);
+        topics.add(EVPS_ROUTE_TOPIC);
+        topics.add(EVPS_NODE_TOPIC);
+        topics.add(EVPS_PHASE_TOPIC);
+        topics.add(EVPS_SIGNAL_TOPIC);
+        topics.add(EVPS_EVENT_TOPIC);
 
         ContainerProperties containerProperties = new ContainerProperties(topics.toArray(new String[0]));
-        containerProperties.setGroupId(this.groupId);
+        containerProperties.setGroupId(this.groupId+"Z");
         containerProperties.setPollTimeout(5000);
         //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
-        containerProperties.setMessageListener(new TsiKafkaConsumerWorker(consumerTopic, keys));
+        containerProperties.setMessageListener(new KafkaConsumerWorker());
         containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
             @Override
             public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
@@ -60,8 +70,7 @@ public class KafkaConsumerService {
                 consumer.seekToEnd(partitions);
             }
         });
-
-        ConsumerFactory<String, byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
+        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerStringPropertiesMap());
         this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
         this.kafkaListenerContainer.setBeanName("consumer");
         this.kafkaListenerContainer.setConcurrency(1);
@@ -69,8 +78,109 @@ public class KafkaConsumerService {
             log.error("kafkaListenerContainer error: {}", thrownException.getMessage());
             this.kafkaListenerContainer.stop();
         });
-
         this.kafkaListenerContainer.start();
+
+
+        ContainerProperties serviceContainerProperties = new ContainerProperties(EVPS_SERVICE_TOPIC);
+        serviceContainerProperties.setGroupId(this.groupId);
+        serviceContainerProperties.setPollTimeout(5000);
+        serviceContainerProperties.setMessageListener(new KafkaEvpsServiceConsumerWorker());
+        serviceContainerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
+            @Override
+            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+            }
+            @Override
+            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+            }
+            @Override
+            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+                consumer.seekToEnd(partitions);
+            }
+        });
+        ConsumerFactory<String, KafkaEvpsServiceDto> serviceFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap(), new StringDeserializer(), new JsonDeserializer<>(KafkaEvpsServiceDto.class, false));
+        this.serviceListenerContainer = new ConcurrentMessageListenerContainer<>(serviceFactory, serviceContainerProperties);
+        this.serviceListenerContainer.setBeanName("serviceListenerContainer");
+        this.serviceListenerContainer.setConcurrency(1);
+        this.serviceListenerContainer.setErrorHandler((thrownException, data) -> {
+            log.error("serviceListenerContainer error: {}", thrownException.getMessage());
+            this.serviceListenerContainer.stop();
+        });
+        this.serviceListenerContainer.start();
+
+
+//        ContainerProperties routeContainerProperties = new ContainerProperties(EVPS_ROUTE_TOPIC);
+//        routeContainerProperties.setGroupId(this.groupId);
+//        routeContainerProperties.setPollTimeout(5000);
+//        routeContainerProperties.setMessageListener(new KafkaEvpsRouteConsumerWorker());
+//        ConsumerFactory<String, KafkaEvpsRouteDto> routeFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
+//        this.routeListenerContainer = new ConcurrentMessageListenerContainer<>(routeFactory, routeContainerProperties);
+//        this.routeListenerContainer.setBeanName("routeListenerContainer");
+//        this.routeListenerContainer.setConcurrency(1);
+//        this.routeListenerContainer.setErrorHandler((thrownException, data) -> {
+//            log.error("routeListenerContainer error: {}", thrownException.getMessage());
+//            this.routeListenerContainer.stop();
+//        });
+//        this.routeListenerContainer.start();
+//
+//
+        ContainerProperties nodeContainerProperties = new ContainerProperties(EVPS_NODE_TOPIC);
+        nodeContainerProperties.setGroupId(this.groupId);
+        nodeContainerProperties.setPollTimeout(5000);
+        nodeContainerProperties.setMessageListener(new KafkaEvpsNodeConsumerWorker());
+        ConsumerFactory<String, KafkaEvpsNodeDto> nodeFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
+        this.nodeListenerContainer = new ConcurrentMessageListenerContainer<>(nodeFactory, nodeContainerProperties);
+        this.nodeListenerContainer.setBeanName("nodeListenerContainer");
+        this.nodeListenerContainer.setConcurrency(1);
+        this.nodeListenerContainer.setErrorHandler((thrownException, data) -> {
+            log.error("nodeListenerContainer error: {}", thrownException.getMessage());
+            this.nodeListenerContainer.stop();
+        });
+        this.nodeListenerContainer.start();
+//
+//
+//        ContainerProperties phaseContainerProperties = new ContainerProperties(EVPS_PHASE_TOPIC);
+//        phaseContainerProperties.setGroupId(this.groupId);
+//        phaseContainerProperties.setPollTimeout(5000);
+//        phaseContainerProperties.setMessageListener(new KafkaEvpsPhaseConsumerWorker());
+//        ConsumerFactory<String, KafkaEvpsPhaseDto> phaseFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
+//        this.phaseListenerContainer = new ConcurrentMessageListenerContainer<>(phaseFactory, phaseContainerProperties);
+//        this.phaseListenerContainer.setBeanName("phaseListenerContainer");
+//        this.phaseListenerContainer.setConcurrency(1);
+//        this.phaseListenerContainer.setErrorHandler((thrownException, data) -> {
+//            log.error("phaseListenerContainer error: {}", thrownException.getMessage());
+//            this.phaseListenerContainer.stop();
+//        });
+//        this.phaseListenerContainer.start();
+//
+//
+        ContainerProperties signalContainerProperties = new ContainerProperties(EVPS_SIGNAL_TOPIC);
+        signalContainerProperties.setGroupId(this.groupId);
+        signalContainerProperties.setPollTimeout(5000);
+        signalContainerProperties.setMessageListener(new KafkaEvpsSignalConsumerWorker());
+        ConsumerFactory<String, KafkaEvpsSignalDto> signalFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
+        this.signalListenerContainer = new ConcurrentMessageListenerContainer<>(signalFactory, signalContainerProperties);
+        this.signalListenerContainer.setBeanName("signalListenerContainer");
+        this.signalListenerContainer.setConcurrency(1);
+        this.signalListenerContainer.setErrorHandler((thrownException, data) -> {
+            log.error("signalListenerContainer error: {}", thrownException.getMessage());
+            this.signalListenerContainer.stop();
+        });
+        this.signalListenerContainer.start();
+
+
+        ContainerProperties eventContainerProperties = new ContainerProperties(EVPS_EVENT_TOPIC);
+        eventContainerProperties.setGroupId(this.groupId);
+        eventContainerProperties.setPollTimeout(5000);
+        eventContainerProperties.setMessageListener(new KafkaEvpsEventConsumerWorker());
+        ConsumerFactory<String, KafkaEvpsEventDto> eventFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
+        this.eventListenerContainer = new ConcurrentMessageListenerContainer<>(eventFactory, eventContainerProperties);
+        this.eventListenerContainer.setBeanName("eventListenerContainer");
+        this.eventListenerContainer.setConcurrency(1);
+        this.eventListenerContainer.setErrorHandler((thrownException, data) -> {
+            log.error("eventListenerContainer error: {}", thrownException.getMessage());
+            this.eventListenerContainer.stop();
+        });
+        this.eventListenerContainer.start();
     }
 
     public Map<String, Object> getConsumerPropertiesMap() {
@@ -84,16 +194,49 @@ public class KafkaConsumerService {
         properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
         properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
         properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
 
         return properties;
     }
 
+    public Map<String, Object> getConsumerStringPropertiesMap() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
+        properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
+        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
+
+        return properties;
+    }
     public void shutdown() {
         try {
-            if (this.kafkaListenerContainer != null) {
-                this.kafkaListenerContainer.stop();
+            if (this.serviceListenerContainer != null) {
+                this.serviceListenerContainer.stop();
+            }
+            if (this.routeListenerContainer != null) {
+                this.routeListenerContainer.stop();
+            }
+            if (this.phaseListenerContainer != null) {
+                this.phaseListenerContainer.stop();
+            }
+            if (this.nodeListenerContainer != null) {
+                this.nodeListenerContainer.stop();
+            }
+            if (this.signalListenerContainer != null) {
+                this.signalListenerContainer.stop();
+            }
+            if (this.eventListenerContainer != null) {
+                this.eventListenerContainer.stop();
             }
         }
         catch(Exception ignored) {

+ 17 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaConsumerWorker.java

@@ -0,0 +1,17 @@
+package com.evp.comm.consumer.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaConsumerWorker implements MessageListener<String, String> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, String> record) {
+        log.info("onMessage: Key: {}, Data: {}", record.key(), record.value());
+    }
+
+}

+ 18 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaEvpsEventConsumerWorker.java

@@ -0,0 +1,18 @@
+package com.evp.comm.consumer.kafka;
+
+import com.evp.comm.consumer.kafka.dto.KafkaEvpsEventDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaEvpsEventConsumerWorker implements MessageListener<String, KafkaEvpsEventDto> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, KafkaEvpsEventDto> record) {
+        log.info("EvpsEvent: Key: {}, Data: {}", record.key(), record.value());
+    }
+
+}

+ 18 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaEvpsNodeConsumerWorker.java

@@ -0,0 +1,18 @@
+package com.evp.comm.consumer.kafka;
+
+import com.evp.comm.consumer.kafka.dto.KafkaEvpsNodeDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaEvpsNodeConsumerWorker implements MessageListener<String, KafkaEvpsNodeDto> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, KafkaEvpsNodeDto> record) {
+        log.info("EvpsNode: Key: {}, Data: {}", record.key(), record.value());
+    }
+
+}

+ 18 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaEvpsPhaseConsumerWorker.java

@@ -0,0 +1,18 @@
+package com.evp.comm.consumer.kafka;
+
+import com.evp.comm.consumer.kafka.dto.KafkaEvpsPhaseDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaEvpsPhaseConsumerWorker implements MessageListener<String, KafkaEvpsPhaseDto> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, KafkaEvpsPhaseDto> record) {
+        log.info("EvpsPhase: Key: {}, Data: {}", record.key(), record.value());
+    }
+
+}

+ 18 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaEvpsRouteConsumerWorker.java

@@ -0,0 +1,18 @@
+package com.evp.comm.consumer.kafka;
+
+import com.evp.comm.consumer.kafka.dto.KafkaEvpsRouteDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaEvpsRouteConsumerWorker implements MessageListener<String, KafkaEvpsRouteDto> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, KafkaEvpsRouteDto> record) {
+        log.info("EvpsRoute: Key: {}, Data: {}", record.key(), record.value());
+    }
+
+}

+ 18 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaEvpsServiceConsumerWorker.java

@@ -0,0 +1,18 @@
+package com.evp.comm.consumer.kafka;
+
+import com.evp.comm.consumer.kafka.dto.KafkaEvpsServiceDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaEvpsServiceConsumerWorker implements MessageListener<String, KafkaEvpsServiceDto> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, KafkaEvpsServiceDto> record) {
+        log.info("EvpsService: Key: {}, Data: {}", record.key(), record.value());
+    }
+
+}

+ 19 - 0
src/main/java/com/evp/comm/consumer/kafka/KafkaEvpsSignalConsumerWorker.java

@@ -0,0 +1,19 @@
+package com.evp.comm.consumer.kafka;
+
+import com.evp.comm.consumer.kafka.dto.KafkaEvpsSignalDto;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+
+@Slf4j
+@AllArgsConstructor
+public class KafkaEvpsSignalConsumerWorker implements MessageListener<String, KafkaEvpsSignalDto> {
+
+    @Override
+    public void onMessage(ConsumerRecord<String, KafkaEvpsSignalDto> record) {
+        log.info("EvpsSignal: Key: {}, Data: {}", record.key(), record.value());
+
+    }
+
+}

+ 0 - 381
src/main/java/com/evp/comm/consumer/kafka/TsiKafkaConsumerWorker.java

@@ -1,381 +0,0 @@
-package com.evp.comm.consumer.kafka;
-
-import com.evp.app.common.utils.SysUtils;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.listener.MessageListener;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
-
-@Slf4j
-@AllArgsConstructor
-public class TsiKafkaConsumerWorker implements MessageListener<String, byte[]> {
-
-    private final String topic;
-    private final HashSet<String> keyValues;
-
-    @Override
-    public void onMessage(ConsumerRecord<String, byte[]> record) {
-        if (!this.keyValues.contains(record.key())) {
-            return;
-        }
-
-        log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
-                this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
-
-        byte[] buffer = record.value();
-        int idx = 0;
-        int tail = 0;
-        if ("cvim-raw".equals(this.topic)) {
-            idx = 27 + 6;
-            tail = -2;
-        }
-        else if ("topic-for-ssd-test".equals(this.topic)) {
-            int stx1    = buffer[idx++];
-            int stx2    = buffer[idx++];
-            int length  = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-            int opCode  = buffer[idx++];
-            int version = buffer[idx++];
-            log.info("STX1: {}, STX2: {}, Length: {}, OpCode: {}, Version: {}, Idx: {}",
-                    stx1, stx2, length, opCode, version, idx);
-            //idx = 6;
-            tail = -2;
-        }
-        displayPacket(record.key(), record.value(), idx, tail);
-    }
-
-    public static void displayPacket(String topicKey, byte[] buffer, int start, int tail) {
-        int idx = start;
-//        int stx1    = buffer[idx++];
-//        int stx2    = buffer[idx++];
-//        int length  = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-//        int opCode  = buffer[idx++];
-//        int version = buffer[idx++];
-        long nodeId = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-//        log.info("STX1: {}, STX2: {}, LENTH: {}, OpCode: {}, Version: {}, NodeId: {}", stx1, stx2, length, opCode, version, nodeId);
-
-        log.info("NodeId: {}, {}", topicKey, nodeId);
-
-        int status  = buffer[idx++];
-        int error   = buffer[idx++];
-        int counter = buffer[idx++];                //주기 카운터, 초
-        int stts    = buffer[idx++];
-        long localTime = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-
-        int manual  = ((status     ) & 0x01);       //수동, 1: 이상, 0: 정상
-        int blink   = ((status >> 1) & 0x01);       //점멸, 1: 점멸, 0: 정상
-        int turnOff = ((status >> 2) & 0x01);       //소등, 1: 소등, 0: 정상
-        int sensing = ((status >> 3) & 0x01);       //감응, 1: 감응, 0: 정상
-        int trans   = ((status >> 4) & 0x01);       //전이, 1: 전이중, 0: 전이완료
-
-        int conflict   = ((error     ) & 0x01);     //모순 이상,      1 : 이상, 0 : 정상
-        int centerComm = ((error >> 1) & 0x01);     //센터 통신 이상, 1: 센터 통신이상, 0 : 정상
-        int scuComm    = ((error >> 2) & 0x01);     //SCU 통신 이상,  1: MCU <--> SCU 통신 이상, 0: 정상
-
-        int statusCount = ((stts)      & 0x7F);     //총 신호상태정보의 개수, N 개
-        int splitFlag   = ((stts >> 7) & 0x01);     //데이터를 분할하여 전송할 경우 마지막 정보임을 나타내는 플래그(분할 패킷의 처음과 중간:0, 단일 또는 마지막 패킷:1)
-
-        Date date = new java.util.Date(localTime * 1000L);
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        sdf.setTimeZone(java.util.TimeZone.getTimeZone("GMT+9"));
-        String signalTime = sdf.format(date);
-
-        log.info("   LocalTime: ({})", signalTime);
-        log.info("  SystemTime: ({})", SysUtils.getSysTimeStr());
-        log.info("      STATUS: 전이({}), 감응({}), 소등({}), 점멸({}), 수동({}) - (1:전이중, 0:전이완료... 1:상태, 0:정상)",
-                trans, sensing, turnOff, blink, manual);
-        log.info("       ERROR: SCU통신({}), 센터통신({}), 모순({}) - (1: 이상, 0: 정상)",
-                scuComm, centerComm, conflict);
-        log.info("CycleCounter: {} sec.", counter);
-        log.info("     Signals: {} EA, DataSplit({}) - (0:First/Middle, 1:Single/Last)", statusCount, splitFlag);
-
-//typedef struct _tsc_cvim_hdr_s
-//{
-//    uint8_t manual      : 1;    /* 수동, 1: 이상, 0: 정상 */
-//    uint8_t blink       : 1;    /* 점멸, 1: 점멸, 0: 정상 */
-//    uint8_t turnOff     : 1;    /* 소등, 1: 소등, 0: 정상 */
-//    uint8_t response    : 1;    /* 감응, 1: 감응, 0: 정상 */
-//    uint8_t trans       : 1;    /* 전이, 1: 전이중, 0: 전이완료 */
-//    uint8_t byte1Res0   : 3;    /* 예약, bit 7 ~ 5 */
-//
-//    uint8_t conflict    : 1;    /* 모순 이상, 1 : 이상, 0 : 정상 */
-//    uint8_t centerComm  : 1;    /* 센터 통신 이상, 1: 센터 통신이상, 0 : 정상 */
-//    uint8_t scuComm     : 1;    /* SCU 통신 이상,  1: MCU <--> SCU 통신 이상, 0: 정상 */
-//    uint8_t byte2Res0   : 5;    /* 예약, bit 7 ~ 3 */
-//
-//    uint8_t cycleCounter;       /* 주기 카운터, 초 */
-//
-//    uint8_t sttsCount   : 7;    /* 총 신호상태정보의 개수, N 개 */
-//    uint8_t splitFlag   : 1;    /* 데이터를 분할하여 전송할 경우 마지막 정보임을 나타내는 플래그(분할 패킷의 처음과 중간:0, 단일 또는 마지막 패킷:1) */
-//
-//    uint8_t currTime[4];        /* 현재시간, time_t형, Big Endian */
-//} tsc_cvim_hdr_t, *ptsc_cvim_hdr_t;
-
-        final int SIGNAL_STATUS_SIZE = 5;
-        int remainLength = buffer.length - idx + tail;
-        if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
-            log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
-            return;
-        }
-
-        log.info("SEQ\t신호등정보\t방향\t시간정보신뢰성\t보행자   \t비보호신호\t신호등상태\t표출\t잔여\t방향코드");
-        for (int ii = 0; ii < statusCount; ii++) {
-            int dirInfo = buffer[idx++] & 0xFF;
-            int sttsInfo = buffer[idx++] & 0xFF;
-            int displayTm = buffer[idx++] & 0xFF;
-            int remainTm = buffer[idx++] & 0xFF;
-            int dirCode = buffer[idx++] & 0xFF;
-
-            int dirAdd     = ((dirInfo) & 0x0F);        //3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2)
-            int lightsType = ((dirInfo >> 4) & 0x0F);   //7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7)
-
-            int lighting   = ((sttsInfo     ) & 0x07);  //2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6)
-            int unprotect  = ((sttsInfo >> 3) & 0x01);  //3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1)
-            int walkerPush = ((sttsInfo >> 6) & 0x01);  //6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1)
-            int timeFlag   = ((sttsInfo >> 7) & 0x01);  //7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1)
-
-            String plightInfo;
-            switch (lightsType)
-            {
-                case 0: plightInfo = "미지정(0)"; break;
-                case 1: plightInfo = "직진(1)  "; break;
-                case 2: plightInfo = "좌회전(2)"; break;
-                case 3: plightInfo = "보행자(3)"; break;
-                case 4: plightInfo = "자전거(4)"; break;
-                case 5: plightInfo = "우회전(5)"; break;
-                case 6: plightInfo = "버스(6)  "; break;
-                case 7: plightInfo = "유턴(7)  "; break;
-                default: plightInfo = "XXX(" + lightsType + ")"; break;
-            }
-            String ptimeFlag;
-            switch (timeFlag)
-            {
-                case 0: ptimeFlag = "고정신호(0)"; break;
-                case 1: ptimeFlag = "가변신호(1)"; break;
-                default: ptimeFlag = "XXX(" + timeFlag + ")"; break;
-            }
-            String pwalkerPush;
-            switch (walkerPush)
-            {
-                case 0: pwalkerPush = "없음(0)    "; break;
-                case 1: pwalkerPush = "자동검지(1)"; break;
-                default: pwalkerPush = "XXX(" + walkerPush + ")"; break;
-            }
-            String punprotect;
-            switch (unprotect)
-            {
-                case 0: punprotect = "아님(0)  "; break;
-                case 1: punprotect = "비보호(1)"; break;
-                default: punprotect = "XXX(" + unprotect + ")"; break;
-            }
-            String plighting;
-            switch (lighting)
-            {
-                case 0: plighting = "소등(0)    "; break;
-                case 1: plighting = "적색점등(1)"; break;
-                case 2: plighting = "황색점등(2)"; break;
-                case 3: plighting = "녹색점등(3)"; break;
-                case 4: plighting = "적색점멸(4)"; break;
-                case 5: plighting = "황색점멸(5)"; break;
-                case 6: plighting = "녹색점멸(6)"; break;
-                default: plighting = "XXX(" + lighting + ")"; break;
-            }
-            log.info("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
-                    String.format("%3d", ii),
-                    plightInfo,
-                    dirAdd,
-                    ptimeFlag,
-                    pwalkerPush,
-                    punprotect,
-                    plighting,
-                    displayTm,
-                    remainTm,
-                    dirCode);
-        }
-//        typedef struct _tsc_cvim_stts_s
-//        {
-//            uint8_t dirAdd      : 4;    /* 3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2) */
-//            uint8_t lightInfo   : 4;    /* 7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7) */
-//
-//            uint8_t lighting    : 3;    /* 2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6) */
-//            uint8_t unprotect   : 1;    /* 3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1) */
-//            uint8_t Reserved0   : 2;    /* 5 ~ 4, 예비, ■ 예비 */
-//            uint8_t walkerPush  : 1;    /* 6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1) */
-//            uint8_t timeFlag    : 1;    /* 7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1) */
-//
-//            uint8_t displayTm;          /* 표출 시간, ■ 초 */
-//            uint8_t remainTm;           /* 잔여 시간, ■ 초 */
-//            uint8_t dirCode;            /* 방향 코드, ■ 출력지정 테이블에 지정된 방향코드 */
-//        } tsc_cvim_stts_t, *ptsc_cvim_stts_t;
-
-    }
-
-//    public static void displayPacket(String topicKey, byte[] buffer, int start) {
-//        int idx = start;
-////        int stx1    = buffer[idx++];
-////        int stx2    = buffer[idx++];
-////        int length  = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-////        int opCode  = buffer[idx++];
-////        int version = buffer[idx++];
-//        long nodeId = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-////        log.info("STX1: {}, STX2: {}, LENTH: {}, OpCode: {}, Version: {}, NodeId: {}", stx1, stx2, length, opCode, version, nodeId);
-//
-//        log.info("NodeId: {}, {}", topicKey, nodeId);
-//
-//        int status  = buffer[idx++];
-//        int error   = buffer[idx++];
-//        int counter = buffer[idx++];                //주기 카운터, 초
-//        int stts    = buffer[idx++];
-//        long localTime = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-//
-//        int manual  = ((status     ) & 0x01);       //수동, 1: 이상, 0: 정상
-//        int blink   = ((status >> 1) & 0x01);       //점멸, 1: 점멸, 0: 정상
-//        int turnOff = ((status >> 2) & 0x01);       //소등, 1: 소등, 0: 정상
-//        int sensing = ((status >> 3) & 0x01);       //감응, 1: 감응, 0: 정상
-//        int trans   = ((status >> 4) & 0x01);       //전이, 1: 전이중, 0: 전이완료
-//
-//        int conflict   = ((error     ) & 0x01);     //모순 이상,      1 : 이상, 0 : 정상
-//        int centerComm = ((error >> 1) & 0x01);     //센터 통신 이상, 1: 센터 통신이상, 0 : 정상
-//        int scuComm    = ((error >> 2) & 0x01);     //SCU 통신 이상,  1: MCU <--> SCU 통신 이상, 0: 정상
-//
-//        int statusCount = ((stts)      & 0x7F);     //총 신호상태정보의 개수, N 개
-//        int splitFlag   = ((stts >> 7) & 0x01);     //데이터를 분할하여 전송할 경우 마지막 정보임을 나타내는 플래그(분할 패킷의 처음과 중간:0, 단일 또는 마지막 패킷:1)
-//
-//        Date date = new java.util.Date(localTime * 1000L);
-//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-//        sdf.setTimeZone(java.util.TimeZone.getTimeZone("GMT+9"));
-//        String signalTime = sdf.format(date);
-//
-//        log.info("   LocalTime: ({})", signalTime);
-//        log.info("  SystemTime: ({})", SysUtils.getSysTimeStr());
-//        log.info("      STATUS: 전이({}), 감응({}), 소등({}), 점멸({}), 수동({}) - (1:전이중, 0:전이완료... 1:상태, 0:정상)",
-//                trans, sensing, turnOff, blink, manual);
-//        log.info("       ERROR: SCU통신({}), 센터통신({}), 모순({}) - (1: 이상, 0: 정상)",
-//                scuComm, centerComm, conflict);
-//        log.info("CycleCounter: {} sec.", counter);
-//        log.info("     Signals: {} EA, DataSplit({}) - (0:First/Middle, 1:Single/Last)", statusCount, splitFlag);
-//
-////typedef struct _tsc_cvim_hdr_s
-////{
-////    uint8_t manual      : 1;    /* 수동, 1: 이상, 0: 정상 */
-////    uint8_t blink       : 1;    /* 점멸, 1: 점멸, 0: 정상 */
-////    uint8_t turnOff     : 1;    /* 소등, 1: 소등, 0: 정상 */
-////    uint8_t response    : 1;    /* 감응, 1: 감응, 0: 정상 */
-////    uint8_t trans       : 1;    /* 전이, 1: 전이중, 0: 전이완료 */
-////    uint8_t byte1Res0   : 3;    /* 예약, bit 7 ~ 5 */
-////
-////    uint8_t conflict    : 1;    /* 모순 이상, 1 : 이상, 0 : 정상 */
-////    uint8_t centerComm  : 1;    /* 센터 통신 이상, 1: 센터 통신이상, 0 : 정상 */
-////    uint8_t scuComm     : 1;    /* SCU 통신 이상,  1: MCU <--> SCU 통신 이상, 0: 정상 */
-////    uint8_t byte2Res0   : 5;    /* 예약, bit 7 ~ 3 */
-////
-////    uint8_t cycleCounter;       /* 주기 카운터, 초 */
-////
-////    uint8_t sttsCount   : 7;    /* 총 신호상태정보의 개수, N 개 */
-////    uint8_t splitFlag   : 1;    /* 데이터를 분할하여 전송할 경우 마지막 정보임을 나타내는 플래그(분할 패킷의 처음과 중간:0, 단일 또는 마지막 패킷:1) */
-////
-////    uint8_t currTime[4];        /* 현재시간, time_t형, Big Endian */
-////} tsc_cvim_hdr_t, *ptsc_cvim_hdr_t;
-//
-//        final int SIGNAL_STATUS_SIZE = 5;
-//        int remainLength = buffer.length - idx;
-//        if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
-//            log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
-//            return;
-//        }
-//
-//        log.info("SEQ\t신호등정보\t방향\t시간정보신뢰성\t보행자   \t비보호신호\t신호등상태\t표출\t잔여\t방향코드");
-//        for (int ii = 0; ii < statusCount; ii++) {
-//            int dirInfo = buffer[idx++] & 0xFF;
-//            int sttsInfo = buffer[idx++] & 0xFF;
-//            int displayTm = buffer[idx++] & 0xFF;
-//            int remainTm = buffer[idx++] & 0xFF;
-//            int dirCode = buffer[idx++] & 0xFF;
-//
-//            int dirAdd     = ((dirInfo) & 0x0F);        //3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2)
-//            int lightsType = ((dirInfo >> 4) & 0x0F);   //7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7)
-//
-//            int lighting   = ((sttsInfo     ) & 0x07);  //2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6)
-//            int unprotect  = ((sttsInfo >> 3) & 0x01);  //3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1)
-//            int walkerPush = ((sttsInfo >> 6) & 0x01);  //6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1)
-//            int timeFlag   = ((sttsInfo >> 7) & 0x01);  //7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1)
-//
-//            String plightInfo;
-//            switch (lightsType)
-//            {
-//                case 0: plightInfo = "미지정(0)"; break;
-//                case 1: plightInfo = "직진(1)  "; break;
-//                case 2: plightInfo = "좌회전(2)"; break;
-//                case 3: plightInfo = "보행자(3)"; break;
-//                case 4: plightInfo = "자전거(4)"; break;
-//                case 5: plightInfo = "우회전(5)"; break;
-//                case 6: plightInfo = "버스(6)  "; break;
-//                case 7: plightInfo = "유턴(7)  "; break;
-//                default: plightInfo = "XXX(" + lightsType + ")"; break;
-//            }
-//            String ptimeFlag;
-//            switch (timeFlag)
-//            {
-//                case 0: ptimeFlag = "고정신호(0)"; break;
-//                case 1: ptimeFlag = "가변신호(1)"; break;
-//                default: ptimeFlag = "XXX(" + timeFlag + ")"; break;
-//            }
-//            String pwalkerPush;
-//            switch (walkerPush)
-//            {
-//                case 0: pwalkerPush = "없음(0)    "; break;
-//                case 1: pwalkerPush = "자동검지(1)"; break;
-//                default: pwalkerPush = "XXX(" + walkerPush + ")"; break;
-//            }
-//            String punprotect;
-//            switch (unprotect)
-//            {
-//                case 0: punprotect = "아님(0)  "; break;
-//                case 1: punprotect = "비보호(1)"; break;
-//                default: punprotect = "XXX(" + unprotect + ")"; break;
-//            }
-//            String plighting;
-//            switch (lighting)
-//            {
-//                case 0: plighting = "소등(0)    "; break;
-//                case 1: plighting = "적색점등(1)"; break;
-//                case 2: plighting = "황색점등(2)"; break;
-//                case 3: plighting = "녹색점등(3)"; break;
-//                case 4: plighting = "적색점멸(4)"; break;
-//                case 5: plighting = "황색점멸(5)"; break;
-//                case 6: plighting = "녹색점멸(6)"; break;
-//                default: plighting = "XXX(" + lighting + ")"; break;
-//            }
-//            log.info("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
-//                    String.format("%3d", ii),
-//                    plightInfo,
-//                    dirAdd,
-//                    ptimeFlag,
-//                    pwalkerPush,
-//                    punprotect,
-//                    plighting,
-//                    displayTm,
-//                    remainTm,
-//                    dirCode);
-//        }
-////        typedef struct _tsc_cvim_stts_s
-////        {
-////            uint8_t dirAdd      : 4;    /* 3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2) */
-////            uint8_t lightInfo   : 4;    /* 7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7) */
-////
-////            uint8_t lighting    : 3;    /* 2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6) */
-////            uint8_t unprotect   : 1;    /* 3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1) */
-////            uint8_t Reserved0   : 2;    /* 5 ~ 4, 예비, ■ 예비 */
-////            uint8_t walkerPush  : 1;    /* 6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1) */
-////            uint8_t timeFlag    : 1;    /* 7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1) */
-////
-////            uint8_t displayTm;          /* 표출 시간, ■ 초 */
-////            uint8_t remainTm;           /* 잔여 시간, ■ 초 */
-////            uint8_t dirCode;            /* 방향 코드, ■ 출력지정 테이블에 지정된 방향코드 */
-////        } tsc_cvim_stts_t, *ptsc_cvim_stts_t;
-//
-//    }
-}

+ 8 - 17
src/main/java/com/evp/comm/consumer/kafka/dto/KafkaEvpsEventDto.java

@@ -1,22 +1,27 @@
 package com.evp.comm.consumer.kafka.dto;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.util.Date;
-
 /**
 * 긴급차량 이벤트 정보
 */
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsEventDto {
+
+    public static final int EVPS_EVENT_SERVICE_START = 0;
+    public static final int EVPS_EVENT_VEHICLE_MOVE = 1;
+    public static final int EVPS_EVENT_SERVICE_END = 2;
+
     /**
     * 수집시각
     */
-    private Date clctDt;
+    private String clctDt;
     /**
     * 긴급차량 서비스 ID
     */
@@ -45,18 +50,4 @@ public class KafkaEvpsEventDto {
     * 목적지 남은거리(m)
     */
     private Integer remDist;
-
-    public KafkaEvpsEventDto(String serviceId, int eventCd) {
-        this.serviceId = serviceId;
-        this.eventCd = eventCd;
-        init();
-    }
-
-    private void init() {
-        this.curLat = 0.;
-        this.curLng = 0.;
-        this.curSpd = 0;
-        this.remDist = 0;
-        this.evNo = "";
-    }
 }

+ 70 - 5
src/main/java/com/evp/comm/consumer/kafka/dto/KafkaEvpsNodeDto.java

@@ -1,9 +1,6 @@
 package com.evp.comm.consumer.kafka.dto;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -12,6 +9,7 @@ import java.util.List;
 * 긴급차량 서비스 교차로 정보
 */
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsNodeDto {
@@ -22,6 +20,7 @@ public class KafkaEvpsNodeDto {
     private List<EvpsNodeInfo> nodeList = new ArrayList<>();
 
     @Getter
+    @Builder
     @NoArgsConstructor
     @AllArgsConstructor
     public static class EvpsNodeInfo {
@@ -32,7 +31,7 @@ public class KafkaEvpsNodeDto {
         /**
          * 교차로 ID
          */
-        private Integer nodeId;
+        private Long nodeId;
         /**
          * 교차로명
          */
@@ -45,6 +44,72 @@ public class KafkaEvpsNodeDto {
          * 위치 경로
          */
         private double lng;
+
+        private List<EvpsPhaseInfo> phaseList = new ArrayList<>();
     }
 
+    @Getter
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class EvpsPhaseInfo {
+
+        /**
+         * 교차로 순서(1,...,N)
+         */
+        private Integer seqNo;
+        /**
+         * 교차로 ID
+         */
+        private Long nodeId;
+        /**
+         * 링번호(1:A링, 2:B링)
+         */
+        private Integer ring;
+        /**
+         * 현시번호(1~8)
+         */
+        private Integer phaseNo;
+        /**
+         * 맵 번호(0:일반제, 1~5:시차제, 6:전용맵)
+         */
+        private Integer planClass;
+        /**
+         * 이동류번호(1-16, 17, 18)
+         */
+        private Integer flowNo;
+        /**
+         * 시작점 위치 위도
+         */
+        private Double headLat;
+        /**
+         * 시작점 위치 경로
+         */
+        private Double headLng;
+        /**
+         * 중심점 위치 위도
+         */
+        private Double midLat;
+        /**
+         * 중심점 위치 경로
+         */
+        private Double midLng;
+        /**
+         * 끝점 위치 위도
+         */
+        private Double endLat;
+        /**
+         * 끝점 위치 경로
+         */
+        private Double endLng;
+        /**
+         * 시작점 각도
+         */
+        private Integer headAngle;
+        /**
+         * 종점 각도
+         */
+        private Integer endAngle;
+
+    }
 }

+ 4 - 5
src/main/java/com/evp/comm/consumer/kafka/dto/KafkaEvpsPhaseDto.java

@@ -1,9 +1,6 @@
 package com.evp.comm.consumer.kafka.dto;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -12,6 +9,7 @@ import java.util.List;
 * 긴급차량 서비스 교차로 현시 정보
 */
 @Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsPhaseDto {
@@ -23,6 +21,7 @@ public class KafkaEvpsPhaseDto {
     private List<EvpsPhaseInfo> phaseList = new ArrayList<>();
 
     @Getter
+    @Builder
     @NoArgsConstructor
     @AllArgsConstructor
     public static class EvpsPhaseInfo {
@@ -34,7 +33,7 @@ public class KafkaEvpsPhaseDto {
         /**
          * 교차로 ID
          */
-        private Integer nodeId;
+        private Long nodeId;
         /**
          * 링번호(1:A링, 2:B링)
          */

+ 30 - 3
src/main/java/com/evp/comm/consumer/kafka/dto/KafkaEvpsServiceDto.java

@@ -6,7 +6,6 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 
 /**
@@ -18,6 +17,8 @@ import java.util.List;
 @AllArgsConstructor
 public class KafkaEvpsServiceDto {
 
+    public static final int SERVICE_START = 1;
+
     /**
     * 긴급차량 서비스 ID
     */
@@ -25,7 +26,7 @@ public class KafkaEvpsServiceDto {
     /**
     * 수집시각
     */
-    private Date clctDt;
+    private String clctDt;
     /**
     * 긴급차량 번호
     */
@@ -70,11 +71,37 @@ public class KafkaEvpsServiceDto {
      * 거리(단위:m)
      */
     private Integer serviceDist;
+
+    /**
+     * 현재 차량 속도(DTO 에서만 사용할 변수)
+     */
+    private Integer curSpd;
+
     /**
     * 서비스 상태 코드(1:진행중-서비스 진행중,2:정상종료-모든 교차로 제어 및 해제 완료,3:취소-아직 통과하지 않은 교차로 존재,4:센터강제종료-운영자가 서비스를 강제로 종료,5:비정상종료-서비스가 존재하지 않음,6:서비스시작실패-제어대상교차로가 없음,7:비정상종료-앱서버에 에러 발생,8:비정상종료-일정시간 앱에서 위치 및 속도 정보가 오지 않는 경우,9:자동종료-경로이탈,10:자동종료-경로진입 가능시간 초과,11:자동종료-정차가능시간 초과,12:취소-모든 교차로 제어및 해제 완료,13:실패-서비스 제어 요청 실패,14:실패-서비스 가능 교차로가 존재하지 않음,15:자동종료-위치정보 수신 가능 시간 초과)
     */
     private Integer statusCd;
 
-    private List<KafkaEvpsServiceRouteInfo> routeList = new ArrayList<>();
+    private List<KafkaEvpsRouteInfo> routeList = new ArrayList<>();
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class KafkaEvpsRouteInfo {
+
+        /**
+         * 경로 순서(1,...,N)
+         */
+        private Integer seqNo;
+        /**
+         * 위치 위도
+         */
+        private double lat;
+        /**
+         * 위치 경로
+         */
+        private double lng;
+    }
 
 }

+ 27 - 0
src/main/java/com/evp/comm/consumer/kafka/dto/KafkaEvpsServiceEndDto.java

@@ -0,0 +1,27 @@
+package com.evp.comm.consumer.kafka.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+* 긴급차량 서비스 종료 정보
+*/
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaEvpsServiceEndDto {
+
+    /**
+    * 긴급차량 서비스 ID
+    */
+    private String serviceId;
+
+    /**
+    * 서비스 상태 코드(1:진행중-서비스 진행중,2:정상종료-모든 교차로 제어 및 해제 완료,3:취소-아직 통과하지 않은 교차로 존재,4:센터강제종료-운영자가 서비스를 강제로 종료,5:비정상종료-서비스가 존재하지 않음,6:서비스시작실패-제어대상교차로가 없음,7:비정상종료-앱서버에 에러 발생,8:비정상종료-일정시간 앱에서 위치 및 속도 정보가 오지 않는 경우,9:자동종료-경로이탈,10:자동종료-경로진입 가능시간 초과,11:자동종료-정차가능시간 초과,12:취소-모든 교차로 제어및 해제 완료,13:실패-서비스 제어 요청 실패,14:실패-서비스 가능 교차로가 존재하지 않음,15:자동종료-위치정보 수신 가능 시간 초과)
+    */
+    private Integer reason;
+
+}

+ 26 - 3
src/main/resources/logback-spring.xml

@@ -2,22 +2,45 @@
 <configuration scan="true" scanPeriod="60 seconds">
     <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
-    <property name="PROJECT_NAME"    value="tsi-consumer"/>
+    <property name="PROJECT_NAME"    value="evp-consumer"/>
     <property name="ROOT_LOG_LEVEL"  value="DEBUG"/>
     <property name="LOG_CHARSET"     value="UTF-8" />
+    <property name="LOG_PATH"        value="${user.home}/logs/evp-comm-server/"/>
+    <property name="LOG_BACKUP_PATH" value="${user.home}/logs/evp-comm-server/backup/"/>
 
+    <property name="LOG_FILE_NAME"       value="${PROJECT_NAME}.log"/>
+    <property name="LOG_FILE_NAME_ERROR"   value="${PROJECT_NAME}.err.log"/>
+    <property name="LOG_FILE_NAME_BACKUP"  value="%d{yyyyMMdd}_%i.log.gz"/>
+    <property name="LOG_PATTERN_FILE"    value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
     <property name="LOG_PATTERN_CONSOLE" value="[%d{HH:mm:ss.SSS}] %highlight([%5level]): %cyan(%msg) %n"/>
+    <property name="MAX_FILESIZE" value="10MB"/>
+    <property name="MAX_HISTORY"  value="10"/>
+
+    <appender name="FILE_LOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${LOG_PATH}${LOG_FILE_NAME}</file>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <charset>${LOG_CHARSET}</charset>
+            <pattern>${LOG_PATTERN_FILE}</pattern>
+        </encoder>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_BACKUP_PATH}${LOG_FILE_NAME}.${LOG_FILE_NAME_BACKUP}</fileNamePattern>
+            <maxFileSize>${MAX_FILESIZE}</maxFileSize>
+            <maxHistory>${MAX_HISTORY}</maxHistory>
+        </rollingPolicy>
+    </appender>
 
     <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
         <withJansi>true</withJansi>
         <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
-<!--            <charset>${LOG_CHARSET}</charset>-->
+            <!--            <charset>${LOG_CHARSET}</charset>-->
             <pattern>${LOG_PATTERN_CONSOLE}</pattern>
         </encoder>
     </appender>
 
-    <root level="INFO" additivity="true">
+    <root level="INFO">
         <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="FILE_LOG"/>
     </root>
 
+
 </configuration>