Bladeren bron

vds traf inst_lane column add

shjung 3 jaren geleden
bovenliggende
commit
0d016a9f75
36 gewijzigde bestanden met toevoegingen van 1633 en 1 verwijderingen
  1. 4 0
      pom.xml
  2. 4 0
      src/main/java/com/its/api/ItsOpServerApplication.java
  3. 8 1
      src/main/java/com/its/api/config/ThreadPoolTaskExecutorConfig.java
  4. 12 0
      src/main/java/com/its/api/its/model/dto/cctv/TbCctvScnrDto.java
  5. 4 0
      src/main/java/com/its/api/its/model/dto/vds/TbVdsDtct15mStatDto.java
  6. 4 0
      src/main/java/com/its/api/its/model/dto/vds/TbVdsDtctDdStatDto.java
  7. 4 0
      src/main/java/com/its/api/its/model/dto/vds/TbVdsDtctHhStatDto.java
  8. 4 0
      src/main/java/com/its/api/its/model/dto/vds/TbVdsDtctMnStatDto.java
  9. 15 0
      src/main/java/com/its/api/its/model/entity/cctv/TbCctvScnr.java
  10. 1 0
      src/main/java/com/its/api/its/model/entity/vds/TbVdsDtct15mStat.java
  11. 1 0
      src/main/java/com/its/api/its/model/entity/vds/TbVdsDtctDdStat.java
  12. 1 0
      src/main/java/com/its/api/its/model/entity/vds/TbVdsDtctHhStat.java
  13. 1 0
      src/main/java/com/its/api/its/model/entity/vds/TbVdsDtctMnStat.java
  14. 17 0
      src/main/java/com/its/api/its/repository/cctv/TbCctvScnrRepository.java
  15. 11 0
      src/main/java/com/its/api/scheduler/ItsApiScheduler.java
  16. 78 0
      src/main/java/com/its/api/scheduler/job/CctvPsetScnrThread.java
  17. 23 0
      src/main/java/com/its/api/utils/Converter.java
  18. 39 0
      src/main/java/com/its/api/utils/Counter.java
  19. 19 0
      src/main/java/com/its/api/utils/ItsUtils.java
  20. 145 0
      src/main/java/com/its/api/websocket/SessionConsumerThread.java
  21. 30 0
      src/main/java/com/its/api/websocket/WebSocketConfig.java
  22. 154 0
      src/main/java/com/its/api/websocket/WebSocketSessionClient.java
  23. 135 0
      src/main/java/com/its/api/websocket/WebsocketHandler.java
  24. 71 0
      src/main/java/com/its/api/websocket/WebsocketSessionManager.java
  25. 75 0
      src/main/java/com/its/api/xnetudp/CenterCommUdpServer.java
  26. 78 0
      src/main/java/com/its/api/xnetudp/codec/CenterCommClientEncoder.java
  27. 54 0
      src/main/java/com/its/api/xnetudp/codec/CenterCommServerDecoder.java
  28. 41 0
      src/main/java/com/its/api/xnetudp/handler/CenterCommServerPacketHandler.java
  29. 175 0
      src/main/java/com/its/api/xnetudp/protocol/CENTER_COMM_DEFINE.java
  30. 137 0
      src/main/java/com/its/api/xnetudp/protocol/CENTER_COMM_MESSAGE.java
  31. 59 0
      src/main/java/com/its/api/xnetudp/protocol/CENTER_PG_STATE_RES.java
  32. 29 0
      src/main/java/com/its/api/xnetudp/service/CenterCommResponseService.java
  33. 54 0
      src/main/java/com/its/api/xnetudp/thread/CenterCommClientSender.java
  34. 141 0
      src/main/java/com/its/api/xnetudp/thread/CenterCommServerReceiver.java
  35. 3 0
      src/main/resources/application-dev.yml
  36. 2 0
      src/main/resources/application.yml

+ 4 - 0
pom.xml

@@ -223,6 +223,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-tomcat</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>io.netty</groupId>

+ 4 - 0
src/main/java/com/its/api/ItsOpServerApplication.java

@@ -2,6 +2,7 @@ package com.its.api;
 
 import com.its.api.config.AppUtils;
 import com.its.api.config.ProcessConfig;
+import com.its.api.xnetudp.CenterCommUdpServer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
@@ -60,6 +61,9 @@ public class ItsOpServerApplication implements CommandLineRunner, ApplicationLis
         log.info("** process: {}", processConfig.getId());
         log.info("** startup: {}", processConfig.getBootingDateTime());
         log.info("************************************************************************************");
+
+        CenterCommUdpServer centerCommUdpServer = (CenterCommUdpServer)AppUtils.getBean(CenterCommUdpServer.class);
+        centerCommUdpServer.run();
     }
 
     @Override

+ 8 - 1
src/main/java/com/its/api/config/ThreadPoolTaskExecutorConfig.java

@@ -23,7 +23,7 @@ public class ThreadPoolTaskExecutorConfig extends AsyncConfigurerSupport {
     private void init() {
 
         this.poolCore = Runtime.getRuntime().availableProcessors();
-        if (this.poolCore < 8) {
+        if (this.poolCore <= 8) {
             this.poolCore = 16;
         }
 
@@ -49,6 +49,13 @@ public class ThreadPoolTaskExecutorConfig extends AsyncConfigurerSupport {
         threadPoolTaskExecutor.initialize();
         return threadPoolTaskExecutor;
     }
+    @Bean(name="centerCommExecutor")
+    public Executor getCenterCommExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = this.getDefaultExecutor((int)(this.poolCore/2));
+        threadPoolTaskExecutor.setThreadNamePrefix("udp-comm-pool-");
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
+    }
 
     @Bean(name="dbmsJobExecutor")
     public Executor getDbmsJobExecutor() {

+ 12 - 0
src/main/java/com/its/api/its/model/dto/cctv/TbCctvScnrDto.java

@@ -45,6 +45,18 @@ public class TbCctvScnrDto implements Serializable {
 
     // Code Description Field
 
+    public TbCctvScnr toEntity() {
+        TbCctvScnr entity = TbCctvScnr.builder()
+                .cctvMngmNmbr(this.cctvMngmNmbr)
+                .strtHms(this.strtHms)
+                .endHms(this.endHms)
+                .psetNmbr(this.psetNmbr)
+                .build();
+
+        entity.initEndHms();
+        return entity;
+    }
+
     @ApiModel("TbCctvScnrUpdReq(CCTV 시나리오 정보변경)")
     @Getter
     @Setter

+ 4 - 0
src/main/java/com/its/api/its/model/dto/vds/TbVdsDtct15mStatDto.java

@@ -21,6 +21,10 @@ public class TbVdsDtct15mStatDto implements Serializable {
     @JsonProperty("dtct_nmbr")
     private String dtctNmbr;
 
+    @ApiModelProperty("설치 차로, Nullable = Y, NUMBER(2)")  // Y NUMBER(2)
+    @JsonProperty("istl_lane")
+    private Integer istlLane;
+
     @ApiModelProperty("통계 일시")  // N VARCHAR(14)
     @JsonProperty("stat_dt")
     private String statDt;

+ 4 - 0
src/main/java/com/its/api/its/model/dto/vds/TbVdsDtctDdStatDto.java

@@ -21,6 +21,10 @@ public class TbVdsDtctDdStatDto implements Serializable {
     @JsonProperty("dtct_nmbr")
     private String dtctNmbr;
 
+    @ApiModelProperty("설치 차로, Nullable = Y, NUMBER(2)")  // Y NUMBER(2)
+    @JsonProperty("istl_lane")
+    private Integer istlLane;
+
     @ApiModelProperty("통계 일시")  // N VARCHAR(14)
     @JsonProperty("stat_dt")
     private String statDt;

+ 4 - 0
src/main/java/com/its/api/its/model/dto/vds/TbVdsDtctHhStatDto.java

@@ -21,6 +21,10 @@ public class TbVdsDtctHhStatDto implements Serializable {
     @JsonProperty("dtct_nmbr")
     private String dtctNmbr;
 
+    @ApiModelProperty("설치 차로, Nullable = Y, NUMBER(2)")  // Y NUMBER(2)
+    @JsonProperty("istl_lane")
+    private Integer istlLane;
+
     @ApiModelProperty("통계 일시")  // N VARCHAR(14)
     @JsonProperty("stat_dt")
     private String statDt;

+ 4 - 0
src/main/java/com/its/api/its/model/dto/vds/TbVdsDtctMnStatDto.java

@@ -21,6 +21,10 @@ public class TbVdsDtctMnStatDto implements Serializable {
     @JsonProperty("dtct_nmbr")
     private String dtctNmbr;
 
+    @ApiModelProperty("설치 차로, Nullable = Y, NUMBER(2)")  // Y NUMBER(2)
+    @JsonProperty("istl_lane")
+    private Integer istlLane;
+
     @ApiModelProperty("통계 일시")  // N VARCHAR(14)
     @JsonProperty("stat_dt")
     private String statDt;

+ 15 - 0
src/main/java/com/its/api/its/model/entity/cctv/TbCctvScnr.java

@@ -4,6 +4,8 @@ import com.its.api.its.model.dto.cctv.TbCctvScnrDto;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.*;
+import org.hibernate.annotations.NotFound;
+import org.hibernate.annotations.NotFoundAction;
 
 import javax.persistence.*;
 import java.io.Serializable;
@@ -49,6 +51,19 @@ public class TbCctvScnr implements Serializable {
     @Column(name = "CTRL_RES_YN", columnDefinition = "CHAR", length = 1)
     private String ctrlResYn;
 
+    @OneToOne
+    @JoinColumn(insertable=false, updatable=false, name="CCTV_MNGM_NMBR", referencedColumnName = "CCTV_MNGM_NMBR")
+    @NotFound(action = NotFoundAction.IGNORE)
+    private TbCctvCtlr cctv = new TbCctvCtlr();
+
+    @OneToOne
+    @JoinColumns( {
+        @JoinColumn(updatable=false, insertable=false, name="CCTV_MNGM_NMBR", referencedColumnName="CCTV_MNGM_NMBR"),
+        @JoinColumn(updatable=false, insertable=false, name="PSET_NMBR", referencedColumnName="PSET_NMBR")
+    })
+    @NotFound(action = NotFoundAction.IGNORE)
+    private TbCctvPset pset = new TbCctvPset();
+
     public void initEndHms() {
         if (this.endHms == null || this.endHms.length() != 6) {
             this.endHms = "000000";

+ 1 - 0
src/main/java/com/its/api/its/model/entity/vds/TbVdsDtct15mStat.java

@@ -77,6 +77,7 @@ public class TbVdsDtct15mStat implements Serializable {
                 .build();
 
         if (this.dtct != null) {
+            dto.setIstlLane(this.dtct.getIstlLane());
             dto.setVdsDtctNm(this.dtct.getVdsDtctNm());
         }
 

+ 1 - 0
src/main/java/com/its/api/its/model/entity/vds/TbVdsDtctDdStat.java

@@ -77,6 +77,7 @@ public class TbVdsDtctDdStat implements Serializable {
                 .build();
 
         if (this.dtct != null) {
+            dto.setIstlLane(this.dtct.getIstlLane());
             dto.setVdsDtctNm(this.dtct.getVdsDtctNm());
         }
 

+ 1 - 0
src/main/java/com/its/api/its/model/entity/vds/TbVdsDtctHhStat.java

@@ -77,6 +77,7 @@ public class TbVdsDtctHhStat implements Serializable {
                 .build();
 
         if (this.dtct != null) {
+            dto.setIstlLane(this.dtct.getIstlLane());
             dto.setVdsDtctNm(this.dtct.getVdsDtctNm());
         }
 

+ 1 - 0
src/main/java/com/its/api/its/model/entity/vds/TbVdsDtctMnStat.java

@@ -77,6 +77,7 @@ public class TbVdsDtctMnStat implements Serializable {
                 .build();
 
         if (this.dtct != null) {
+            dto.setIstlLane(this.dtct.getIstlLane());
             dto.setVdsDtctNm(this.dtct.getVdsDtctNm());
         }
 

+ 17 - 0
src/main/java/com/its/api/its/repository/cctv/TbCctvScnrRepository.java

@@ -4,9 +4,26 @@ import com.its.api.its.model.entity.cctv.TbCctvScnr;
 import com.its.api.its.model.entity.cctv.TbCctvScnrKey;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
 import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
 
 @Repository
 public interface TbCctvScnrRepository extends JpaRepository<TbCctvScnr, TbCctvScnrKey>, JpaSpecificationExecutor<TbCctvScnr> {
 
+    @Query("select p from TbCctvScnr p inner join fetch p.cctv cctv inner join fetch p.pset pset left outer join fetch cctv.state state where cctv.delYn = 'N'")
+    List<TbCctvScnr> findAllList();
+
+    @Transactional
+    @Modifying
+    @Query(value="update tb_cctv_scnr " +
+                 "   set ctrl_dt     = :ctrlDt, " +
+                 "       ctrl_res_yn = :ctrlResYn " +
+                 " where cctv_mngm_nmbr = :nmbr " +
+                 "   and strt_hms       = :strtHms " +
+                 "   and end_hms        = :endHms", nativeQuery = true)
+    Integer updateResult(Long nmbr, String strtHms, String endHms, String ctrlDt, String ctrlResYn);
 }

+ 11 - 0
src/main/java/com/its/api/scheduler/ItsApiScheduler.java

@@ -1,5 +1,7 @@
 package com.its.api.scheduler;
 
+import com.its.api.scheduler.job.CctvPsetScnrThread;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.EnableScheduling;
@@ -9,10 +11,13 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PreDestroy;
 
 @Slf4j
+@AllArgsConstructor
 @EnableScheduling
 @Component
 public class ItsApiScheduler {
 
+    private final CctvPsetScnrThread cctvPsetScnrThread;
+
     @PreDestroy
     public void onShutDown() {
     }
@@ -24,6 +29,12 @@ public class ItsApiScheduler {
     @Scheduled(cron = "0 * * * * *")  // 1분 주기 작업 실행
     public void jobCctvPsetScnr() {
         log.info("START jobCctvPsetScnr");
+        this.cctvPsetScnrThread.run();
+    }
+
+    @Async
+    @Scheduled(cron = "0/2 * * * * *")  // 2초 주기 작업 실행
+    public void checkKafkaServerAlive() {
     }
 
 }

+ 78 - 0
src/main/java/com/its/api/scheduler/job/CctvPsetScnrThread.java

@@ -0,0 +1,78 @@
+package com.its.api.scheduler.job;
+
+import com.its.api.its.model.dto.cctv.TbCctvScnrDto;
+import com.its.api.its.model.entity.cctv.TbCctvScnr;
+import com.its.api.its.repository.cctv.TbCctvScnrRepository;
+import com.its.api.utils.ItsUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StopWatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@AllArgsConstructor
+@Service
+public class CctvPsetScnrThread {
+
+    private final TbCctvScnrRepository cctvScnrRepo;
+
+    @Async("cctvPsetScnrExecutor")
+    public void run() {
+
+        log.info("START: CctvPsetScnrThread.run: {}", Thread.currentThread().getName());
+
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+
+        List<TbCctvScnrDto> cctvScnrList = new ArrayList<>();
+        List<TbCctvScnr> cctvScnr = this.cctvScnrRepo.findAllList();
+        cctvScnr.forEach(obj -> {
+            cctvScnrList.add(obj.toDto());
+        });
+
+        String currDay = ItsUtils.getSysTime("yyyyMMdd");
+        String currCtrlDt = currDay+ItsUtils.getSysTime("HHmm");
+
+        cctvScnrList.forEach(obj -> {
+
+            String controlDt = currDay + obj.getStrtHms().substring(0, 4);
+            int scheduleMin = ItsUtils.getDiffMinutes(controlDt+"00", currCtrlDt+"00");
+            if (scheduleMin < 0 || scheduleMin > 2) {
+                return;
+            }
+
+            // 제어시각 = 제어시작시각+0,1,2(분)
+            if (obj.getCtrlDt() != null && currCtrlDt.equals(obj.getCtrlDt().substring(0, 12))) {
+                if (("Y").equals(obj.getCtrlResYn())) {
+                    // 이미 제어를 수행하였고 결과가 성공이었다면 다시 제어할 필요 없음
+                    return;
+                }
+            }
+
+            // 제어를 시작한다.
+log.error("Control: {}, {}, {}, {}", obj.getCctvMngmNmbr(), obj.getStrtHms(), obj.getPsetNmbr(), currCtrlDt);
+
+            TbCctvScnr scnr = TbCctvScnr.builder()
+                    .cctvMngmNmbr(obj.getCctvMngmNmbr())
+                    .strtHms(obj.getStrtHms())
+                    .endHms(obj.getEndHms())
+                    .psetNmbr(obj.getPsetNmbr())
+                    .ctrlDt(controlDt+"00")
+                    .ctrlResYn("N")
+                    .build();
+            //this.cctvScnrRepo.(scnr);
+            String ctrlResYn = "N";
+            if (obj.getCctvMngmNmbr() % 2 == 0) {
+                ctrlResYn = "Y";
+            }
+            this.cctvScnrRepo.updateResult(obj.getCctvMngmNmbr(), obj.getStrtHms(), obj.getEndHms(), controlDt+"00", ctrlResYn);
+        });
+
+        stopWatch.stop();
+        log.info("--END: CctvPsetScnrThread.run: {}, {} ms.", Thread.currentThread().getName(), stopWatch.getTotalTimeMillis());
+    }
+}

+ 23 - 0
src/main/java/com/its/api/utils/Converter.java

@@ -0,0 +1,23 @@
+package com.its.api.utils;
+
+public class Converter {
+    private Converter() {
+    }
+
+    public static String getSize(double bytes) {
+        long kilo = 1024L;
+        double kb = bytes / 1024.0D;
+        double mb = kb / 1024.0D;
+        double gb = mb / 1024.0D;
+        double tb = gb / 1024.0D;
+        if (tb > 1.0D) {
+            return String.format("%.2f TB", tb);
+        } else if (gb > 1.0D) {
+            return String.format("%.2f GB", gb);
+        } else if (mb > 1.0D) {
+            return String.format("%.2f MB", mb);
+        } else {
+            return kb > 1.0D ? String.format("%.2f KB", kb) : String.format("%.2f Bytes", bytes);
+        }
+    }
+}

+ 39 - 0
src/main/java/com/its/api/utils/Counter.java

@@ -0,0 +1,39 @@
+package com.its.api.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Counter {
+    private AtomicLong counter = new AtomicLong(0L);
+
+    public Counter() {
+    }
+
+    public long reset() {
+        return this.counter.getAndSet(0L);
+    }
+
+    public long reset(long value) {
+        return this.counter.getAndSet(0L);
+    }
+
+    public long increment() {
+        return this.counter.incrementAndGet();
+    }
+
+    public long add(long value) {
+        return this.counter.addAndGet(value);
+    }
+
+    public long decrement() {
+        return this.counter.decrementAndGet();
+    }
+
+    public long get() {
+        return this.counter.get();
+    }
+
+    public String toString() {
+        return Converter.getSize(this.counter.doubleValue());
+    }
+
+}

+ 19 - 0
src/main/java/com/its/api/utils/ItsUtils.java

@@ -139,6 +139,18 @@ public final class ItsUtils
 		return to;
 	}
 
+	public static int getDiffMinutes(String fromDt, String toDt) {
+		Date dtFrom = stringToDate(fromDt);
+		Date dtTo = stringToDate(toDt);
+		GregorianCalendar gcFromDt= new GregorianCalendar();
+		GregorianCalendar gcToDt = new GregorianCalendar();
+		gcFromDt.setTime(dtFrom);
+		gcToDt.setTime(dtTo);
+		long gap = gcToDt.getTimeInMillis() - gcFromDt.getTimeInMillis();
+		long min = gap / 1000L / 60L;
+		return (int)min;
+	}
+
 	// 0--6: Sunday = 0, C/C++ struct tm, tm_wday
 	// 1--7: oracle, SELECT TO_CHAR(SYSDATE, 'D') FROM DUAL;, 주중의 일을 1~7로 표시(일요일이 1)
 	//     : java, Calendar cal; cal.get(Calendar.DAY_OF_WEEK, sunday=1, monday=2, ...)
@@ -184,6 +196,13 @@ public final class ItsUtils
 		Date dtNow = new Date();
 		return sdfDate.format(dtNow);
 	}
+
+	public static String getSysTime(String format)
+	{
+		SimpleDateFormat sdfDate = new SimpleDateFormat(format);
+		Date dtNow = new Date();
+		return sdfDate.format(dtNow);
+	}
 	public static String getSysMonth()
 	{
 		SimpleDateFormat sdfDate = new SimpleDateFormat("yyyyMM");

+ 145 - 0
src/main/java/com/its/api/websocket/SessionConsumerThread.java

@@ -0,0 +1,145 @@
+package com.its.api.websocket;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+@Getter
+@Setter
+public class SessionConsumerThread implements Runnable {
+
+    private AtomicBoolean stopFlag = new AtomicBoolean(false);
+    private AtomicBoolean updateTopics = new AtomicBoolean(false);
+
+    private final WebsocketHandler websocketHandler;
+    private final WebSocketSession session;
+    private String groupId;
+    private List<String> topics;
+    //private KafkaConsumer<String, byte[]> consumer;
+    private ObjectMapper mapper;
+
+    public SessionConsumerThread(WebsocketHandler websocketHandler, WebSocketSession session) {
+        this.websocketHandler = websocketHandler;
+        this.session = session;
+
+        this.topics = new CopyOnWriteArrayList();
+        this.mapper = new ObjectMapper();
+
+        this.groupId = this.session.getId();
+    }
+
+    /*public List<String> formatPartitions(Collection<TopicPartition> partitions) {
+        return partitions.stream().map(topicPartition ->
+                        String.format("\ntopic: %s, partition: %s", topicPartition.topic(), topicPartition.partition()))
+                .collect(Collectors.toList());
+    }*/
+
+    @Override
+    public void run() {
+
+        log.info("SessionConsumerThread start: {}", this.groupId);
+
+        try {
+            boolean unsubscribe = false;
+            while (!this.stopFlag.get() && (!Thread.currentThread().isInterrupted())) {
+/*
+                if (this.topics.size() == 0) {
+                    try {
+                        if (unsubscribe) {
+                            this.consumer.unsubscribe();
+                            unsubscribe = false;
+                        }
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    continue;
+                }
+
+                if (this.consumer == null) {
+                    TsiKafkaConsumerConfig config = (TsiKafkaConsumerConfig) AppUtils.getBean(TsiKafkaConsumerConfig.class);
+                    this.consumer = new KafkaConsumer<>(config.getConsumerProperties(this.groupId));
+                    log.info("Kafka Consumer Create: {}", this.consumer);
+                }
+
+                if (this.updateTopics.get()) {
+                    this.updateTopics.set(false);
+
+                    log.info("SessionConsumerThread Subscribe Start: {}, {}", this.groupId, this.topics);
+                    *//*this.consumer.unsubscribe();
+
+                    this.consumer.subscribe(this.topics);
+                    Set<TopicPartition> assignment = new HashSet<>();
+                    while (assignment.size() == 0) {
+                        consumer.poll(Duration.ofMillis(100));
+                        assignment = consumer.assignment();
+                    }
+                    log.info("SessionConsumerThread Subscribe Partitions: {} EA", assignment.size());
+                    consumer.seekToEnd(assignment);
+*//*
+                    this.consumer.subscribe(this.topics,
+                            new ConsumerRebalanceListener() {
+                                @Override
+                                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                                    log.info("onPartitionsRevoked - consumerName: {}, partitions: {}", topics.toString(), formatPartitions(partitions));
+                                }
+
+                                @Override
+                                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                                    log.info("onPartitionsAssigned - consumerName: {}, partitions: {}", topics.toString(), formatPartitions(partitions));
+                                    consumer.seekToEnd(partitions);
+                                }
+                            });
+                    log.info("SessionConsumerThread Subscribe ..End: {}, {}", this.groupId, this.topics);
+                    unsubscribe = true;
+                }
+
+                ConsumerRecords<String, byte[]> records = this.consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<String, byte[]> record : records) {
+                    try {
+                        CpuNodeStatusDTO status = TsiCpuNodePacket.getNodeStatusDTO(Long.parseLong(record.key()), record.value());
+                        if (status != null) {
+                            try {
+                                String jsonInString = this.mapper.writeValueAsString(status);
+                                 try {
+                                    this.websocketHandler.sendMessage(this.session, record.key(), new TextMessage(jsonInString));
+                                    log.info("Send to: {}, {}, {} bytes.", this.session.getRemoteAddress().getAddress(), record.key(), jsonInString.length());
+                                }
+                                catch(Exception e) {
+                                    log.error("Send Failed: {}, {}, {} bytes.", this.session, record.key(), jsonInString.length());
+                                }
+                            }
+                            catch(JsonProcessingException e) {
+                                log.error("ConsumerThread Json parsing Exception: {}, {}", this.topics.toString(), e.getMessage());
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.error("SessionConsumerThread Exception: {}, {}, {}", this.topics.toString(), this.session.toString(), e.getMessage());
+                    }
+                }*/
+            }
+
+            log.info("SessionConsumerThread: {}, {}, {}, stopped.", this.groupId, this.topics.toString(), this.session.toString());
+        }
+        catch(Exception e) {
+            log.error("SessionConsumerThread Wakeup Exception: {}, {}", this.groupId, e.getMessage());
+        }
+        finally {
+        }
+    }
+
+    public void stop() {
+        this.stopFlag.set(true);
+    }
+
+    public void shutdown() {
+        //this.consumer.wakeup();
+    }
+}

+ 30 - 0
src/main/java/com/its/api/websocket/WebSocketConfig.java

@@ -0,0 +1,30 @@
+package com.its.api.websocket;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig implements WebSocketConfigurer {
+
+    private final WebsocketHandler websocketHandler;
+
+    public WebSocketConfig(WebsocketHandler websocketHandler) {
+        this.websocketHandler = websocketHandler;
+    }
+
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
+        webSocketHandlerRegistry.addHandler(this.websocketHandler, "/op/socket")
+                .setAllowedOrigins("*")
+                .withSockJS();  // sockjs
+        webSocketHandlerRegistry.addHandler(this.websocketHandler, "/op/socket")
+                .setAllowedOrigins("*"); // 그냥 websocket 지원
+
+        /*webSocketHandlerRegistry.addHandler(echoHandler, "/echo").setAllowedOrigins("*").withSockJS()
+                .setInterceptors(new HttpSessionHandshakeInterceptor())
+                .setClientLibraryUrl("http://localhost:8080/resources/sockjs.min.js");*/
+    }
+}

+ 154 - 0
src/main/java/com/its/api/websocket/WebSocketSessionClient.java

@@ -0,0 +1,154 @@
+package com.its.api.websocket;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Data
+public class WebSocketSessionClient {
+
+    private WebsocketHandler websocketHandler;
+    private WebSocketSession session;
+    private String groupId;
+    private List<String> topics = new ArrayList<>();
+    private List<String> threadTopics = new ArrayList<>();
+
+    private SessionConsumerThread consumerThread = null;
+    private Thread thread;
+
+    public WebSocketSessionClient(WebsocketHandler websocketHandler, WebSocketSession session) {
+        this.websocketHandler = websocketHandler;
+        this.session = session;
+    }
+
+    public void delSubscription(List<String> topics, boolean isExit) {
+        if (topics == null) {
+            return;
+        }
+/*
+        this.threadTopics = new ArrayList<>();
+        for (String topic: topics) {
+            boolean remain = true;
+            TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic));
+            if (kafkaVo != null) {
+                if (kafkaVo.getNodeConsumerThread() != null) {
+                    kafkaVo.getNodeConsumerThread().removeSession(this.session);
+                    remain = false;
+                    log.info("delSubscription: {}, consumer: {}, session: {}", topic, kafkaVo.getNodeConsumerThread(), session.toString());
+                }
+            }
+            if (remain) {
+                this.threadTopics.add(topic);
+            }
+        }*/
+
+        if (this.consumerThread != null && this.threadTopics.size() > 0) {
+            if (isExit) {
+                this.threadTopics = new ArrayList<>();
+            }
+            this.consumerThread.setTopics(this.threadTopics);
+            this.consumerThread.getUpdateTopics().set(true);
+        }
+    }
+
+    public void addSubscription(List<String> topics) {
+        if (topics == null) {
+            return;
+        }
+
+        long currMilliseconds = System.currentTimeMillis();
+        this.threadTopics = new ArrayList<>();
+        /*for (String topic: topics) {
+            boolean remain = true;
+            TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic));
+            if (kafkaVo != null) {
+                // 등록된 교차로인 경우 일단 요청해 놓는다.
+                if (kafkaVo.getNodeConsumerThread() != null) {
+                    kafkaVo.getNodeConsumerThread().addSession(this.session);
+
+                    log.info("addSubscription: {}, recvTm: {}, consumer: {}, session: {}", topic, currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime(), kafkaVo.getNodeConsumerThread(), session.toString());
+
+                    // 통신이 OffLine 인 경우 offline 메시지 전송
+                    if (currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime() > 5000) {
+                        try {
+                            this.websocketHandler.sendMessage(this.session, topic, new TextMessage(topic + ":offline"));
+                        } catch (Exception e) {
+                            //
+                        }
+                    }
+                }
+            }
+            if (remain) {
+                this.threadTopics.add(topic);
+            }
+        }*/
+
+        if (this.threadTopics.size() > 0) {
+            if (this.consumerThread == null) {
+                this.consumerThread = new SessionConsumerThread(this.websocketHandler, this.session);
+                this.thread = new Thread(this.consumerThread);
+                this.thread.setUncaughtExceptionHandler(
+                        new Thread.UncaughtExceptionHandler() {
+                            @Override
+                            public void uncaughtException(Thread th, Throwable ex) {
+                                log.error("Uncaught exception: {}, {}", groupId, ex.getMessage());
+                            }
+                        }
+                );
+                this.thread.start();
+            }
+            this.consumerThread.setTopics(this.threadTopics);
+            this.consumerThread.getUpdateTopics().set(true);
+        }
+    }
+
+    public void subscribe(List<String> topics) {
+        if (topics.size() == 1 && topics.get(0).equals("x")) {
+            // 세션 종료 메시지 안 경우임. 모든 요청 토픽 삭제
+            delSubscription(this.topics, true);
+            this.topics = new ArrayList<>();
+        }
+        else {
+            List<String> delTopics = new ArrayList<>();
+            List<String> newTopics = new ArrayList<>();
+
+            // 지금 요청중인 토픽 중에서 이번 요청에 포함되지 않은 토픽은 삭제한다.
+            for (String topic: this.topics) {
+                if (!topics.contains(topic)) {
+                    delTopics.add(topic);
+                }
+            }
+            if (delTopics.size() > 0) {
+                delSubscription(delTopics, false);
+            }
+
+            // 이번 요청 토픽중에 지금 요청중이 아닌 경우에만 새롭게 요청한다.
+            for (String topic: topics) {
+                if (!this.topics.contains(topic)) {
+                    newTopics.add(topic);
+                }
+            }
+            if (newTopics.size() > 0) {
+                addSubscription(newTopics);
+            }
+
+            this.topics = topics;
+        }
+    }
+
+    public void start(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public void stop() {
+        delSubscription(this.topics, true);
+
+        if (this.consumerThread != null) {
+            this.consumerThread.stop();
+        }
+    }
+}

+ 135 - 0
src/main/java/com/its/api/websocket/WebsocketHandler.java

@@ -0,0 +1,135 @@
+package com.its.api.websocket;
+
+import com.its.api.utils.ItsUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Controller
+@RequestMapping("/op/socket")
+public class WebsocketHandler extends TextWebSocketHandler {
+
+    public WebsocketHandler() {
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            for (Map.Entry<WebSocketSession, WebSocketSessionClient> obj : WebsocketSessionManager.getInstance().getMap().entrySet()) {
+                if (obj.getValue() != null) {
+                    if (obj.getValue().getConsumerThread() != null) {
+                        obj.getValue().getConsumerThread().shutdown();
+                    }
+                }
+            }
+        }));
+
+        log.info("WebsocketHandler() START");
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        // 클라이언트가 연결되었을때 실행
+        log.info("afterConnectionEstablished: " + session.getRemoteAddress() + ",  Uri: " + session.getUri() + ", UUID: " + session.getId());
+        super.afterConnectionEstablished(session);
+
+        WebSocketSessionClient vo = new WebSocketSessionClient(this, session);
+        WebsocketSessionManager.getInstance().addSession(session, vo);
+    }
+
+    //클라이언트가 웹소켓 서버로 메시지를 전송했을 때 실행
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        log.info("handleTextMessage: " + session.getRemoteAddress() + ",  Uri[" + session.getUri() + "], UUID[" + session.getId() + "], message[" + message.getPayload() + "]");
+        List<String> requests = ItsUtils.split(message.getPayload(), ":");
+        if (requests.size() != 2) {
+            log.error("Request data parsing error: {}", requests.toString());
+            session.close();
+            return;
+        }
+
+        WebSocketSessionClient sessionClient = WebsocketSessionManager.getInstance().getSessionVo(session);
+        if (sessionClient == null) {
+            log.error("Request session not found: {}", session.toString());
+            session.close();
+            return;
+        }
+        log.info("Payload: {}", message.getPayload());
+
+        String command = requests.get(0).trim();
+        String data = requests.get(1).trim();
+        if (command.equals("group")) {
+            String groupId = data;
+            sessionClient.start(groupId);
+            log.info("Request group: {}", groupId);
+        }
+        else if (command.equals("topics")) {
+
+            String topics = data.replaceAll("[ ]", "");
+            topics = topics.replaceAll(" ", "");
+            List<String> oldTopics = sessionClient.getTopics();
+            List<String> newTopics = ItsUtils.split(topics, ",");
+
+            log.info("Request topic: old: {}, new: {}", oldTopics, newTopics);
+
+            // 현재 요청 토픽목록을 새롭게 할당한다.
+            sessionClient.subscribe(newTopics);
+            log.info("Subscribe topic ok: {}", sessionClient.toString());
+        }
+    }
+
+    //클라이언트 연결을 끊었을 때 실행
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        log.info("afterConnectionClosed: " + session.getRemoteAddress() + ",  Uri: " + session.getUri() + ", UUID: " + session.getId());
+        super.afterConnectionClosed(session, status);
+
+        removeSession(session, status);
+    }
+
+    public void removeSession(WebSocketSession session, CloseStatus status) {
+        WebSocketSessionClient sessionClient = WebsocketSessionManager.getInstance().getSessionVo(session);
+        if (sessionClient != null) {
+            sessionClient.stop();
+        }
+        WebsocketSessionManager.getInstance().removeSession(session);
+
+    }
+
+    // 세션에 해당 하는 토픽스레드 삭제
+    /*public void removeConsumerThread(WebSocketSession session, ConsumerThread consumerThread) {
+        try {
+            if (consumerThread == null) {
+                return;
+            }
+            int remainSessions = consumerThread.removeSession(session);
+            if (remainSessions == 0) {
+                String topicName = consumerThread.getTopicName();
+                consumerThread.stop();
+                this.consumerMap.remove(topicName);
+                log.info("topic: {}, consumer terminated.", topicName);
+            }
+        }
+        catch(Exception e) {
+            log.error("removeConsumerThread: Exception: {}", e.getMessage());
+        }
+    }*/
+
+    public void sendMessage(WebSocketSession session, String nodeId, TextMessage message) {
+
+        if (session != null && session.isOpen()) {
+            synchronized (session) {
+                try {
+                    session.sendMessage(message);
+                } catch (Exception e) {
+                    log.error("sendMessage: nodeId: {}, session: {}, {}", nodeId, session, e.getMessage());
+                }
+            }
+        }
+    }
+}

+ 71 - 0
src/main/java/com/its/api/websocket/WebsocketSessionManager.java

@@ -0,0 +1,71 @@
+package com.its.api.websocket;
+
+import com.its.api.utils.Counter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class WebsocketSessionManager {
+
+    private static WebsocketSessionManager _instance = null;
+
+    public static WebsocketSessionManager getInstance() {
+        if (_instance == null) {
+            synchronized (WebsocketSessionManager.class) {
+                if (_instance == null)
+                    _instance = new WebsocketSessionManager();
+            }
+        }
+        return _instance;
+    }
+
+    private WebsocketSessionManager() {
+        this.sessions = new Counter();
+        this.sessionMap = new ConcurrentHashMap<>();
+    }
+
+    private volatile boolean serverRun;
+    private Counter sessions;
+    private ConcurrentHashMap<WebSocketSession, WebSocketSessionClient> sessionMap;
+
+    public ConcurrentHashMap<WebSocketSession, WebSocketSessionClient> getMap() {
+        return this.sessionMap;
+    }
+
+    public int add() {
+        return (int) this.sessions.increment();
+    }
+    public int remove() {
+        return (int) this.sessions.decrement();
+    }
+    public int get() {
+        return (int) this.sessions.get();
+    }
+
+    public void addSession(WebSocketSession session, WebSocketSessionClient vo) {
+        this.sessionMap.put(session, vo);
+    }
+    public void removeSession(WebSocketSession session) {
+        this.sessionMap.remove(session);
+    }
+
+    public WebSocketSessionClient getSessionVo(WebSocketSession session) {
+        return this.sessionMap.get(session);
+    }
+
+    public void start() {
+        this.serverRun = true;
+    }
+    public void stop() {
+        this.serverRun = false;
+    }
+    public boolean isServerRun() {
+        return this.serverRun;
+    }
+
+    public void reportSession() {
+        log.info("Sessions: {}", this.sessionMap.toString());
+    }
+}

+ 75 - 0
src/main/java/com/its/api/xnetudp/CenterCommUdpServer.java

@@ -0,0 +1,75 @@
+package com.its.api.xnetudp;
+
+import com.its.api.config.CenterCommConfig;
+import com.its.api.xnetudp.codec.CenterCommServerDecoder;
+import com.its.api.xnetudp.handler.CenterCommServerPacketHandler;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@ToString
+@RequiredArgsConstructor
+@Component
+public class CenterCommUdpServer {
+
+    private final CenterCommConfig serverConfig;
+
+    private EventLoopGroup nioEventLoopGroup;
+    private Bootstrap bootstrap;
+
+    public void run() {
+        log.info("UdpServerCenterComm.init: Start.");
+
+        try {
+            this.nioEventLoopGroup = new NioEventLoopGroup();
+            this.bootstrap = new Bootstrap();
+            this.bootstrap.channel(NioDatagramChannel.class);
+            this.bootstrap.group(this.nioEventLoopGroup);
+            this.bootstrap.option(ChannelOption.SO_BROADCAST, Boolean.valueOf(true));
+            this.bootstrap.option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(true));
+            this.bootstrap.handler(new ChannelInitializer<Channel>() {
+                @Override
+                protected void initChannel(Channel channel) throws Exception
+                {
+                    channel.pipeline().addLast("centerCommServerDecoder", new CenterCommServerDecoder());
+                    channel.pipeline().addLast("centerCommServerPacketHandler", new CenterCommServerPacketHandler());
+                }
+            });
+        }
+        catch (Exception e) {
+            this.log.error("UdpServerCenterComm.init: Exception: {}", e.toString());
+        }
+
+        log.info("===============================================================");
+        log.info("=  UDP Center Communication Server Start.......................");
+        log.info("=  bindAddress: {}", serverConfig.getBindingAddr());
+        log.info("=     bindPort: {}", serverConfig.getBindingPort());
+        log.info("===============================================================");
+
+        try {
+            if (this.serverConfig.getBindingAddr().equals("0.0.0.0")) {
+                this.bootstrap.bind(this.serverConfig.getBindingPort());
+            }
+            else {
+                this.bootstrap.bind(this.serverConfig.getBindingAddr(), serverConfig.getBindingPort());
+            }
+        }
+        catch (Exception e) {
+            this.log.error("UdpServerCenterComm.init: Bind Exception: {}", e.toString());
+            this.nioEventLoopGroup.shutdownGracefully();
+        }
+        finally {
+        }
+        log.info("UdpServerCenterComm.init: ..End. {}", toString());
+    }
+
+}

+ 78 - 0
src/main/java/com/its/api/xnetudp/codec/CenterCommClientEncoder.java

@@ -0,0 +1,78 @@
+package com.its.api.xnetudp.codec;
+
+import com.its.api.config.AppUtils;
+import com.its.api.config.CenterCommConfig;
+import com.its.api.utils.SysUtils;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+@Slf4j
+public class CenterCommClientEncoder extends MessageToMessageEncoder<Object> {
+
+	private String remoteIpAddr;
+	private int remotePort;
+
+	public CenterCommClientEncoder(String hostIp, int hostPort) {
+		this.remoteIpAddr = hostIp;
+		this.remotePort = hostPort;
+	}
+
+	@Override
+	protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+
+		CenterCommConfig serverConfig = (CenterCommConfig) AppUtils.getBean(CenterCommConfig.class);
+
+		Channel channel = ctx.channel();
+		if (!channel.isOpen()) {
+			log.error("SEND UDP: Channel Closed.");
+		}
+		if (!channel.isActive()) {
+			log.error("SEND UDP: Channel InActive.");
+			channel.flush();
+			return;
+		}
+
+		if (!(msg instanceof ByteBuffer)) {
+			log.error("SEND UDP: Encoding Data source Unknown Type.");
+			return;
+		}
+
+		int sendBytes = ((ByteBuffer) msg).array().length;
+		if (this.remoteIpAddr == null) {
+			log.info("SEND UDP: [ALL], {} Bytes.", sendBytes);
+		}
+		else {
+			log.info("SEND UDP: [{}:{}], {} Bytes.", this.remoteIpAddr, this.remotePort, sendBytes);
+		}
+
+		log.info("SEND UDP, DUMP: {}", SysUtils.byteArrayToHex(((ByteBuffer) msg).array()));
+/*
+		// 브로드 캐스팅(모든 운영단말로 전송) 및 개별 메시지 응답
+		UnitSystService unitSystService = (UnitSystService) AppUtils.getBean(UnitSystService.class);
+		ConcurrentHashMap<String, voUnitSyst> untiSystMap = unitSystService.getUntiSystMap();
+		for (Map.Entry<String, voUnitSyst> e : untiSystMap.entrySet()) {
+			if (this.remoteIpAddr == null || this.remoteIpAddr.equals(e.getValue().getSYST_IP_1())) {
+				try {
+					ByteBuf byteBuf = Unpooled.buffer(sendBytes);
+					byteBuf.writeBytes(((ByteBuffer) msg).array());
+					InetSocketAddress addr = new InetSocketAddress(e.getValue().getSYST_IP_1(), e.getValue().getPRGM_PORT());
+					DatagramPacket packet = new DatagramPacket(byteBuf, addr);
+					out.add(packet);
+					if (this.remoteIpAddr == null) {
+						log.debug("SEND: [{}] [{}:{}], {} Bytes. OK....", e.getValue().getSYST_ID(), e.getValue().getSYST_IP_1(), e.getValue().getPRGM_PORT(), sendBytes);
+					}
+					else {
+						log.info("SEND: [{}] [{}:{}], {} Bytes. OK....", e.getValue().getSYST_ID(), e.getValue().getSYST_IP_1(), e.getValue().getPRGM_PORT(), sendBytes);
+					}
+				} catch (Exception e2) {
+					log.error("SEND: [{}] [{}:{}], {} Bytes. Failed. Exception: {}", e.getValue().getSYST_ID(), e.getValue().getSYST_IP_1(), e.getValue().getPRGM_PORT(), sendBytes, e2.toString());
+				}
+			}
+		}*/
+	}
+}

+ 54 - 0
src/main/java/com/its/api/xnetudp/codec/CenterCommServerDecoder.java

@@ -0,0 +1,54 @@
+package com.its.api.xnetudp.codec;
+
+import com.its.api.utils.SysUtils;
+import com.its.api.xnetudp.protocol.CENTER_COMM_MESSAGE;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class CenterCommServerDecoder extends MessageToMessageDecoder<DatagramPacket> {
+
+	@Override
+	protected void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out) throws Exception {
+
+		ByteBuf buf = packet.content();
+
+		int recvBytes = buf.readableBytes();
+		log.info("RECV UDP: [{}], {} Bytes.", packet.sender().getAddress().getHostAddress(), recvBytes);
+
+		if (buf.readableBytes() < CENTER_COMM_MESSAGE.CENTER_HEADER_SIZE) {
+			log.error("RECV UDP: UDP Message header length error: {}/{} Bytes.", recvBytes, CENTER_COMM_MESSAGE.CENTER_HEADER_SIZE);
+			return;
+		}
+
+		byte[] buffer = new byte[recvBytes];
+		buf.getBytes(buf.readerIndex(), buffer);
+		log.error("RECV UDP, DUMP: {}, {}", packet.sender().getAddress().getHostAddress(), SysUtils.byteArrayToHex(buffer));
+//01 00 01 01 00 01 00 00 00 00
+//27 07 01 01 00 FF 00 00 04 D1   0C 00 29 31 30 30
+//27 07 01 01 00 FF 00 00 00 10   0B 00 32 30 32 32 30 33 32 34 31 35 34 38 31 30
+
+//#define INT_OP_VMS_STATE_REQ      0x01          /* VMS 통신 서버, VMS 시설물 상태정보 요청 */
+//#define INT_OP_VMS_STATE_RES      0x02          /* VMS 통신 서버, VMS 시설물 상태정보 전송 */
+//#define INT_OP_VMS_POWER_CTL      0x03          /* VMS 통신 서버, VMS 전광판 On/Off 제어 */
+//#define INT_OP_VMS_LUMINANCE_CTL  0x04          /* VMS 통신 서버, VMS 휘도 제어 */
+		//typedef struct int_head
+		//{
+		//	BYTE  SendId;							/* 송신 시스템 ID */
+		//	BYTE  RecvId;							/* 수신 시스템 ID */
+		//	BYTE  TotalFrame;						/* 전체 프레임 갯수 */
+		//	BYTE  CurrentFrame;						/* 현재 프레임 번호 */
+		//	BYTE  Reserved;							/* Reserved */
+		//	BYTE  OPCode;							/* 명령어 */
+		//	DWORD Length;							/* 데이터의 길이 */
+		//} INT_HEAD;
+
+		CENTER_COMM_MESSAGE msg = new CENTER_COMM_MESSAGE(buffer, packet.sender());
+		out.add(msg);
+	}
+}

+ 41 - 0
src/main/java/com/its/api/xnetudp/handler/CenterCommServerPacketHandler.java

@@ -0,0 +1,41 @@
+package com.its.api.xnetudp.handler;
+
+import com.its.api.config.AppUtils;
+import com.its.api.utils.NettyUtils;
+import com.its.api.xnetudp.protocol.CENTER_COMM_MESSAGE;
+import com.its.api.xnetudp.thread.CenterCommServerReceiver;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+
+@Slf4j
+public class CenterCommServerPacketHandler extends SimpleChannelInboundHandler<Object> {
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+		if (!(msg instanceof CENTER_COMM_MESSAGE)) {
+			log.error("CenterCommServerPacketHandler: Received Data Unknown Type: {}", NettyUtils.getTcpAddress(ctx.channel()));
+			return;
+		}
+
+		// UDP 통신은 스레드 풀로 처리한다.
+		CenterCommServerReceiver handler = (CenterCommServerReceiver) AppUtils.getBean(CenterCommServerReceiver.class);
+		handler.run((CENTER_COMM_MESSAGE) msg);
+	}
+
+	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+		ctx.channel().flush();
+	}
+
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
+	{
+		InetSocketAddress remoteAddress = (InetSocketAddress)ctx.channel().remoteAddress();
+        String ip = remoteAddress.getAddress().getHostAddress();
+        log.error("CenterCommServerPacketHandler::exceptionCaught: {}, {}", ip, cause.getMessage());
+	    cause.printStackTrace();
+	    super.exceptionCaught(ctx, cause);
+	}
+}

+ 175 - 0
src/main/java/com/its/api/xnetudp/protocol/CENTER_COMM_DEFINE.java

@@ -0,0 +1,175 @@
+package com.its.api.xnetudp.protocol;
+
+public class CENTER_COMM_DEFINE {
+
+	/*******************************************************************************
+	 * 시스템 ID
+	 *******************************************************************************/
+	public static final int INT_ID_TRAFFIC_SERVER		= 0x01;    /* 가공서버 */
+	public static final int INT_ID_SIGCTL_SERVER		= 0x02;    /* 신호제어서버 */
+	public static final int INT_ID_SIGCOM_SERVER		= 0x03;    /* 신호통신서버 */
+	public static final int INT_ID_VDS_SERVER			= 0x04;    /* VDS 서버 */
+	public static final int INT_ID_AVI_SERVER			= 0x05;    /* AVI 서버 */
+	public static final int INT_ID_DSRC_SERVER 			= 0x06;    /* DSRC 서버 */
+	public static final int INT_ID_VMS_SERVER 			= 0x07;    /* VMS 서버 */
+	public static final int INT_ID_CCTV_SERVER			= 0x08;    /* CCTV 서버 */
+	public static final int INT_ID_WEB_SERVER		   	= 0x09;    /* WEB 서버 */
+	public static final int INT_ID_UTIS_SERVER			= 0x0A;    /* UTIS 서버 */
+	public static final int INT_ID_KMA_SERVER			= 0x0B;    /* 기상청 연계 서버 */
+	public static final int INT_ID_WCAM_SERVER			= 0x0C;    /* 웹카메라 연계 서버 */
+	public static final int INT_ID_FCLT_SERVER			= 0x0D;    /* 시설물관리 서버 */
+	public static final int INT_ID_EXT01_SERVER			= 0x0E;    /* 연계 서버1 */
+	public static final int INT_ID_EXT02_SERVER			= 0x0F;    /* 연계 서버2 */
+	public static final int INT_ID_EXT03_SERVER			= 0x10;    /* 연계 서버3 */
+	public static final int INT_ID_CCAM_SERVER			= 0x11;    /* 교차로감시카메라 연계 서버 */
+	public static final int INT_ID_PARK_SERVER			= 0x12;    /* 주차장 연계 서버 */
+	public static final int INT_ID_RSE_SERVER			= 0x13;    /* RSE 연계 서버 */
+
+	public static final int INT_ID_MAIN_OPER 			= 0x21;    /* 통합운영단말 */
+	public static final int INT_ID_WALL_OPER 			= 0x22;    /* 상황판운영단말 */
+	public static final int INT_ID_SIG_OPER 			= 0x23;    /* 신호운영단말 */
+	public static final int INT_ID_VDS_OPER 			= 0x24;    /* VDS운영단말 */
+	public static final int INT_ID_AVI_OPER 			= 0x25;    /* AVI운영단말 */
+	public static final int INT_ID_DSRC_OPER 			= 0x26;    /* DSRC운영단말 */
+	public static final int INT_ID_VMS_OPER 			= 0x27;    /* VMS운영단말 */
+	public static final int INT_ID_CCTV_OPER 			= 0x28;    /* CCTV운영단말 */
+	public static final int INT_ID_FMS_OPER 			= 0x29;    /* 시설물관리단말 */
+	public static final int INT_ID_UTIS_OPER 			= 0x2A;    /* UTIS관리단말 */
+	public static final int INT_ID_WCAM_OPER 			= 0x2B;    /* WCAM관리단말 */
+
+	public static final int INT_ID_OPER_MIN 			= INT_ID_MAIN_OPER ;    /* 운영단말 최소값 */
+	public static final int INT_ID_OPER_MAX 			= INT_ID_WCAM_OPER ;    /* 운영단말 최대값 */
+
+	/*******************************************************************************
+	 * 프로토콜 OP CODE 정의
+	 *******************************************************************************/
+	public static final int INT_OP_TRAFFIC_CHANGE   	    = 0x01;        /* 교통정보가공완료 */
+	public static final int INT_OP_NODELINK_CHANGE          = 0x11;        /* 노드링크 기반 정보 변경 */
+	public static final int INT_OP_FACILITY_CHANGE          = 0x12;        /* 시설물 기반 정보 변경 */
+	public static final int INT_OP_PG_STATE_REQ             = 0x13;        /* 공통사항, 프로그램 동작상태 요청 */
+	public static final int INT_OP_PG_STATE_RES             = 0x14;        /* 공통사항, 프로그램 동작상태 응답 */
+	public static final int INT_OP_CHANGE_EVENT             = 0x15;        /* 공통사항, 마스터정보 변경 알림 */
+	public static final int INT_OP_INC_EVENT                = 0x16;        /* 공통사항, 돌발정보 전송 */
+	public static final int INT_OP_ACK                      = 0x17;        /* 공통사항, ACK(Acknowledge) */
+	public static final int INT_OP_NACK                     = 0x18;        /* 공통사항, NACK(Negative Acknowledge) */
+	public static final int INT_OP_EXTENSION                = 0x19;        /* 공통사항, EXTENSION 요청/응답 */
+
+	/*******************************************************************************
+	 * VMS Protocol OP Code
+	 *******************************************************************************/
+	public static final int INT_OP_VMS_STATE_REQ            = 0x21;        /* VMS 통신 서버, VMS 시설물 상태정보 요청 */
+	public static final int INT_OP_VMS_STATE_RES            = 0x22;        /* VMS 통신 서버, VMS 시설물 상태정보 전송 */
+	public static final int INT_OP_VMS_POWER_CTL            = 0x23;        /* VMS 통신 서버, VMS 전광판 On/Off 제어 */
+	public static final int INT_OP_VMS_LUMINANCE_CTL        = 0x24;        /* VMS 통신 서버, VMS 휘도 제어 */
+
+	public static final int INT_OP_VMS_PARA_SET             = 0x25;        /* VMS 통신 서버, VMS 환경설정정보 전송 */
+	public static final int INT_OP_VMS_PARA_SET2            = 0x26;        /* VMS 통신 서버, VMS 환경설정정보 전송(For Pasig) */
+	public static final int INT_OP_VMS_RESET                = 0x27;        /* VMS 통신 서버, VMS 제어기 리셋 전송 */
+	public static final int INT_OP_VMS_MODE                 = 0x28;        /* VMS 통신 서버, VMS 운영모드 변경 */
+	public static final int INT_OP_VMS_FORM_SAVE            = 0x29;        /* VMS 통신 서버, 실시간 VMS 제공정보 저장 */
+	//public static final int INT_OP_VMS_FORM_DOWNLOAD        = 0x2A;        /* VMS 통신 서버, 실시간 VMS 제공정보 저장 */
+	public static final int INT_OP_VMS_FORM_DOWNLOAD        = 0x0C;        /* VMS 통신 서버, 실시간 VMS 제공정보 저장 */
+	public static final int INT_OP_VMS_IMAGE_SYMBOL         = 0x2B;        /* VMS image symbol update */
+	public static final int INT_OP_VMS_IMAGE_TRAFFIC        = 0x2C;        /* VMS image traffic update */
+	public static final int INT_OP_VMS_FORM_UPDATE          = 0x2D;        /* VMS form update */
+	public static final int INT_OP_VMS_IFSC_UPDATE          = 0x2E;        /* VMS ifsc update */
+
+	public static final int INT_OP_VMS_PARAM_RES            = 0x2F;        /* VMS 통신 서버, VMS 시설물 파라미터정보 전송 ==> 목포시 싸인텔레콤 추가 */
+
+	public static final int INT_OP_VMS_COMMON               = 0x30;        /* 신규로 생성될 명령어를 처리하기 위한 OP Code */
+	public static final int INT_OP_VMS_COMMON_EXT           = 0x31;        /* 신규로 생성될 명령어를 처리하기 위한 OP Code */
+	public static final int INT_OP_VMS_IMMEDIATE_SCN_DNLD   = 0x32;        /* 폼 즉시 다운로드 OP Code */
+	public static final int INT_OP_VMS_PARAM_REQ            = 0x33;        /* VMS 파라미터 요청 */
+	public static final int INT_OP_VMS_DIRECT               = 0x34;        /* VMS Direct command */
+
+
+	/*******************************************************************************
+	 * VDS Protocol OP Code
+	 *******************************************************************************/
+	public static final int INT_OP_VDS_STOP_IMAGE_REQ       = 0x50;        /* VDS 통신 서버, VDS 정지영상 요청 */
+	public static final int INT_OP_VDS_STOP_IMAGE_RES       = 0x51;        /* VDS 통신 서버, VDS 정지영상 요청 응답 */
+	public static final int INT_OP_VDS_STATE_RES            = 0x52;        /* VDS 통신 서버, VDS 시설물 상태정보 전송 */
+	public static final int INT_OP_VDS_FAN_CONTROL          = 0x54;        /* VDS 통신 서버, VDS FAN 제어 */
+	public static final int INT_OP_VDS_RESET                = 0x59;        /* VDS 통신 서버, VDS 제어기 리셋 전송 */
+
+	public static final int INT_OP_VDS_PARAM_REQ            = 0x55;        /* VDS 통신 서버, VDS 파라미터 요청 */
+	public static final int INT_OP_VDS_PARAM_RES            = 0x56;        /* VDS 통신 서버, VDS 파라미터 요청 응답 */
+	public static final int INT_OP_VDS_PARAM_SET_REQ        = 0x57;        /* VDS 통신 서버, VDS 파라미터 설정 요청 */
+	public static final int INT_OP_VDS_PARAM_SET_RES        = 0x58;        /* VDS 통신 서버, VDS 파라미터 설정 요청 응답 */
+
+	/*******************************************************************************
+	 * DSRC Protocol OP Code
+	 *******************************************************************************/
+	public static final int INT_OP_DSRC_CONTROL_REQ         = 0x60;        /* DSRC 통신 서버, DSRC 제어 요청 */
+	public static final int INT_OP_DSRC_CONTROL_RES         = 0x61;        /* DSRC 통신 서버, DSRC 제어 요청 응답 */
+	public static final int INT_OP_DSRC_STATE_RES           = 0x62;        /* DSRC 통신 서버, DSRC 시설물 상태정보 전송 */
+
+
+	/*******************************************************************************
+	 * WCAM Protocol OP Code
+	 *******************************************************************************/
+	public static final int INT_OP_WCAM_STATE_RES           = 0x70;        /* WCAM 통신 서버, WCAM 시설물 상태정보 전송 */
+
+	/*******************************************************************************
+	 * CCAM Protocol OP Code
+	 *******************************************************************************/
+	public static final int INT_OP_CCAM_STATE_RES           = 0x71;        /* CCAM 통신 서버, CCAM 시설물 상태정보 전송 */
+
+	/*******************************************************************************
+	 * PARK Protocol OP Code
+	 *******************************************************************************/
+	public static final int INT_OP_PARK_STATE_RES           = 0x81;        /* PARK 통신 서버, PARK 시설물 상태정보 전송 */
+
+	/*******************************************************************************
+	 *  프로세스 상태정보 요청/응답
+	 *******************************************************************************/
+	public static final int INT_MAX_PG_STATE                = 146;        /* 최대 프로그램 상태정보 */
+
+	/*******************************************************************************
+	 *  돌발 발생 통보
+	 *******************************************************************************/
+	public static final int INT_INC_TIME_SIZE           = 14;        /* 'YYYYMMDDHH24MISS' */
+	public static final int INT_INC_ID_SIZE             = 12;        /* 돌발 ID */
+
+	/*******************************************************************************
+	 *  VMS
+	 *******************************************************************************/
+	public static final int INT_VMS_MAX_ID              = 15;       /* VMS 제어기 ID */
+	public static final int INT_VMS_MAX_MODULE          = 100;      /* 최대 VMS 모듈 */
+	public static final int INT_VMS_MAX_POWER           = 10;       /* 최대 VMS 전원 */
+	public static final int INT_VMS_MAX_MODULE_BIT      = 100;      /* 최대 VMS 모듈 비트*/
+	public static final int INT_VMS_MAX_POWER_BIT       = 10;       /* 최대 VMS 전원 비트 */
+
+	public static final int INT_VMS_MAX_DATETIME        = 14;       /* YYYYMMDDHHMMSS */
+	public static final int INT_VMS_MAX_OPER_ID         = 20;
+
+	public static final int INT_VMS_MAX_STATE           = 80;       /* 최대 VMS 시설물 상태정보 */
+	public static final int INT_VMS_MAX_PARAM           = 100;      /* 최대 VMS 파라미터 */
+	public static final int INT_VMS_MAX_PREP            = 93;       /* 최대 VMS 폼 갯수 정보 */
+	public static final int INT_VMS_MAX_FORM            = 40;       /* 최대 VMS 폼 */
+	public static final int INT_VMS_MAX_OBJECT          = 64;       /* 최대 VMS 오브젝트 */
+	public static final int INT_VMS_MAX_STRING_BUF      = 64;       /* 최대 VMS 문자열 데이터 버퍼 */
+	public static final int INT_VMS_MAX_FORM_DOWNLOAD   = 100;      /* 최대 VMS 제공정보 저장 (256개 까지 증가 가능) */
+	public static final int INT_VMS_MAX_MODE            = 100;      /* 최대 VMS 모드변경 갯수 */
+
+	/*******************************************************************************
+	 *  VDS
+	 *******************************************************************************/
+	public static final int INT_VDS_MAX_DATETIME        = 14;       /* YYYYMMDDHHMMSS */
+	public static final int INT_VDS_MAX_OPER_ID         = 20;
+	public static final int INT_VDS_MAX_STATE           = 200;      /* 최대 VDS 시설물 상태정보 */
+	public static final int INT_VDS_MAX_STOP_IMAGE      = 65535;    /* 정지영상 패킷 최대크기 */
+	/* VDS 정지영상 요청 응답 */
+	public static final int ERR_VDS_STOP_IMAGE_NORMAL           = 0x00;
+	public static final int ERR_VDS_STOP_IMAGE_NOT_CONNECT      = 0x01;
+	public static final int ERR_VDS_STOP_IMAGE_BUSY             = 0x02;
+	public static final int ERR_VDS_STOP_IMAGE_UNKNOWN_ID       = 0x03;
+	public static final int ERR_VDS_STOP_IMAGE_SIZE             = 0x04;
+
+	/*******************************************************************************
+	 *  DSRC
+	 *******************************************************************************/
+	public static final int INT_DSRC_MAX_DATETIME        = 14;      /* YYYYMMDDHHMMSS */
+	public static final int INT_DSRC_MAX_OPER_ID         = 20;
+	public static final int INT_DSRC_MAX_STATE           = 200;     /* 최대 DSRC 시설물 상태정보 */
+}

+ 137 - 0
src/main/java/com/its/api/xnetudp/protocol/CENTER_COMM_MESSAGE.java

@@ -0,0 +1,137 @@
+package com.its.api.xnetudp.protocol;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class CENTER_COMM_MESSAGE {
+
+	public static int CENTER_HEADER_SIZE = 8; 
+	
+	private InetSocketAddress sender;
+	
+	private byte sendId;
+	private byte recvId;
+	private byte totalFrame;
+	private byte currentFrame;
+	private byte msgSeq;
+	private byte opCode;
+	private int  length;		// 2byte unsigned short
+	
+	private byte[] body;
+
+	public CENTER_COMM_MESSAGE() {
+
+	}
+	public CENTER_COMM_MESSAGE(int senderId, int opCode, int length, byte msgSeq) {
+		this.sendId       = (byte)senderId;
+		this.recvId       = (byte) CENTER_COMM_DEFINE.INT_ID_MAIN_OPER;
+		this.totalFrame   = 1;
+		this.currentFrame = 1;
+		this.msgSeq       = msgSeq;
+		this.opCode       = (byte)opCode;
+		this.length       = length;
+	}
+	public CENTER_COMM_MESSAGE(byte[] buffer, InetSocketAddress inetSocketAddress) {
+
+		this.sender = inetSocketAddress;
+
+		ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+		byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+		this.sendId       = byteBuffer.get();
+		this.recvId       = byteBuffer.get();
+		this.totalFrame   = byteBuffer.get();
+		this.currentFrame = byteBuffer.get();
+		this.msgSeq       = byteBuffer.get();
+		this.opCode       = byteBuffer.get();
+		this.length       = byteBuffer.getInt();
+		this.length = byteBuffer.array().length - 10;
+		if (this.length > 0) {
+			this.body = new byte[this.length];
+			byteBuffer.get(this.body);
+		}
+	}
+
+	public String getSenderIp() {
+		return this.sender.getAddress().getHostAddress();
+	}
+	public int getSenderPort() {
+		return this.sender.getPort();
+	}
+	
+	public byte getOpCode() {
+		return this.opCode;
+	}
+	public void setOpCode(byte opCode) {
+		this.opCode = opCode;
+	}
+	
+	public int getLength() {
+		return this.length;
+	}
+	public void setLength(int length) {
+		this.length = length;
+	}
+	public InetSocketAddress getSender() {
+		return sender;
+	}
+
+	public void setSender(InetSocketAddress sender) {
+		this.sender = sender;
+	}
+
+	public byte getSendId() {
+		return sendId;
+	}
+
+	public void setSendId(byte sendId) {
+		this.sendId = sendId;
+	}
+
+	public byte getRecvId() {
+		return recvId;
+	}
+
+	public void setRecvId(byte recvId) {
+		this.recvId = recvId;
+	}
+
+	public byte getTotalFrame() {
+		return totalFrame;
+	}
+
+	public void setTotalFrame(byte totalFrame) {
+		this.totalFrame = totalFrame;
+	}
+
+	public byte getCurrentFrame() {
+		return currentFrame;
+	}
+
+	public void setCurrentFrame(byte currentFrame) {
+		this.currentFrame = currentFrame;
+	}
+
+	public byte getMsgSeq() {
+		return msgSeq;
+	}
+
+	public void setMsgSeq(byte msgSeq) {
+		this.msgSeq = msgSeq;
+	}
+
+	public byte[] getBody() {
+		return this.body;
+	}
+	public void setBody(byte[] body) {
+		this.body = body;
+	}
+	public byte[] getMessage() {
+		return null;
+	}
+
+	public ByteBuffer getBuffer() {
+		return null;
+	}
+}

+ 59 - 0
src/main/java/com/its/api/xnetudp/protocol/CENTER_PG_STATE_RES.java

@@ -0,0 +1,59 @@
+package com.its.api.xnetudp.protocol;
+
+import lombok.Data;
+import lombok.ToString;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+@Data
+@ToString
+public class CENTER_PG_STATE_RES extends CENTER_COMM_MESSAGE {
+
+	public static int STATE_RES_HEAD_SIZE = 5;
+	public static byte STATE_NORMAL = 0x00;
+	public static byte STATE_ERROR = 0x01;
+
+	private byte count;
+	private byte type;
+	private byte db;
+	private byte comm;
+	private byte action;
+
+	public CENTER_PG_STATE_RES(int senderId, byte db, byte comm, byte action) {
+
+		super(senderId, CENTER_COMM_DEFINE.INT_OP_PG_STATE_RES, 0, (byte)0x00);
+
+		this.count  = 0x01;
+		this.type   = (byte)senderId;
+		this.db     = db;
+		this.comm   = comm;
+		this.action = action;
+	}
+
+	@Override
+	public ByteBuffer getBuffer() {
+		int pktSize = STATE_RES_HEAD_SIZE;
+		setLength(pktSize);
+
+		ByteBuffer byteBuffer = ByteBuffer.allocate(CENTER_HEADER_SIZE + getLength());
+		byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+		byteBuffer.put(getSendId());
+		byteBuffer.put(getRecvId());
+		byteBuffer.put(getTotalFrame());
+		byteBuffer.put(getCurrentFrame());
+		byteBuffer.put(getMsgSeq());
+		byteBuffer.put(getOpCode());
+		byteBuffer.putShort((short)getLength());
+
+		byteBuffer.put(this.count);
+		byteBuffer.put(this.type);
+		byteBuffer.put(this.db);
+		byteBuffer.put(this.comm);
+		byteBuffer.put(this.action);
+
+		return byteBuffer;
+	}
+
+}

+ 29 - 0
src/main/java/com/its/api/xnetudp/service/CenterCommResponseService.java

@@ -0,0 +1,29 @@
+package com.its.api.xnetudp.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class CenterCommResponseService {
+/*
+    public int response(voDsrcCtlr obj, voDsrcCtlrCntl ctlrCntl, byte result) {
+        int rspsType = (int)result;
+        ctlrCntl.setRSPS_TYPE(String.valueOf(rspsType));
+
+        // 제어명령 데이터베이스 저장
+        DbmsJobProcess dbmsJobProcess = (DbmsJobProcess) AppUtils.getBean(DbmsJobProcess.class);
+        dbmsJobProcess.add(new DbmsJobData(DbmsJobType.DATA_TYPE_CNTL_LOG, false, (Object)ctlrCntl));
+
+        // 제어명령 운영단말 응답
+        CENTER_DSRC_RES_CONTROL res = new CENTER_DSRC_RES_CONTROL(ctlrCntl.getReq().getMsgSeq());
+        res.setMsgSeq(ctlrCntl.getReq().getMsgSeq());
+        res.setCtlrNmbr(Integer.valueOf(ctlrCntl.getReq().getCtlrNmbr()).intValue());
+        res.setReqSeq(ctlrCntl.getReq().getReqSeq());
+        res.setResCode(result);
+        CenterCommClientSender response = (CenterCommClientSender) AppUtils.getBean(CenterCommClientSender.class);
+        response.run(ctlrCntl.getReqIpAddress(), ctlrCntl.getReqPort(), res.getBuffer());
+
+        return 0;
+    }*/
+}

+ 54 - 0
src/main/java/com/its/api/xnetudp/thread/CenterCommClientSender.java

@@ -0,0 +1,54 @@
+package com.its.api.xnetudp.thread;
+
+import com.its.api.xnetudp.codec.CenterCommClientEncoder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.nio.ByteBuffer;
+
+@Slf4j
+@Service
+public class CenterCommClientSender {
+
+    @Async("centerCommExecutor")
+    public void run(String ip, int port, ByteBuffer sendBuffer) {
+        if (sendBuffer == null) {
+            log.error("CenterCommClientSender.run: Notify Data NULL");
+            return;
+        }
+
+        EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
+        try {
+            Bootstrap bootstrap = new Bootstrap();
+            bootstrap.channel(NioDatagramChannel.class);
+            bootstrap.group(nioEventLoopGroup);
+            bootstrap.option(ChannelOption.SO_BROADCAST, false);
+            bootstrap.handler(new ChannelInitializer<Channel>() {
+                @Override
+                protected void initChannel(Channel channel) throws Exception
+                {
+                    channel.pipeline().addLast(new CenterCommClientEncoder(ip, port));
+                }
+            });
+
+            Channel channel = bootstrap.bind(0).sync().channel();
+            ChannelFuture f = channel.writeAndFlush(sendBuffer).sync();
+            if (!f.isDone() || !f.isSuccess()) {
+                log.error("CenterCommClientSender.run: Send Failed. isDone: {}, isSuccess: {}", f.isDone(), f.isSuccess());
+            }
+        }
+        catch(Exception e) {
+            log.error("CenterCommClientSender.run: Exception: {}", e.toString());
+            e.printStackTrace();
+        }
+        finally {
+            nioEventLoopGroup.shutdownGracefully();
+            log.info("CenterCommClientSender.run: {} Bytes, ip: {}, port: {}", sendBuffer.array().length, ip, port);
+        }
+    }
+}

+ 141 - 0
src/main/java/com/its/api/xnetudp/thread/CenterCommServerReceiver.java

@@ -0,0 +1,141 @@
+package com.its.api.xnetudp.thread;
+
+import com.its.api.utils.SysUtils;
+import com.its.api.xnetudp.protocol.CENTER_COMM_DEFINE;
+import com.its.api.xnetudp.protocol.CENTER_COMM_MESSAGE;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+@Slf4j
+@Service
+public class CenterCommServerReceiver {
+
+    @Async("centerCommExecutor")
+    public void run(CENTER_COMM_MESSAGE data) {
+        if (data == null) {
+            log.error("CenterCommServerReceiver: RECV Data Packet NULL");
+            return;
+        }
+
+        if (CENTER_COMM_DEFINE.INT_ID_TRAFFIC_SERVER == data.getSendId()) {
+            if (CENTER_COMM_DEFINE.INT_OP_TRAFFIC_CHANGE == data.getOpCode()) {
+                log.error("RECV Traffic server job completed.");
+            }
+            return;
+        }
+
+        // VMS 통신 서버는 Sender, Receiver 위치가 바뀌었음.
+        if (CENTER_COMM_DEFINE.INT_ID_VMS_OPER == data.getSendId() && CENTER_COMM_DEFINE.INT_ID_VMS_SERVER == data.getRecvId()) {
+            log.error("RECV VMS Server Message: {}, Length: {} Bytes.", data.getOpCode(), data.getLength());
+            log.error("VMS PACKET: {}", SysUtils.byteArrayToHex(data.getBody()));
+            if (data.getLength() > 0) {
+                ByteBuffer byteBuffer = ByteBuffer.wrap(data.getBody());
+                byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+                byte opCode = byteBuffer.get();
+                byte msgSeq = byteBuffer.get();
+                log.error("OP CODE: {}, MsgSeq: {}", opCode, msgSeq);
+                if (CENTER_COMM_DEFINE.INT_OP_VMS_FORM_DOWNLOAD == opCode) {
+                    byte count = byteBuffer.get();
+                    log.error("download count: {} EA", count);
+                    for (int ii = 0; ii < count; ii++) {
+                        byte[] vmsId = new byte[CENTER_COMM_DEFINE.INT_VMS_MAX_ID];
+                        byte[] downloadDt  = new byte[CENTER_COMM_DEFINE.INT_VMS_MAX_DATETIME];
+                        byteBuffer.get(vmsId);
+                        byteBuffer.get(downloadDt);
+                        byte result = byteBuffer.get();
+
+                        for (int jj = 0; jj < vmsId.length; jj++) {
+                            if (vmsId[jj] < '0' || vmsId[jj] > '9') {
+                                vmsId[jj] = ' ';
+                            }
+                        }
+                        log.error("{}, {}, {}, {}", ii+1, new String(vmsId).trim(), new String(downloadDt), result);
+
+                    }
+                }
+            }
+            //switch(data.getOpCode()) {
+//
+//            }
+        }
+
+        String reqIpAddr = data.getSenderIp();
+        int reqPort = 4603;
+        /*UnitSystService unitSystService = (UnitSystService) AppUtils.getBean(UnitSystService.class);
+        ConcurrentHashMap<String, voUnitSyst> untiSystMap = unitSystService.getUntiSystMap();
+        for (Map.Entry<String, voUnitSyst> e : untiSystMap.entrySet()) {
+            if (reqIpAddr.equals(e.getValue().getSYST_IP_1())) {
+                reqPort = e.getValue().getPRGM_PORT();
+                break;
+            }
+        }*/
+
+        /*byte opCode = data.getOpCode();
+        if (opCode == CENTER_COMM_DEFINE.INT_OP_DSRC_CONTROL_REQ) {
+
+            CENTER_DSRC_REQ_CONTROL req = new CENTER_DSRC_REQ_CONTROL(data.getBody());
+            log.info("DSRC REQ CONTROL: [{}] [{},{},{},{}], {}",
+                    req.getCtlrNmbr(), req.getOperId(), req.getCmdTime(), req.getDeviceType(), req.getControlType(), Thread.currentThread().getName());
+
+            int devcType = req.getDeviceType();
+            int cntlType = req.getControlType();
+            voDsrcCtlrCntl cntl = new voDsrcCtlrCntl();
+            cntl.setReq(req);
+            cntl.setReqIpAddress(data.getSender().getAddress().getHostAddress());
+            cntl.setReqPort(data.getSender().getPort());
+            cntl.setID(req.getCtlrNmbr());
+            cntl.setCNTL_DT(SysUtils.getSysTime());
+            cntl.setDEVC_TYPE(String.valueOf(devcType));
+            cntl.setCNTL_TYPE(String.valueOf(cntlType));
+
+            voDsrcCtlr dsrcCtlr = AppRepository.getInstance().getCtlrMap().get(String.valueOf(req.getCtlrNmbr()).toString());
+            if (dsrcCtlr == null) {
+                log.info("DSRC REQ CONTROL Unknown DSRC ID: [{}] [{},{},{},{}]",
+                        req.getCtlrNmbr(), req.getOperId(), req.getCmdTime(), req.getDeviceType(), req.getControlType());
+                responseOperator(null, cntl, (byte)0x01);
+                return;
+            }
+            if (dsrcCtlr.getChannel() == null || dsrcCtlr.getNetState() != NET.LOGINED) {
+                // 여기 들어오면 안된다. 운영단말에서 통신이 정상인 것만 제어를 내릴수 있도록 한다.
+                log.error("DSRC REQ CONTROL Not Connect: [{}] [{},{},{},{}]",
+                        req.getCtlrNmbr(), req.getOperId(), req.getCmdTime(), req.getDeviceType(), req.getControlType());
+                responseOperator(dsrcCtlr, cntl, (byte)0x01);
+                return;
+            }
+
+            // 제어요청 패킷을 만든다
+            eControlDeviceId controlDeviceId = eControlDeviceId.getByValue(devcType);
+            eControlCommand commandType = eControlCommand.getByValue(cntlType);
+            if (controlDeviceId == null || commandType == null) {
+                log.error("DSRC REQ CONTROL Value Error: [{}] [{},{},{},{}]",
+                        req.getCtlrNmbr(), req.getOperId(), req.getCmdTime(), req.getDeviceType(), req.getControlType());
+                responseOperator(dsrcCtlr, cntl, (byte)0x01);
+                return;
+            }
+            log.warn("{}, {}, controlDeviceId: {}, commandType: {}", dsrcCtlr.getID(), dsrcCtlr.getRSE_ID(), controlDeviceId.toString(), commandType.toString());
+            boolean res = ControlDeviceService.getInstance().requestSubscriptionDeviceCommand(true, dsrcCtlr, dsrcCtlr.getChannel(), controlDeviceId.getValue(), commandType.getValue());
+            if (res) {
+                log.info("DSRC REQ CONTROL SEND OK: [{}] [{},{},{},{}]",
+                        req.getCtlrNmbr(), req.getOperId(), req.getCmdTime(), req.getDeviceType(), req.getControlType());
+                responseOperator(dsrcCtlr, cntl, (byte)0x00);
+            } else {
+                log.error("DSRC REQ CONTROL SEND Error: [{}] [{},{},{},{}]",
+                        req.getCtlrNmbr(), req.getOperId(), req.getCmdTime(), req.getDeviceType(), req.getControlType());
+                responseOperator(dsrcCtlr, cntl, (byte)0x01);
+            }
+        }
+        else {
+            log.error("UDP RECV UNKNOWN MESSAGE: {}", opCode);
+        }*/
+    }
+/*
+    private int responseOperator(voDsrcCtlr obj, voDsrcCtlrCntl cntl, byte error) {
+        CenterCommResponseService responseService = (CenterCommResponseService) AppUtils.getBean(CenterCommResponseService.class);
+        return responseService.response(obj, cntl, error);
+    }*/
+}

+ 3 - 0
src/main/resources/application-dev.yml

@@ -5,6 +5,9 @@ spring:
       jdbc-url: jdbc:tibero:thin:@115.91.94.42:8629:tibero
       username: yiits
       password: yiits
+      minimumIdle: 5
+      maximumIdle: 10
+      idleTimeout: 30000
   jpa:
     database-platform: org.hibernate.dialect.Oracle10gDialect
     properties:

+ 2 - 0
src/main/resources/application.yml

@@ -5,6 +5,8 @@ application:
     center-id: L08
     region-code: 228
     region-name: 용인시
+  center-comm:
+    binding-port: 4602
 
 server:
   port: 80