Browse Source

cluster library update logging

HANTE 1 month ago
parent
commit
c29698966c

+ 0 - 1
.idea/vcs.xml

@@ -2,6 +2,5 @@
 <project version="4">
   <component name="VcsDirectoryMappings">
     <mapping directory="$PROJECT_DIR$" vcs="Git" />
-    <mapping directory="$PROJECT_DIR$/its-common" vcs="Git" />
   </component>
 </project>

+ 1 - 1
its-cluster/src/main/java/com/its/common/cluster/codec/HaClusterMessageDecoder.java

@@ -24,7 +24,7 @@ public class HaClusterMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
         // 패킷 길이(4바이트)를 따로 저장
         int packetLength = in.readInt();
         if (this.isLogging) {
-            log.info("HaClusterMessageDecoder.decode: packetLength: {}", packetLength);
+            log.info("HaClusterMessageDecoder.decode: RECV {} Bytes.", packetLength + 4);
         }
 
         // 남은 데이터 부분 읽기

+ 7 - 0
its-cluster/src/main/java/com/its/common/cluster/codec/HaClusterMessageEncoder.java

@@ -4,10 +4,12 @@ import com.its.common.cluster.vo.HaClusterMessage;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 
+@Slf4j
 public class HaClusterMessageEncoder extends MessageToByteEncoder<HaClusterMessage> {
 
     private final boolean isLogging;
@@ -22,8 +24,13 @@ public class HaClusterMessageEncoder extends MessageToByteEncoder<HaClusterMessa
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         oos.writeObject(msg);
         oos.flush();
+
         byte[] bytes = bos.toByteArray();
 
+        if (this.isLogging) {
+            log.info("HaClusterMessageEncoder.encode: SEND {} Bytes.", bytes.length + 4);
+        }
+
         out.writeInt(bytes.length); // 길이 필드 추가
         out.writeBytes(bytes);
 

+ 1 - 1
its-cluster/src/main/java/com/its/common/cluster/handler/HaClusterSlaveHandler.java

@@ -16,7 +16,7 @@ public class HaClusterSlaveHandler extends ChannelInboundHandlerAdapter {
     private final AbstractHaClusterSlaveService slaveService;
     private final HaInfo cluster;
 
-    private ScheduledFuture<?> future;
+    private ScheduledFuture<?> future = null;
 
     @Override
     public void channelActive(final ChannelHandlerContext ctx) {

+ 8 - 6
its-cluster/src/main/java/com/its/common/cluster/service/AbstractHaClusterMasterService.java

@@ -124,11 +124,13 @@ public abstract class AbstractHaClusterMasterService {
             else {
                 this.clusterConfig.setMaster(false);
             }
-            log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, Master: {}.",
-                    this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
 
-            election(this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
+            if (this.clusterConfig.isLogging()) {
+                log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, Master: {}.",
+                        this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
+            }
 
+            election(this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
         }, 2 * 1000L);
     }
 
@@ -144,7 +146,7 @@ public abstract class AbstractHaClusterMasterService {
             }
         }
         catch (Exception e) {
-            log.info("ClusterMasterService.acceptGroup.shutdownGracefully");
+            log.error("ClusterMasterService.acceptGroup.shutdownGracefully");
         }
         try {
             if (this.workerGroup != null) {
@@ -152,7 +154,7 @@ public abstract class AbstractHaClusterMasterService {
             }
         }
         catch (Exception e) {
-            log.info("ClusterMasterService.workerGroup.shutdownGracefully");
+            log.error("ClusterMasterService.workerGroup.shutdownGracefully");
         }
         try {
             if (this.channelFuture != null && this.channelFuture.channel() != null) {
@@ -160,7 +162,7 @@ public abstract class AbstractHaClusterMasterService {
             }
         }
         catch (Exception e) {
-            log.info("ClusterMasterService.closeFuture");
+            log.error("ClusterMasterService.closeFuture");
         }
     }
 }

+ 28 - 25
its-cluster/src/main/java/com/its/common/cluster/service/AbstractHaClusterSlaveService.java

@@ -39,11 +39,11 @@ public abstract class AbstractHaClusterSlaveService {
     }
 
     public void start() throws Exception {
-        log.info("ClusterSlaveService.run: Start.");
+        if (this.clusterConfig.isLogging()) {
+            log.info("ClusterSlaveService.run: Start.");
+        }
 
-        /**
-         * Cluster 접속
-         */
+        // 모든 HaCluster 접속
         for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
             HaInfo cluster = entry.getValue();
             if (cluster.getServerId() == this.clusterConfig.getServerId()) {
@@ -55,7 +55,9 @@ public abstract class AbstractHaClusterSlaveService {
 
         try {
             List<Future<Object>> futures = this.executorService.invokeAll(this.clientTasks);
-            log.info("ClusterSlaveService.run: futures, {} EA.", (long)futures.size());
+            if (this.clusterConfig.isLogging()) {
+                log.info("ClusterSlaveService.run: futures, {} EA.", (long) futures.size());
+            }
 
             dataSyncSchedule();
         }
@@ -63,7 +65,9 @@ public abstract class AbstractHaClusterSlaveService {
             log.error("ClusterSlaveService.run: Exception: InterruptedException");
             Thread.currentThread().interrupt();
         }
-        log.info("ClusterSlaveService.run: ..End.");
+        if (this.clusterConfig.isLogging()) {
+            log.info("ClusterSlaveService.run: ..End.");
+        }
     }
 
     public void shutdown() {
@@ -95,7 +99,7 @@ public abstract class AbstractHaClusterSlaveService {
                 channel.close();
             }
         } catch (Exception e) {
-            log.info("ClusterSlaveService.channelClose");
+            log.error("ClusterSlaveService.channelClose");
         }
     }
     private String getSysTime() {
@@ -107,17 +111,7 @@ public abstract class AbstractHaClusterSlaveService {
     public abstract List<HaClusterMessageData> getClusterMessageData();
 
     private HaClusterMessage getClusterMessage() {
-//        List<HaClusterMessageData> details = new ArrayList<>();
         List<HaClusterMessageData> details = getClusterMessageData();
-//        List<String> keySet = new ArrayList<>(ApplicationRepository.CENTER_MAP.keySet());
-//        Collections.sort(keySet);
-//        for (String key : keySet) {
-//            CenterDto region = ApplicationRepository.CENTER_MAP.get(key);
-//            if (region == null) {
-//                continue;
-//            }
-////            details.add(region.getClusterData());
-//        }
         return HaClusterMessage.builder()
                 .serverId(this.clusterConfig.getServerId())
                 .master(this.clusterConfig.isMaster())
@@ -127,7 +121,9 @@ public abstract class AbstractHaClusterSlaveService {
     }
 
     private void dataSyncSchedule() {
-        log.info("ClusterSlaveService:dataSyncSchedule: {} seconds.", this.clusterConfig.getSyncSeconds());
+        if (this.clusterConfig.isLogging()) {
+            log.info("ClusterSlaveService:dataSyncSchedule: {} seconds.", this.clusterConfig.getSyncSeconds());
+        }
         this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
             for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
                 HaInfo cluster = entry.getValue();
@@ -145,24 +141,31 @@ public abstract class AbstractHaClusterSlaveService {
 
     public void sendSyncData(final HaInfo cluster, final Channel channel) {
         HaClusterMessage clusterMsg = getClusterMessage();
-        try {
+
+        if (this.clusterConfig.isLogging()) {
             MDC.put("id", cluster.getLogKey());
+        }
+
+        try {
             ChannelFuture f = channel.writeAndFlush(clusterMsg);
             f.awaitUninterruptibly();
             if (f.isDone() || f.isSuccess()) {
-                log.info("ClusterSlaveService.sendSyncData: [{}], {}, [--TO: serverId: {}, (serverId: {}, serverTime: {})]",
-                        this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
-                        cluster.getServerId(), clusterMsg.getServerId(), clusterMsg.getServerTime());
+                if (this.clusterConfig.isLogging()) {
+                    log.info("ClusterSlaveService.sendSyncData: [{}], {}, [--TO: serverId: {}, (serverId: {}, serverTime: {})]",
+                            this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
+                            cluster.getServerId(), clusterMsg.getServerId(), clusterMsg.getServerTime());
+                }
             }
         }
         catch (Exception e) {
-            log.info("ClusterSlaveService.sendSyncData: [{}], {}, Failed: [--TO: serverId: {}, (serverId: {}, serverTime: {})]",
+            log.error("ClusterSlaveService.sendSyncData: [{}], {}, Failed: [--TO: serverId: {}, (serverId: {}, serverTime: {})]",
                     this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
                     cluster.getServerId(), clusterMsg.getServerId(), clusterMsg.getServerTime());
-            log.info("ClusterSlaveService.sendSyncData: [{}], {}, Failed: {}",
+            log.error("ClusterSlaveService.sendSyncData: [{}], {}, Failed: {}",
                     this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel), e.getMessage());
         }
-        finally {
+
+        if (this.clusterConfig.isLogging()) {
             MDC.remove(cluster.getLogKey());
             MDC.clear();
         }

+ 36 - 32
its-cluster/src/main/java/com/its/common/cluster/service/HaClusterMasterInitializer.java

@@ -35,53 +35,57 @@ public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
         String ipAddress  = HaUtils.getRemoteIpAddress(channel);
         int clientPort = HaUtils.getRemotePort(channel);
         int serverId = clientPort - this.clusterConfig.getSyncPort();
-        log.info("ClusterMasterInitializer.----initChannel: connected from: {}:{}, ServerId: {}.",
-                ipAddress, clientPort, serverId);
-//        HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
+
+        if (this.clusterConfig.isLogging()) {
+            log.info("HaClusterMasterInitializer.initChannel: connected from: {}:{}, ServerId: {}.", ipAddress, clientPort, serverId);
+        }
+
+        // 하나의 서버에 여러 개의 클러스터가 있을 수 있기 때문에 IP Address 로 찾는 것은 위험함.
+        // HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
         HaInfo cluster = this.clusterConfig.getClusterMap().get(serverId);
         if (cluster == null) {
-            log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
+            log.error("HaClusterMasterInitializer.initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
             channel.disconnect();
             channel.close();
             return;
         }
         if (!cluster.getIpAddress().equals(ipAddress)) {
-            log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress);
+            log.error("HaClusterMasterInitializer.initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress);
             channel.disconnect();
             channel.close();
             return;
         }
 
-        try {
+        if (this.clusterConfig.isLogging()) {
             MDC.put("id", cluster.getLogKey());
+            log.info("HaClusterMasterInitializer.initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
+        }
+
+        if (cluster.getElectionState().getChannel() != null) {
+            log.warn("HaClusterMasterInitializer.initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
+            // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
+            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+            cluster.getElectionState().disConnect();
+
+            channel.disconnect();
+            channel.close();
+        }
 
-            log.info("ClusterMasterInitializer.----initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
-            if (cluster.getElectionState().getChannel() != null) {
-                log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
-                // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
-                channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
-                cluster.getElectionState().disConnect();
-
-                channel.disconnect();
-                channel.close();
-            }
-
-            cluster.getElectionState().connect(channel);
-            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
-
-            IdleStateHandler idleStateHandler = new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS);
-            ChannelPipeline pipeline = channel.pipeline();
-//            if (this.clusterConfig.isLogging()) {
-//                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
-//            }
-            pipeline.addLast(idleStateHandler);
-
-            pipeline.addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
-            pipeline.addLast(new HaClusterMessageDecoder(this.clusterConfig.isLogging()));
-            pipeline.addLast(new HaClusterMessageEncoder(this.clusterConfig.isLogging()));
-            pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig));
+        cluster.getElectionState().connect(channel);
+        channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
+
+        ChannelPipeline pipeline = channel.pipeline();
+
+        if (this.clusterConfig.isPacketLogging()) {
+            pipeline.addLast(new LoggingHandler(LogLevel.INFO));
         }
-        finally {
+        pipeline.addLast(new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS));
+        pipeline.addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
+        pipeline.addLast(new HaClusterMessageDecoder(this.clusterConfig.isLogging()));
+        pipeline.addLast(new HaClusterMessageEncoder(this.clusterConfig.isLogging()));
+        pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig));
+
+        if (this.clusterConfig.isLogging()) {
             MDC.remove(cluster.getLogKey());
             MDC.clear();
         }

+ 45 - 35
its-cluster/src/main/java/com/its/common/cluster/service/HaClusterSlave.java

@@ -43,13 +43,17 @@ public class HaClusterSlave implements Callable<Object> {
     public Object call() {
 
         try {
-            MDC.put("id", this.cluster.getLogKey());
+            if (this.clusterConfig.isLogging()) {
+                MDC.put("id", this.cluster.getLogKey());
+            }
 
             this.ipAddress = this.cluster.getIpAddress();
             this.port = this.cluster.getSyncPort();
 
             if (this.bootstrap == null) {
-                log.info("ClusterSlave >>>>>>>>Start: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
+                if (this.clusterConfig.isLogging()) {
+                    log.info("HaClusterSlave: >>>>>>>>Start: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
+                }
                 this.bootstrap = this.bootstrapFactory.createBootstrap();
                 this.bootstrap.option(ChannelOption.SO_REUSEADDR, true);
                 this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5 * 1000);
@@ -57,23 +61,26 @@ public class HaClusterSlave implements Callable<Object> {
                     // 핸들러가 실행되는 순서는 추가된 순서에 의해 결정된다.(Inbound: head=>tail, Outbound: tail=>head, name2ctx)
                     @Override
                     public void initChannel(SocketChannel ch) {
-//                        if (cluster.isLogging()) {
-//                            ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
-//                        }
-                        IdleStateHandler idleStateHandler = new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS);
-
-                        ch.pipeline().addLast(idleStateHandler);
-                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
-                        ch.pipeline().addLast(new HaClusterMessageDecoder(clusterConfig.isLogging()));
-                        ch.pipeline().addLast(new HaClusterMessageEncoder(clusterConfig.isLogging()));
-                        ch.pipeline().addLast(new HaClusterSlaveHandler(slaveService, cluster));
+                    if (clusterConfig.isPacketLogging()) {
+                        ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
+                    }
+                    ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
+                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
+                    ch.pipeline().addLast(new HaClusterMessageDecoder(clusterConfig.isLogging()));
+                    ch.pipeline().addLast(new HaClusterMessageEncoder(clusterConfig.isLogging()));
+                    ch.pipeline().addLast(new HaClusterSlaveHandler(slaveService, cluster));
                     }
                 });
-                // 바인드 로컬 포트 설정
+
+                // 바인딩 로컬 포트 설정(설정파일에 등록된 바인딩 포트번호 + 서버 ID), 따라서 하나의 서버에 여러 개의 클러스터 서버가 존재할 수 있다.
+                // 주의) 하나의 서버에 여러 개의 클러스터 서버가 존재할 경우, 바인딩 포트는 위의 규칙에 속하지 않는 포트번호를 할당해야 한다.
                 this.bootstrap.localAddress(new InetSocketAddress(this.port + this.clusterConfig.getServerId()));
             }
 
-            log.info("ClusterSlave >>Connect Try: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
+            if (this.clusterConfig.isLogging()) {
+                log.info("HaClusterSlave: >>Connect Try: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
+            }
+
             if (this.channelFuture != null && this.channelFuture.channel() != null) {
                 this.channelFuture.channel().close();
                 this.channelFuture = null;
@@ -107,19 +114,24 @@ public class HaClusterSlave implements Callable<Object> {
      * 연결 성공시 처리 이벤트
      */
     protected void channelOpen(ChannelFuture future) {
-        try {
+        if (this.clusterConfig.isLogging()) {
             MDC.put("id", this.cluster.getLogKey());
-            if (future.isSuccess()) {
-                Channel channel = future.channel();
-                log.info("ClusterSlave ..channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
-                channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
-                this.cluster.getSyncState().connect(channel);
-            }
-            else {
-                log.warn("ClusterSlave ConnectFailed: [{}, {}], {}, Cause: {}", cluster.getServerId(), cluster.getIpAddress(), cluster.getSyncPort(), future.cause().getMessage());
+        }
+        if (future.isSuccess()) {
+            Channel channel = future.channel();
+
+            if (this.clusterConfig.isLogging()) {
+                log.info("HaClusterSlave: channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
             }
+
+            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
+            this.cluster.getSyncState().connect(channel);
         }
-        finally {
+        else {
+            log.warn("HaClusterSlave: ConnectFailed: [{}, {}], {}, Cause: {}", cluster.getServerId(), cluster.getIpAddress(), cluster.getSyncPort(), future.cause().getMessage());
+        }
+
+        if (this.clusterConfig.isLogging()) {
             MDC.remove(this.cluster.getLogKey());
             MDC.clear();
         }
@@ -130,21 +142,19 @@ public class HaClusterSlave implements Callable<Object> {
      * @param future
      */
     protected synchronized void channelClosed(ChannelFuture future) {
-        try {
-            MDC.put("id", this.cluster.getLogKey());
+        Channel channel = future.channel();
 
-            Channel channel = future.channel();
-            log.warn("ClusterSlave channelClosed: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
-
-            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
-            this.cluster.getSyncState().disConnect();
-            channel.close();
-            channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);
-        }
-        finally {
+        if (this.clusterConfig.isLogging()) {
+            MDC.put("id", this.cluster.getLogKey());
+            log.warn("ClusterSlave:channelClosed: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
             MDC.remove(this.cluster.getLogKey());
             MDC.clear();
         }
+
+        channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+        this.cluster.getSyncState().disConnect();
+        channel.close();
+        channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);
     }
 
 }

+ 0 - 1
its-cluster/src/main/java/com/its/common/cluster/service/HaClusterSlaveBootstrapFactory.java

@@ -9,7 +9,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-@Slf4j
 @RequiredArgsConstructor
 public class HaClusterSlaveBootstrapFactory {
     private final int workerThread;

+ 4 - 3
its-cluster/src/main/java/com/its/common/cluster/vo/AbstractHaClusterConfig.java

@@ -28,9 +28,10 @@ public abstract class AbstractHaClusterConfig {
     private int syncPort = -1;      // 데이터 동기화를 위한 포트
 
     // application.yml 에 설정되는 정보(application.ha-cluster)
-    private boolean enabled = false;    // 클러스터 기능 사용 여부
-    private boolean logging = false;    // 라이브러리 내 로깅 여부
-    private String configFile;          // 클러스터 설정 파일 경로
+    private boolean enabled = false;        // 클러스터 기능 사용 여부
+    private boolean logging = false;        // 라이브러리 내 로깅 여부
+    private boolean packetLogging = false;  // 패킷 로깅 여부
+    private String configFile;              // 클러스터 설정 파일 경로
 
     private final HashMap<Integer, HaInfo> clusterMap = new HashMap<>();