Browse Source

moct cluster algorithm commit

HANTE 6 months ago
parent
commit
ec73ccab03

+ 2 - 1
conf/moct-utic-server-cluster.cfg

@@ -1,5 +1,6 @@
 server.id=1
 syncSeconds=5
-server.1=192.168.20.44:12888
+server.1=192.168.20.44:11888
 server.2=192.168.20.99:12888
+server.3=192.168.20.99:13888
 

+ 2 - 1
moct-utic-server/conf/moct-utic-server-cluster.cfg

@@ -1,4 +1,5 @@
 server.id=1
 syncSeconds=5
-server.1=192.168.20.44:12888
+server.1=192.168.20.44:11888
 server.2=192.168.20.99:12888
+server.3=192.168.20.99:13888

+ 1 - 0
moct-utic-server/conf/moct-utic-server.pid

@@ -0,0 +1 @@
+28168

BIN
moct-utic-server/moct-utic-server-0.0.1.jar


+ 4 - 2
moct-utic-server/src/main/java/com/its/moct/utic/server/config/HaClusterConfig.java

@@ -135,7 +135,8 @@ public class HaClusterConfig {
                                 .ipAddress(ipAddress)
                                 .syncPort(syncPort)
                                 .logging(false)
-                                .netState(new NetState())
+                                .electionState(new NetState())
+                                .syncState(new NetState())
                                 .build();
                         this.clusterMap.put(haCluster.getServerId(), haCluster);
                         log.info("{}", haCluster);
@@ -165,7 +166,8 @@ public class HaClusterConfig {
         private int syncPort;       // 포트 1: 데이터 동기화를 위한 포트
         private boolean logging;
 
-        private NetState netState;
+        private NetState electionState;
+        private NetState syncState;
 
         public String getLogKey() {
             return String.valueOf(this.serverId);

+ 38 - 28
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/master/ClusterMasterHandler.java

@@ -4,6 +4,7 @@ import com.its.common.network.NettyUtils;
 import com.its.moct.utic.server.config.HaClusterConfig;
 import com.its.moct.utic.server.repository.ApplicationRepository;
 import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessage;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.timeout.IdleState;
@@ -26,7 +27,16 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
             log.info("ClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, serverTime: {}, infos: {}]",
                     this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(ctx.channel()),
                     clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
-            ctx.writeAndFlush(clusterMsg);
+
+            HaClusterConfig.HaCluster cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+            if (cluster == null) {
+                log.error("RECV: [{}]. Not Found Channel Cluster Object... Oops Will be closed.", NettyUtils.getAddress(ctx.channel()));
+                closeChannel(ctx.channel());
+                return;
+            }
+
+            cluster.getElectionState().setLastRecvTime();
+//            ctx.writeAndFlush(clusterMsg);
         }
     }
 
@@ -34,13 +44,13 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         HaClusterConfig.HaCluster cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
         if (cluster == null) {
-            log.error("{}.++channelInactive: Unknown Center: {}.", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
+            log.error("{}.++channelInactive: Unknown Cluster: {}.", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
             return;
         }
         try {
             MDC.put("id", cluster.getLogKey());
             log.info("{}.++channelInactive: [{}, {}].", this.getClass().getSimpleName(), cluster.getServerId(), cluster.getIpAddress());
-            cluster.getNetState().disConnect();
+            cluster.getElectionState().disConnect();
 
             ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
             ctx.fireChannelInactive();
@@ -56,37 +66,24 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
         if (e instanceof IdleStateEvent) {
             HaClusterConfig.HaCluster cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-//            log.error("{}.userEventTriggered: Unknown Center: {}.", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
+                log.error("{}.userEventTriggered: Unknown Cluster: {}.", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
                 return;
             }
 
-            MDC.put("id", cluster.getLogKey());
-
             IdleStateEvent evt = (IdleStateEvent) e;
-//            log.info("{}.++userEventTriggered: {}. {}", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()), evt.state());
 
-            // 연결이 완료된 후 송수신 데이터가 일정시간 동안 없을 경우 이곳에서 처리
-            if (evt.state() == IdleState.READER_IDLE) {
-                long recvTimeout = System.currentTimeMillis() - cluster.getNetState().getLastRecvTime();
+            MDC.put("id", cluster.getLogKey());
+            log.info("{}.++userEventTriggered: {}. {}", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()), evt);
 
-//                if (cluster.getNetState().getState() <= NET.LOGIN_WAIT && (recvTimeout > (center.getResTime() * 1000L))) {
-//                    // 접속 후 로그인 처리가 되지 않은 경우
-//                    if (cluster.getNetState().getRetryCount() >= ItsAsn.SERVER_MAX_RETRY_COUNT) {
-//                        log.error("IDLE: [{}, {}]. Login Timeout, {}, {} ms. Will be closed.", cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, center.getResTime() * 1000L);
-//                        ApplicationRepository.closeChannel(center, ctx.channel());
-//                    }
-//                    else {
-//                        cluster.getNetState().retry();
-//                        log.warn("IDLE: [{}, {}]. Login Initialize Request, {}, {} ms. {} Counts.", cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, center.getResTime() * 1000L, center.getNetState().getRetryCount());
-//                    }
-//                    return;
-//                }
-//                if (recvTimeout > (ApplicationRepository.center.getHeartBeat() * 1000L)) {
-//                    // Heartbeat 동안 데이터 수신을 하지 못한 경우 연결을 종료한다.
-//                    log.info("IDLE: [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.", cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, center.getHeartBeat() * 1000L);
-////                    ApplicationRepository.closeChannel(center, ctx.channel());
-//                    return;
-//                }
+            if (evt.state() == IdleState.READER_IDLE) {
+                long recvTimeout = System.currentTimeMillis() - cluster.getElectionState().getLastRecvTime();
+                long heartbeatTimeout = this.clusterConfig.getSyncSeconds() * 1000L * 3;
+                if (recvTimeout > heartbeatTimeout) {
+                    log.info("{}.++userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
+                            this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()),
+                            cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, heartbeatTimeout);
+                    closeChannel(ctx.channel());
+                }
             }
             MDC.remove(cluster.getLogKey());
             MDC.clear();
@@ -94,6 +91,19 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
         ctx.fireUserEventTriggered(e);
     }
 
+    public static void closeChannel(Channel channel) {
+        try {
+            if (channel != null) {
+                channel.flush();
+                channel.disconnect();
+                channel.close();
+            }
+        }
+        catch (Exception e) {
+            log.error("ApplicationRepository.closeChannel Exception: {}", e.getMessage());
+        }
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         ctx.close();

+ 24 - 10
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/master/ClusterMasterInitializer.java

@@ -22,15 +22,29 @@ import java.util.concurrent.TimeUnit;
 public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
 
     private final ApplicationRepository repo;
-    private final HaClusterConfig cluster;
+    private final HaClusterConfig clusterConfig;
 
     @Override
     protected void initChannel(Channel channel) throws Exception {
+//        InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
+//        String clientIP = remoteAddress.getAddress().getHostAddress();
+//        int clientPort = remoteAddress.getPort();
+
         String ipAddress  = NettyUtils.getRemoteIpAddress(channel);
-        log.info("ItsAsnCommServerInitializer.----initChannel: connected from: {}", ipAddress);
-        HaClusterConfig.HaCluster cluster = this.cluster.get(ipAddress);
+        int clientPort = NettyUtils.getRemotePort(channel);
+        int serverId = clientPort - this.clusterConfig.getSyncPort();
+        log.info("ClusterMasterInitializer.----initChannel: connected from: {}:{}, ServerId: {}.",
+                ipAddress, clientPort, serverId);
+//        HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
+        HaClusterConfig.HaCluster cluster = this.clusterConfig.getClusterMap().get(serverId);
         if (cluster == null) {
-            log.error("ClusterMasterInitializer.----initChannel: [LXX, {}], Unknown ip address. will be closed.", ipAddress);
+            log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
+            channel.disconnect();
+            channel.close();
+            return;
+        }
+        if (!cluster.getIpAddress().equals(ipAddress)) {
+            log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress);
             channel.disconnect();
             channel.close();
             return;
@@ -40,29 +54,29 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
             MDC.put("id", cluster.getLogKey());
 
             log.info("ClusterMasterInitializer.----initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
-            if (cluster.getNetState().getChannel() != null) {
+            if (cluster.getElectionState().getChannel() != null) {
                 log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
                 // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
                 channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
-                cluster.getNetState().disConnect();
+                cluster.getElectionState().disConnect();
 
                 channel.disconnect();
                 channel.close();
             }
 
-            cluster.getNetState().connect(channel);
+            cluster.getElectionState().connect(channel);
             channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
 
-            IdleStateHandler idleStateHandler = new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS);
+            IdleStateHandler idleStateHandler = new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS);
             ChannelPipeline pipeline = channel.pipeline();
-            if (this.cluster.isLogging()) {
+            if (this.clusterConfig.isLogging()) {
                 pipeline.addLast(new LoggingHandler(LogLevel.INFO));
             }
             pipeline.addLast(idleStateHandler);
 
             pipeline.addLast(new ClusterMessageDecoder());
             pipeline.addLast(new ClusterMessageEncoder());
-            pipeline.addLast(new ClusterMasterHandler(this.repo, this.cluster));
+            pipeline.addLast(new ClusterMasterHandler(this.repo, this.clusterConfig));
         }
         finally {
             MDC.remove(cluster.getLogKey());

+ 54 - 21
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/master/ClusterMasterService.java

@@ -4,6 +4,7 @@ import com.its.beanit.utils.ItsAsn;
 import com.its.common.network.NettyUtils;
 import com.its.common.utils.OsPlatform;
 import com.its.moct.utic.server.config.HaClusterConfig;
+import com.its.moct.utic.server.dto.NET;
 import com.its.moct.utic.server.repository.ApplicationRepository;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -13,20 +14,34 @@ import io.netty.channel.epoll.Epoll;
 import io.netty.channel.nio.NioEventLoopGroup;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
 @Slf4j
 @Service
 @RequiredArgsConstructor
 public class ClusterMasterService {
 
+    private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+    private ScheduledFuture<?> taskFuture;
+
     private final ApplicationRepository repo;
-    private final HaClusterConfig cluster;
+    private final HaClusterConfig clusterConfig;
 
     private EventLoopGroup acceptGroup;
     private EventLoopGroup workerGroup;
     private ChannelFuture channelFuture;
 
+    @PostConstruct
+    void init() {
+        this.taskScheduler.setPoolSize(1);
+        this.taskScheduler.initialize();
+    }
+
     public void start() {
         if (!OsPlatform.isWindows()) {
             if (!Epoll.isAvailable()) {
@@ -34,41 +49,31 @@ public class ClusterMasterService {
             }
         }
         if (NettyUtils.isEpollAvailable()) {
-            log.info("클러스터 마스터가 리눅스 EPOLL 모드에서 실행됩니다.");
+            log.info("The Cluster Master runs in LINUX EPOLL mode.");
         }
         else {
-            log.info("클러스터 마스터가 윈도우 NIO 모드에서 실행됩니다.");
+            log.info("The Cluster Master runs in Windows NIO mode.");
         }
 
         this.acceptGroup = new NioEventLoopGroup();
         this.workerGroup = new NioEventLoopGroup();
         ServerBootstrap serverBootstrap = createBootstrap();
-//        b.group(this.acceptGroup, this.workerGroup)
-//                .channel(NioServerSocketChannel.class)
-//                .childHandler(new ChannelInitializer<SocketChannel>() {
-//                    @Override
-//                    public void initChannel(SocketChannel ch) throws Exception {
-//                        ChannelPipeline p = ch.pipeline();
-//                        p.addLast(new ClusterMessageDecoder());
-//                        p.addLast(new ClusterMessageEncoder());
-//                        p.addLast(new ClusterMasterHandler(repo, cluster));
-//                    }
-//                });
 
         log.info("*********************************************************************************");
         log.info("**            UTIC MOCT HA Cluster Master Server Information                   **");
-        log.info("**     bindAddress: {}", this.cluster.getIpAddress());
-        log.info("**      listenPort: {}", this.cluster.getSyncPort());
-        log.info("**        isMaster: {}", this.cluster.isMaster());
+        log.info("**     bindAddress: {}", this.clusterConfig.getIpAddress());
+        log.info("**      listenPort: {}", this.clusterConfig.getSyncPort());
+        log.info("**        isMaster: {}", this.clusterConfig.isMaster());
         log.info("*********************************************************************************");
 
         try {
-            if (this.cluster.getIpAddress().equals("0.0.0.0")) {
-                this.channelFuture = serverBootstrap.bind(this.cluster.getSyncPort());
+            if (this.clusterConfig.getIpAddress().equals("0.0.0.0")) {
+                this.channelFuture = serverBootstrap.bind(this.clusterConfig.getSyncPort());
             }
             else {
-                this.channelFuture = serverBootstrap.bind(this.cluster.getIpAddress(), this.cluster.getSyncPort());
+                this.channelFuture = serverBootstrap.bind(this.clusterConfig.getIpAddress(), this.clusterConfig.getSyncPort());
             }
+            electionMasterSchedule();
         }
         catch (Exception e) {
             log.error("cluster start, InterruptedException");
@@ -98,14 +103,42 @@ public class ClusterMasterService {
 
         ClusterMasterInitializer clusterMasterInitializer = new ClusterMasterInitializer(
                 this.repo,
-                this.cluster
+                this.clusterConfig
         );
         serverBootstrap.childHandler(clusterMasterInitializer);
 
         return serverBootstrap;
     }
 
+    private void electionMasterSchedule() {
+        this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
+            int masterId = Integer.MAX_VALUE;
+            for (Map.Entry<Integer, HaClusterConfig.HaCluster> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                HaClusterConfig.HaCluster cluster = entry.getValue();
+                if (cluster.getElectionState().getState() != NET.CLOSED) {
+                    if (cluster.getServerId() < masterId) {
+                        masterId = cluster.getServerId();
+                    }
+                }
+            }
+//            log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, masterId: {}", this.clusterConfig.getServerId(), masterId);
+            if (masterId == Integer.MAX_VALUE || masterId > this.clusterConfig.getServerId()) {
+                this.clusterConfig.setMaster(true);
+            }
+            else {
+                this.clusterConfig.setMaster(false);
+            }
+            log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, Master: {}.",
+                    this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
+        }, 2 * 1000L);
+    }
+
     public void shutdown() {
+        if (this.taskFuture != null) {
+            this.taskFuture.cancel(true);
+        }
+        this.taskScheduler.shutdown();
+
         try {
             if (this.acceptGroup != null) {
                 this.acceptGroup.shutdownGracefully();

+ 23 - 26
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/slave/ClusterSlave.java

@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 @RequiredArgsConstructor
 public class ClusterSlave  implements Callable<Object> {
 
+    private final ClusterSlaveService slaveService;
     private final ApplicationRepository repo;
     private final HaClusterConfig clusterConfig;
     private final HaClusterConfig.HaCluster cluster;
@@ -47,9 +48,10 @@ public class ClusterSlave  implements Callable<Object> {
             this.ipAddress = this.cluster.getIpAddress();
             this.port = this.cluster.getSyncPort();
 
-            log.info("ClusterSlave >>>>>>>>Start: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
             if (this.bootstrap == null) {
+                log.info("ClusterSlave >>>>>>>>Start: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
                 this.bootstrap = this.bootstrapFactory.createBootstrap();
+                this.bootstrap.option(ChannelOption.SO_REUSEADDR, true);
                 this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5 * 1000);
                 this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                     // 핸들러가 실행되는 순서는 추가된 순서에 의해 결정된다.(Inbound: head=>tail, Outbound: tail=>head, name2ctx)
@@ -60,12 +62,14 @@ public class ClusterSlave  implements Callable<Object> {
                         }
                         IdleStateHandler idleStateHandler = new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS);
 
-                        ch.pipeline().addLast("itsClusterIdleStateHandler", idleStateHandler);
+                        ch.pipeline().addLast(idleStateHandler);
                         ch.pipeline().addLast(new ClusterMessageDecoder());
                         ch.pipeline().addLast(new ClusterMessageEncoder());
-                        ch.pipeline().addLast(new ClusterSlaveHandler(repo, clusterConfig, cluster));
+                        ch.pipeline().addLast(new ClusterSlaveHandler(slaveService, repo, clusterConfig, cluster));
                     }
                 });
+                // 바인드 로컬 포트 설정
+                this.bootstrap.localAddress(new InetSocketAddress(this.port + this.clusterConfig.getServerId()));
             }
 
             log.info("ClusterSlave >>Connect Try: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
@@ -79,17 +83,7 @@ public class ClusterSlave  implements Callable<Object> {
             this.channelFuture.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) {
-                    try {
-                        if (future.isSuccess()) {
-                            channelOpen(future.channel());
-                        } else {
-                            log.warn("ClusterSlave ConnectFailed: [{}, {}], {}, Exception {}", cluster.getServerId(), cluster.getIpAddress(), cluster.getSyncPort(), future.cause().getMessage());
-                        }
-                    }
-                    finally {
-                        MDC.remove(cluster.getLogKey());
-                        MDC.clear();
-                    }
+                    channelOpen(future);
                 }
             });
 
@@ -97,7 +91,7 @@ public class ClusterSlave  implements Callable<Object> {
             this.channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) {
-                    channelClosed(future.channel());
+                    channelClosed(future);
                 }
             });
             return null;
@@ -111,15 +105,18 @@ public class ClusterSlave  implements Callable<Object> {
     /**
      * 연결 성공시 처리 이벤트
      */
-    protected void channelOpen(Channel channel) {
-        log.info("ClusterSlave ..channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
-        channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
-        this.cluster.getNetState().connect(channel);
-
+    protected void channelOpen(ChannelFuture future) {
         try {
             MDC.put("id", this.cluster.getLogKey());
-            // 서버에 최초 접속한 경우 AI_Login Message Send
-//            AiLogin.run(this.center);
+            if (future.isSuccess()) {
+                Channel channel = future.channel();
+                log.info("ClusterSlave ..channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
+                channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
+                this.cluster.getSyncState().connect(channel);
+            }
+            else {
+                log.warn("ClusterSlave ConnectFailed: [{}, {}], {}, Cause: {}", cluster.getServerId(), cluster.getIpAddress(), cluster.getSyncPort(), future.cause().getMessage());
+            }
         }
         finally {
             MDC.remove(this.cluster.getLogKey());
@@ -129,17 +126,17 @@ public class ClusterSlave  implements Callable<Object> {
 
     /**
      * 연결 종료시 처리 이벤트
-     * @param channel
+     * @param future
      */
-    protected synchronized void channelClosed(Channel channel) {
+    protected synchronized void channelClosed(ChannelFuture future) {
         try {
             MDC.put("id", this.cluster.getLogKey());
 
+            Channel channel = future.channel();
             log.warn("ClusterSlave channelClosed: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
 
             channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
-//            this.center.setRecvPktNmbr(0);
-            this.cluster.getNetState().disConnect();
+            this.cluster.getSyncState().disConnect();
             channel.close();
             channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);
         }

+ 3 - 44
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/slave/ClusterSlaveHandler.java

@@ -1,28 +1,19 @@
 package com.its.moct.utic.server.xnet.cluster.slave;
 
-import com.its.common.network.NettyUtils;
-import com.its.common.utils.SysUtils;
 import com.its.moct.utic.server.config.HaClusterConfig;
-import com.its.moct.utic.server.dto.CenterDto;
 import com.its.moct.utic.server.repository.ApplicationRepository;
-import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessage;
-import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessageData;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.ScheduledFuture;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 
 @Slf4j
 @RequiredArgsConstructor
 public class ClusterSlaveHandler extends ChannelInboundHandlerAdapter {
 
+    private final ClusterSlaveService slaveService;
     private final ApplicationRepository repo;
     private final HaClusterConfig clusterConfig;
     private final HaClusterConfig.HaCluster cluster;
@@ -31,40 +22,7 @@ public class ClusterSlaveHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelActive(final ChannelHandlerContext ctx) {
-        sendData(ctx);
-//        scheduleNextDataSend(ctx);
-    }
-
-    private void sendData(ChannelHandlerContext ctx) {
-        List<ClusterMessageData> details = new ArrayList<>();
-        List<String> keySet = new ArrayList<>(this.repo.getCenterMap().keySet());
-        Collections.sort(keySet);
-        for (String key : keySet) {
-            CenterDto region = this.repo.getCenterMap().get(key);
-            if (region == null) {
-                continue;
-            }
-//            details.add(region.getClusterData());
-        }
-        ClusterMessage clusterMsg = ClusterMessage.builder()
-                .serverId(this.clusterConfig.getServerId())
-                .serverTime(SysUtils.getSysTime())
-                .infos(details)
-                .build();
-        log.info("-ClusterSlaveHandler.sendData___: [{}], {}, [--TO: serverId: {}, serverTime: {}, infos: {}]",
-                this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(ctx.channel()),
-                clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
-        ctx.writeAndFlush(clusterMsg);
-    }
-
-    private void scheduleNextDataSend(final ChannelHandlerContext ctx) {
-        future = ctx.executor().schedule(new Runnable() {
-            @Override
-            public void run() {
-                sendData(ctx);
-                scheduleNextDataSend(ctx);
-            }
-        }, 5, TimeUnit.SECONDS);
+        this.slaveService.sendSyncData(this.cluster, ctx.channel(), null);
     }
 
 //    @Override
@@ -81,6 +39,7 @@ public class ClusterSlaveHandler extends ChannelInboundHandlerAdapter {
         if (future != null) {
             future.cancel(true);
         }
+//        this.cluster.getSyncState().disConnect();
         super.channelInactive(ctx);
     }
 

+ 34 - 29
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/slave/ClusterSlaveService.java

@@ -13,7 +13,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.Scheduled;
+import org.slf4j.MDC;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.stereotype.Service;
 
@@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledFuture;
 public class ClusterSlaveService {
 
     private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+    private ScheduledFuture<?> taskFuture;
 
     private final ApplicationRepository repo;
     private final HaClusterConfig clusterConfig;
@@ -41,8 +42,6 @@ public class ClusterSlaveService {
     private final ExecutorService executorService= Executors.newFixedThreadPool(1);
     private final List<ClusterSlave> clientTasks = Collections.synchronizedList(new ArrayList<>());
 
-    private ScheduledFuture<?> taskFuture;
-
     @PostConstruct
     void init() {
         this.bootstrapFactory = new ClusterSlaveBootstrapFactory(1, 5);
@@ -61,7 +60,7 @@ public class ClusterSlaveService {
             if (cluster.getServerId() == this.clusterConfig.getServerId()) {
                 continue;
             }
-            ClusterSlave slaveClient = new ClusterSlave(repo, clusterConfig, cluster, this.bootstrapFactory);
+            ClusterSlave slaveClient = new ClusterSlave(this, repo, clusterConfig, cluster, this.bootstrapFactory);
             this.clientTasks.add(slaveClient);
         }
 
@@ -69,7 +68,7 @@ public class ClusterSlaveService {
             List<Future<Object>> futures = this.executorService.invokeAll(this.clientTasks);
             log.info("ClusterSlaveService.run: futures, {} EA.", (long)futures.size());
 
-            startSyncSchedule();
+            dataSyncSchedule();
         }
         catch(InterruptedException e) {
             log.error("ClusterSlaveService.run: Exception: InterruptedException");
@@ -89,7 +88,7 @@ public class ClusterSlaveService {
             if (cluster.getServerId() == this.clusterConfig.getServerId()) {
                 continue;
             }
-            channelClose(cluster.getNetState().getChannel());
+            channelClose(cluster.getSyncState().getChannel());
         }
         try {
             if (this.bootstrapFactory != null && this.bootstrapFactory.getEventLoopGroup() != null) {
@@ -129,8 +128,8 @@ public class ClusterSlaveService {
                 .build();
     }
 
-    private void startSyncSchedule() {
-        log.info("ClusterSlaveService: startSyncSchedule: {} seconds.", this.clusterConfig.getSyncSeconds());
+    private void dataSyncSchedule() {
+        log.info("ClusterSlaveService:dataSyncSchedule: {} seconds.", this.clusterConfig.getSyncSeconds());
         this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
             ClusterMessage clusterMsg = getClusterMessage();
             for (Map.Entry<Integer, HaClusterConfig.HaCluster> entry : this.clusterConfig.getClusterMap().entrySet()) {
@@ -138,33 +137,39 @@ public class ClusterSlaveService {
                 if (cluster.getServerId() == this.clusterConfig.getServerId()) {
                     continue;
                 }
-                if (cluster.getNetState().getState() != NET.CLOSED) {
-                    try {
-                        ChannelFuture f = cluster.getNetState().getChannel().writeAndFlush(clusterMsg);
-                        f.awaitUninterruptibly();
-                        if (f.isDone() || f.isSuccess()) {
-                            log.info("-ClusterSlaveHandler.sendData___: [{}], {}, [--TO: serverId: {}, serverTime: {}, infos: {}]",
-                                    this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(cluster.getNetState().getChannel()),
-                                    clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
-                        }
-                    }
-                    catch (Exception e) {
-                        log.info("-ClusterSlaveHandler.sendData___: [{}], {}, Failed: [--TO: serverId: {}, serverTime: {}, infos: {}]",
-                                this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(cluster.getNetState().getChannel()),
-                                clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
-                        log.info("-ClusterSlaveHandler.sendData___: [{}], {}, Failed: {}",
-                                this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(cluster.getNetState().getChannel()), e.getMessage());
-                    }
+                if (cluster.getSyncState().getState() != NET.CLOSED) {
+                    sendSyncData(cluster, cluster.getSyncState().getChannel(), clusterMsg);
                 }
             }
         }, this.clusterConfig.getSyncSeconds() * 1000L);
 
     }
 
-    @Scheduled(fixedRate = 5000)
-    public void logPeriodicMessage() {
-        log.info("This is a periodic log message every 5 seconds.");
+    public void sendSyncData(final HaClusterConfig.HaCluster cluster, final Channel channel, ClusterMessage clusterMsg) {
+        if (null == clusterMsg) {
+            clusterMsg = getClusterMessage();
+        }
+        try {
+            MDC.put("id", cluster.getLogKey());
+            ChannelFuture f = channel.writeAndFlush(clusterMsg);
+            f.awaitUninterruptibly();
+            if (f.isDone() || f.isSuccess()) {
+                log.info("ClusterSlaveService.sendSyncData: [{}], {}, [--TO: serverId: {}, serverTime: {}, infos: {}]",
+                        this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(channel),
+                        clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
+            }
+        }
+        catch (Exception e) {
+            log.info("ClusterSlaveService.sendSyncData: [{}], {}, Failed: [--TO: serverId: {}, serverTime: {}, infos: {}]",
+                    this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(channel),
+                    clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
+            log.info("ClusterSlaveService.sendSyncData: [{}], {}, Failed: {}",
+                    this.clusterConfig.getServerId(), NettyUtils.getTcpAddress(channel), e.getMessage());
+        }
+        finally {
+            MDC.remove(cluster.getLogKey());
+            MDC.clear();
+        }
     }
-
 }
 

+ 2 - 2
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/server/ItsAsnCommServerService.java

@@ -38,10 +38,10 @@ public class ItsAsnCommServerService {
             }
         }
         if (NettyUtils.isEpollAvailable()) {
-            log.info("서버가 리눅스 EPOLL 모드에서 실행됩니다.");
+            log.info("The MOCT ASN.1 Server runs in Linux EPOLL mode.");
         }
         else {
-            log.info("서버가 윈도우 NIO 모드에서 실행됩니다.");
+            log.info("The MOCT ASN.1 runs in Windows NIO mode.");
         }
 
         this.serverBootstrap = createBootstrap();

+ 1 - 1
moct-utic-server/src/main/resources/logback-spring-appender.xml

@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <included>
     <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-                <withJansi>true</withJansi>
+<!--                <withJansi>true</withJansi>-->
         <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
             <charset>${LOG_CHARSET}</charset>
             <pattern>${LOG_PATTERN_CONSOLE}</pattern>

+ 2 - 0
moct-utic-server/src/main/resources/logback-spring.xml

@@ -29,6 +29,8 @@
     <property name="LOG_PATTERN_SCHEDULE"    value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
     <property name="LOG_PATTERN_ASPECT"      value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] %msg%n"/>
     <property name="LOG_PATTERN_CONSOLE"     value="[%d{HH:mm:ss.SSS}] [%5level] %msg %n"/>
+<!--    <property name="LOG_PATTERN_CONSOLE"     value="[%d{HH:mm:ss.SSS}] %highlight([%5level]) %cyan(%msg) %n"/>-->
+<!--    <property name="LOG_PATTERN_CONSOLE"     value="[%d{HH:mm:ss.SSS}] %highlight([%5level]) %msg %n"/>-->
 
     <springProfile name="!xxx">
         <include resource="logback-spring-appender.xml"/>