فهرست منبع

update logging thread to asynclogger

shjung 4 روز پیش
والد
کامیت
44bab85e4b

+ 2 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/config/ApplicationConfig.java

@@ -19,6 +19,7 @@ import javax.annotation.PostConstruct;
 public class ApplicationConfig {
 
     private String processId = "tsi-comm-server";
+    private boolean useLoggingThread = false;
     private double cpuLimits = 75;
     private String bootingTime;
     private boolean startSchedule;
@@ -33,6 +34,7 @@ public class ApplicationConfig {
 
         log.info("[ApplicationConfig] -------------------------");
         log.info("[ApplicationConfig]         processId: {}", this.processId);
+        log.info("[ApplicationConfig]  useLoggingThread: {}", this.useLoggingThread);
         log.info("[ApplicationConfig]         cpuLimits: {}", this.cpuLimits);
     }
 

+ 7 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/controller/TsiCommServerRestController.java

@@ -217,6 +217,13 @@ public class TsiCommServerRestController {
 
         sb.append(" Logging Worker Queue Information.").append(sep);
 
+        if (!this.loggingProcess.isEnabled()) {
+            sb.append(heading).append(sep);
+            sb.append(" Logging Worker is disabled (useLoggingThread=false).").append(sep);
+            sb.append(heading).append(sep);
+            return sb.toString();
+        }
+
         List<AbstractTsiCvimWorker> loggingWorkerList = this.loggingProcess.getWorkerList();
         List<TsiCvimLoggingWorker> loggingWorkers = loggingWorkerList.stream()
                 .filter(worker -> worker instanceof TsiCvimLoggingWorker)

+ 37 - 1
tsi-comm-server/src/main/java/com/tsi/comm/server/process/logging/TsiCvimLoggingProcess.java

@@ -1,5 +1,6 @@
 package com.tsi.comm.server.process.logging;
 
+import com.tsi.comm.server.config.ApplicationConfig;
 import com.tsi.comm.server.config.TsiCvimServerConfig;
 import com.tsi.comm.server.process.AbstractTsiCvimProcess;
 import com.tsi.comm.server.process.AbstractTsiCvimWorker;
@@ -13,6 +14,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
 import org.springframework.stereotype.Service;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -24,12 +27,36 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class TsiCvimLoggingProcess extends AbstractTsiCvimProcess {
 
+    private final ApplicationConfig config;
     private final TsiNodeManager nodeManager;
 
     private ExecutorService executor = null;
     private int qSize;
 
+    public boolean isEnabled() {
+        return this.config.isUseLoggingThread();
+    }
+
+    @Override
+    public List<AbstractTsiCvimWorker> getWorkerList() {
+        if (!isEnabled()) {
+            return Collections.emptyList(); // 비활성화 시 빈 리스트 반환
+        }
+        return super.getWorkerList();
+    }
+
+    public int getQSize() {
+        if (!isEnabled()) {
+            return 0; // 비활성화 시 0 반환
+        }
+        return this.qSize;
+    }
+
     public void start() {
+        if (!this.config.isUseLoggingThread()) {
+            return;
+        }
+
         ThreadGroup workerGroup = new ThreadGroup("loggingProcess");
         TsiCvimServerConfig tsiCvimServerConfig = SpringUtils.getBean(TsiCvimServerConfig.class);
 
@@ -45,7 +72,7 @@ public class TsiCvimLoggingProcess extends AbstractTsiCvimProcess {
             private int count = 0;
             public Thread newThread(Runnable r) {
                 Thread t = new Thread(workerGroup, r);
-                t.setName(String.format("logWorker-%02d.%02d", workers, ++count));
+                t.setName(String.format("loggingWorker-%02d.%02d", workers, ++count));
                 t.setDaemon(true);
                 return t;
             }
@@ -71,6 +98,9 @@ public class TsiCvimLoggingProcess extends AbstractTsiCvimProcess {
     }
 
     public boolean add(Object object, int idx) {
+        if (!this.config.isUseLoggingThread()) {
+            return false;
+        }
         boolean offer = false;
         AbstractTsiPacket packet = (AbstractTsiPacket)object;
         try {
@@ -91,6 +121,9 @@ public class TsiCvimLoggingProcess extends AbstractTsiCvimProcess {
 
     public void stop() {
         log.info("LoggingProcess Stopping...");
+        if (!this.config.isUseLoggingThread()) {
+            return;
+        }
 
         for (AbstractTsiCvimWorker worker : this.workerList) {
             try {
@@ -130,6 +163,9 @@ public class TsiCvimLoggingProcess extends AbstractTsiCvimProcess {
     }
 
     public void report() {
+        if (!this.config.isUseLoggingThread()) {
+            return;
+        }
         for (AbstractTsiCvimWorker worker : this.workerList) {
             worker.report();
         }

+ 4 - 2
tsi-comm-server/src/main/java/com/tsi/comm/server/process/packet/TsiCvimPacketProcess.java

@@ -1,5 +1,6 @@
 package com.tsi.comm.server.process.packet;
 
+import com.tsi.comm.server.config.ApplicationConfig;
 import com.tsi.comm.server.config.TsiCvimServerConfig;
 import com.tsi.comm.server.kafka.KafkaProducerService;
 import com.tsi.comm.server.process.AbstractTsiCvimProcess;
@@ -27,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 @Service
 public class TsiCvimPacketProcess extends AbstractTsiCvimProcess {
 
+    private final ApplicationConfig config;
     private final KafkaProducerService kafkaProducerService;
     private final TsiNodeManager nodeManager;
     private ExecutorService executor = null;
@@ -46,14 +48,14 @@ public class TsiCvimPacketProcess extends AbstractTsiCvimProcess {
             private int count = 0;
             public Thread newThread(@NotNull Runnable r) {
                 Thread t = new Thread(workerGroup, r);
-                t.setName(String.format("logWorker-%02d.%02d", workers, ++count));
+                t.setName(String.format("packetWorker-%02d.%02d", workers, ++count));
                 t.setDaemon(true);
                 return t;
             }
         });
 
         for (int ii = 0; ii < this.workers; ii++) {
-            TsiCvimPacketWorker packetWorker = new TsiCvimPacketWorker(ii, this.qSize, this.kafkaProducerService);
+            TsiCvimPacketWorker packetWorker = new TsiCvimPacketWorker(ii, this.qSize, this.kafkaProducerService, this.config.isUseLoggingThread());
             this.workerList.add(packetWorker);
             this.executor.submit(packetWorker); // 여기서 실행됨!
         }

+ 32 - 3
tsi-comm-server/src/main/java/com/tsi/comm/server/process/packet/TsiCvimPacketWorker.java

@@ -15,6 +15,8 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
 
+import java.util.Date;
+import java.text.SimpleDateFormat;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -30,11 +32,14 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
     private final TsiCvimLoggingProcess loggingProcess;
     private final TraceConfig traceConfig;
     private final TsiCvimServerConfig serverConfig;
+    private final boolean useLoggingThread;
+    private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-    public TsiCvimPacketWorker(int idx, int qSize, KafkaProducerService kafkaProducer) {
+    public TsiCvimPacketWorker(int idx, int qSize, KafkaProducerService kafkaProducer, boolean useLoggingThread) {
         this.idx = idx;
         this.qSize = qSize;
         this.kafkaProducer = kafkaProducer;
+        this.useLoggingThread = useLoggingThread;
         this.DATA_QUEUE = new LinkedBlockingQueue<>(qSize);
         this.loggingProcess = SpringUtils.getBean(TsiCvimLoggingProcess.class);
         this.traceConfig = SpringUtils.getBean(TraceConfig.class);
@@ -288,7 +293,12 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
     private void logPacketIfNeeded(AbstractTsiPacket packet, TsiNodeVo nodeVo) {
         if (this.traceConfig.isNodeLogging() || nodeVo.isDump()) {
 
-            this.loggingProcess.add(packet, nodeVo.getLogQIdx());
+            if (this.useLoggingThread) {
+                this.loggingProcess.add(packet, nodeVo.getLogQIdx());
+            }
+            else {
+                logging(packet);
+            }
 
             if (packet instanceof TsiCpuPacket) {
                 TsiCpuPacket cpuPacket = (TsiCpuPacket) packet;
@@ -301,13 +311,32 @@ public class TsiCvimPacketWorker extends AbstractTsiCvimWorker implements Runnab
                         addNodePacket.setEnd(packet.getEnd());
                         addNodePacket.setAvg(packet.getAvg());
 
-                        this.loggingProcess.add(addNodePacket, nodeVo.getLogQIdx());
+                        if (this.useLoggingThread) {
+                            this.loggingProcess.add(addNodePacket, nodeVo.getLogQIdx());
+                        }
+                        else {
+                            logging(addNodePacket);
+                        }
                     }
                 }
             }
         }
     }
 
+    private void logging(AbstractTsiPacket packet) {
+        long job = packet.getEnd() - packet.getRcv();
+
+        Date date = new Date(packet.getTimespec().times() * 1000L);
+        String collectTime = this.sdf.format(date);
+        log.info("{} Node: {},        Job: {} {}, {} bytes, Average {}",
+                collectTime,
+                packet.getNodeId(),
+                TimeUtils.elapsedTimeStr(job),
+                Thread.currentThread().getName(),
+                packet.getPacketLength(),
+                TimeUtils.elapsedTimeStr(packet.getAvg()));
+    }
+
     public void report() {
         log.info("PacketWorker({}), Queue Total/Size/Remain: {}/{}/{}, Average: {}, {}",
                 this.idx,

+ 29 - 20
tsi-comm-server/src/main/java/com/tsi/comm/server/repository/TsiReportManager.java

@@ -1,8 +1,5 @@
 package com.tsi.comm.server.repository;
 
-import com.tsi.comm.server.xnet.NettyUtils;
-import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
-import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
 import com.tsi.comm.server.process.AbstractTsiCvimWorker;
 import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
 import com.tsi.comm.server.process.dbms.TsiCvimDbmsWorker;
@@ -13,6 +10,9 @@ import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
 import com.tsi.comm.server.process.packet.TsiCvimPacketWorker;
 import com.tsi.comm.server.vo.TsiAlarmConfigVo;
 import com.tsi.comm.server.vo.TsiNodeVo;
+import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
+import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
+import com.tsi.comm.server.xnet.NettyUtils;
 import com.tsi.common.utils.TimeUtils;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -20,7 +20,9 @@ import org.slf4j.MDC;
 import org.springframework.stereotype.Component;
 
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -185,25 +187,32 @@ public class TsiReportManager {
                 isTail = true;
             }
         }
-
-        List<AbstractTsiCvimWorker> loggingWorkerList = this.loggingProcess.getWorkerList();
-        remainingCapacity = isAlert ? 100 : this.loggingProcess.getQSize();
-        if (!isAlert) {
-            log.info("----- Logging Worker: {} EA, QSize: {} EA.", loggingWorkerList.size(), this.loggingProcess.getQSize());
+        if (this.loggingProcess.isEnabled()) {
+            List<AbstractTsiCvimWorker> loggingWorkerList = this.loggingProcess.getWorkerList();
+            remainingCapacity = isAlert ? 100 : this.loggingProcess.getQSize();
+            if (!isAlert) {
+                log.info("----- Logging Worker: {} EA, QSize: {} EA.", loggingWorkerList.size(), this.loggingProcess.getQSize());
+            }
+            List<TsiCvimLoggingWorker> loggingWorkers = loggingWorkerList.stream()
+                    .filter(worker -> worker instanceof TsiCvimLoggingWorker)
+                    .map(worker -> (TsiCvimLoggingWorker) worker)
+                    .collect(Collectors.toList());
+            for (TsiCvimLoggingWorker worker : loggingWorkers) {
+                if (worker.getDATA_QUEUE().remainingCapacity() <= remainingCapacity) {
+                    log.info("LoggingWorker({}), Total/Size/Remain: {}/{}/{}, Average: {}",
+                            worker.getIdx(),
+                            worker.getQSize(), worker.getDATA_QUEUE().size(), worker.getDATA_QUEUE().remainingCapacity(),
+                            TimeUtils.elapsedTimeStr(worker.getAvgTime()));
+                    isTail = true;
+                }
+            }
         }
-        List<TsiCvimLoggingWorker> loggingWorkers = loggingWorkerList.stream()
-                .filter(worker -> worker instanceof TsiCvimLoggingWorker)
-                .map(worker -> (TsiCvimLoggingWorker) worker)
-                .collect(Collectors.toList());
-        for (TsiCvimLoggingWorker worker : loggingWorkers) {
-            if (worker.getDATA_QUEUE().remainingCapacity() <= remainingCapacity) {
-                log.info("LoggingWorker({}), Total/Size/Remain: {}/{}/{}, Average: {}",
-                        worker.getIdx(),
-                        worker.getQSize(), worker.getDATA_QUEUE().size(), worker.getDATA_QUEUE().remainingCapacity(),
-                        TimeUtils.elapsedTimeStr(worker.getAvgTime()));
-                isTail = true;
+        else {
+            if (!isAlert) {
+                log.info(" Logging Worker is disabled (useLoggingThread=false).");
             }
         }
+
         if (isTail) {
             log.info("================================================================================================================");
         }

+ 1 - 0
tsi-comm-server/src/main/resources/application.yml

@@ -64,6 +64,7 @@ logging:
 # Application Configure
 application:
   process-id: tsi-comm-server
+  use-logging-thread: false
   thread-pool:
     pool-core: 1
   scheduling:

+ 23 - 4
tsi-comm-server/src/main/resources/logback-spring.xml

@@ -95,20 +95,39 @@
         </sift>
     </appender>
 
+    <appender name="ASYNC_PROCESS" class="ch.qos.logback.classic.AsyncAppender">
+        <!-- 큐 사이즈: 메모리가 허용하는 한 넉넉하게 (기본값 256) -->
+        <queueSize>10000</queueSize>
+
+        <!-- 큐가 80% 찼을 때 INFO, DEBUG, TRACE 레벨 로그는 버림 (WARN, ERROR는 유지) -->
+        <!-- 0으로 설정하면 절대 버리지 않음 (성능 저하 위험 감수) -->
+        <discardingThreshold>5</discardingThreshold>
+
+        <!-- 큐가 꽉 찼을 때 동작 설정 -->
+        <!-- true: 절대 대기하지 않음 (큐가 차면 로그를 버리고 즉시 리턴 -> 시스템 성능 보호, 로그 유실됨) -->
+        <!-- false: 대기함 (큐가 빌 때까지 스레드 멈춤 -> 로그 보존, 시스템 성능 저하 위험) -->
+        <neverBlock>true</neverBlock>
+
+        <!-- 호출자 데이터 포함 여부 (성능을 위해 false 권장) -->
+        <includeCallerData>false</includeCallerData>
+
+        <appender-ref ref="FILE_PROCESS" />
+    </appender>
+
     <logger name="com.tsi.comm.server.protocol" level="INFO" additivity="false">
-        <appender-ref ref="FILE_PROCESS"/>
+        <appender-ref ref="ASYNC_PROCESS"/>
         <appender-ref ref="FILE_ERROR"/>
     </logger>
     <logger name="com.tsi.comm.server.process.packet" level="INFO" additivity="false">
-        <appender-ref ref="FILE_PROCESS"/>
+        <appender-ref ref="ASYNC_PROCESS"/>
         <appender-ref ref="FILE_ERROR"/>
     </logger>
     <logger name="com.tsi.comm.server.process.logging" level="INFO" additivity="false">
-        <appender-ref ref="FILE_PROCESS"/>
+        <appender-ref ref="ASYNC_PROCESS"/>
         <appender-ref ref="FILE_ERROR"/>
     </logger>
     <logger name="com.tsi.comm.server.tcp.handler.MdcLoggingHandler" level="INFO" additivity="false">
-        <appender-ref ref="FILE_PROCESS"/>
+        <appender-ref ref="ASYNC_PROCESS"/>
         <appender-ref ref="FILE_ERROR"/>
     </logger>