shjung há 10 meses atrás
pai
commit
1ddcf4af31

+ 2 - 1
src/main/java/com/ggits/comm/server/dto/IntStatusDto.java

@@ -10,12 +10,13 @@ import java.io.Serializable;
 //@Data
 //@Builder
 @ToString
+@Getter
 @NoArgsConstructor//(access = AccessLevel.PROTECTED)
 @AllArgsConstructor
 public class IntStatusDto implements Serializable {
     public static final long serialVersionUID = 1L;
 
-    public static final int MAX_KAFKA_DATA_SIZE = 51;
+    public static final int MAX_KAFKA_DATA_SIZE = 31;
 
     public String regionCd;                   	//	N	VARCHAR2(3)	N
     public int    intNo;                        //	N	NUMBER(6)	N

+ 10 - 6
src/main/java/com/ggits/comm/server/process/dbms/DbmsDataProcess.java

@@ -9,6 +9,7 @@ import com.ggits.comm.server.dto.IntStatusDto;
 import com.ggits.comm.server.dto.RegionCenter;
 import com.ggits.comm.server.entity.TbProcessState;
 import com.ggits.comm.server.entity.TbRegionCenterComm;
+import com.ggits.comm.server.kafka.KafkaProducerService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -23,21 +24,17 @@ import java.util.concurrent.ThreadPoolExecutor;
 @Service
 public class DbmsDataProcess {
 
-    private final LinkedBlockingQueue<DbmsData> dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000);
+    private final LinkedBlockingQueue<DbmsData> dbmsDataBlockingQueue = new LinkedBlockingQueue<>(2000);
     private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
 
     private final DbmsDataAsyncTask asyncTask;
+    private final KafkaProducerService kafkaProducer;
     private final ProcessMapper processMapper;
     private final RegionCenterMapper centerMapper;
     private final SigIntDao sigIntDao;
 
-    private int maxCore = Runtime.getRuntime().availableProcessors();
-
     public void run() {
         log.info("DbmsDataProcess.run: Start.");
-        if (this.maxCore < 8) {
-            this.maxCore = 8;
-        }
         ThreadPoolInitializer poolInitializer = SpringUtils.getBean(ThreadPoolInitializer.class);
         int executePool = poolInitializer.getDbms();
         for (int ii = 0; ii < executePool; ii++) {
@@ -94,6 +91,13 @@ public class DbmsDataProcess {
                         return;
                     }
                     List<IntStatusDto> intStatusLists = (List<IntStatusDto>)data.getData();
+                    intStatusLists.forEach(status -> {
+                        if (status.getNodeId() > 1000000000) {
+                            this.kafkaProducer.sendNode(Long.toString(status.getNodeId()), status.getKafkaData());
+                            this.kafkaProducer.sendSig(Long.toString(status.getNodeId()), status.getKafkaData());
+                        }
+                    });
+
                     cnt = this.sigIntDao.updateStatus(intStatusLists);
 //                    log.info("INT_STATUS_UPDATE: [{}, {}], {}",
 //                            data.getRegionCd(), data.getRegionId(),

+ 1 - 1
src/main/java/com/ggits/comm/server/process/work/GgitsPacketProcess.java

@@ -56,6 +56,7 @@ public class GgitsPacketProcess extends AbstractAppProcess {
             qSize = 1000;
         }
 
+//        qSize *= 2;
         qSize *= 4;
         qSize /= this.workers;
         for (int ii = 0; ii < this.workers; ii++) {
@@ -69,7 +70,6 @@ public class GgitsPacketProcess extends AbstractAppProcess {
         for (Thread worker : this.threadList) {
             worker.start();
         }
-
     }
 
     public void stop() {

+ 3 - 4
src/main/java/com/ggits/comm/server/process/work/GgitsPacketWorker.java

@@ -63,8 +63,7 @@ public class GgitsPacketWorker extends AbstractAppWorker implements Runnable {
             offer = this.DATA_QUEUE.offer(packet);
             if (!offer) {
                 MDC.put("id", Long.toString(packet.getLocalPort()));
-                log.error("Packet Queue.offer: [{}]({})/{}, Queue Full: {} EA.",
-                        packet.getLocalPort(), this.idx, this.DATA_QUEUE.size(), this.qSize);
+                log.error("Packet Queue.offer: [{}]({})/{}, Queue Full: {} EA.", packet.getLocalPort(), this.idx, this.DATA_QUEUE.size(), this.qSize);
                 MDC.clear();
             }
         } catch (Exception e) {
@@ -244,8 +243,8 @@ public class GgitsPacketWorker extends AbstractAppWorker implements Runnable {
                         status.setOperStatus(operStts);
                         status.setPhase(aRingCode, aRingStep, bRingCode, bRingStep, holdPhase);
                         status.setStatus(lcStts, holdPhase);
-                        this.kafkaProducer.sendNode(Long.toString(intDto.getNodeId()), status.getKafkaData());
-                        this.kafkaProducer.sendSig(Long.toString(intDto.getNodeId()), status.getKafkaData());
+//                        this.kafkaProducer.sendNode(Long.toString(intDto.getNodeId()), status.getKafkaData());
+//                        this.kafkaProducer.sendSig(Long.toString(intDto.getNodeId()), status.getKafkaData());
                     }
                 }
 

+ 0 - 1
src/main/resources/application.yml

@@ -48,7 +48,6 @@ application:
   kafka:
     bootstrap-servers: 192.168.11.23:9092
     group-id: ggits-comm-server
-    consumer-ack-config: 1
     enable-node: true
     enable-sig: true
     props: