浏览代码

buffering modify

shjung 10 月之前
父节点
当前提交
3c00fb1e17
共有 35 个文件被更改,包括 1078 次插入598 次删除
  1. 129 0
      .bashrc
  2. 27 0
      .profile
  3. 37 0
      src/main/java/com/evp/app/common/kafka/KafkaProducerFactory.java
  4. 6 0
      src/main/java/com/evp/app/common/utils/TimeUtils.java
  5. 5 5
      src/main/java/com/evp/comm/server/dao/mapper/EvpsServiceMapper.java
  6. 61 6
      src/main/java/com/evp/comm/server/dto/EvpsCenter.java
  7. 0 11
      src/main/java/com/evp/comm/server/dto/EvpsSequenceDto.java
  8. 0 67
      src/main/java/com/evp/comm/server/dto/EvpsServiceDto.java
  9. 0 4
      src/main/java/com/evp/comm/server/entity/TbEvpEvent.java
  10. 3 3
      src/main/java/com/evp/comm/server/entity/TbEvpsCenter.java
  11. 98 118
      src/main/java/com/evp/comm/server/kafka/KafkaProducerService.java
  12. 11 21
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsEventDto.java
  13. 12 10
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsNodeDto.java
  14. 87 0
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsPhaseDto.java
  15. 5 3
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsRouteDto.java
  16. 38 5
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsServiceDto.java
  17. 27 0
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsServiceEndDto.java
  18. 27 0
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsServiceRouteInfo.java
  19. 12 12
      src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsSignalDto.java
  20. 5 5
      src/main/java/com/evp/comm/server/process/dbms/DbmsDataProcess.java
  21. 2 2
      src/main/java/com/evp/comm/server/repository/ApplicationRepository.java
  22. 41 31
      src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsEvent.java
  23. 101 72
      src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsNode.java
  24. 72 89
      src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsService.java
  25. 49 28
      src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsServiceEnd.java
  26. 65 56
      src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsSignal.java
  27. 8 7
      src/main/java/com/evp/comm/server/xnet/server/process/work/DataPacketProcess.java
  28. 1 1
      src/main/resources/application.yml
  29. 13 0
      src/main/resources/logback-spring-appender.xml
  30. 15 7
      src/main/resources/logback-spring.xml
  31. 4 4
      src/main/resources/mybatis/mapper/EvpsServiceMapper.xml
  32. 61 0
      src/test/java/com/evp/comm/server/EvpCommServerApplicationTests.java
  33. 28 12
      start.sh
  34. 4 11
      stat.sh
  35. 24 8
      stop.sh

+ 129 - 0
.bashrc

@@ -0,0 +1,129 @@
+# ~/.bashrc: executed by bash(1) for non-login shells.
+# see /usr/share/doc/bash/examples/startup-files (in the package bash-doc)
+# for examples
+
+# If not running interactively, don't do anything
+case $- in
+    *i*) ;;
+      *) return;;
+esac
+
+# don't put duplicate lines or lines starting with space in the history.
+# See bash(1) for more options
+HISTCONTROL=ignoreboth
+
+# append to the history file, don't overwrite it
+shopt -s histappend
+
+# for setting history length see HISTSIZE and HISTFILESIZE in bash(1)
+HISTSIZE=1000
+HISTFILESIZE=2000
+
+# check the window size after each command and, if necessary,
+# update the values of LINES and COLUMNS.
+shopt -s checkwinsize
+
+# If set, the pattern "**" used in a pathname expansion context will
+# match all files and zero or more directories and subdirectories.
+#shopt -s globstar
+
+# make less more friendly for non-text input files, see lesspipe(1)
+[ -x /usr/bin/lesspipe ] && eval "$(SHELL=/bin/sh lesspipe)"
+
+# set variable identifying the chroot you work in (used in the prompt below)
+if [ -z "${debian_chroot:-}" ] && [ -r /etc/debian_chroot ]; then
+    debian_chroot=$(cat /etc/debian_chroot)
+fi
+
+# set a fancy prompt (non-color, unless we know we "want" color)
+case "$TERM" in
+    xterm-color|*-256color) color_prompt=yes;;
+esac
+
+# uncomment for a colored prompt, if the terminal has the capability; turned
+# off by default to not distract the user: the focus in a terminal window
+# should be on the output of commands, not on the prompt
+#force_color_prompt=yes
+
+if [ -n "$force_color_prompt" ]; then
+    if [ -x /usr/bin/tput ] && tput setaf 1 >&/dev/null; then
+	# We have color support; assume it's compliant with Ecma-48
+	# (ISO/IEC-6429). (Lack of such support is extremely rare, and such
+	# a case would tend to support setf rather than setaf.)
+	color_prompt=yes
+    else
+	color_prompt=
+    fi
+fi
+
+if [ "$color_prompt" = yes ]; then
+    PS1='${debian_chroot:+($debian_chroot)}\[\033[01;32m\]\u@\h\[\033[00m\]:\[\033[01;34m\]\w\[\033[00m\]\$ '
+else
+    PS1='${debian_chroot:+($debian_chroot)}\u@\h:\w\$ '
+fi
+unset color_prompt force_color_prompt
+
+# If this is an xterm set the title to user@host:dir
+case "$TERM" in
+xterm*|rxvt*)
+    PS1="\[\e]0;${debian_chroot:+($debian_chroot)}\u@\h: \w\a\]$PS1"
+    ;;
+*)
+    ;;
+esac
+
+# enable color support of ls and also add handy aliases
+if [ -x /usr/bin/dircolors ]; then
+    test -r ~/.dircolors && eval "$(dircolors -b ~/.dircolors)" || eval "$(dircolors -b)"
+    alias ls='ls --color=auto'
+    #alias dir='dir --color=auto'
+    #alias vdir='vdir --color=auto'
+
+    alias grep='grep --color=auto'
+    alias fgrep='fgrep --color=auto'
+    alias egrep='egrep --color=auto'
+fi
+
+# colored GCC warnings and errors
+#export GCC_COLORS='error=01;31:warning=01;35:note=01;36:caret=01;32:locus=01:quote=01'
+
+# some more ls aliases
+alias ll='ls -alF'
+alias la='ls -A'
+alias l='ls -CF'
+
+# Add an "alert" alias for long running commands.  Use like so:
+#   sleep 10; alert
+alias alert='notify-send --urgency=low -i "$([ $? = 0 ] && echo terminal || echo error)" "$(history|tail -n1|sed -e '\''s/^\s*[0-9]\+\s*//;s/[;&|]\s*alert$//'\'')"'
+
+# Alias definitions.
+# You may want to put all your additions into a separate file like
+# ~/.bash_aliases, instead of adding them here directly.
+# See /usr/share/doc/bash-doc/examples in the bash-doc package.
+
+if [ -f ~/.bash_aliases ]; then
+    . ~/.bash_aliases
+fi
+
+# enable programmable completion features (you don't need to enable
+# this, if it's already enabled in /etc/bash.bashrc and /etc/profile
+# sources /etc/bash.bashrc).
+if ! shopt -oq posix; then
+  if [ -f /usr/share/bash-completion/bash_completion ]; then
+    . /usr/share/bash-completion/bash_completion
+  elif [ -f /etc/bash_completion ]; then
+    . /etc/bash_completion
+  fi
+fi
+export JAVA_HOME=~/jdks/java-17
+export JRE_HOME=$JAVA_HOME/jre
+export PATH=.:$JAVA_HOME/bin:$PATH
+PS1="[\u@\h \w]\\$ "
+
+alias ll='ls -alF'
+alias la='ls -A'
+alias l='ls -CF'
+
+alias logs='cd ~/logs/evp-comm-server'
+alias bin='cd ~/bin'
+

+ 27 - 0
.profile

@@ -0,0 +1,27 @@
+# ~/.profile: executed by the command interpreter for login shells.
+# This file is not read by bash(1), if ~/.bash_profile or ~/.bash_login
+# exists.
+# see /usr/share/doc/bash/examples/startup-files for examples.
+# the files are located in the bash-doc package.
+
+# the default umask is set in /etc/profile; for setting the umask
+# for ssh logins, install and configure the libpam-umask package.
+#umask 022
+
+# if running bash
+if [ -n "$BASH_VERSION" ]; then
+    # include .bashrc if it exists
+    if [ -f "$HOME/.bashrc" ]; then
+	. "$HOME/.bashrc"
+    fi
+fi
+
+# set PATH so it includes user's private bin if it exists
+if [ -d "$HOME/bin" ] ; then
+    PATH="$HOME/bin:$PATH"
+fi
+
+# set PATH so it includes user's private bin if it exists
+if [ -d "$HOME/.local/bin" ] ; then
+    PATH="$HOME/.local/bin:$PATH"
+fi

+ 37 - 0
src/main/java/com/evp/app/common/kafka/KafkaProducerFactory.java

@@ -1,8 +1,10 @@
 package com.evp.app.common.kafka;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.HashMap;
 import java.util.List;
@@ -11,6 +13,41 @@ import java.util.Properties;
 
 public class KafkaProducerFactory {
 
+    public static <K,V> KafkaTemplate<K, V> createJsonTemplate(String bootstrapServers, List<Map<String, String>> props) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        configs.put("enable.idempotence", false);
+        configs.put(ProducerConfig.ACKS_CONFIG, "0");
+        configs.put(ProducerConfig.RETRIES_CONFIG, 0);
+        configs.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
+        configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
+        //configs.put("queue.buffering.max.messages", 10000000);
+        //configs.put("queue.buffering.max.kbytes", 2147483647);
+        //configs.put("queue.buffering.max.ms", 0);
+        //configs.put("api.version.request", false);
+        configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5000);
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+        for (Map<String, String> prop : props) {
+            for (Map.Entry<String, String> elem : prop.entrySet()) {
+                String key = elem.getKey();
+                String val = elem.getValue();
+                if (val != null) {
+                    if (val.equals("true") || val.equals("false")) {
+                        configs.put(key, val.equals("true"));
+                    } else {
+                        configs.put(key, val);
+                    }
+                }
+            }
+        }
+
+        DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configs);
+        return new KafkaTemplate<>(defaultKafkaProducerFactory);
+    }
+
     public static <K,V> KafkaTemplate<K, V> createByteArrayTemplate(String bootstrapServers, List<Map<String, String>> props) {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

+ 6 - 0
src/main/java/com/evp/app/common/utils/TimeUtils.java

@@ -100,6 +100,12 @@ public class TimeUtils {
         return sdf.format(today);
     }
 
+    public static String now() {
+        Date today = new Date();
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        return sdf.format(today);
+    }
+
     public static long getCurrentTimeSeconds() {
         Calendar cal = Calendar.getInstance();
         return Math.round((double)cal.getTimeInMillis() / 1000.0D);

+ 5 - 5
src/main/java/com/evp/comm/server/dao/mapper/EvpsServiceMapper.java

@@ -1,7 +1,7 @@
 package com.evp.comm.server.dao.mapper;
 
-import com.evp.comm.server.entity.TbEvpEvent;
-import com.evp.comm.server.entity.TbEvpService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsEventDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
 
@@ -10,9 +10,9 @@ import java.util.Map;
 @Mapper
 public interface EvpsServiceMapper {
 
-    int insertEvpService(@Param("obj") TbEvpService obj);
-    int updateEvpService(@Param("obj") TbEvpService obj);
-    int insertEvpEvent(@Param("obj") TbEvpEvent obj);
+    int insertEvpService(@Param("obj") KafkaEvpsServiceDto obj);
+    int updateEvpService(@Param("obj") KafkaEvpsServiceDto obj);
+    int insertEvpEvent(@Param("obj") KafkaEvpsEventDto obj);
     int insertEvpRoute(Map<String, Object> obj);
     int insertEvpNode(Map<String, Object> obj);
     int insertEvpPhase(Map<String, Object> obj);

+ 61 - 6
src/main/java/com/evp/comm/server/dto/EvpsCenter.java

@@ -6,7 +6,8 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.HashMap;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 @Slf4j
 @Data
@@ -19,29 +20,83 @@ public class EvpsCenter {
     private String centerId;        /* 지역 코드 */
     private String centerNm;        /* 지역 명 */
     private String ipAddress;       /* IP Address */
-    private String currentServiceId;
     private boolean useYn;
 
     private boolean dump;
-    private HashMap<Integer, EvpsSequenceDto> sequenceMap = new HashMap<>();
 
     // 센터 네트워크 상태 정보
     private NetState netState;
 
+    private ByteBuffer serviceBuffer = null;
+    private ByteBuffer nodeBuffer = null;
+    private ByteBuffer signalBuffer = null;
+
     public String getLogKey() {
         return this.centerId;
     }
 
     public void update(EvpsCenter dto) {
-        this.currentServiceId = dto.getCurrentServiceId();
         this.centerNm = dto.getCenterNm();
         this.ipAddress = dto.getIpAddress();
         this.useYn = dto.isUseYn();
         this.dump = dto.isDump();
     }
 
-    public boolean channelOpened() {
-        return this.netState.getChannel() != null && this.netState.getState() == NET.DATA_TRANS;
+    public void clearServiceBuffer() {
+        this.serviceBuffer = null;
+    }
+    public void clearNodeBuffer() {
+        this.nodeBuffer = null;
+    }
+    public void clearSignalBuffer() {
+        this.signalBuffer = null;
+    }
+
+    public void addServiceBuffer(byte[] buffer) {
+        if (buffer == null) {
+            return;
+        }
+        if (this.serviceBuffer == null) {
+            this.serviceBuffer = ByteBuffer.wrap(buffer);
+            serviceBuffer.order(ByteOrder.BIG_ENDIAN);
+        }
+        else {
+            byte[] prevBuffer = this.serviceBuffer.array();
+            this.serviceBuffer = ByteBuffer.allocate(prevBuffer.length + buffer.length);
+            this.serviceBuffer.put(prevBuffer);
+            this.serviceBuffer.put(buffer);
+        }
+    }
+
+    public void addNodeBuffer(byte[] buffer) {
+        if (buffer == null) {
+            return;
+        }
+        if (this.nodeBuffer == null) {
+            this.nodeBuffer = ByteBuffer.wrap(buffer);
+            nodeBuffer.order(ByteOrder.BIG_ENDIAN);
+        }
+        else {
+            byte[] prevBuffer = this.nodeBuffer.array();
+            this.nodeBuffer = ByteBuffer.allocate(prevBuffer.length + buffer.length);
+            this.nodeBuffer.put(prevBuffer);
+            this.nodeBuffer.put(buffer);
+        }
     }
 
+    public void addSignalBuffer(byte[] buffer) {
+        if (buffer == null) {
+            return;
+        }
+        if (this.signalBuffer == null) {
+            this.signalBuffer = ByteBuffer.wrap(buffer);
+            signalBuffer.order(ByteOrder.BIG_ENDIAN);
+        }
+        else {
+            byte[] prevBuffer = this.signalBuffer.array();
+            this.signalBuffer = ByteBuffer.allocate(prevBuffer.length + buffer.length);
+            this.signalBuffer.put(prevBuffer);
+            this.signalBuffer.put(buffer);
+        }
+    }
 }

+ 0 - 11
src/main/java/com/evp/comm/server/dto/EvpsSequenceDto.java

@@ -1,11 +0,0 @@
-package com.evp.comm.server.dto;
-
-import lombok.Data;
-
-@Data
-public class EvpsSequenceDto {
-
-    private Integer sequence;
-    private String serviceId;
-
-}

+ 0 - 67
src/main/java/com/evp/comm/server/dto/EvpsServiceDto.java

@@ -1,67 +0,0 @@
-package com.evp.comm.server.dto;
-
-import com.evp.comm.server.entity.TbEvpService;
-import lombok.Data;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-@Data
-public class EvpsServiceDto {
-
-    private boolean completed;
-    private String serviceId;
-
-    private TbEvpService service;
-
-    private ByteBuffer routeBuffer;
-    private ByteBuffer nodeBuffer;
-
-    public EvpsServiceDto(String serviceId) {
-        this.completed = false;
-        this.serviceId = serviceId;
-        this.service = new TbEvpService();
-        this.routeBuffer = null;
-        this.nodeBuffer = null;
-    }
-
-    public void clearRouteBuffer() {
-        this.routeBuffer = null;
-    }
-
-    public void addRouteBuffer(byte[] buffer) {
-        if (buffer == null) {
-            return;
-        }
-        if (this.routeBuffer == null) {
-            this.routeBuffer = ByteBuffer.wrap(buffer);
-            routeBuffer.order(ByteOrder.BIG_ENDIAN);
-        }
-        else {
-            byte[] prevBuffer = this.routeBuffer.array();
-            this.routeBuffer = ByteBuffer.allocate(prevBuffer.length + buffer.length);
-            this.routeBuffer.put(prevBuffer);
-            this.routeBuffer.put(buffer);
-        }
-    }
-
-    public void clearNodeBuffer() {
-        this.nodeBuffer = null;
-    }
-
-    public void addNodeBuffer(byte[] buffer) {
-        if (buffer == null) {
-            return;
-        }
-        if (this.nodeBuffer == null) {
-            this.nodeBuffer = ByteBuffer.wrap(buffer);
-            nodeBuffer.order(ByteOrder.BIG_ENDIAN);
-        }
-        else {
-            byte[] prevBuffer = this.nodeBuffer.array();
-            this.nodeBuffer = ByteBuffer.allocate(prevBuffer.length + buffer.length);
-            this.nodeBuffer.put(prevBuffer);
-            this.nodeBuffer.put(buffer);
-        }
-    }
-}

+ 0 - 4
src/main/java/com/evp/comm/server/entity/TbEvpEvent.java

@@ -12,10 +12,6 @@ import java.util.Date;
 @Data
 public class TbEvpEvent implements Serializable {
 
-    public static final int EVPS_EVENT_SERVICE_START = 0;
-    public static final int EVPS_EVENT_VEHICLE_MOVE = 1;
-    public static final int EVPS_EVENT_SERVICE_END = 2;
-
     /**
     * 수집시각
     */

+ 3 - 3
src/main/java/com/evp/comm/server/entity/TbEvpsCenter.java

@@ -7,7 +7,6 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
-import java.util.HashMap;
 
 @Data
 @Builder
@@ -29,8 +28,9 @@ public class TbEvpsCenter implements Serializable {
                 .ipAddress(this.ipAddress.trim())
                 .useYn("Y".equals(this.useYn))
                 .dump(false)
-                .currentServiceId("")
-                .sequenceMap(new HashMap<>())
+                .serviceBuffer(null)
+                .nodeBuffer(null)
+                .signalBuffer(null)
                 .build();
     }
 }

+ 98 - 118
src/main/java/com/evp/comm/server/kafka/KafkaProducerService.java

@@ -1,24 +1,16 @@
 package com.evp.comm.server.kafka;
 
 import com.evp.app.common.kafka.KafkaProducerFactory;
-import com.evp.app.common.utils.TimeUtils;
 import com.evp.comm.server.config.KafkaConfig;
-import com.evp.comm.server.process.dbms.DbmsDataProcess;
+import com.evp.comm.server.kafka.dto.*;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
 import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
 
 import javax.annotation.PostConstruct;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @AllArgsConstructor
@@ -26,147 +18,135 @@ import java.util.concurrent.TimeUnit;
 public class KafkaProducerService {
 
     private final KafkaConfig config;
-    private final DbmsDataProcess dbmsDataProcess;
 
-    private KafkaTemplate<String, byte[]> nodeProducer;
-    private KafkaTemplate<String, Long> pingProducer;
+    private final String EVPS_SERVICE_TOPIC = "evps-service";
+    private final String EVPS_ROUTE_TOPIC   = "evps-route";
+    private final String EVPS_NODE_TOPIC    = "evps-node";
+    private final String EVPS_PHASE_TOPIC   = "evps-phase";
+    private final String EVPS_SIGNAL_TOPIC  = "evps-signal";
+    private final String EVPS_EVENT_TOPIC   = "evps-event";
+
+    private KafkaTemplate<String, KafkaEvpsServiceDto> serviceProducer;
+    private KafkaTemplate<String, KafkaEvpsRouteDto> serviceRoute;
+    private KafkaTemplate<String, KafkaEvpsNodeDto> serviceNode;
+    private KafkaTemplate<String, KafkaEvpsPhaseDto> servicePhase;
+    private KafkaTemplate<String, KafkaEvpsSignalDto> serviceSignal;
+    private KafkaTemplate<String, KafkaEvpsEventDto> serviceEvent;
 
 
     @PostConstruct
     void init() {
-        //this.callback = new ProducerResultCallback();
+        this.serviceProducer = KafkaProducerFactory.createJsonTemplate(this.config.getBootstrapServers(), this.config.props);
+        this.serviceRoute = KafkaProducerFactory.createJsonTemplate(this.config.getBootstrapServers(), this.config.props);
+        this.serviceNode = KafkaProducerFactory.createJsonTemplate(this.config.getBootstrapServers(), this.config.props);
+        this.servicePhase = KafkaProducerFactory.createJsonTemplate(this.config.getBootstrapServers(), this.config.props);
+        this.serviceSignal = KafkaProducerFactory.createJsonTemplate(this.config.getBootstrapServers(), this.config.props);
+        this.serviceEvent = KafkaProducerFactory.createJsonTemplate(this.config.getBootstrapServers(), this.config.props);
 
-        if (this.config.isMultiConnect()) {
-            // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용 한다.
-            if (this.config.isEnableNode()) {
-                this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.config.getNodeServers(), this.config.props);
+        log.info("[{}] ------------------", this.getClass().getSimpleName());
+        log.info("[{}]   serviceProducer: {}", this.getClass().getSimpleName(), this.serviceProducer);
+        log.info("[{}]      serviceRoute: {}", this.getClass().getSimpleName(), this.serviceRoute);
+        log.info("[{}]       serviceNode: {}", this.getClass().getSimpleName(), this.serviceNode);
+        log.info("[{}]      servicePhase: {}", this.getClass().getSimpleName(), this.servicePhase);
+        log.info("[{}]     serviceSignal: {}", this.getClass().getSimpleName(), this.serviceSignal);
+        log.info("[{}]      serviceEvent: {}", this.getClass().getSimpleName(), this.serviceEvent);
+    }
+    public void sendEvpsServiceTopic(KafkaEvpsServiceDto data) {
+        if (this.serviceProducer != null) {
+            try {
+                ListenableFuture<SendResult<String, KafkaEvpsServiceDto>> result = this.serviceProducer.send(EVPS_SERVICE_TOPIC, data.getServiceId(), data);
+                log.info("sendEvpsServiceTopic: {}, Key: {}, Data: {}", EVPS_SERVICE_TOPIC, data.getServiceId(), data);
             }
-        }
-        else {
-            // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
-            // 동일한 KafkaTemplate 를 사용 한다.
-            KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
-            if (this.config.isEnableNode()) {
-                this.nodeProducer = producer;
+            catch (Exception e) {
+                log.error("sendEvpsServiceTopic: {}, {}: {}", EVPS_SERVICE_TOPIC, data.getServiceId(), e.toString());
             }
         }
-
-        createPingProducer();
-
-        log.info("[{}] ------------------", this.getClass().getSimpleName());
-        log.info("[{}]   nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
-        log.info("[{}]   pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
-
-        //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
     }
 
-    public void createPingProducer() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrapServers());
-        props.put(ProducerConfig.ACKS_CONFIG, this.config.getConsumerAckConfig());
-        props.put(ProducerConfig.RETRIES_CONFIG, 0);
-        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
-        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
-        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
-
-        this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
-        this.pingProducer.setDefaultTopic(this.config.getPingTopic());
+    public void sendEvpsRouteTopic(KafkaEvpsRouteDto data) {
+        if (this.serviceRoute != null) {
+            try {
+                ListenableFuture<SendResult<String, KafkaEvpsRouteDto>> result = this.serviceRoute.send(EVPS_ROUTE_TOPIC, data.getServiceId(), data);
+                log.info("sendEvpsRouteTopic: {}, Key: {}, Data: {}", EVPS_ROUTE_TOPIC, data.getServiceId(), data);
+            }
+            catch (Exception e) {
+                log.error("sendEvpsRouteTopic: {}, {}: {}", EVPS_ROUTE_TOPIC, data.getServiceId(), e.toString());
+            }
+        }
     }
 
-    public void shutdown() {
-        try {
-            if (this.nodeProducer != null) {
-                this.nodeProducer.destroy();
+    public void sendEvpsNodeTopic(KafkaEvpsNodeDto data) {
+        if (this.serviceNode != null) {
+            try {
+                ListenableFuture<SendResult<String, KafkaEvpsNodeDto>> result = this.serviceNode.send(EVPS_NODE_TOPIC, data.getServiceId(), data);
+                log.info("sendEvpsNodeTopic: {}, Key: {}, Data: {}", EVPS_NODE_TOPIC, data.getServiceId(), data);
             }
-            if (this.pingProducer != null) {
-                this.pingProducer.destroy();
+            catch (Exception e) {
+                log.error("sendEvpsNodeTopic: {}, {}: {}", EVPS_NODE_TOPIC, data.getServiceId(), e.toString());
             }
         }
-        catch(Exception e) {
-            log.error("Failed to shutdown: {}", e.getMessage());
-        }
     }
-    public void sendPing() {
-        if (this.pingProducer == null ) {
-            log.info("sendPing: pingProducer == null");
-            return;
-        }
 
-        long sendNanoTime = System.nanoTime();
-//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime);   // nano seconds
-//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0);                    // micro seconds
-//        TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0);                    // micro seconds
-
-        ListenableFuture<SendResult<String, Long>> future =  this.pingProducer.sendDefault("key", sendNanoTime);
-        future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
-
-            @Override
-            public void onSuccess(SendResult<String, Long> result) {
-                long recvNanoTime = System.nanoTime();
-                long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
-//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
-                log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
-            }
-            @Override
-            public void onFailure(Throwable ex) {
-                long recvNanoTime = System.nanoTime();
-//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
-//                KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
-//                stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
-//                stat.setStatus(0);
-//                stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
-//                stat.setRecvTm(0);
-//                tsiCvimDbmsService.add(stat, (int)Thread.currentThread().getId());
-//                log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
-//
-//                // 카프카 전송 오류 알람 저장
-//                String value = "Send Failed";
-//                if (ex != null) {
-//                    value = ex.getMessage().substring(0, 99);
-//                }
-//                AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
-//                alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
-//                alarm.setAlarmTarget(producerConfig.getBootstrapServers());
-//                alarm.setAlarmValue(value);
-//                tsiCvimDbmsService.add(alarm, (int)Thread.currentThread().getId());
-            }
-        });
+    public void sendEvpsPhaseTopic(KafkaEvpsPhaseDto data) {
+        if (this.servicePhase != null) {
+            try {
+                ListenableFuture<SendResult<String, KafkaEvpsPhaseDto>> result = this.servicePhase.send(EVPS_PHASE_TOPIC, data.getServiceId(), data);
+                log.info("sendEvpsPhaseTopic: {}, Key: {}, Data: {}", EVPS_PHASE_TOPIC, data.getServiceId(), data);
+            }
+            catch (Exception e) {
+                log.error("sendEvpsPhaseTopic: {}, {}: {}", EVPS_PHASE_TOPIC, data.getServiceId(), e.toString());
+            }
+        }
     }
 
-    public void sendNode(String key, byte[] data) {
-        if (this.nodeProducer != null) {
+    public void sendEvpsSignalTopic(KafkaEvpsSignalDto data) {
+        if (this.serviceSignal != null) {
             try {
-                this.nodeProducer.send(key, key, data);
+                ListenableFuture<SendResult<String, KafkaEvpsSignalDto>> result = this.serviceSignal.send(EVPS_SIGNAL_TOPIC, data.getServiceId(), data);
+                log.info("sendEvpsSignalTopic: {}, Key: {}, Data: {}", EVPS_SIGNAL_TOPIC, data.getServiceId(), data);
             }
             catch (Exception e) {
-                log.error("sendNode: {}, {}: {}", key, key, e.toString());
+                log.error("sendEvpsSignalTopic: {}, {}: {}", EVPS_SIGNAL_TOPIC, data.getServiceId(), e.toString());
             }
         }
     }
 
-    protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
-        try {
-            kafka.send(topic, key, data);
-        }
-        catch(Exception e) {
-            log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
+    public void sendEvpsEventTopic(KafkaEvpsEventDto data) {
+        if (this.serviceEvent != null) {
+            try {
+                ListenableFuture<SendResult<String, KafkaEvpsEventDto>> result = this.serviceEvent.send(EVPS_EVENT_TOPIC, data.getServiceId(), data);
+                log.info("sendEvpsEventTopic: {}, Key: {}, Data: {}", EVPS_EVENT_TOPIC, data.getServiceId(), data);
+            }
+            catch (Exception e) {
+                log.error("sendEvpsEventTopic: {}, {}: {}", EVPS_EVENT_TOPIC, data.getServiceId(), e.toString());
+            }
         }
     }
 
-    private static class ProducerResultCallback implements Callback {
-        @Override
-        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-            if (e != null) {
-                log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
+    public void shutdown() {
+        try {
+            if (this.serviceProducer != null) {
+                this.serviceProducer.destroy();
+            }
+            if (this.serviceRoute != null) {
+                this.serviceRoute.destroy();
+            }
+            if (this.serviceNode != null) {
+                this.serviceNode.destroy();
+            }
+            if (this.servicePhase != null) {
+                this.servicePhase.destroy();
             }
-            else {
-                String message = String.format("sent message to topic:%s partition:%s  offset:%s",
-                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
-               log.error(message);
+            if (this.serviceSignal != null) {
+                this.serviceSignal.destroy();
+            }
+            if (this.serviceEvent != null) {
+                this.serviceEvent.destroy();
             }
         }
+        catch(Exception e) {
+            log.error("Failed to shutdown: {}", e.getMessage());
+        }
     }
+
 }

+ 11 - 21
src/main/java/com/evp/comm/server/dto/KafkaEvpsEventDto.java → src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsEventDto.java

@@ -1,23 +1,27 @@
-package com.evp.comm.server.dto;
+package com.evp.comm.server.kafka.dto;
 
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Builder;
+import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.util.Date;
-
 /**
 * 긴급차량 이벤트 정보
-* tb_evp_event
 */
-@Getter
+@Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsEventDto {
+
+    public static final int EVPS_EVENT_SERVICE_START = 0;
+    public static final int EVPS_EVENT_VEHICLE_MOVE = 1;
+    public static final int EVPS_EVENT_SERVICE_END = 2;
+
     /**
     * 수집시각
     */
-    private Date clctDt;
+    private String clctDt;
     /**
     * 긴급차량 서비스 ID
     */
@@ -46,18 +50,4 @@ public class KafkaEvpsEventDto {
     * 목적지 남은거리(m)
     */
     private Integer remDist;
-
-    public KafkaEvpsEventDto(String serviceId, int eventCd) {
-        this.serviceId = serviceId;
-        this.eventCd = eventCd;
-        init();
-    }
-
-    private void init() {
-        this.curLat = 0.;
-        this.curLng = 0.;
-        this.curSpd = 0;
-        this.remDist = 0;
-        this.evNo = "";
-    }
 }

+ 12 - 10
src/main/java/com/evp/comm/server/dto/KafkaEvpsNodeDto.java → src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsNodeDto.java

@@ -1,15 +1,15 @@
-package com.evp.comm.server.dto;
+package com.evp.comm.server.kafka.dto;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
 * 긴급차량 서비스 교차로 정보
 */
-@Getter
+@Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsNodeDto {
@@ -17,9 +17,10 @@ public class KafkaEvpsNodeDto {
     * 긴급차량 서비스 ID
     */
     private String serviceId;
-    private List<EvpsNodeInfo> nodeList;
+    private List<EvpsNodeInfo> nodeList = new ArrayList<>();
 
     @Getter
+    @Builder
     @NoArgsConstructor
     @AllArgsConstructor
     public static class EvpsNodeInfo {
@@ -30,7 +31,7 @@ public class KafkaEvpsNodeDto {
         /**
          * 교차로 ID
          */
-        private Integer nodeId;
+        private Long nodeId;
         /**
          * 교차로명
          */
@@ -44,14 +45,15 @@ public class KafkaEvpsNodeDto {
          */
         private double lng;
 
-        private List<EvpsPhaseInfo> phaseList;
+        private List<EvpsPhaseInfo> phaseList = new ArrayList<>();
     }
 
-
     @Getter
+    @Builder
     @NoArgsConstructor
     @AllArgsConstructor
     public static class EvpsPhaseInfo {
+
         /**
          * 교차로 순서(1,...,N)
          */
@@ -59,7 +61,7 @@ public class KafkaEvpsNodeDto {
         /**
          * 교차로 ID
          */
-        private Integer nodeId;
+        private Long nodeId;
         /**
          * 링번호(1:A링, 2:B링)
          */

+ 87 - 0
src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsPhaseDto.java

@@ -0,0 +1,87 @@
+package com.evp.comm.server.kafka.dto;
+
+import lombok.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+* 긴급차량 서비스 교차로 현시 정보
+*/
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaEvpsPhaseDto {
+
+    /**
+     * 긴급차량 서비스 ID
+     */
+    private String serviceId;
+    private List<EvpsPhaseInfo> phaseList = new ArrayList<>();
+
+    @Getter
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class EvpsPhaseInfo {
+
+        /**
+         * 교차로 순서(1,...,N)
+         */
+        private Integer seqNo;
+        /**
+         * 교차로 ID
+         */
+        private Long nodeId;
+        /**
+         * 링번호(1:A링, 2:B링)
+         */
+        private Integer ring;
+        /**
+         * 현시번호(1~8)
+         */
+        private Integer phaseNo;
+        /**
+         * 맵 번호(0:일반제, 1~5:시차제, 6:전용맵)
+         */
+        private Integer planClass;
+        /**
+         * 이동류번호(1-16, 17, 18)
+         */
+        private Integer flowNo;
+        /**
+         * 시작점 위치 위도
+         */
+        private Double headLat;
+        /**
+         * 시작점 위치 경로
+         */
+        private Double headLng;
+        /**
+         * 중심점 위치 위도
+         */
+        private Double midLat;
+        /**
+         * 중심점 위치 경로
+         */
+        private Double midLng;
+        /**
+         * 끝점 위치 위도
+         */
+        private Double endLat;
+        /**
+         * 끝점 위치 경로
+         */
+        private Double endLng;
+        /**
+         * 시작점 각도
+         */
+        private Integer headAngle;
+        /**
+         * 종점 각도
+         */
+        private Integer endAngle;
+
+    }
+}

+ 5 - 3
src/main/java/com/evp/comm/server/dto/KafkaEvpsRouteDto.java → src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsRouteDto.java

@@ -1,15 +1,17 @@
-package com.evp.comm.server.dto;
+package com.evp.comm.server.kafka.dto;
 
 import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
 * 긴급차량 서비스 경로 정보
 */
-@Getter
+@Data
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsRouteDto {
@@ -18,7 +20,7 @@ public class KafkaEvpsRouteDto {
     * 긴급차량 서비스 ID
     */
     private String serviceId;
-    private List<EvpsRouteInfo> routeList;
+    private List<EvpsRouteInfo> routeList = new ArrayList<>();
 
     @Getter
     @NoArgsConstructor

+ 38 - 5
src/main/java/com/evp/comm/server/dto/KafkaEvpsServiceDto.java → src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsServiceDto.java

@@ -1,19 +1,24 @@
-package com.evp.comm.server.dto;
+package com.evp.comm.server.kafka.dto;
 
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Builder;
+import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.util.Date;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
 * 긴급차량 서비스 정보
 */
-@Getter
+@Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsServiceDto {
 
+    public static final int SERVICE_START = 1;
+
     /**
     * 긴급차량 서비스 ID
     */
@@ -21,7 +26,7 @@ public class KafkaEvpsServiceDto {
     /**
     * 수집시각
     */
-    private Date clctDt;
+    private String clctDt;
     /**
     * 긴급차량 번호
     */
@@ -66,9 +71,37 @@ public class KafkaEvpsServiceDto {
      * 거리(단위:m)
      */
     private Integer serviceDist;
+
+    /**
+     * 현재 차량 속도(DTO 에서만 사용할 변수)
+     */
+    private Integer curSpd;
+
     /**
     * 서비스 상태 코드(1:진행중-서비스 진행중,2:정상종료-모든 교차로 제어 및 해제 완료,3:취소-아직 통과하지 않은 교차로 존재,4:센터강제종료-운영자가 서비스를 강제로 종료,5:비정상종료-서비스가 존재하지 않음,6:서비스시작실패-제어대상교차로가 없음,7:비정상종료-앱서버에 에러 발생,8:비정상종료-일정시간 앱에서 위치 및 속도 정보가 오지 않는 경우,9:자동종료-경로이탈,10:자동종료-경로진입 가능시간 초과,11:자동종료-정차가능시간 초과,12:취소-모든 교차로 제어및 해제 완료,13:실패-서비스 제어 요청 실패,14:실패-서비스 가능 교차로가 존재하지 않음,15:자동종료-위치정보 수신 가능 시간 초과)
     */
     private Integer statusCd;
 
+    private List<KafkaEvpsRouteInfo> routeList = new ArrayList<>();
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class KafkaEvpsRouteInfo {
+
+        /**
+         * 경로 순서(1,...,N)
+         */
+        private Integer seqNo;
+        /**
+         * 위치 위도
+         */
+        private double lat;
+        /**
+         * 위치 경로
+         */
+        private double lng;
+    }
+
 }

+ 27 - 0
src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsServiceEndDto.java

@@ -0,0 +1,27 @@
+package com.evp.comm.server.kafka.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+* 긴급차량 서비스 종료 정보
+*/
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaEvpsServiceEndDto {
+
+    /**
+    * 긴급차량 서비스 ID
+    */
+    private String serviceId;
+
+    /**
+    * 서비스 상태 코드(1:진행중-서비스 진행중,2:정상종료-모든 교차로 제어 및 해제 완료,3:취소-아직 통과하지 않은 교차로 존재,4:센터강제종료-운영자가 서비스를 강제로 종료,5:비정상종료-서비스가 존재하지 않음,6:서비스시작실패-제어대상교차로가 없음,7:비정상종료-앱서버에 에러 발생,8:비정상종료-일정시간 앱에서 위치 및 속도 정보가 오지 않는 경우,9:자동종료-경로이탈,10:자동종료-경로진입 가능시간 초과,11:자동종료-정차가능시간 초과,12:취소-모든 교차로 제어및 해제 완료,13:실패-서비스 제어 요청 실패,14:실패-서비스 가능 교차로가 존재하지 않음,15:자동종료-위치정보 수신 가능 시간 초과)
+    */
+    private Integer reason;
+
+}

+ 27 - 0
src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsServiceRouteInfo.java

@@ -0,0 +1,27 @@
+package com.evp.comm.server.kafka.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+* 긴급차량 서비스 경로 정보
+*/
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaEvpsServiceRouteInfo {
+
+    /**
+     * 경로 순서(1,...,N)
+     */
+    private Integer seqNo;
+    /**
+     * 위치 위도
+     */
+    private double lat;
+    /**
+     * 위치 경로
+     */
+    private double lng;
+}

+ 12 - 12
src/main/java/com/evp/comm/server/dto/KafkaEvpsSignalDto.java → src/main/java/com/evp/comm/server/kafka/dto/KafkaEvpsSignalDto.java

@@ -1,16 +1,15 @@
-package com.evp.comm.server.dto;
+package com.evp.comm.server.kafka.dto;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.*;
 
-import java.util.Date;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
 * 긴급차량 서비스 교차로 신호 정보
 */
-@Getter
+@Data
+@Builder
 @NoArgsConstructor
 @AllArgsConstructor
 public class KafkaEvpsSignalDto {
@@ -22,22 +21,23 @@ public class KafkaEvpsSignalDto {
     /**
      * 수집시각
      */
-    private Date clctDt;
+    private String clctDt;
 
-    private List<EvpsSignalInfo> signalList;
+    private List<EvpsSignalInfo> signalList = new ArrayList<>();
 
     @Getter
+    @Builder
     @NoArgsConstructor
     @AllArgsConstructor
     public static class EvpsSignalInfo {
-        /**
-         * 교차로 ID
-         */
-        private Integer nodeId;
         /**
          * 교차로 순서(1,...,N)
          */
         private Integer seqNo;
+        /**
+         * 교차로 ID
+         */
+        private Long nodeId;
         /**
          * 목적지 남은거리(m)
          */

+ 5 - 5
src/main/java/com/evp/comm/server/process/dbms/DbmsDataProcess.java

@@ -7,9 +7,9 @@ import com.evp.comm.server.dao.mapper.EvpsServiceMapper;
 import com.evp.comm.server.dao.mapper.ProcessMapper;
 import com.evp.comm.server.dao.mapper.batch.EvpCommServerDao;
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.entity.TbEvpEvent;
-import com.evp.comm.server.entity.TbEvpService;
 import com.evp.comm.server.entity.TbRegionCenterComm;
+import com.evp.comm.server.kafka.dto.KafkaEvpsEventDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
@@ -84,16 +84,16 @@ public class DbmsDataProcess {
             type = data.getType();
             switch(type) {
                 case DbmsData.DBMS_DATA_INS_SERVICE:
-                    TbEvpService newService = (TbEvpService)data.getData();
+                    KafkaEvpsServiceDto newService = (KafkaEvpsServiceDto)data.getData();
                     result = this.serviceMapper.insertEvpService(newService);
                     break;
                 case DbmsData.DBMS_DATA_UPD_SERVICE:
-                    TbEvpService updService = (TbEvpService)data.getData();
+                    KafkaEvpsServiceDto updService = (KafkaEvpsServiceDto)data.getData();
                     result = this.serviceMapper.updateEvpService(updService);
                     break;
 
                 case DbmsData.DBMS_DATA_INS_EVENT:
-                    TbEvpEvent event = (TbEvpEvent)data.getData();
+                    KafkaEvpsEventDto event = (KafkaEvpsEventDto)data.getData();
                     result = this.serviceMapper.insertEvpEvent(event);
                     break;
                 case DbmsData.DBMS_DATA_INS_ROUTE:

+ 2 - 2
src/main/java/com/evp/comm/server/repository/ApplicationRepository.java

@@ -4,10 +4,10 @@ import com.evp.app.common.utils.Elapsed;
 import com.evp.app.common.xnet.NettyUtils;
 import com.evp.comm.server.dao.mapper.EvpsCenterMapper;
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.dto.EvpsServiceDto;
 import com.evp.comm.server.dto.NET;
 import com.evp.comm.server.dto.NetState;
 import com.evp.comm.server.entity.TbEvpsCenter;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import io.netty.channel.Channel;
 import io.netty.util.AttributeKey;
@@ -42,7 +42,7 @@ public class ApplicationRepository {
             .dump(false)
             .build();
 
-    public static final ConcurrentHashMap<String, EvpsServiceDto> serviceMap = new ConcurrentHashMap<>();
+    public static final ConcurrentHashMap<String, KafkaEvpsServiceDto> serviceMap = new ConcurrentHashMap<>();
 
     private final ConcurrentHashMap<String, EvpsCenter> centerMap = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, EvpsCenter> ipAddrMap = new ConcurrentHashMap<>();

+ 41 - 31
src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsEvent.java

@@ -1,13 +1,17 @@
 package com.evp.comm.server.xnet.server.process.response;
 
+import com.evp.app.common.utils.TimeUtils;
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.dto.EvpsServiceDto;
-import com.evp.comm.server.entity.TbEvpEvent;
+import com.evp.comm.server.kafka.KafkaProducerService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsEventDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import com.evp.comm.server.process.dbms.DbmsData;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import com.evp.comm.server.repository.ApplicationRepository;
 import com.evp.comm.server.xnet.server.process.protocol.EvpsProtocolConst;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.MDC;
 
 import java.util.Arrays;
@@ -21,12 +25,10 @@ distance        잔여 거리         Integer      4   (단위: m), 목적지 
  */
 
 @Slf4j
+@AllArgsConstructor
 public class EvpsEvent implements EvpsCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-
-    public EvpsEvent(DbmsDataProcess dbmsDataProcess) {
-        this.dbmsDataProcess = dbmsDataProcess;
-    }
+    private final KafkaProducerService kafkaProducerService;
 
     @Override
     public boolean response(RecvPacketDto packet) {
@@ -48,15 +50,7 @@ public class EvpsEvent implements EvpsCommResponse {
         return result;
     }
 
-    private boolean makeData(RecvPacketDto packet, byte[] buffer) {
-        EvpsCenter center = packet.getCenter();
-        if (buffer == null || buffer.length != EvpsProtocolConst.EVENT_DATA_SIZE) {
-            log.error("[{}], EvpsEvent.response: Data Length Error: Required data length({}), Cur({}). will be closed.",
-                    center.getLogKey(), EvpsProtocolConst.EVENT_DATA_SIZE, buffer == null ? 0 : buffer.length);
-            return false;
-        }
-
-        int idx = 0;
+    private KafkaEvpsEventDto getData(String serviceId, byte[] buffer, int idx) {
         // 차량번호
         byte[] vehNoArr = Arrays.copyOfRange(buffer, idx, idx+EvpsProtocolConst.EVPS_VEHICLE_NO_SIZE);
         idx += EvpsProtocolConst.EVPS_VEHICLE_NO_SIZE;
@@ -69,26 +63,42 @@ public class EvpsEvent implements EvpsCommResponse {
         // 잔여 거리
         int distance   = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
 
-        // 이벤트 정보 입력(차량 이동)
-        String serviceId = packet.getPacket().getServiceId();
-        TbEvpEvent event = new TbEvpEvent(serviceId, TbEvpEvent.EVPS_EVENT_VEHICLE_MOVE);
-        event.setEvNo(new String(vehNoArr).trim());
-        event.setCurLat(currentLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-        event.setCurLng(currentLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-        event.setCurSpd(speed);
-        event.setRemDist(distance);
+        return KafkaEvpsEventDto.builder()
+                .clctDt(TimeUtils.now())
+                .serviceId(serviceId)
+                .evNo(new String(vehNoArr).trim())
+                .eventCd(KafkaEvpsEventDto.EVPS_EVENT_VEHICLE_MOVE)
+                .curLat(currentLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                .curLng(currentLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                .curSpd(speed)
+                .remDist(distance)
+                .build();
+    }
+
+    private boolean makeData(@NotNull RecvPacketDto packet, byte[] buffer) {
+        EvpsCenter center = packet.getCenter();
+        if (buffer == null || buffer.length != EvpsProtocolConst.EVENT_DATA_SIZE) {
+            log.error("[{}], EvpsEvent.response: Data Length Error: Required data length({}), Cur({}). will be closed.",
+                    center.getLogKey(), EvpsProtocolConst.EVENT_DATA_SIZE, buffer == null ? 0 : buffer.length);
+            return false;
+        }
 
-        this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_EVENT, center, event));
+        KafkaEvpsEventDto data = getData(packet.getPacket().getServiceId(), buffer, 0);
+        // kafka 전송
+        this.kafkaProducerService.sendEvpsEventTopic(data);
+
+        // 이벤트 정보 입력(차량 이동)
+        this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_EVENT, center, data));
 
-        EvpsServiceDto currService = ApplicationRepository.serviceMap.get(event.getServiceId());
-        if (currService == null) {
-            log.error("[{}], EvpsEvent.response: Not Found Service Information. Service Id: {}", center.getLogKey(), event.getServiceId());
+        KafkaEvpsServiceDto service = ApplicationRepository.serviceMap.get(data.getServiceId());
+        if (service == null) {
+            log.error("[{}], EvpsEvent.response: Not Found Service Information. Service Id: {}", center.getLogKey(), data.getServiceId());
         }
         else {
-            currService.getService().setCurLat(event.getCurLat());
-            currService.getService().setCurLng(event.getCurLng());
-            currService.getService().setCurSpd(event.getCurSpd());
-            currService.getService().setServiceDist(event.getRemDist());
+            service.setCurLat(data.getCurLat());
+            service.setCurLng(data.getCurLng());
+            service.setCurSpd(data.getCurSpd());
+            service.setServiceDist(data.getRemDist());
         }
         return true;
     }

+ 101 - 72
src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsNode.java

@@ -1,13 +1,16 @@
 package com.evp.comm.server.xnet.server.process.response;
 
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.dto.EvpsSequenceDto;
-import com.evp.comm.server.dto.EvpsServiceDto;
+import com.evp.comm.server.kafka.KafkaProducerService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsNodeDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import com.evp.comm.server.process.dbms.DbmsData;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import com.evp.comm.server.repository.ApplicationRepository;
 import com.evp.comm.server.xnet.server.process.protocol.EvpsProtocolConst;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.MDC;
 
 import java.util.ArrayList;
@@ -24,12 +27,10 @@ lcList          교차로 갯수     Integer     2
  */
 
 @Slf4j
+@AllArgsConstructor
 public class EvpsNode implements EvpsCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-
-    public EvpsNode(DbmsDataProcess dbmsDataProcess) {
-        this.dbmsDataProcess = dbmsDataProcess;
-    }
+    private final KafkaProducerService kafkaProducerService;
 
     @Override
     public boolean response(RecvPacketDto packet) {
@@ -43,49 +44,22 @@ public class EvpsNode implements EvpsCommResponse {
                 return makeData(packet, packet.getPacket().getBuffer());
             }
 
-            // 패킷이 분할되어 전송된 경우
-            EvpsServiceDto serviceDto = null;
-            int sequence = packet.getPacket().getSequence();
+            // 패킷이 분할되어 전송된 경우 서비스 메모리에 패킷을 순차적으로 적재
             if (packet.getPacket().getCurrent() == 1) {
-                // 첫번째 프레임
-                // sequence 에 대항하는 서비스 정보를 메모리에 저장하고 서비스 ID에 대한 메모리 정보를 얻는다.
-                String serviceId = packet.getPacket().getServiceId();
-                EvpsSequenceDto sequenceDto = new EvpsSequenceDto();
-                sequenceDto.setSequence(sequence);
-                sequenceDto.setServiceId(serviceId);
-                packet.getCenter().getSequenceMap().put(sequence, sequenceDto); // SequenceMap 에 SequenceDto 를 저장한다.
-                // 서비스 정보를 메모리에서 얻어온다.
-                serviceDto = ApplicationRepository.serviceMap.get(serviceId);
-            }
-            else {
-                // 분할된 프레임
-                // sequence 에 대한 정보를 메모리에서 구하고 얻어온 서비스 ID로 서비스 정보를 메모리에서 얻어온다.
-                EvpsSequenceDto sequenceDto = packet.getCenter().getSequenceMap().get(sequence);
-                if (sequenceDto != null) {
-                    // 서비스 정보를 메모리에서 얻어온다.
-                    serviceDto = ApplicationRepository.serviceMap.get(sequenceDto.getServiceId());
-                }
+                // 다중 프레임인 경우 첫번째 프레임일때 메모리 클리어
+                packet.getCenter().clearNodeBuffer();
             }
+            packet.getCenter().addNodeBuffer(packet.getPacket().getBuffer());
 
-            if (serviceDto == null) {
-                // 서비스 정보를 메모리에서 얻어 오지 못한 경우.
-                log.error("[{}], EvpsNode.response: Not Found Service Information. Sequence: {}, Curr: ({}), Tot: ({})",
-                        packet.getCenter().getLogKey(), sequence, packet.getPacket().getCurrent(), packet.getPacket().getTotal());
-                return false;
-            }
-
-            // 서비스 메모리에 교차로 정보 패킷을 순차적으로 적재
-            serviceDto.addNodeBuffer(packet.getPacket().getBuffer());
             if (packet.getPacket().getCurrent() == packet.getPacket().getTotal()) {
                 // 분할된 프레임을 모두 받았음. 즉 마지막 프레임을 수신한 경우 데이터 처리
-                log.info("[{}], EvpsNode.response: Recv All Frames. Service Id: {}, Sequence: {}, Curr: ({}), Tot: ({})",
-                        packet.getCenter().getLogKey(), serviceDto.getServiceId(), sequence, packet.getPacket().getCurrent(), packet.getPacket().getTotal());
+                log.info("[{}], EvpsNode.response: Recv All Frames. Service Id: {}, Sequence: Curr: ({}), Tot: ({})",
+                        packet.getCenter().getLogKey(), packet.getPacket().getServiceId(), packet.getPacket().getCurrent(), packet.getPacket().getTotal());
 
                 // 저장한 프레임 전체를 이용해서 데이터 생성
-                result = makeData(packet, serviceDto.getNodeBuffer().array());
-
-                // SequenceMap 에서 SequenceDto 를 제거한다.
-                packet.getCenter().getSequenceMap().remove(sequence);
+                result = makeData(packet, packet.getCenter().getNodeBuffer().array());
+                // 다중 프레임인 경우 마지막 프레임일때 메모리 클리어
+                packet.getCenter().clearNodeBuffer();
             }
         }
         catch (Exception e) {
@@ -99,7 +73,7 @@ public class EvpsNode implements EvpsCommResponse {
         return result;
     }
 
-    private boolean makeData(RecvPacketDto packet, byte[] buffer) {
+    private boolean makeData(@NotNull RecvPacketDto packet, byte[] buffer) {
         EvpsCenter center = packet.getCenter();
         if (buffer == null || buffer.length < EvpsProtocolConst.MIN_NODE_DATA_SIZE) {
             log.error("[{}], EvpsNode.response: Data Length Error: Required minimum data length({}), Cur({}). will be closed.",
@@ -119,6 +93,11 @@ public class EvpsNode implements EvpsCommResponse {
             return false;
         }
 
+        KafkaEvpsNodeDto data = KafkaEvpsNodeDto.builder()
+                .serviceId(serviceId)
+                .nodeList(new ArrayList<>())
+                .build();
+
         List<HashMap<String, Object>> lists = new ArrayList<>();
         List<HashMap<String, Object>> phaseLists = new ArrayList<>();
         for (int ii = 0; ii < nodeCount; ii++) {
@@ -128,14 +107,17 @@ public class EvpsNode implements EvpsCommResponse {
             int currentLat   = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
             int currentLng   = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
 
-            HashMap<String, Object> param = new HashMap<>();
-            param.put("SERVICE_ID",  serviceId);
-            param.put("SEQ_NO",      ii+1);
-            param.put("NODE_ID",     nodeId);
-            param.put("NODE_NM",     new String(nodeNmArr).trim());
-            param.put("LAT",         currentLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-            param.put("LNG",         currentLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-
+            KafkaEvpsNodeDto.EvpsNodeInfo node = KafkaEvpsNodeDto.EvpsNodeInfo.builder()
+                    .seqNo(ii+1)
+                    .nodeId(nodeId)
+                    .nodeNm(new String(nodeNmArr).trim())
+                    .lat(currentLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                    .lng(currentLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                    .phaseList(new ArrayList<>())
+                    .build();
+            data.getNodeList().add(node);
+
+            HashMap<String, Object> param = getNodeInfoMap(serviceId, node);
             lists.add(param);
 
             int phaseCount = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
@@ -156,33 +138,44 @@ public class EvpsNode implements EvpsCommResponse {
                 int midLng    = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
                 int endLat    = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
                 int endLng    = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-                int headAngle = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-                int endAngle  = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-
-                HashMap<String, Object> phaseParam = new HashMap<>();
-                param.put("SERVICE_ID",  serviceId);
-                param.put("SEQ_NO",      jj+1);
-                param.put("NODE_ID",     nodeId);
-
-                param.put("RING",       ring);
-                param.put("PHASE_NO",   phaseNo);
-                param.put("PLAN_CLASS", planClass);
-                param.put("FLOW_NO",    flowNo);
-                param.put("HEAD_LAT",   headLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-                param.put("HEAD_LNG",   headLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-                param.put("MID_LAT",    midLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-                param.put("MID_LNG",    midLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-                param.put("END_LAT",    endLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-                param.put("END_LNG",    endLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-                param.put("HEAD_ANGLE", headAngle);
-                param.put("END_ANGLE",  endAngle);
-
+                int headAngle = ((buffer[idx++] & 0xFF) <<  8) | ( buffer[idx++] & 0xFF);
+                int endAngle  = ((buffer[idx++] & 0xFF) <<  8) | ( buffer[idx++] & 0xFF);
+
+                KafkaEvpsNodeDto.EvpsPhaseInfo phase = KafkaEvpsNodeDto.EvpsPhaseInfo.builder()
+                        .seqNo(jj+1)
+                        .nodeId(nodeId)
+                        .ring(ring)
+                        .phaseNo(phaseNo)
+                        .planClass(planClass)
+                        .flowNo(flowNo)
+                        .headLat(headLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                        .headLng(headLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                        .midLat(midLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                        .midLng(midLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                        .endLat(endLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                        .endLng(endLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                        .headAngle(headAngle)
+                        .endAngle(endAngle)
+                        .build();
+                node.getPhaseList().add(phase);
+
+                HashMap<String, Object> phaseParam = getNodePhaseInfoMap(serviceId, nodeId, phase);
                 phaseLists.add(phaseParam);
             }
         }
         log.info("[{}], EvpsNode.response: Service Node List {} EA.", center.getLogKey(), lists.size());
 
+        KafkaEvpsServiceDto service = ApplicationRepository.serviceMap.get(data.getServiceId());
+        if (service == null) {
+            log.error("[{}], EvpsNode.response: Not Found Service Information. Service Id: {}", center.getLogKey(), data.getServiceId());
+        }
+        else {
+        }
+
         if (!lists.isEmpty()) {
+            // kafka 전송
+            this.kafkaProducerService.sendEvpsNodeTopic(data);
+
             this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_NODE, center, lists));
         }
         if (!phaseLists.isEmpty()) {
@@ -190,4 +183,40 @@ public class EvpsNode implements EvpsCommResponse {
         }
         return true;
     }
+
+    @NotNull
+    private static HashMap<String, Object> getNodeInfoMap(String serviceId, KafkaEvpsNodeDto.EvpsNodeInfo node) {
+        HashMap<String, Object> param = new HashMap<>();
+
+        param.put("SERVICE_ID",  serviceId);
+        param.put("SEQ_NO",      node.getSeqNo());
+        param.put("NODE_ID",     node.getNodeId());
+        param.put("NODE_NM",     node.getNodeNm());
+        param.put("LAT",         node.getLat());
+        param.put("LNG",         node.getLng());
+        return param;
+    }
+
+    @NotNull
+    private static HashMap<String, Object> getNodePhaseInfoMap(String serviceId, Long nodeId, KafkaEvpsNodeDto.EvpsPhaseInfo phase) {
+        HashMap<String, Object> param = new HashMap<>();
+
+        param.put("SERVICE_ID",  serviceId);
+        param.put("SEQ_NO",      phase.getSeqNo());
+        param.put("NODE_ID",     nodeId);
+
+        param.put("RING",       phase.getRing());
+        param.put("PHASE_NO",   phase.getPhaseNo());
+        param.put("PLAN_CLASS", phase.getPlanClass());
+        param.put("FLOW_NO",    phase.getFlowNo());
+        param.put("HEAD_LAT",   phase.getHeadLat());
+        param.put("HEAD_LNG",   phase.getHeadLng());
+        param.put("MID_LAT",    phase.getMidLat());
+        param.put("MID_LNG",    phase.getMidLng());
+        param.put("END_LAT",    phase.getEndLat());
+        param.put("END_LNG",    phase.getEndLng());
+        param.put("HEAD_ANGLE", phase.getHeadAngle());
+        param.put("END_ANGLE",  phase.getEndAngle());
+        return param;
+    }
 }

+ 72 - 89
src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsService.java

@@ -1,15 +1,17 @@
 package com.evp.comm.server.xnet.server.process.response;
 
+import com.evp.app.common.utils.TimeUtils;
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.dto.EvpsSequenceDto;
-import com.evp.comm.server.dto.EvpsServiceDto;
-import com.evp.comm.server.entity.TbEvpEvent;
-import com.evp.comm.server.entity.TbEvpService;
+import com.evp.comm.server.kafka.KafkaProducerService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsEventDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import com.evp.comm.server.process.dbms.DbmsData;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import com.evp.comm.server.repository.ApplicationRepository;
 import com.evp.comm.server.xnet.server.process.protocol.EvpsProtocolConst;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.MDC;
 
 import java.util.ArrayList;
@@ -36,12 +38,10 @@ routeList       경로수          Integer     2
  */
 
 @Slf4j
+@AllArgsConstructor
 public class EvpsService implements EvpsCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-
-    public EvpsService(DbmsDataProcess dbmsDataProcess) {
-        this.dbmsDataProcess = dbmsDataProcess;
-    }
+    private final KafkaProducerService kafkaProducerService;
 
     @Override
     public boolean response(RecvPacketDto packet) {
@@ -52,52 +52,25 @@ public class EvpsService implements EvpsCommResponse {
 
             if (packet.getPacket().getCurrent() == 1 && packet.getPacket().getTotal() == 1) {
                 // 모든 정보가 하나의 프레임으로 전송된 경우
-                return makeData(packet, packet.getPacket().getBuffer(), true);
+                return makeData(packet, packet.getPacket().getBuffer());
             }
 
-            // 패킷이 분할되어 전송된 경우
-            EvpsServiceDto serviceDto = null;
-            int sequence = packet.getPacket().getSequence();
+            // 패킷이 분할되어 전송된 경우 서비스 메모리에 패킷을 순차적으로 적재
             if (packet.getPacket().getCurrent() == 1) {
-                // 첫번째 프레임
-                // sequence 에 대항하는 서비스 정보를 메모리에 저장하고 서비스 ID에 대한 메모리 정보를 얻는다.
-                String serviceId = packet.getPacket().getServiceId();
-                EvpsSequenceDto sequenceDto = new EvpsSequenceDto();
-                sequenceDto.setSequence(sequence);
-                sequenceDto.setServiceId(serviceId);
-                packet.getCenter().getSequenceMap().put(sequence, sequenceDto); // SequenceMap 에 SequenceDto 를 저장한다.
-                // 서비스 정보를 메모리에서 얻어온다.
-                serviceDto = ApplicationRepository.serviceMap.get(serviceId);
-            }
-            else {
-                // 분할된 프레임
-                // sequence 에 대한 정보를 메모리에서 구하고 얻어온 서비스 ID로 서비스 정보를 메모리에서 얻어온다.
-                EvpsSequenceDto sequenceDto = packet.getCenter().getSequenceMap().get(sequence);
-                if (sequenceDto != null) {
-                    // 서비스 정보를 메모리에서 얻어온다.
-                    serviceDto = ApplicationRepository.serviceMap.get(sequenceDto.getServiceId());
-                }
-            }
-
-            if (serviceDto == null) {
-                // 서비스 정보를 메모리에서 얻어 오지 못한 경우.
-                log.error("[{}], EvpsService.response: Not Found Service Information. Sequence: {}, Curr: ({}), Tot: ({})",
-                        packet.getCenter().getLogKey(), sequence, packet.getPacket().getCurrent(), packet.getPacket().getTotal());
-                return false;
+                // 다중 프레임인 경우 첫번째 프레임일때 메모리 클리어
+                packet.getCenter().clearServiceBuffer();
             }
+            packet.getCenter().addServiceBuffer(packet.getPacket().getBuffer());
 
-            // 서비스 메모리에 교차로 정보 패킷을 순차적으로 적재
-            serviceDto.addNodeBuffer(packet.getPacket().getBuffer());
             if (packet.getPacket().getCurrent() == packet.getPacket().getTotal()) {
                 // 분할된 프레임을 모두 받았음. 즉 마지막 프레임을 수신한 경우 데이터 처리
-                log.info("[{}], EvpsService.response: Recv All Frames. Service Id: {}, Sequence: {}, Curr: ({}), Tot: ({})",
-                        packet.getCenter().getLogKey(), serviceDto.getServiceId(), sequence, packet.getPacket().getCurrent(), packet.getPacket().getTotal());
+                log.info("[{}], EvpsService.response: Recv All Frames. Service Id: {}, Sequence: Curr: ({}), Tot: ({})",
+                        packet.getCenter().getLogKey(), packet.getPacket().getServiceId(), packet.getPacket().getCurrent(), packet.getPacket().getTotal());
 
                 // 저장한 프레임 전체를 이용해서 데이터 생성
-                result = makeData(packet, serviceDto.getNodeBuffer().array(), false);
-
-                // SequenceMap 에서 SequenceDto 를 제거한다.
-                packet.getCenter().getSequenceMap().remove(sequence);
+                result = makeData(packet, packet.getCenter().getServiceBuffer().array());
+                // 다중 프레임인 경우 마지막 프레임일때 메모리 클리어
+                packet.getCenter().clearServiceBuffer();
             }
         }
         catch (Exception e) {
@@ -111,15 +84,14 @@ public class EvpsService implements EvpsCommResponse {
         return result;
     }
 
-    private boolean makeData(RecvPacketDto packet, byte[] buffer, boolean oneFrame) {
+    private boolean makeData(@NotNull RecvPacketDto packet, byte[] buffer) {
         EvpsCenter center = packet.getCenter();
         if (buffer == null || buffer.length < EvpsProtocolConst.MIN_SERVICE_DATA_SIZE) {
             log.error("[{}], EvpsService.response: Data Length Error: Required minimum data length({}), Cur({}). will be closed.",
                     center.getLogKey(), EvpsProtocolConst.MIN_SERVICE_DATA_SIZE, buffer == null ? 0 : buffer.length);
             return false;
         }
-        // 첫번째 패킷인 경우 서비스 정보를 메모리에 저장하고 데이터베이스에 저장한다.
-        // 만일 분할 된 패킷인 경우 패킷을 메노리에 저장하고 리턴
+
         int idx = 0;
 
         // 차량번호
@@ -149,41 +121,6 @@ public class EvpsService implements EvpsCommResponse {
         // 거리
         int distance = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
 
-        // 서비스 시작 정보 입력
-        TbEvpService service = new TbEvpService();
-        service.setServiceId(packet.getPacket().getServiceId());
-        service.setEvNo(new String(vehNoArr).trim());
-        service.setCurLat(currentLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-        service.setCurLng(currentLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-        service.setServiceNm(new String(serviceNameArr).trim());
-        service.setArrLat(arrivalLat/EvpsProtocolConst.EVPS_GEO_CORRECT);
-        service.setArrLng(arrivalLng/EvpsProtocolConst.EVPS_GEO_CORRECT);
-        service.setVehLen(vehicleLength);
-        service.setOcrNo(new String(ocrNoArr).trim());
-        service.setOcrType(new String(ocrTypeArr).trim());
-        service.setArrTm(arrivalTime);
-        service.setServiceDist(distance);
-        service.setStatusCd(TbEvpService.SERVICE_START);
-
-        EvpsServiceDto serviceDto = new EvpsServiceDto(service.getServiceId());
-        serviceDto.setService(service);
-
-        if (oneFrame) {
-            service.setClctDt(packet.getPacket().getPacketTime());
-            ApplicationRepository.serviceMap.put(service.getServiceId(), serviceDto);
-
-            this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_SERVICE, center, service));
-
-            // 이벤트 정보 입력(서비스 시작)
-            TbEvpEvent event = new TbEvpEvent(service.getServiceId(), TbEvpEvent.EVPS_EVENT_SERVICE_START);
-            event.setCurLat(service.getCurLat());
-            event.setCurLng(service.getCurLng());
-            event.setCurSpd(service.getCurSpd());
-            event.setRemDist(service.getServiceDist());
-            this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_EVENT, center, event));
-            return true;
-        }
-
         int routeCount = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
         int reqDataSize = EvpsProtocolConst.SERVICE_DATA_ROUTE_SIZE * routeCount;
         if ((buffer.length - idx) != reqDataSize) {
@@ -192,27 +129,73 @@ public class EvpsService implements EvpsCommResponse {
             return false;
         }
 
+        // 서비스 시작 정보 입력
+        KafkaEvpsServiceDto service = KafkaEvpsServiceDto.builder()
+                .serviceId(packet.getPacket().getServiceId())
+                .clctDt(TimeUtils.now())
+                .evNo(new String(vehNoArr).trim())
+                .curLat(currentLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                .curLng(currentLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                .serviceNm(new String(serviceNameArr).trim())
+                .arrLat(arrivalLat/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                .arrLng(arrivalLng/EvpsProtocolConst.EVPS_GEO_CORRECT)
+                .arrTm(arrivalTime)
+                .vehLen(vehicleLength)
+                .ocrNo(new String(ocrNoArr).trim())
+                .ocrType(new String(ocrTypeArr).trim())
+                .serviceDist(distance)
+                .curSpd(0)
+                .statusCd(KafkaEvpsServiceDto.SERVICE_START)
+                .routeList(new ArrayList<>())
+                .build();
+
         List<HashMap<String, Object>> lists = new ArrayList<>();
         for (int ii = 0; ii < routeCount; ii++) {
-
             int routeLat = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
             int routeLng = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
 
-            service.setArrLng(arrivalLng / EvpsProtocolConst.EVPS_GEO_CORRECT);
+            KafkaEvpsServiceDto.KafkaEvpsRouteInfo route = KafkaEvpsServiceDto.KafkaEvpsRouteInfo.builder()
+                    .seqNo(ii + 1)
+                    .lat(routeLat / EvpsProtocolConst.EVPS_GEO_CORRECT)
+                    .lng(routeLng / EvpsProtocolConst.EVPS_GEO_CORRECT)
+                    .build();
+            service.getRouteList().add(route);
 
             HashMap<String, Object> param = new HashMap<>();
             param.put("SERVICE_ID", service.getServiceId());
-            param.put("SEQ_NO", ii + 1);
-            param.put("LAT", routeLat / EvpsProtocolConst.EVPS_GEO_CORRECT);
-            param.put("LNG", routeLng / EvpsProtocolConst.EVPS_GEO_CORRECT);
-
+            param.put("SEQ_NO",     route.getSeqNo());
+            param.put("LAT",        route.getLat());
+            param.put("LNG",        route.getLng());
             lists.add(param);
         }
         log.info("[{}], EvpsService.response: Service Route List {} EA.", center.getLogKey(), lists.size());
 
+        ApplicationRepository.serviceMap.put(service.getServiceId(), service);
+
+        // kafka 전송
+        this.kafkaProducerService.sendEvpsServiceTopic(service);
+
+        // 서비스 정보 입력
+        this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_SERVICE, center, service));
+
+        // 라우트 정보 입력
         if (!lists.isEmpty()) {
             this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_ROUTE, center, lists));
         }
+
+        // 이벤트 정보 입력(서비스 시작)
+        KafkaEvpsEventDto event = KafkaEvpsEventDto.builder()
+                .clctDt(TimeUtils.now())
+                .serviceId(service.getServiceId())
+                .evNo(new String(vehNoArr).trim())
+                .eventCd(KafkaEvpsEventDto.EVPS_EVENT_SERVICE_START)
+                .curLat(service.getCurLat())
+                .curLng(service.getCurLng())
+                .curSpd(service.getCurSpd())
+                .remDist(service.getServiceDist())
+                .build();
+        this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_EVENT, center, event));
+
         return true;
     }
 }

+ 49 - 28
src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsServiceEnd.java

@@ -1,14 +1,18 @@
 package com.evp.comm.server.xnet.server.process.response;
 
+import com.evp.app.common.utils.TimeUtils;
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.dto.EvpsServiceDto;
-import com.evp.comm.server.entity.TbEvpEvent;
-import com.evp.comm.server.entity.TbEvpService;
+import com.evp.comm.server.kafka.KafkaProducerService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsEventDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceEndDto;
 import com.evp.comm.server.process.dbms.DbmsData;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import com.evp.comm.server.repository.ApplicationRepository;
 import com.evp.comm.server.xnet.server.process.protocol.EvpsProtocolConst;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.MDC;
 
 /*
@@ -31,12 +35,10 @@ reason          종료코드        Integer     4
  */
 
 @Slf4j
+@AllArgsConstructor
 public class EvpsServiceEnd implements EvpsCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-
-    public EvpsServiceEnd(DbmsDataProcess dbmsDataProcess) {
-        this.dbmsDataProcess = dbmsDataProcess;
-    }
+    private final KafkaProducerService kafkaProducerService;
 
     @Override
     public boolean response(RecvPacketDto packet) {
@@ -58,7 +60,15 @@ public class EvpsServiceEnd implements EvpsCommResponse {
         return result;
     }
 
-    private boolean makeData(RecvPacketDto packet, byte[] buffer) {
+    private KafkaEvpsServiceEndDto getData(String serviceId, @NotNull byte[] buffer, int idx) {
+        int reason = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
+        return KafkaEvpsServiceEndDto.builder()
+                .serviceId(serviceId)
+                .reason(reason)
+                .build();
+    }
+
+    private boolean makeData(@NotNull RecvPacketDto packet, byte[] buffer) {
         EvpsCenter center = packet.getCenter();
         if (buffer == null || buffer.length != EvpsProtocolConst.EVENT_SERVICE_END_SIZE) {
             log.error("[{}], EvpsServiceEnd.response: Data Length Error: Required data length({}), Cur({}). will be closed.",
@@ -66,35 +76,46 @@ public class EvpsServiceEnd implements EvpsCommResponse {
             return false;
         }
 
-        int idx = 0;
-        // 종료코드
-        int reason = ((buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
-
-        // 서비스 종료 정보 업데이트
-        TbEvpService service = new TbEvpService();
-        service.setServiceId(packet.getPacket().getServiceId());
-        service.setStatusCd(reason);
-        this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_UPD_SERVICE, center, service));
+        KafkaEvpsServiceEndDto data = getData(packet.getPacket().getServiceId(), buffer, 0);
 
         // 이벤트 정보 입력(서비스 종료)
-        TbEvpEvent event = new TbEvpEvent(service.getServiceId(), TbEvpEvent.EVPS_EVENT_SERVICE_END);
+        KafkaEvpsEventDto event = KafkaEvpsEventDto.builder()
+                .clctDt(TimeUtils.now())
+                .serviceId(data.getServiceId())
+                .eventCd(KafkaEvpsEventDto.EVPS_EVENT_SERVICE_END)
+                .build();
 
-        EvpsServiceDto currService = ApplicationRepository.serviceMap.get(event.getServiceId());
-        if (currService == null) {
-            log.error("[{}], EvpsServiceEnd.response: Not Found Service Information. Service Id: {}", center.getLogKey(), event.getServiceId());
-        }
-        else {
-            event.setCurLat(currService.getService().getCurLat());
-            event.setCurLng(currService.getService().getCurLng());
-            event.setCurSpd(currService.getService().getCurSpd());
-            event.setRemDist(currService.getService().getServiceDist());
+        // 서비스 종료 정보 업데이트
+        KafkaEvpsServiceDto service = ApplicationRepository.serviceMap.get(data.getServiceId());
+        if (service == null) {
+            log.error("[{}], EvpsServiceEnd.response: Not Found Service Information. Service Id: {}", center.getLogKey(), data.getServiceId());
+            service = new KafkaEvpsServiceDto();
+            service.setServiceId(data.getServiceId());
+            service.setEvNo("-");
+            service.setCurLat(0.);
+            service.setCurLng(0.);
+            service.setCurSpd(0);
         }
+        service.setStatusCd(data.getReason());
+
+        event.setEvNo(service.getEvNo());
+        event.setCurLat(service.getCurLat());
+        event.setCurLng(service.getCurLng());
+        event.setCurSpd(service.getCurSpd());
+        event.setRemDist(service.getServiceDist());
+
+        // kafka 전송
+        this.kafkaProducerService.sendEvpsServiceTopic(service);
+
+        // kafka 전송
+        this.kafkaProducerService.sendEvpsEventTopic(event);
+
+        this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_UPD_SERVICE, center, service));
 
         this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_EVENT, center, event));
 
         // 서비스를 메모리에서 삭제한다.
         ApplicationRepository.serviceMap.remove(service.getServiceId());
-        center.setCurrentServiceId("");
         return true;
     }
 }

+ 65 - 56
src/main/java/com/evp/comm/server/xnet/server/process/response/EvpsSignal.java

@@ -1,13 +1,17 @@
 package com.evp.comm.server.xnet.server.process.response;
 
+import com.evp.app.common.utils.TimeUtils;
 import com.evp.comm.server.dto.EvpsCenter;
-import com.evp.comm.server.dto.EvpsSequenceDto;
-import com.evp.comm.server.dto.EvpsServiceDto;
+import com.evp.comm.server.kafka.KafkaProducerService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
+import com.evp.comm.server.kafka.dto.KafkaEvpsSignalDto;
 import com.evp.comm.server.process.dbms.DbmsData;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import com.evp.comm.server.repository.ApplicationRepository;
 import com.evp.comm.server.xnet.server.process.protocol.EvpsProtocolConst;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.MDC;
 
 import java.util.ArrayList;
@@ -26,12 +30,10 @@ signalList      교차로현황 리스트      SHORT       2
  */
 
 @Slf4j
+@AllArgsConstructor
 public class EvpsSignal implements EvpsCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-
-    public EvpsSignal(DbmsDataProcess dbmsDataProcess) {
-        this.dbmsDataProcess = dbmsDataProcess;
-    }
+    private final KafkaProducerService kafkaProducerService;
 
     @Override
     public boolean response(RecvPacketDto packet) {
@@ -45,49 +47,22 @@ public class EvpsSignal implements EvpsCommResponse {
                 return makeData(packet, packet.getPacket().getBuffer());
             }
 
-            // 패킷이 분할되어 전송된 경우
-            EvpsServiceDto serviceDto = null;
-            int sequence = packet.getPacket().getSequence();
+            // 패킷이 분할되어 전송된 경우 서비스 메모리에 패킷을 순차적으로 적재
             if (packet.getPacket().getCurrent() == 1) {
-                // 첫번째 프레임
-                // sequence 에 대항하는 서비스 정보를 메모리에 저장하고 서비스 ID에 대한 메모리 정보를 얻는다.
-                String serviceId = packet.getPacket().getServiceId();
-                EvpsSequenceDto sequenceDto = new EvpsSequenceDto();
-                sequenceDto.setSequence(sequence);
-                sequenceDto.setServiceId(serviceId);
-                packet.getCenter().getSequenceMap().put(sequence, sequenceDto); // SequenceMap 에 SequenceDto 를 저장한다.
-                // 서비스 정보를 메모리에서 얻어온다.
-                serviceDto = ApplicationRepository.serviceMap.get(serviceId);
-            }
-            else {
-                // 분할된 프레임
-                // sequence 에 대한 정보를 메모리에서 구하고 얻어온 서비스 ID로 서비스 정보를 메모리에서 얻어온다.
-                EvpsSequenceDto sequenceDto = packet.getCenter().getSequenceMap().get(sequence);
-                if (sequenceDto != null) {
-                    // 서비스 정보를 메모리에서 얻어온다.
-                    serviceDto = ApplicationRepository.serviceMap.get(sequenceDto.getServiceId());
-                }
-            }
-
-            if (serviceDto == null) {
-                // 서비스 정보를 메모리에서 얻어 오지 못한 경우.
-                log.error("[{}], EvpsSignal.response: Not Found Service Information. Sequence: {}, Curr: ({}), Tot: ({})",
-                        packet.getCenter().getLogKey(), sequence, packet.getPacket().getCurrent(), packet.getPacket().getTotal());
-                return false;
+                // 다중 프레임인 경우 첫번째 프레임일때 메모리 클리어
+                packet.getCenter().clearSignalBuffer();
             }
+            packet.getCenter().addSignalBuffer(packet.getPacket().getBuffer());
 
-            // 서비스 메모리에 교차로 정보 패킷을 순차적으로 적재
-            serviceDto.addNodeBuffer(packet.getPacket().getBuffer());
             if (packet.getPacket().getCurrent() == packet.getPacket().getTotal()) {
                 // 분할된 프레임을 모두 받았음. 즉 마지막 프레임을 수신한 경우 데이터 처리
-                log.info("[{}], EvpsSignal.response: Recv All Frames. Service Id: {}, Sequence: {}, Curr: ({}), Tot: ({})",
-                        packet.getCenter().getLogKey(), serviceDto.getServiceId(), sequence, packet.getPacket().getCurrent(), packet.getPacket().getTotal());
+                log.info("[{}], EvpsSignal.response: Recv All Frames. Service Id: {}, Sequence: Curr: ({}), Tot: ({})",
+                        packet.getCenter().getLogKey(), packet.getPacket().getServiceId(), packet.getPacket().getCurrent(), packet.getPacket().getTotal());
 
                 // 저장한 프레임 전체를 이용해서 데이터 생성
-                result = makeData(packet, serviceDto.getNodeBuffer().array());
-
-                // SequenceMap 에서 SequenceDto 를 제거한다.
-                packet.getCenter().getSequenceMap().remove(sequence);
+                result = makeData(packet, packet.getCenter().getSignalBuffer().array());
+                // 다중 프레임인 경우 마지막 프레임일때 메모리 클리어
+                packet.getCenter().clearSignalBuffer();
             }
         }
         catch (Exception e) {
@@ -101,11 +76,11 @@ public class EvpsSignal implements EvpsCommResponse {
         return result;
     }
 
-    private boolean makeData(RecvPacketDto packet, byte[] buffer) {
+    private boolean makeData(@NotNull RecvPacketDto packet, byte[] buffer) {
         EvpsCenter center = packet.getCenter();
         if (buffer == null || buffer.length < EvpsProtocolConst.MIN_SIGNAL_DATA_SIZE) {
             log.error("[{}], EvpsSignal.response: Data Length Error: Required minimum data length({}), Cur({}). will be closed.",
-                    center.getLogKey(), EvpsProtocolConst.MIN_NODE_DATA_SIZE, buffer == null ? 0: buffer.length);
+                    center.getLogKey(), EvpsProtocolConst.MIN_SIGNAL_DATA_SIZE, buffer == null ? 0: buffer.length);
             return false;
         }
 
@@ -121,6 +96,12 @@ public class EvpsSignal implements EvpsCommResponse {
             return false;
         }
 
+        KafkaEvpsSignalDto data = KafkaEvpsSignalDto.builder()
+                .serviceId(serviceId)
+                .clctDt(TimeUtils.now())
+                .signalList(new ArrayList<>())
+                .build();
+
         List<HashMap<String, Object>> lists = new ArrayList<>();
         for (int ii = 0; ii < nodeCount; ii++) {
             long nodeId     = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
@@ -131,25 +112,53 @@ public class EvpsSignal implements EvpsCommResponse {
             int bRingPhase  = buffer[idx++] & 0xFF;
             int holdPhase   = buffer[idx++] & 0xFF;
 
-            HashMap<String, Object> param = new HashMap<>();
-            param.put("SERVICE_ID",  serviceId);
-            param.put("SEQ_NO",      ii+1);
-            param.put("NODE_ID",     nodeId);
-
-            param.put("REM_DIST",     remDist);
-            param.put("STATE",        state);
-            param.put("PLAN_CLASS",   planClass);
-            param.put("A_RING_PHASE", aRingPhase);
-            param.put("B_RING_PHASE", bRingPhase);
-            param.put("HOLD_PHASE",   holdPhase);
-
+            KafkaEvpsSignalDto.EvpsSignalInfo signal = KafkaEvpsSignalDto.EvpsSignalInfo.builder()
+                    .nodeId(nodeId)
+                    .seqNo(ii+1)
+                    .remDist(remDist)
+                    .state(state)
+                    .planClass(planClass)
+                    .aRingPhase(aRingPhase)
+                    .bRingPhase(bRingPhase)
+                    .holdPhase(holdPhase)
+                    .build();
+            data.getSignalList().add(signal);
+
+            HashMap<String, Object> param = getSignalInfoMap(serviceId, signal);
             lists.add(param);
         }
         log.info("[{}], EvpsSignal.response: Service Signal List {} EA.", center.getLogKey(), lists.size());
 
+        KafkaEvpsServiceDto service = ApplicationRepository.serviceMap.get(data.getServiceId());
+        if (service == null) {
+            log.error("[{}], EvpsSignal.response: Not Found Service Information. Service Id: {}", center.getLogKey(), data.getServiceId());
+        }
+        else {
+        }
+
         if (!lists.isEmpty()) {
+            // kafka 전송
+            this.kafkaProducerService.sendEvpsSignalTopic(data);
+
             this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INS_SIGNAL, center, lists));
         }
         return true;
     }
+
+    @NotNull
+    private static HashMap<String, Object> getSignalInfoMap(String serviceId, KafkaEvpsSignalDto.EvpsSignalInfo signal) {
+        HashMap<String, Object> param = new HashMap<>();
+
+        param.put("SERVICE_ID",   serviceId);
+        param.put("SEQ_NO",       signal.getSeqNo());
+        param.put("NODE_ID",      signal.getNodeId());
+
+        param.put("REM_DIST",     signal.getRemDist());
+        param.put("STATE",        signal.getState());
+        param.put("PLAN_CLASS",   signal.getPlanClass());
+        param.put("A_RING_PHASE", signal.getARingPhase());
+        param.put("B_RING_PHASE", signal.getBRingPhase());
+        param.put("HOLD_PHASE",   signal.getHoldPhase());
+        return param;
+    }
 }

+ 8 - 7
src/main/java/com/evp/comm/server/xnet/server/process/work/DataPacketProcess.java

@@ -3,11 +3,11 @@ package com.evp.comm.server.xnet.server.process.work;
 import com.evp.comm.server.common.SpringUtils;
 import com.evp.comm.server.config.ThreadPoolInitializer;
 import com.evp.comm.server.dto.EvpsCenter;
+import com.evp.comm.server.kafka.KafkaProducerService;
 import com.evp.comm.server.process.dbms.DbmsDataProcess;
 import com.evp.comm.server.repository.ApplicationRepository;
-import com.evp.comm.server.xnet.server.process.response.RecvPacketDto;
-import com.evp.comm.server.xnet.server.process.response.*;
 import com.evp.comm.server.xnet.server.process.protocol.eEvpsOpCode;
+import com.evp.comm.server.xnet.server.process.response.*;
 import io.netty.channel.Channel;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +28,7 @@ public class DataPacketProcess {
 
     private final DataPacketAsyncTask asyncTask;
     private final DbmsDataProcess dbmsDataProcess;
+    private final KafkaProducerService kafkaProducerService;
 
     public void run() {
         log.info("DataPacketProcess.run: Start.");
@@ -63,19 +64,19 @@ public class DataPacketProcess {
             eEvpsOpCode opCode = eEvpsOpCode.getValue(packet.getPacket().getOpCode());
             switch(opCode) {
                 case EVPS_SERVICE:
-                    response = new EvpsService(this.dbmsDataProcess);
+                    response = new EvpsService(this.dbmsDataProcess, this.kafkaProducerService);
                     break;
                 case EVPS_NODE:
-                    response = new EvpsNode(this.dbmsDataProcess);
+                    response = new EvpsNode(this.dbmsDataProcess, this.kafkaProducerService);
                     break;
                 case EVPS_SIGNAL:
-                    response = new EvpsSignal(this.dbmsDataProcess);
+                    response = new EvpsSignal(this.dbmsDataProcess, this.kafkaProducerService);
                     break;
                 case EVPS_EVENT:
-                    response = new EvpsEvent(this.dbmsDataProcess);
+                    response = new EvpsEvent(this.dbmsDataProcess, this.kafkaProducerService);
                     break;
                 case EVPS_SERVICE_END:
-                    response = new EvpsServiceEnd(this.dbmsDataProcess);
+                    response = new EvpsServiceEnd(this.dbmsDataProcess, this.kafkaProducerService);
                     break;
             }
             if (response != null) {

+ 1 - 1
src/main/resources/application.yml

@@ -47,7 +47,7 @@ application:
     work: 5
 
   kafka:
-    bootstrap-servers: 192.168.11.23:9092
+    bootstrap-servers: 172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094
     group-id: evp-comm-server
     consumer-ack-config: 1
     ping-topic: ping-topic

+ 13 - 0
src/main/resources/logback-spring-appender.xml

@@ -55,6 +55,19 @@
         </rollingPolicy>
     </appender>
 
+    <appender name="FILE_KAFKA" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${LOG_PATH}${LOG_FILE_NAME_KAFKA}</file>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <charset>${LOG_CHARSET}</charset>
+            <pattern>${LOG_PATTERN_KAFKA}</pattern>
+        </encoder>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_BACKUP_PATH}${LOG_FILE_KAFKA}.${LOG_FILE_NAME_BACKUP}</fileNamePattern>
+            <maxFileSize>${MAX_FILESIZE}</maxFileSize>
+            <maxHistory>${MAX_HISTORY}</maxHistory>
+        </rollingPolicy>
+    </appender>
+
     <appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
         <filter class="ch.qos.logback.classic.filter.LevelFilter">
             <level>error</level>

+ 15 - 7
src/main/resources/logback-spring.xml

@@ -3,19 +3,20 @@
     <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
     <property name="APP_CLASS_PATH"  value="com.evp.comm.server"/>
-    <property name="PROJECT_PREFIX"  value="sig"/>
+    <property name="PROJECT_PREFIX"  value="evp"/>
     <property name="PROJECT_NAME"    value="${PROJECT_PREFIX}-comm-server"/>
     <property name="ROOT_LOG_LEVEL"  value="INFO"/>
     <property name="LOG_CHARSET"     value="UTF-8" />
     <property name="LOG_PATH"        value="${user.home}/logs/${PROJECT_NAME}/"/>
     <property name="LOG_BACKUP_PATH" value="${user.home}/logs/${PROJECT_NAME}/backup/"/>
 
-    <property name="LOG_FILE_NAME"             value="${PROJECT_NAME}.log"/>
-    <property name="LOG_FILE_NAME_ERROR"       value="${PROJECT_NAME}.err.log"/>
-    <property name="LOG_FILE_NAME_BACKUP"     value="%d{yyyyMMdd}_%i.log.gz"/>
-    <property name="LOG_FILE_NAME_PACKET"      value="${PROJECT_PREFIX}-packet"/>
-    <property name="LOG_FILE_NAME_SESSION"     value="${PROJECT_PREFIX}-session.log"/>
-    <property name="LOG_FILE_NAME_ASPECT"      value="${PROJECT_PREFIX}-aspect.log"/>
+    <property name="LOG_FILE_NAME"         value="${PROJECT_NAME}.log"/>
+    <property name="LOG_FILE_NAME_ERROR"   value="${PROJECT_NAME}.err.log"/>
+    <property name="LOG_FILE_NAME_BACKUP"  value="%d{yyyyMMdd}_%i.log.gz"/>
+    <property name="LOG_FILE_NAME_PACKET"  value="${PROJECT_PREFIX}-packet"/>
+    <property name="LOG_FILE_NAME_SESSION" value="${PROJECT_PREFIX}-session.log"/>
+    <property name="LOG_FILE_NAME_ASPECT"  value="${PROJECT_PREFIX}-aspect.log"/>
+    <property name="LOG_FILE_NAME_KAFKA"   value="${PROJECT_PREFIX}-kafka.log"/>
 
     <property name="MAX_FILESIZE" value="10MB"/>
     <property name="MAX_HISTORY"  value="10"/>
@@ -24,6 +25,7 @@
     <property name="LOG_PATTERN_PACKET"      value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
     <property name="LOG_PATTERN_SESSION"     value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
     <property name="LOG_PATTERN_ASPECT"      value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
+    <property name="LOG_PATTERN_KAFKA"       value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
     <property name="LOG_PATTERN_CONSOLE"     value="[%d{HH:mm:ss.SSS}] [%5level] %msg %n"/>
 
     <springProfile name="!xxx">
@@ -42,6 +44,12 @@
         <appender-ref ref="FILE_ERROR"/>
     </logger>
 
+    <logger name="${APP_CLASS_PATH}.kafka" level="INFO" additivity="false">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="FILE_KAFKA"/>
+        <appender-ref ref="FILE_ERROR"/>
+    </logger>
+
     <logger name="${APP_CLASS_PATH}.xnet" level="INFO" additivity="false">
         <appender-ref ref="CONSOLE"/>
         <appender-ref ref="FILE_PACKET"/>

+ 4 - 4
src/main/resources/mybatis/mapper/EvpsServiceMapper.xml

@@ -3,14 +3,14 @@
 
 <mapper namespace="com.evp.comm.server.dao.mapper.EvpsServiceMapper">
 
-    <insert id="insertEvpService" parameterType="com.evp.comm.server.entity.TbEvpService">
+    <insert id="insertEvpService" parameterType="com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto">
     <![CDATA[
         INSERT INTO tb_evp_service (service_id,
                                     clct_dt,
                                     ev_no,
                                     cur_lat,
                                     cur_lng,
-                                    service_name,
+                                    service_nm,
                                     arr_lat,
                                     arr_lng,
                                     arr_tm,
@@ -38,7 +38,7 @@
         ]]>
     </insert>
 
-    <update id="updateEvpService" parameterType="com.evp.comm.server.entity.TbEvpService">
+    <update id="updateEvpService" parameterType="com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto">
     <![CDATA[
         UPDATE tb_evp_service
            SET status_cd = #{obj.statusCd}
@@ -46,7 +46,7 @@
         ]]>
     </update>
 
-    <insert id="insertEvpEvent" parameterType="com.evp.comm.server.entity.TbEvpEvent">
+    <insert id="insertEvpEvent" parameterType="com.evp.comm.server.kafka.dto.KafkaEvpsEventDto">
     <![CDATA[
         INSERT INTO tb_evp_event (clct_dt,
                                   service_id,

+ 61 - 0
src/test/java/com/evp/comm/server/EvpCommServerApplicationTests.java

@@ -2,17 +2,25 @@ package com.evp.comm.server;
 
 import com.evp.app.common.utils.ByteUtils;
 import com.evp.app.common.utils.SysUtils;
+import com.evp.app.common.utils.TimeUtils;
 import com.evp.comm.server.dto.EvpsCenter;
 import com.evp.comm.server.dto.NetState;
+import com.evp.comm.server.kafka.KafkaProducerService;
+import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
 import com.evp.comm.server.xnet.server.process.protocol.EvpsProtocolConst;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.TimeZone;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -32,6 +40,59 @@ public class EvpCommServerApplicationTests {
             .dump(false)
             .build();
 
+    @Autowired
+    private KafkaProducerService kafkaService;
+    public static String toString( Date date ) {
+
+        SimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
+
+        TimeZone tz = TimeZone.getTimeZone( "UTC" );
+
+        df.setTimeZone( tz );
+
+        String output = df.format( date );
+
+        int inset0 = 9;
+        int inset1 = 6;
+
+        String s0 = output.substring( 0, output.length() - inset0 );
+        String s1 = output.substring( output.length() - inset1, output.length() );
+
+        String result = s0 + s1;
+
+        result = result.replaceAll( "UTC", "+00:00" );
+
+        return result;
+
+    }
+
+    @Test
+    void testDate() {
+        log.info("{}", toString(new Date()));
+        log.info("{}", TimeUtils.now());
+    }
+    @Test
+    void sendTopicService() {
+        KafkaEvpsServiceDto data = KafkaEvpsServiceDto.builder()
+                .serviceId("99920240906124848")
+                .clctDt(TimeUtils.getCurrentDate())
+                .evNo("373구2164")
+                .curLat(1.)
+                .curLng(2.)
+                .serviceNm("도로교통공단")
+                .arrLat(3.)
+                .arrLng(4.)
+                .arrTm(10)
+                .vehLen(20)
+                .ocrNo("")
+                .ocrType("")
+                .serviceDist(4300)
+                .statusCd(1)
+                .routeList(new ArrayList<>())
+                .build();
+        kafkaService.sendEvpsServiceTopic(data);
+    }
+
     @Test
     void test3() {
         String data = "000102030405060708090A000102030405060708090A000102030405060708090A";

+ 28 - 12
start.sh

@@ -1,18 +1,33 @@
-#!/bin/sh
+#!/bin/sh
 
-export SERVICE_HOME=/home/cvim/test
-export SERVICE_NAME=sig-comm-server
-export SERVICE_VERSION=0.0.1-SNAPSHOT
-export EXE_NAME=$SERVICE_NAME-$SERVICE_VERSION.jar
-export PID_NAME=$SERVICE_NAME.pid
-export SERVICE_PID=$SERVICE_HOME/conf/$PID_NAME
+#SERVICE_HOME=$(dirname $0)
+USER_HOME=/home/cvim
+SERVICE_HOME=/home/cvim/bin
 
-cd $SERVICE_HOME
+usage() {
+	echo "Usage:" $0 "[comm]"
+	echo "RUN evp-comm-server:" $0 "comm"
+	exit
+}
+
+case $1 in
+  comm)
+    SERVICE_NAME=evp-comm-server
+    SERVICE_VERSION=0.0.1
+    ;;
+  *)
+    usage
+    ;;
+esac
+
+
+EXE_NAME=${SERVICE_HOME}/$SERVICE_NAME-$SERVICE_VERSION.jar
+PID_NAME=$SERVICE_NAME.pid
+SERVICE_PID=$SERVICE_HOME/conf/$PID_NAME
 
 export JAVA_OPT="-server"
-export JAVA_OPT="$JAVA_OPT -Xms4096m -Xmx4096m"
-export JAVA_OPT="$JAVA_OPT -Xlog:gc*:file=logs/gc.log"
-export CONFIG_OPT="--spring.config.location=conf//application.yml"
+#export JAVA_OPT="$JAVA_OPT -Xms4096m -Xmx4096m"
+export JAVA_OPT="$JAVA_OPT -Xlog:gc*:file=${USER_HOME}/logs/${SERVICE_NAME}/${SERVICE_NAME}.gc.log"
 
 if [ ! -z "$SERVICE_PID" ]; then
   if [ -f "$SERVICE_PID" ]; then
@@ -21,7 +36,7 @@ if [ ! -z "$SERVICE_PID" ]; then
   fi
 fi
 
-nohup java $JAVA_OPT -jar ./$EXE_NAME $CONFIG_OPT 1> /dev/null 2>&1 &
+java $JAVA_OPT -jar $EXE_NAME &
 
 echo "$SERVICE_NAME is started...."
 
@@ -32,3 +47,4 @@ ps -eaf | grep $SERVICE_NAME | grep -v grep |wc -l
 sleep 1
 
 ps -eaf | grep $SERVICE_NAME | grep -v grep
+

+ 4 - 11
stat.sh

@@ -1,16 +1,9 @@
-#!/bin/sh
+#!/bin/sh
 
-export SERVICE_HOME=/home/cvim/test
-export SERVICE_NAME=sig-comm-server
-export SERVICE_VERSION=0.0.1-SNAPSHOT
-export EXE_NAME=$SERVICE_NAME-$SERVICE_VERSION.jar
-export PID_NAME=$SERVICE_NAME.pid
-export SERVICE_PID=$SERVICE_HOME/conf/$PID_NAME
+export SERVICE_NAME=evp-
 
-cd $SERVICE_HOME
-
-ps -eaf | grep $SERVICE_NAME | grep -v grep |wc -l
+ps -eaf | grep $SERVICE_NAME | grep -v grep | grep -v tail | grep -v kafka | grep java | wc -l
 
 sleep 1
 
-ps -eaf | grep $SERVICE_NAME | grep -v grep
+ps -eaf | grep $SERVICE_NAME | grep -v grep | grep -v tail | grep -v kafka | grep java

+ 24 - 8
stop.sh

@@ -1,13 +1,29 @@
-#!/bin/sh
+#!/bin/sh
 
-export SERVICE_HOME=/home/cvim/test
-export SERVICE_NAME=sig-comm-server
-export SERVICE_VERSION=0.0.1-SNAPSHOT
-export EXE_NAME=$SERVICE_NAME-$SERVICE_VERSION.jar
-export PID_NAME=$SERVICE_NAME.pid
-export SERVICE_PID=$SERVICE_HOME/conf/$PID_NAME
+#SERVICE_HOME=$(dirname $0)
+USER_HOME=/home/cvim
+SERVICE_HOME=/home/cvim/bin
 
-cd $SERVICE_HOME
+usage() {
+    echo "Usage:" $0 "[comm]"
+    echo "RUN evp-comm-server:" $0 "comm"
+    exit
+}
+
+case $1 in
+  comm)
+    SERVICE_NAME=evp-comm-server
+    SERVICE_VERSION=0.0.1
+    ;;
+  *)
+    usage
+    ;;
+esac
+
+
+EXE_NAME=$SERVICE_NAME-$SERVICE_VERSION.jar
+PID_NAME=$SERVICE_NAME.pid
+SERVICE_PID=$SERVICE_HOME/conf/$PID_NAME
 
 if [ ! -z "$SERVICE_PID" ]; then
   if [ -f "$SERVICE_PID" ]; then