فهرست منبع

moct cluster add

HANTE 6 ماه پیش
والد
کامیت
9184655e8d

+ 0 - 1
conf/moct-utic-server.pid

@@ -1 +0,0 @@
-25680

+ 10 - 0
moct-utic-server/src/main/java/com/its/moct/utic/server/config/HaClusterConfig.java

@@ -34,6 +34,7 @@ public class HaClusterConfig {
 //    private int electionPort;   // 포트 2: 리더선출을 위한 포트
 
     private String configFile;
+    private boolean logging = false;
 
     private final HashMap<Integer, HaCluster> clusterMap = new HashMap<>();
 
@@ -171,4 +172,13 @@ public class HaClusterConfig {
         }
     }
 
+    public HaCluster get(String ipAddress) {
+        for (Map.Entry<Integer, HaCluster> entry : this.clusterMap.entrySet()) {
+            HaCluster cluster = entry.getValue();
+            if (cluster.getIpAddress().equals(ipAddress)) {
+                return cluster;
+            }
+        }
+        return null;
+    }
 }

+ 67 - 0
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/master/ClusterMasterHandler.java

@@ -6,8 +6,11 @@ import com.its.moct.utic.server.repository.ApplicationRepository;
 import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessage;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.slf4j.MDC;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -27,6 +30,70 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
         }
     }
 
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        HaClusterConfig.HaCluster cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+        if (cluster == null) {
+            log.error("{}.++channelInactive: Unknown Center: {}.", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
+            return;
+        }
+        try {
+            MDC.put("id", cluster.getLogKey());
+            log.info("{}.++channelInactive: [{}, {}].", this.getClass().getSimpleName(), cluster.getServerId(), cluster.getIpAddress());
+            cluster.getNetState().disConnect();
+
+            ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+            ctx.fireChannelInactive();
+        }
+        finally {
+            MDC.remove(cluster.getLogKey());
+            MDC.clear();
+        }
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object e) throws Exception {
+        if (e instanceof IdleStateEvent) {
+            HaClusterConfig.HaCluster cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+            if (cluster == null) {
+//            log.error("{}.userEventTriggered: Unknown Center: {}.", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
+                return;
+            }
+
+            MDC.put("id", cluster.getLogKey());
+
+            IdleStateEvent evt = (IdleStateEvent) e;
+//            log.info("{}.++userEventTriggered: {}. {}", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()), evt.state());
+
+            // 연결이 완료된 후 송수신 데이터가 일정시간 동안 없을 경우 이곳에서 처리
+            if (evt.state() == IdleState.READER_IDLE) {
+                long recvTimeout = System.currentTimeMillis() - cluster.getNetState().getLastRecvTime();
+
+//                if (cluster.getNetState().getState() <= NET.LOGIN_WAIT && (recvTimeout > (center.getResTime() * 1000L))) {
+//                    // 접속 후 로그인 처리가 되지 않은 경우
+//                    if (cluster.getNetState().getRetryCount() >= ItsAsn.SERVER_MAX_RETRY_COUNT) {
+//                        log.error("IDLE: [{}, {}]. Login Timeout, {}, {} ms. Will be closed.", cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, center.getResTime() * 1000L);
+//                        ApplicationRepository.closeChannel(center, ctx.channel());
+//                    }
+//                    else {
+//                        cluster.getNetState().retry();
+//                        log.warn("IDLE: [{}, {}]. Login Initialize Request, {}, {} ms. {} Counts.", cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, center.getResTime() * 1000L, center.getNetState().getRetryCount());
+//                    }
+//                    return;
+//                }
+//                if (recvTimeout > (ApplicationRepository.center.getHeartBeat() * 1000L)) {
+//                    // Heartbeat 동안 데이터 수신을 하지 못한 경우 연결을 종료한다.
+//                    log.info("IDLE: [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.", cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, center.getHeartBeat() * 1000L);
+////                    ApplicationRepository.closeChannel(center, ctx.channel());
+//                    return;
+//                }
+            }
+            MDC.remove(cluster.getLogKey());
+            MDC.clear();
+        }
+        ctx.fireUserEventTriggered(e);
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         ctx.close();

+ 73 - 0
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/master/ClusterMasterInitializer.java

@@ -0,0 +1,73 @@
+package com.its.moct.utic.server.xnet.cluster.master;
+
+import com.its.common.network.NettyUtils;
+import com.its.moct.utic.server.config.HaClusterConfig;
+import com.its.moct.utic.server.repository.ApplicationRepository;
+import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessageDecoder;
+import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessageEncoder;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.MDC;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@RequiredArgsConstructor
+public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
+
+    private final ApplicationRepository repo;
+    private final HaClusterConfig cluster;
+
+    @Override
+    protected void initChannel(Channel channel) throws Exception {
+        String ipAddress  = NettyUtils.getRemoteIpAddress(channel);
+        log.info("ItsAsnCommServerInitializer.----initChannel: connected from: {}", ipAddress);
+        HaClusterConfig.HaCluster cluster = this.cluster.get(ipAddress);
+        if (cluster == null) {
+            log.error("ClusterMasterInitializer.----initChannel: [LXX, {}], Unknown ip address. will be closed.", ipAddress);
+            channel.disconnect();
+            channel.close();
+            return;
+        }
+
+        try {
+            MDC.put("id", cluster.getLogKey());
+
+            log.info("ClusterMasterInitializer.----initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
+            if (cluster.getNetState().getChannel() != null) {
+                log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
+                // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
+                channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+                cluster.getNetState().disConnect();
+
+                channel.disconnect();
+                channel.close();
+            }
+
+            cluster.getNetState().connect(channel);
+            channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
+
+            IdleStateHandler idleStateHandler = new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS);
+            ChannelPipeline pipeline = channel.pipeline();
+            if (this.cluster.isLogging()) {
+                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
+            }
+            pipeline.addLast(idleStateHandler);
+
+            pipeline.addLast(new ClusterMessageDecoder());
+            pipeline.addLast(new ClusterMessageEncoder());
+            pipeline.addLast(new ClusterMasterHandler(this.repo, this.cluster));
+        }
+        finally {
+            MDC.remove(cluster.getLogKey());
+            MDC.clear();
+        }
+    }
+
+}

+ 84 - 28
moct-utic-server/src/main/java/com/its/moct/utic/server/xnet/cluster/master/ClusterMasterService.java

@@ -1,17 +1,16 @@
 package com.its.moct.utic.server.xnet.cluster.master;
 
+import com.its.beanit.utils.ItsAsn;
+import com.its.common.network.NettyUtils;
+import com.its.common.utils.OsPlatform;
 import com.its.moct.utic.server.config.HaClusterConfig;
 import com.its.moct.utic.server.repository.ApplicationRepository;
-import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessageDecoder;
-import com.its.moct.utic.server.xnet.cluster.utils.ClusterMessageEncoder;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -23,25 +22,38 @@ public class ClusterMasterService {
 
     private final ApplicationRepository repo;
     private final HaClusterConfig cluster;
-    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup acceptGroup;
     private EventLoopGroup workerGroup;
     private ChannelFuture channelFuture;
 
     public void start() {
-        this.bossGroup = new NioEventLoopGroup();
+        if (!OsPlatform.isWindows()) {
+            if (!Epoll.isAvailable()) {
+                Epoll.unavailabilityCause().printStackTrace();
+            }
+        }
+        if (NettyUtils.isEpollAvailable()) {
+            log.info("클러스터 마스터가 리눅스 EPOLL 모드에서 실행됩니다.");
+        }
+        else {
+            log.info("클러스터 마스터가 윈도우 NIO 모드에서 실행됩니다.");
+        }
+
+        this.acceptGroup = new NioEventLoopGroup();
         this.workerGroup = new NioEventLoopGroup();
-        ServerBootstrap b = new ServerBootstrap();
-        b.group(this.bossGroup, this.workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    public void initChannel(SocketChannel ch) throws Exception {
-                        ChannelPipeline p = ch.pipeline();
-                        p.addLast(new ClusterMessageDecoder());
-                        p.addLast(new ClusterMessageEncoder());
-                        p.addLast(new ClusterMasterHandler(repo, cluster));
-                    }
-                });
+        ServerBootstrap serverBootstrap = createBootstrap();
+//        b.group(this.acceptGroup, this.workerGroup)
+//                .channel(NioServerSocketChannel.class)
+//                .childHandler(new ChannelInitializer<SocketChannel>() {
+//                    @Override
+//                    public void initChannel(SocketChannel ch) throws Exception {
+//                        ChannelPipeline p = ch.pipeline();
+//                        p.addLast(new ClusterMessageDecoder());
+//                        p.addLast(new ClusterMessageEncoder());
+//                        p.addLast(new ClusterMasterHandler(repo, cluster));
+//                    }
+//                });
 
         log.info("*********************************************************************************");
         log.info("**            UTIC MOCT HA Cluster Master Server Information                   **");
@@ -52,10 +64,10 @@ public class ClusterMasterService {
 
         try {
             if (this.cluster.getIpAddress().equals("0.0.0.0")) {
-                this.channelFuture = b.bind(this.cluster.getSyncPort());
+                this.channelFuture = serverBootstrap.bind(this.cluster.getSyncPort());
             }
             else {
-                this.channelFuture = b.bind(this.cluster.getIpAddress(), this.cluster.getSyncPort());
+                this.channelFuture = serverBootstrap.bind(this.cluster.getIpAddress(), this.cluster.getSyncPort());
             }
         }
         catch (Exception e) {
@@ -63,16 +75,60 @@ public class ClusterMasterService {
             shutdown();
         }
     }
+    public ServerBootstrap createBootstrap() {
+        ServerBootstrap serverBootstrap = new ServerBootstrap();
+        EventLoopGroup acceptGroups;
+        EventLoopGroup workerGroups;
+
+        acceptGroups = NettyUtils.newEventLoopGroup(1, "Accept");
+        workerGroups = NettyUtils.newEventLoopGroup(1, "Worker");
+        serverBootstrap.channel(NettyUtils.getServerSocketChannel());
+        serverBootstrap.group(acceptGroups, workerGroups);
+
+        serverBootstrap.option(ChannelOption.AUTO_READ, true);
+        serverBootstrap.option(ChannelOption.SO_BACKLOG, 2);
+        serverBootstrap.option(ChannelOption.SO_RCVBUF, ItsAsn.ITS_ASN_PACKET_MAX_SIZE);//config.getRcvBuf());
+        serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
+        serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5*1000);
+
+        serverBootstrap.childOption(ChannelOption.SO_LINGER, 0);           // 4way-handshake 비활성
+        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false);    // KEEPALIVE 비활성(활성: true)
+        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);     // 소켓 재사용
+        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);      // Nagle 알고리즘 비활성화
+
+        ClusterMasterInitializer clusterMasterInitializer = new ClusterMasterInitializer(
+                this.repo,
+                this.cluster
+        );
+        serverBootstrap.childHandler(clusterMasterInitializer);
+
+        return serverBootstrap;
+    }
 
     public void shutdown() {
-        if (this.channelFuture != null) {
-            this.channelFuture.channel().close();
+        try {
+            if (this.acceptGroup != null) {
+                this.acceptGroup.shutdownGracefully();
+            }
+        }
+        catch (Exception e) {
+            log.info("ClusterMasterService.acceptGroup.shutdownGracefully");
+        }
+        try {
+            if (this.workerGroup != null) {
+                this.workerGroup.shutdownGracefully();
+            }
+        }
+        catch (Exception e) {
+            log.info("ClusterMasterService.workerGroup.shutdownGracefully");
         }
-        if (this.bossGroup != null) {
-            this.bossGroup.shutdownGracefully();
+        try {
+            if (this.channelFuture != null && this.channelFuture.channel() != null) {
+                this.channelFuture.channel().closeFuture();
+            }
         }
-        if (this.workerGroup != null) {
-            this.workerGroup.shutdownGracefully();
+        catch (Exception e) {
+            log.info("ClusterMasterService.closeFuture");
         }
     }
 }