shjung 10 月之前
父节点
当前提交
4a2efe869e

+ 4 - 0
src/main/java/com/sig/comm/server/SigCommServerApplication.java

@@ -3,6 +3,7 @@ package com.sig.comm.server;
 import com.sig.comm.server.common.SpringUtils;
 import com.sig.comm.server.config.TraceConfig;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import com.sig.comm.server.process.kafka.KafkaDataProcess;
 import com.sig.comm.server.repository.ApplicationRepository;
 import com.sig.comm.server.xnet.server.SigCommServerService;
 import com.sig.comm.server.xnet.server.process.work.DataPacketProcess;
@@ -57,6 +58,9 @@ public class SigCommServerApplication implements CommandLineRunner, ApplicationL
         DbmsDataProcess dbmsDataProcess = SpringUtils.getBean(DbmsDataProcess.class);
         dbmsDataProcess.run();
 
+        KafkaDataProcess kafkaDataProcess = SpringUtils.getBean(KafkaDataProcess.class);
+        kafkaDataProcess.run();
+
         DataPacketProcess dataPacketProcess = SpringUtils.getBean(DataPacketProcess.class);
         dataPacketProcess.run();
 

+ 11 - 0
src/main/java/com/sig/comm/server/config/ThreadPoolInitializer.java

@@ -23,6 +23,7 @@ public class ThreadPoolInitializer extends AsyncConfigurerSupport {
 
     private int work = 0;
     private int dbms = 0;
+    private int kafka = 0;
 
     @PostConstruct
     private void init() {
@@ -33,6 +34,9 @@ public class ThreadPoolInitializer extends AsyncConfigurerSupport {
         if (this.dbms <= 0) {
             this.dbms = MAX_CORE;
         }
+        if (this.kafka <= 0) {
+            this.kafka = MAX_CORE;
+        }
         log.info("{}", this);
     }
 
@@ -62,4 +66,11 @@ public class ThreadPoolInitializer extends AsyncConfigurerSupport {
         return threadPoolTaskExecutor;
     }
 
+    @Bean(name="kafkaDataExecutor")
+    public Executor getKafkaDataExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = getDefaultExecutor(this.kafka*2);
+        threadPoolTaskExecutor.setThreadNamePrefix("kafka-pool-");
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
+    }
 }

+ 54 - 1
src/main/java/com/sig/comm/server/dao/mapper/batch/SigCommServerDao.java

@@ -2,10 +2,12 @@ package com.sig.comm.server.dao.mapper.batch;
 
 import com.sig.app.common.utils.Elapsed;
 import com.sig.comm.server.dao.mapper.BatchDaoService;
+import com.sig.comm.server.dto.IntStatusDto;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.springframework.stereotype.Repository;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
@@ -18,7 +20,58 @@ public class SigCommServerDao extends BatchDaoService {
         this.serviceName = "SigCommServerDao";
     }
 
-    public int updateIntPhaseStts(List<HashMap<String, Object>> req) {
+    public List<HashMap<String, Object>> getIntPhaseSttsList(List<IntStatusDto> req) {
+        List<HashMap<String, Object>> lists = new ArrayList<>();
+        req.forEach(obj -> {
+            HashMap<String, Object> param = new HashMap<>();
+
+            param.put("REGION_CD",                  obj.regionCd);
+            param.put("INT_NO",                     obj.intNo);
+            param.put("COLLCT_DTIME",               obj.commDt);
+            param.put("COMM_ON_OFF_FLAG",           obj.commStts);
+            param.put("CONTRLR_OPER_MODE_CD",       obj.operMode);
+            param.put("A_RING_PHASE_VAL",           obj.ringPhaseA);
+            param.put("B_RING_PHASE_VAL",           obj.ringPhaseB);
+            param.put("SIGLIGHT_TURNOFF_FLAG",      obj.turnOffFlag);
+            param.put("SIGLIGHT_BLINK_FLAG",        obj.blinking);
+            param.put("CONTRLR_MANUAL_FLAG",        obj.manualFlag);
+            param.put("MAP_NO",                     obj.mapNo);
+            param.put("CONTRLR_TMDIFF_CONTRL_FLAG",	obj.tmDiffFlag);
+            param.put("INT_SIG_CYCLE_CNT",          obj.count);
+            param.put("INT_SIG_CYCLE_LEN",         	obj.cycle);
+            param.put("A_RING_1_PHASE_VAL",         obj.phaseA1);
+            param.put("A_RING_2_PHASE_VAL",         obj.phaseA2);
+            param.put("A_RING_3_PHASE_VAL",         obj.phaseA3);
+            param.put("A_RING_4_PHASE_VAL",         obj.phaseA4);
+            param.put("A_RING_5_PHASE_VAL",         obj.phaseA5);
+            param.put("A_RING_6_PHASE_VAL",         obj.phaseA6);
+            param.put("A_RING_7_PHASE_VAL",         obj.phaseA7);
+            param.put("A_RING_8_PHASE_VAL",         obj.phaseA8);
+            param.put("B_RING_1_PHASE_VAL",         obj.phaseB1);
+            param.put("B_RING_2_PHASE_VAL",         obj.phaseB2);
+            param.put("B_RING_3_PHASE_VAL",         obj.phaseB3);
+            param.put("B_RING_4_PHASE_VAL",         obj.phaseB4);
+            param.put("B_RING_5_PHASE_VAL",         obj.phaseB5);
+            param.put("B_RING_6_PHASE_VAL",         obj.phaseB6);
+            param.put("B_RING_7_PHASE_VAL",         obj.phaseB7);
+            param.put("B_RING_8_PHASE_VAL",         obj.phaseB8);
+            param.put("PPC_CONTRL_FLAG",            obj.ppcControl);
+            param.put("GROUP_NO",                   obj.groupNo);
+
+            lists.add(param);
+        });
+        return lists;
+    }
+
+    public int updateIntPhaseStts(List<IntStatusDto> req) {
+//        log.info("{}.updateStatus: START. {} EA", this.serviceName, req.size());
+//        Elapsed elapsed = new Elapsed();
+        int total = updateBatch("updateIntPhaseStts", getIntPhaseSttsList(req));
+//        log.info("{}.updateStatus: ..END. {} EA. {} ms.", this.serviceName, total, elapsed.milliSeconds());
+        return total;
+    }
+
+    public int updateIntPhaseStts_OLD(List<HashMap<String, Object>> req) {
         log.info("{}.updateIntPhaseStts: START. {} EA", this.serviceName, req.size());
         Elapsed elapsed = new Elapsed();
         int total = updateBatch("updateIntPhaseStts", req);

+ 1 - 1
src/main/java/com/sig/comm/server/dto/IntDto.java

@@ -37,7 +37,7 @@ public class IntDto implements Serializable {
         this.debug = dto.isDebug();
         if (this.nodeId != dto.getNodeId()) {
             this.nodeId = dto.getNodeId();
-            this.status = new IntStatusDto(this.nodeId);
+            this.status = new IntStatusDto(this.regionCd, this.intNo, this.nodeId);
         }
     }
 }

+ 65 - 20
src/main/java/com/sig/comm/server/dto/IntStatusDto.java

@@ -18,7 +18,44 @@ public class IntStatusDto implements Serializable {
 
     public static final int MAX_KAFKA_DATA_SIZE = 31;
 
-    private long nodeId;
+    public String regionCd;
+    public int intNo;
+    public long nodeId;
+    public String commDt;
+    public int groupNo;
+    public int mode;
+    public int ringA;
+    public int ringB;
+    public int status;
+    public int count;
+    public int cycle;
+    public int phaseA1;
+    public int phaseA2;
+    public int phaseA3;
+    public int phaseA4;
+    public int phaseA5;
+    public int phaseA6;
+    public int phaseA7;
+    public int phaseA8;
+    public int phaseB1;
+    public int phaseB2;
+    public int phaseB3;
+    public int phaseB4;
+    public int phaseB5;
+    public int phaseB6;
+    public int phaseB7;
+    public int phaseB8;
+    public int commStts;
+    public int mapNo;
+    public int tmDiffFlag;
+    public int operMode;
+    public int ringPhaseA;
+    public int ringPhaseB;
+    public int ppcControl;
+    public int blinking;
+    public int turnOffFlag;
+    public int manualFlag;
+
     private byte[] kafkaData;
 
 
@@ -59,7 +96,9 @@ public class IntStatusDto implements Serializable {
 //운영 현시 운영현시(A Ring 8 BYTE + B Ring 8 BYTE), 버전정보가 1인 경우 정보가 유효함    16 BYTE
 
 
-    public IntStatusDto(long nodeId) {
+    public IntStatusDto(String regionCd, int intNo, long nodeId) {
+        this.regionCd = regionCd;
+        this.intNo = intNo;
         this.nodeId = nodeId;
         this.kafkaData = new byte[MAX_KAFKA_DATA_SIZE];
 
@@ -99,23 +138,29 @@ public class IntStatusDto implements Serializable {
         this.kafkaData[13] = (byte)(count & 0xFF);
         this.kafkaData[14] = (byte)(length & 0xFF);
     }
-    public void setPhaseVal(int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int b1, int b2, int b3, int b4, int b5, int b6, int b7, int b8) {
-        this.kafkaData[15] = (byte)(a1 & 0xFF);
-        this.kafkaData[16] = (byte)(a2 & 0xFF);
-        this.kafkaData[17] = (byte)(a3 & 0xFF);
-        this.kafkaData[18] = (byte)(a4 & 0xFF);
-        this.kafkaData[19] = (byte)(a5 & 0xFF);
-        this.kafkaData[20] = (byte)(a6 & 0xFF);
-        this.kafkaData[21] = (byte)(a7 & 0xFF);
-        this.kafkaData[22] = (byte)(a8 & 0xFF);
-
-        this.kafkaData[23] = (byte)(b1 & 0xFF);
-        this.kafkaData[24] = (byte)(b2 & 0xFF);
-        this.kafkaData[25] = (byte)(b3 & 0xFF);
-        this.kafkaData[26] = (byte)(b4 & 0xFF);
-        this.kafkaData[27] = (byte)(b5 & 0xFF);
-        this.kafkaData[28] = (byte)(b6 & 0xFF);
-        this.kafkaData[29] = (byte)(b7 & 0xFF);
-        this.kafkaData[30] = (byte)(b8 & 0xFF);
+    public void setPhaseVal(byte[] buffer, int phaseIdx) {
+        int idx = 15;
+        for (int ii = phaseIdx; ii < phaseIdx+16; ii++) {
+            this.kafkaData[idx++] = buffer[ii];
+        }
     }
+//    public void setPhaseVal(int a1, int a2, int a3, int a4, int a5, int a6, int a7, int a8, int b1, int b2, int b3, int b4, int b5, int b6, int b7, int b8) {
+//        this.kafkaData[15] = (byte)(a1 & 0xFF);
+//        this.kafkaData[16] = (byte)(a2 & 0xFF);
+//        this.kafkaData[17] = (byte)(a3 & 0xFF);
+//        this.kafkaData[18] = (byte)(a4 & 0xFF);
+//        this.kafkaData[19] = (byte)(a5 & 0xFF);
+//        this.kafkaData[20] = (byte)(a6 & 0xFF);
+//        this.kafkaData[21] = (byte)(a7 & 0xFF);
+//        this.kafkaData[22] = (byte)(a8 & 0xFF);
+//
+//        this.kafkaData[23] = (byte)(b1 & 0xFF);
+//        this.kafkaData[24] = (byte)(b2 & 0xFF);
+//        this.kafkaData[25] = (byte)(b3 & 0xFF);
+//        this.kafkaData[26] = (byte)(b4 & 0xFF);
+//        this.kafkaData[27] = (byte)(b5 & 0xFF);
+//        this.kafkaData[28] = (byte)(b6 & 0xFF);
+//        this.kafkaData[29] = (byte)(b7 & 0xFF);
+//        this.kafkaData[30] = (byte)(b8 & 0xFF);
+//    }
 }

+ 1 - 1
src/main/java/com/sig/comm/server/entity/TbInt.java

@@ -37,7 +37,7 @@ public class TbInt implements Serializable {
                 .groupNo(this.groupNo)
                 .nodeId(this.nodeId)
                 .debug(false)
-                .status(new IntStatusDto(this.nodeId))
+                .status(new IntStatusDto(this.regionCd, this.intNo, this.nodeId))
                 .build();
     }
 }

+ 5 - 2
src/main/java/com/sig/comm/server/process/dbms/DbmsDataProcess.java

@@ -6,6 +6,7 @@ import com.sig.comm.server.config.ThreadPoolInitializer;
 import com.sig.comm.server.dao.mapper.ProcessMapper;
 import com.sig.comm.server.dao.mapper.RegionCenterMapper;
 import com.sig.comm.server.dao.mapper.batch.SigCommServerDao;
+import com.sig.comm.server.dto.IntStatusDto;
 import com.sig.comm.server.dto.RegionCenter;
 import com.sig.comm.server.entity.TbProcessState;
 import com.sig.comm.server.entity.TbRegionCenterComm;
@@ -92,8 +93,10 @@ public class DbmsDataProcess {
             type = data.getType();
             switch(type) {
                 case DbmsData.DBMS_DATA_INT_PHASE_CHANGE:
-                    List<HashMap<String, Object>> changeLists = (List<HashMap<String, Object>>)data.getData();
-                    result = this.sigCommServerDao.updateIntPhaseStts(changeLists);
+//                    List<HashMap<String, Object>> changeLists = (List<HashMap<String, Object>>)data.getData();
+//                    result = this.sigCommServerDao.updateIntPhaseStts(changeLists);
+                    List<IntStatusDto> intStatusLists = (List<IntStatusDto>)data.getData();
+                    result = this.sigCommServerDao.updateIntPhaseStts(intStatusLists);
                     break;
                 case DbmsData.DBMS_DATA_INT_PHASE_CYCLE:
                     List<HashMap<String, Object>> cycleLists = (List<HashMap<String, Object>>)data.getData();

+ 17 - 0
src/main/java/com/sig/comm/server/process/kafka/KafkaDataAsyncTask.java

@@ -0,0 +1,17 @@
+package com.sig.comm.server.process.kafka;
+
+import com.sig.comm.server.process.dbms.DbmsData;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class KafkaDataAsyncTask {
+
+    @Async("kafkaDataExecutor")
+    public void run(KafkaDataProcess process, DbmsData data) {
+        process.runJob(data);
+    }
+
+}

+ 116 - 0
src/main/java/com/sig/comm/server/process/kafka/KafkaDataProcess.java

@@ -0,0 +1,116 @@
+package com.sig.comm.server.process.kafka;
+
+import com.sig.comm.server.common.SpringUtils;
+import com.sig.comm.server.config.ThreadPoolInitializer;
+import com.sig.comm.server.dto.IntStatusDto;
+import com.sig.comm.server.dto.RegionCenter;
+import com.sig.comm.server.kafka.KafkaProducerService;
+import com.sig.comm.server.process.dbms.DbmsData;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Slf4j
+@RequiredArgsConstructor
+@Service
+public class KafkaDataProcess {
+
+    private final LinkedBlockingQueue<DbmsData> kafkaDataBlockingQueue = new LinkedBlockingQueue<>(2000);
+    private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
+
+    private final KafkaDataAsyncTask asyncTask;
+    private final KafkaProducerService kafkaProducer;
+
+    public void run() {
+        log.info("KafkaDataProcess.run: Start.");
+        ThreadPoolInitializer poolInitializer = SpringUtils.getBean(ThreadPoolInitializer.class);
+        int executePool = poolInitializer.getKafka();
+        for (int ii = 0; ii < executePool; ii++) {
+            log.info("KafkaDataProcess.Task: {}", ii);
+            this.taskExecutor.execute(() -> {
+                boolean isRunning = true;
+                while (isRunning) {
+                    try {
+                        DbmsData data = kafkaDataBlockingQueue.take();
+                        asyncTask.run(this, data);
+//                        runJob(data);
+                    }
+                    catch (Exception e) {
+                        log.error("KafkaDataProcess.Task: Exception: {}", e.getMessage(), e);
+                        Thread.currentThread().interrupt();
+                        isRunning = false;
+                    }
+                }
+            });
+        }
+
+        log.info("KafkaDataProcess.run: ..End.");
+    }
+
+    public void runJob(DbmsData data) {
+        process(data);
+    }
+
+    public void process(DbmsData data) {
+        try {
+            if (DbmsData.DBMS_DATA_INT_PHASE_CHANGE == data.getType()) {
+                RegionCenter center = data.getCenter();
+                if (center == null) {
+                    log.error("KafkaDataProcess.process: RegionCenter is null.");
+                    return;
+                }
+                List<IntStatusDto> intStatusLists = (List<IntStatusDto>)data.getData();
+                int count = 0;
+                for (IntStatusDto status : intStatusLists) {
+                    if (status.getNodeId() > 1000000000) {
+                        count++;
+                    }
+                }
+                if (count > 0) {
+                    ByteBuffer buffer = ByteBuffer.allocate((count * IntStatusDto.MAX_KAFKA_DATA_SIZE) + 2);
+                    buffer.order(ByteOrder.BIG_ENDIAN);
+                    buffer.putShort((short)count);
+                    for (IntStatusDto status : intStatusLists) {
+                        if (status.getNodeId() > 1000000000) {
+                            buffer.put(status.getKafkaData());
+                        }
+                    }
+                    this.kafkaProducer.sendSig(center.getRegionCd(), buffer.array());
+                }
+//                intStatusLists.forEach(status -> {
+//                    if (status.getNodeId() > 1000000000) {
+//                        this.kafkaProducer.sendNode(Long.toString(status.getNodeId()), status.getKafkaData());
+//                        this.kafkaProducer.sendSig(Long.toString(status.getNodeId()), status.getKafkaData());
+//                    }
+//                });
+            }
+        }
+        catch (Exception e) {
+            log.error("KafkaDataProcess.process: Exception: {}", e.toString());
+        }
+    }
+
+    /*
+     *  작업큐에 데이터 추가
+     */
+    public boolean add(DbmsData data) {
+        boolean offer = false;
+        try {
+            offer = kafkaDataBlockingQueue.offer(data);
+            if (!offer) {
+                log.error("KafkaDataProcess.add: Queue Full Error, Size: {} EA", kafkaDataBlockingQueue.size());
+            }
+        } catch (Exception e) {
+            log.error("KafkaDataProcess.add: Exception: {}", e.getMessage(), e);
+        }
+        return offer;
+    }
+
+}

+ 84 - 56
src/main/java/com/sig/comm/server/xnet/server/process/response/SigPhaseChange.java

@@ -2,9 +2,9 @@ package com.sig.comm.server.xnet.server.process.response;
 
 import com.sig.comm.server.dto.IntDto;
 import com.sig.comm.server.dto.IntStatusDto;
-import com.sig.comm.server.kafka.KafkaProducerService;
 import com.sig.comm.server.process.dbms.DbmsData;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import com.sig.comm.server.process.kafka.KafkaDataProcess;
 import com.sig.comm.server.xnet.server.process.protocol.SigProtocolConst;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
@@ -16,11 +16,11 @@ import java.util.List;
 @Slf4j
 public class SigPhaseChange implements SigCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-    private final KafkaProducerService kafkaProducer;
+    private final KafkaDataProcess kafkaDataProcess;
 
-    public SigPhaseChange(DbmsDataProcess dbmsDataProcess, KafkaProducerService kafkaProducer) {
+    public SigPhaseChange(DbmsDataProcess dbmsDataProcess, KafkaDataProcess kafkaDataProcess) {
         this.dbmsDataProcess = dbmsDataProcess;
-        this.kafkaProducer = kafkaProducer;
+        this.kafkaDataProcess = kafkaDataProcess;
     }
 
     @Override
@@ -55,8 +55,11 @@ public class SigPhaseChange implements SigCommResponse {
 //            } pkt_phase_change, *pkt_phase_changep;
 
             long unixTimestamp = System.currentTimeMillis() / 1000L; // 초 단위로 변환
+            int phaseIdx;
+
+            List<IntStatusDto> statusLists = new ArrayList<>();
+//            List<HashMap<String, Object>> lists = new ArrayList<>();
 
-            List<HashMap<String, Object>> lists = new ArrayList<>();
             String regionCd = packet.getCenter().getRegionCd();
             String commDt = packet.getPacket().getCommDate();
             int idx = SigProtocolConst.SIG_HEAD_SIZE;
@@ -71,6 +74,7 @@ public class SigPhaseChange implements SigCommResponse {
                 int status  = (buffer[idx++] & 0xFF);
                 int count   = (buffer[idx++] & 0xFF);
                 int cycle   = (buffer[idx++] & 0xFF);
+                phaseIdx = idx;
                 int phaseA1 = (buffer[idx++] & 0xFF);
                 int phaseA2 = (buffer[idx++] & 0xFF);
                 int phaseA3 = (buffer[idx++] & 0xFF);
@@ -109,60 +113,84 @@ public class SigPhaseChange implements SigCommResponse {
                 int turnOffFlag = ((status&0x04)>0) ? 0x01 : 0x00;       /* 소등 상태, 1: 소등, 0: 정상 */
                 int manualFlag  = ((status&0xF0)>0) ? 0x01 : 0x00;
 
-                param.put("REGION_CD",                  regionCd);
-                param.put("INT_NO",                     intNo);
-                param.put("COLLCT_DTIME",               commDt);
-                param.put("COMM_ON_OFF_FLAG",           commStts);
-                param.put("CONTRLR_OPER_MODE_CD",       operMode);
-                param.put("A_RING_PHASE_VAL",           ringPhaseA);
-                param.put("B_RING_PHASE_VAL",           ringPhaseB);
-                param.put("SIGLIGHT_TURNOFF_FLAG",      turnOffFlag);
-                param.put("SIGLIGHT_BLINK_FLAG",        blinking);
-                param.put("CONTRLR_MANUAL_FLAG",        manualFlag);
-                param.put("MAP_NO",                     mapNo);
-                param.put("CONTRLR_TMDIFF_CONTRL_FLAG",	tmDiffFlag);
-                param.put("INT_SIG_CYCLE_CNT",          count);
-                param.put("INT_SIG_CYCLE_LEN",         	cycle);
-                param.put("A_RING_1_PHASE_VAL",         phaseA1);
-                param.put("A_RING_2_PHASE_VAL",         phaseA2);
-                param.put("A_RING_3_PHASE_VAL",         phaseA3);
-                param.put("A_RING_4_PHASE_VAL",         phaseA4);
-                param.put("A_RING_5_PHASE_VAL",         phaseA5);
-                param.put("A_RING_6_PHASE_VAL",         phaseA6);
-                param.put("A_RING_7_PHASE_VAL",         phaseA7);
-                param.put("A_RING_8_PHASE_VAL",         phaseA8);
-                param.put("B_RING_1_PHASE_VAL",         phaseB1);
-                param.put("B_RING_2_PHASE_VAL",         phaseB2);
-                param.put("B_RING_3_PHASE_VAL",         phaseB3);
-                param.put("B_RING_4_PHASE_VAL",         phaseB4);
-                param.put("B_RING_5_PHASE_VAL",         phaseB5);
-                param.put("B_RING_6_PHASE_VAL",         phaseB6);
-                param.put("B_RING_7_PHASE_VAL",         phaseB7);
-                param.put("B_RING_8_PHASE_VAL",         phaseB8);
-                param.put("PPC_CONTRL_FLAG",            ppcControl);
-                param.put("GROUP_NO",                   groupNo);
-
-                lists.add(param);
-
                 IntDto intDto = packet.getCenter().getIntMap().get(intNo);
-                if (intDto != null && intDto.getNodeId() > 1000000000) {
-                    IntStatusDto intStatus = intDto.getStatus();
-                    if (intStatus != null) {
-                        intStatus.initStatus(unixTimestamp);
-                        intStatus.setOperStatus(mode);
-                        intStatus.setPhase(ringA, ringB);
-                        intStatus.setStatus(status);
-                        intStatus.setCycle(count, cycle);
-                        intStatus.setPhaseVal(phaseA1, phaseA2, phaseA3, phaseA4, phaseA5, phaseA6, phaseA7, phaseA8,
-                                phaseB1, phaseB2, phaseB3, phaseB4, phaseB5, phaseB6, phaseB7, phaseB8);
-
-                        this.kafkaProducer.sendNode(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
-                        this.kafkaProducer.sendSig(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
-                    }
+                if (intDto == null) {
+                    log.warn("[{}], SigPhaseChange.response: Not Found IntNo: {}", packet.getCenter().getLogKey(), intNo);
+                    intDto = IntDto.builder()
+                            .isDeleted(false)
+                            .regionCd(packet.getCenter().getRegionCd())
+                            .intNo(intNo)
+                            .intType(0)
+                            .intLcType(1)
+                            .intLampType(3)
+                            .mainIntNo(0)
+                            .groupNo(0)
+                            .nodeId(0L)
+                            .debug(false)
+                            .status(new IntStatusDto(packet.getCenter().getRegionCd(), intNo, 0))
+                            .build();
+                    packet.getCenter().getIntMap().put(intNo, intDto);
                 }
-            }
 
-            this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INT_PHASE_CHANGE, packet.getCenter(), false, lists));
+                IntStatusDto intStatus = intDto.getStatus();
+                intStatus.initStatus(unixTimestamp);
+
+//                param.put("REGION_CD",                  regionCd);
+//                param.put("INT_NO",                     intNo);
+//                param.put("COLLCT_DTIME",               commDt);
+//                param.put("COMM_ON_OFF_FLAG",           commStts);
+//                param.put("CONTRLR_OPER_MODE_CD",       operMode);
+//                param.put("A_RING_PHASE_VAL",           ringPhaseA);
+//                param.put("B_RING_PHASE_VAL",           ringPhaseB);
+//                param.put("SIGLIGHT_TURNOFF_FLAG",      turnOffFlag);
+//                param.put("SIGLIGHT_BLINK_FLAG",        blinking);
+//                param.put("CONTRLR_MANUAL_FLAG",        manualFlag);
+//                param.put("MAP_NO",                     mapNo);
+//                param.put("CONTRLR_TMDIFF_CONTRL_FLAG",	tmDiffFlag);
+//                param.put("INT_SIG_CYCLE_CNT",          count);
+//                param.put("INT_SIG_CYCLE_LEN",         	cycle);
+//                param.put("A_RING_1_PHASE_VAL",         phaseA1);
+//                param.put("A_RING_2_PHASE_VAL",         phaseA2);
+//                param.put("A_RING_3_PHASE_VAL",         phaseA3);
+//                param.put("A_RING_4_PHASE_VAL",         phaseA4);
+//                param.put("A_RING_5_PHASE_VAL",         phaseA5);
+//                param.put("A_RING_6_PHASE_VAL",         phaseA6);
+//                param.put("A_RING_7_PHASE_VAL",         phaseA7);
+//                param.put("A_RING_8_PHASE_VAL",         phaseA8);
+//                param.put("B_RING_1_PHASE_VAL",         phaseB1);
+//                param.put("B_RING_2_PHASE_VAL",         phaseB2);
+//                param.put("B_RING_3_PHASE_VAL",         phaseB3);
+//                param.put("B_RING_4_PHASE_VAL",         phaseB4);
+//                param.put("B_RING_5_PHASE_VAL",         phaseB5);
+//                param.put("B_RING_6_PHASE_VAL",         phaseB6);
+//                param.put("B_RING_7_PHASE_VAL",         phaseB7);
+//                param.put("B_RING_8_PHASE_VAL",         phaseB8);
+//                param.put("PPC_CONTRL_FLAG",            ppcControl);
+//                param.put("GROUP_NO",                   groupNo);
+//
+//                lists.add(param);
+
+                if (intDto.getNodeId() > 1000000000) {
+                    intStatus.initStatus(unixTimestamp);
+                    intStatus.setOperStatus(mode);
+                    intStatus.setPhase(ringA, ringB);
+                    intStatus.setStatus(status);
+                    intStatus.setCycle(count, cycle);
+                    intStatus.setPhaseVal(buffer, phaseIdx);
+
+//                    intStatus.setPhaseVal(phaseA1, phaseA2, phaseA3, phaseA4, phaseA5, phaseA6, phaseA7, phaseA8,
+//                            phaseB1, phaseB2, phaseB3, phaseB4, phaseB5, phaseB6, phaseB7, phaseB8);
+//
+//                    this.kafkaProducer.sendNode(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
+//                    this.kafkaProducer.sendSig(Long.toString(intDto.getNodeId()), intStatus.getKafkaData());
+                }
+
+                statusLists.add(intStatus);
+            }
+            if (!statusLists.isEmpty()) {
+                this.kafkaDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INT_PHASE_CHANGE, packet.getCenter(), false, statusLists));
+                this.dbmsDataProcess.add(new DbmsData(DbmsData.DBMS_DATA_INT_PHASE_CHANGE, packet.getCenter(), false, statusLists));
+            }
         }
         catch (Exception e) {
             log.error("[{}], SigPhaseChange.response: Exception. will be closed. {}", packet.getCenter().getLogKey(), e.getMessage());

+ 4 - 4
src/main/java/com/sig/comm/server/xnet/server/process/response/SigPhaseCycle.java

@@ -1,8 +1,8 @@
 package com.sig.comm.server.xnet.server.process.response;
 
-import com.sig.comm.server.kafka.KafkaProducerService;
 import com.sig.comm.server.process.dbms.DbmsData;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import com.sig.comm.server.process.kafka.KafkaDataProcess;
 import com.sig.comm.server.xnet.server.process.protocol.SigProtocolConst;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
@@ -14,11 +14,11 @@ import java.util.List;
 @Slf4j
 public class SigPhaseCycle implements SigCommResponse {
     private final DbmsDataProcess dbmsDataProcess;
-    private final KafkaProducerService kafkaProducer;
+    private final KafkaDataProcess kafkaDataProcess;
 
-    public SigPhaseCycle(DbmsDataProcess dbmsDataProcess, KafkaProducerService kafkaProducer) {
+    public SigPhaseCycle(DbmsDataProcess dbmsDataProcess, KafkaDataProcess kafkaDataProcess) {
         this.dbmsDataProcess = dbmsDataProcess;
-        this.kafkaProducer = kafkaProducer;
+        this.kafkaDataProcess = kafkaDataProcess;
     }
 
     @Override

+ 4 - 4
src/main/java/com/sig/comm/server/xnet/server/process/work/DataPacketProcess.java

@@ -3,8 +3,8 @@ package com.sig.comm.server.xnet.server.process.work;
 import com.sig.comm.server.common.SpringUtils;
 import com.sig.comm.server.config.ThreadPoolInitializer;
 import com.sig.comm.server.dto.RegionCenter;
-import com.sig.comm.server.kafka.KafkaProducerService;
 import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import com.sig.comm.server.process.kafka.KafkaDataProcess;
 import com.sig.comm.server.repository.ApplicationRepository;
 import com.sig.comm.server.xnet.server.process.protocol.eSigOpCode;
 import com.sig.comm.server.xnet.server.process.response.*;
@@ -28,7 +28,7 @@ public class DataPacketProcess {
 
     private final DataPacketAsyncTask asyncTask;
     private final DbmsDataProcess dbmsDataProcess;
-    private final KafkaProducerService kafkaProducer;
+    private final KafkaDataProcess kafkaDataProcess;
 
     public void run() {
         log.info("DataPacketProcess.run: Start.");
@@ -67,10 +67,10 @@ public class DataPacketProcess {
                     response = new SigLogin(this.dbmsDataProcess);
                     break;
                 case SIG_PHASE_CHANGE:
-                    response = new SigPhaseChange(this.dbmsDataProcess, this.kafkaProducer);
+                    response = new SigPhaseChange(this.dbmsDataProcess, this.kafkaDataProcess);
                     break;
                 case SIG_STATIC_CYCLE:
-                    response = new SigPhaseCycle(this.dbmsDataProcess, this.kafkaProducer);
+                    response = new SigPhaseCycle(this.dbmsDataProcess, this.kafkaDataProcess);
                     break;
                 case SIG_EVENT_INFO:
                     response = new SigEventInfo(this.dbmsDataProcess);

+ 1 - 0
src/main/resources/application.yml

@@ -51,6 +51,7 @@ application:
   thread-pool:
     dbms: 10
     work: 5
+    kafka: 0
 
   kafka:
     bootstrap-servers: 192.168.11.23:9092