Browse Source

update its-cluster library

HANTE 1 month ago
parent
commit
64b8a5dbea

+ 23 - 20
its-cluster/src/main/java/com/its/common/cluster/master/ClusterMasterService.java → its-cluster/src/main/java/com/its/common/cluster/master/AbstractHaClusterMasterService.java

@@ -1,9 +1,10 @@
 package com.its.common.cluster.master;
 
-import com.its.common.cluster.utils.ClusterPlatform;
-import com.its.common.cluster.utils.ClusterUtils;
-import com.its.common.cluster.vo.HaClusterConfig;
-import com.its.common.cluster.vo.HaClusterInfo;
+import com.its.common.cluster.utils.HaClusterMessage;
+import com.its.common.cluster.utils.HaPlatform;
+import com.its.common.cluster.utils.HaUtils;
+import com.its.common.cluster.vo.AbstractHaClusterConfig;
+import com.its.common.cluster.vo.HaInfo;
 import com.its.common.cluster.vo.HaNET;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -14,7 +15,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.util.Map;
@@ -22,13 +22,12 @@ import java.util.concurrent.ScheduledFuture;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ClusterMasterService {
+public abstract class AbstractHaClusterMasterService {
 
     private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
-    private ScheduledFuture<?> taskFuture;
-
-    private final HaClusterConfig clusterConfig;
+    private final AbstractHaClusterConfig clusterConfig;
 
+    private ScheduledFuture<?> taskFuture;
     private EventLoopGroup acceptGroup;
     private EventLoopGroup workerGroup;
     private ChannelFuture channelFuture;
@@ -40,12 +39,12 @@ public class ClusterMasterService {
     }
 
     public void start() {
-        if (!ClusterPlatform.isWindows()) {
+        if (!HaPlatform.isWindows()) {
             if (!Epoll.isAvailable()) {
                 Epoll.unavailabilityCause().printStackTrace();
             }
         }
-        if (ClusterUtils.isEpollAvailable()) {
+        if (HaUtils.isEpollAvailable()) {
             log.info("The Cluster Master runs in LINUX EPOLL mode.");
         }
         else {
@@ -82,9 +81,9 @@ public class ClusterMasterService {
         EventLoopGroup acceptGroups;
         EventLoopGroup workerGroups;
 
-        acceptGroups = ClusterUtils.newEventLoopGroup(1, "Accept");
-        workerGroups = ClusterUtils.newEventLoopGroup(1, "Worker");
-        serverBootstrap.channel(ClusterUtils.getServerSocketChannel());
+        acceptGroups = HaUtils.newEventLoopGroup(1, "Accept");
+        workerGroups = HaUtils.newEventLoopGroup(1, "Worker");
+        serverBootstrap.channel(HaUtils.getServerSocketChannel());
         serverBootstrap.group(acceptGroups, workerGroups);
 
         serverBootstrap.option(ChannelOption.AUTO_READ, true);
@@ -98,19 +97,20 @@ public class ClusterMasterService {
         serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);     // 소켓 재사용
         serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);      // Nagle 알고리즘 비활성화
 
-        ClusterMasterInitializer clusterMasterInitializer = new ClusterMasterInitializer(
-                this.clusterConfig
-        );
-        serverBootstrap.childHandler(clusterMasterInitializer);
+        HaClusterMasterInitializer haClusterMasterInitializer = new HaClusterMasterInitializer(this, this.clusterConfig);
+        serverBootstrap.childHandler(haClusterMasterInitializer);
 
         return serverBootstrap;
     }
 
+    public abstract void election(boolean isMaster);
+    public abstract void onClusterMessage(HaClusterMessage message);
+
     private void electionMasterSchedule() {
         this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
             int masterId = Integer.MAX_VALUE;
-            for (Map.Entry<Integer, HaClusterInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-                HaClusterInfo cluster = entry.getValue();
+            for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                HaInfo cluster = entry.getValue();
                 if (cluster.getElectionState().getState() != HaNET.CLOSED) {
                     if (cluster.getServerId() < masterId) {
                         masterId = cluster.getServerId();
@@ -126,6 +126,9 @@ public class ClusterMasterService {
             }
             log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, Master: {}.",
                     this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
+
+            election(this.clusterConfig.isMaster());
+
         }, 2 * 1000L);
     }
 

+ 20 - 18
its-cluster/src/main/java/com/its/common/cluster/master/ClusterMasterHandler.java → its-cluster/src/main/java/com/its/common/cluster/master/HaClusterMasterHandler.java

@@ -1,9 +1,9 @@
 package com.its.common.cluster.master;
 
-import com.its.common.cluster.utils.ClusterMessage;
-import com.its.common.cluster.utils.ClusterUtils;
-import com.its.common.cluster.vo.HaClusterConfig;
-import com.its.common.cluster.vo.HaClusterInfo;
+import com.its.common.cluster.utils.HaClusterMessage;
+import com.its.common.cluster.utils.HaUtils;
+import com.its.common.cluster.vo.AbstractHaClusterConfig;
+import com.its.common.cluster.vo.HaInfo;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -15,35 +15,37 @@ import org.slf4j.MDC;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
+public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
 
-    private final HaClusterConfig clusterConfig;
+    private final AbstractHaClusterMasterService masterService;
+    private final AbstractHaClusterConfig clusterConfig;
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof ClusterMessage) {
-            ClusterMessage clusterMsg = (ClusterMessage) msg;
+        if (msg instanceof HaClusterMessage) {
+            HaClusterMessage clusterMsg = (HaClusterMessage) msg;
             log.info("ClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, serverTime: {}, infos: {}]",
-                    this.clusterConfig.getServerId(), ClusterUtils.getTcpAddress(ctx.channel()),
+                    this.clusterConfig.getServerId(), HaUtils.getTcpAddress(ctx.channel()),
                     clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
 
-            HaClusterInfo cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+            HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-                log.error("RECV: [{}]. Not Found Channel Cluster Object... Oops Will be closed.", ClusterUtils.getAddress(ctx.channel()));
+                log.error("RECV: [{}]. Not Found Channel Cluster Object... Oops Will be closed.", HaUtils.getAddress(ctx.channel()));
                 closeChannel(ctx.channel());
                 return;
             }
 
             cluster.getElectionState().setLastRecvTime();
+            this.masterService.onClusterMessage(clusterMsg);
 //            ctx.writeAndFlush(clusterMsg);
         }
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        HaClusterInfo cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+        HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
         if (cluster == null) {
-            log.error("{}.++channelInactive: Unknown Cluster: {}.", this.getClass().getSimpleName(), ClusterUtils.getAddress(ctx.channel()));
+            log.error("{}.++channelInactive: Unknown Cluster: {}.", this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()));
             return;
         }
         try {
@@ -51,7 +53,7 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
             log.info("{}.++channelInactive: [{}, {}].", this.getClass().getSimpleName(), cluster.getServerId(), cluster.getIpAddress());
             cluster.getElectionState().disConnect();
 
-            ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+            ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
             ctx.fireChannelInactive();
         }
         finally {
@@ -63,23 +65,23 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object e) throws Exception {
         if (e instanceof IdleStateEvent) {
-            HaClusterInfo cluster = ctx.channel().attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+            HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-                log.error("{}.userEventTriggered: Unknown Cluster: {}.", this.getClass().getSimpleName(), ClusterUtils.getAddress(ctx.channel()));
+                log.error("{}.userEventTriggered: Unknown Cluster: {}.", this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()));
                 return;
             }
 
             IdleStateEvent evt = (IdleStateEvent) e;
 
             MDC.put("id", cluster.getLogKey());
-            log.info("{}.++userEventTriggered: {}. {}", this.getClass().getSimpleName(), ClusterUtils.getAddress(ctx.channel()), evt);
+            log.info("{}.++userEventTriggered: {}. {}", this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()), evt);
 
             if (evt.state() == IdleState.READER_IDLE) {
                 long recvTimeout = System.currentTimeMillis() - cluster.getElectionState().getLastRecvTime();
                 long heartbeatTimeout = this.clusterConfig.getSyncSeconds() * 1000L * 3;
                 if (recvTimeout > heartbeatTimeout) {
                     log.info("{}.++userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
-                            this.getClass().getSimpleName(), ClusterUtils.getAddress(ctx.channel()),
+                            this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()),
                             cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, heartbeatTimeout);
                     closeChannel(ctx.channel());
                 }

+ 16 - 15
its-cluster/src/main/java/com/its/common/cluster/master/ClusterMasterInitializer.java → its-cluster/src/main/java/com/its/common/cluster/master/HaClusterMasterInitializer.java

@@ -1,10 +1,10 @@
 package com.its.common.cluster.master;
 
-import com.its.common.cluster.utils.ClusterMessageDecoder;
-import com.its.common.cluster.utils.ClusterMessageEncoder;
-import com.its.common.cluster.utils.ClusterUtils;
-import com.its.common.cluster.vo.HaClusterConfig;
-import com.its.common.cluster.vo.HaClusterInfo;
+import com.its.common.cluster.utils.HaClusterMessageDecoder;
+import com.its.common.cluster.utils.HaClusterMessageEncoder;
+import com.its.common.cluster.utils.HaUtils;
+import com.its.common.cluster.vo.AbstractHaClusterConfig;
+import com.its.common.cluster.vo.HaInfo;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
@@ -19,9 +19,10 @@ import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
+public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
 
-    private final HaClusterConfig clusterConfig;
+    private final AbstractHaClusterMasterService masterService;
+    private final AbstractHaClusterConfig clusterConfig;
 
     @Override
     protected void initChannel(Channel channel) throws Exception {
@@ -29,13 +30,13 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
 //        String clientIP = remoteAddress.getAddress().getHostAddress();
 //        int clientPort = remoteAddress.getPort();
 
-        String ipAddress  = ClusterUtils.getRemoteIpAddress(channel);
-        int clientPort = ClusterUtils.getRemotePort(channel);
+        String ipAddress  = HaUtils.getRemoteIpAddress(channel);
+        int clientPort = HaUtils.getRemotePort(channel);
         int serverId = clientPort - this.clusterConfig.getSyncPort();
         log.info("ClusterMasterInitializer.----initChannel: connected from: {}:{}, ServerId: {}.",
                 ipAddress, clientPort, serverId);
 //        HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
-        HaClusterInfo cluster = this.clusterConfig.getClusterMap().get(serverId);
+        HaInfo cluster = this.clusterConfig.getClusterMap().get(serverId);
         if (cluster == null) {
             log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
             channel.disconnect();
@@ -56,7 +57,7 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
             if (cluster.getElectionState().getChannel() != null) {
                 log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
                 // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
-                channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+                channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
                 cluster.getElectionState().disConnect();
 
                 channel.disconnect();
@@ -64,7 +65,7 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
             }
 
             cluster.getElectionState().connect(channel);
-            channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
+            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
 
             IdleStateHandler idleStateHandler = new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS);
             ChannelPipeline pipeline = channel.pipeline();
@@ -73,9 +74,9 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
             }
             pipeline.addLast(idleStateHandler);
 
-            pipeline.addLast(new ClusterMessageDecoder());
-            pipeline.addLast(new ClusterMessageEncoder());
-            pipeline.addLast(new ClusterMasterHandler(this.clusterConfig));
+            pipeline.addLast(new HaClusterMessageDecoder());
+            pipeline.addLast(new HaClusterMessageEncoder());
+            pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig));
         }
         finally {
             MDC.remove(cluster.getLogKey());

+ 34 - 29
its-cluster/src/main/java/com/its/common/cluster/slave/ClusterSlaveService.java → its-cluster/src/main/java/com/its/common/cluster/slave/AbstractHaClusterSlaveService.java

@@ -1,11 +1,11 @@
 package com.its.common.cluster.slave;
 
-import com.its.common.cluster.utils.ClusterMessage;
-import com.its.common.cluster.utils.ClusterMessageData;
-import com.its.common.cluster.utils.ClusterSlaveBootstrapFactory;
-import com.its.common.cluster.utils.ClusterUtils;
-import com.its.common.cluster.vo.HaClusterConfig;
-import com.its.common.cluster.vo.HaClusterInfo;
+import com.its.common.cluster.utils.HaClusterMessage;
+import com.its.common.cluster.utils.AbstractHaClusterMessageData;
+import com.its.common.cluster.utils.HaClusterSlaveBootstrapFactory;
+import com.its.common.cluster.utils.HaUtils;
+import com.its.common.cluster.vo.AbstractHaClusterConfig;
+import com.its.common.cluster.vo.HaInfo;
 import com.its.common.cluster.vo.HaNET;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -13,7 +13,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
-import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.text.SimpleDateFormat;
@@ -25,20 +24,21 @@ import java.util.concurrent.ScheduledFuture;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ClusterSlaveService {
+public abstract class AbstractHaClusterSlaveService {
 
     private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
-    private ScheduledFuture<?> taskFuture;
+    private final ExecutorService executorService= Executors.newFixedThreadPool(1);
+    private final List<HaClusterSlave> clientTasks = Collections.synchronizedList(new ArrayList<>());
 
-    private final HaClusterConfig clusterConfig;
-    private ClusterSlaveBootstrapFactory bootstrapFactory;
+    private final AbstractHaClusterConfig clusterConfig;
+
+    private ScheduledFuture<?> taskFuture;
+    private HaClusterSlaveBootstrapFactory bootstrapFactory;
 
-    private final ExecutorService executorService= Executors.newFixedThreadPool(1);
-    private final List<ClusterSlave> clientTasks = Collections.synchronizedList(new ArrayList<>());
 
     @PostConstruct
     void init() {
-        this.bootstrapFactory = new ClusterSlaveBootstrapFactory(1, 5);
+        this.bootstrapFactory = new HaClusterSlaveBootstrapFactory(1, 5);
         this.taskScheduler.setPoolSize(1);
         this.taskScheduler.initialize();
     }
@@ -49,12 +49,12 @@ public class ClusterSlaveService {
         /**
          * Cluster 접속
          */
-        for (Map.Entry<Integer, HaClusterInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-            HaClusterInfo cluster = entry.getValue();
+        for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            HaInfo cluster = entry.getValue();
             if (cluster.getServerId() == this.clusterConfig.getServerId()) {
                 continue;
             }
-            ClusterSlave slaveClient = new ClusterSlave(this, clusterConfig, cluster, this.bootstrapFactory);
+            HaClusterSlave slaveClient = new HaClusterSlave(this, clusterConfig, cluster, this.bootstrapFactory);
             this.clientTasks.add(slaveClient);
         }
 
@@ -77,8 +77,8 @@ public class ClusterSlaveService {
         }
         this.taskScheduler.shutdown();
 
-        for (Map.Entry<Integer, HaClusterInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-            HaClusterInfo cluster = entry.getValue();
+        for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            HaInfo cluster = entry.getValue();
             if (cluster.getServerId() == this.clusterConfig.getServerId()) {
                 continue;
             }
@@ -108,8 +108,12 @@ public class ClusterSlaveService {
         Date dtLog = new Date();
         return sdfDate.format(dtLog);
     }
-    private ClusterMessage getClusterMessage() {
-        List<ClusterMessageData> details = new ArrayList<>();
+
+    public abstract List<AbstractHaClusterMessageData> getClusterMessageData();
+
+    private HaClusterMessage getClusterMessage() {
+//        List<HaClusterMessageData> details = new ArrayList<>();
+        List<AbstractHaClusterMessageData> details = getClusterMessageData();
 //        List<String> keySet = new ArrayList<>(ApplicationRepository.CENTER_MAP.keySet());
 //        Collections.sort(keySet);
 //        for (String key : keySet) {
@@ -119,7 +123,7 @@ public class ClusterSlaveService {
 //            }
 ////            details.add(region.getClusterData());
 //        }
-        return ClusterMessage.builder()
+        return HaClusterMessage.builder()
                 .serverId(this.clusterConfig.getServerId())
                 .serverTime(getSysTime())
                 .infos(details)
@@ -129,9 +133,9 @@ public class ClusterSlaveService {
     private void dataSyncSchedule() {
         log.info("ClusterSlaveService:dataSyncSchedule: {} seconds.", this.clusterConfig.getSyncSeconds());
         this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
-            ClusterMessage clusterMsg = getClusterMessage();
-            for (Map.Entry<Integer, HaClusterInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-                HaClusterInfo cluster = entry.getValue();
+            HaClusterMessage clusterMsg = getClusterMessage();
+            for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                HaInfo cluster = entry.getValue();
                 if (cluster.getServerId() == this.clusterConfig.getServerId()) {
                     continue;
                 }
@@ -139,11 +143,12 @@ public class ClusterSlaveService {
                     sendSyncData(cluster, cluster.getSyncState().getChannel(), clusterMsg);
                 }
             }
+
         }, this.clusterConfig.getSyncSeconds() * 1000L);
 
     }
 
-    public void sendSyncData(final HaClusterInfo cluster, final Channel channel, ClusterMessage clusterMsg) {
+    public void sendSyncData(final HaInfo cluster, final Channel channel, HaClusterMessage clusterMsg) {
         if (null == clusterMsg) {
             clusterMsg = getClusterMessage();
         }
@@ -153,16 +158,16 @@ public class ClusterSlaveService {
             f.awaitUninterruptibly();
             if (f.isDone() || f.isSuccess()) {
                 log.info("ClusterSlaveService.sendSyncData: [{}], {}, [--TO: serverId: {}, serverTime: {}, infos: {}]",
-                        this.clusterConfig.getServerId(), ClusterUtils.getTcpAddress(channel),
+                        this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
                         clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
             }
         }
         catch (Exception e) {
             log.info("ClusterSlaveService.sendSyncData: [{}], {}, Failed: [--TO: serverId: {}, serverTime: {}, infos: {}]",
-                    this.clusterConfig.getServerId(), ClusterUtils.getTcpAddress(channel),
+                    this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
                     clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
             log.info("ClusterSlaveService.sendSyncData: [{}], {}, Failed: {}",
-                    this.clusterConfig.getServerId(), ClusterUtils.getTcpAddress(channel), e.getMessage());
+                    this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel), e.getMessage());
         }
         finally {
             MDC.remove(cluster.getLogKey());

+ 15 - 15
its-cluster/src/main/java/com/its/common/cluster/slave/ClusterSlave.java → its-cluster/src/main/java/com/its/common/cluster/slave/HaClusterSlave.java

@@ -1,10 +1,10 @@
 package com.its.common.cluster.slave;
 
-import com.its.common.cluster.utils.ClusterMessageDecoder;
-import com.its.common.cluster.utils.ClusterMessageEncoder;
-import com.its.common.cluster.utils.ClusterSlaveBootstrapFactory;
-import com.its.common.cluster.vo.HaClusterConfig;
-import com.its.common.cluster.vo.HaClusterInfo;
+import com.its.common.cluster.utils.HaClusterMessageDecoder;
+import com.its.common.cluster.utils.HaClusterMessageEncoder;
+import com.its.common.cluster.utils.HaClusterSlaveBootstrapFactory;
+import com.its.common.cluster.vo.AbstractHaClusterConfig;
+import com.its.common.cluster.vo.HaInfo;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.*;
 import io.netty.channel.socket.SocketChannel;
@@ -25,12 +25,12 @@ import java.util.concurrent.TimeUnit;
 @Getter
 @Setter
 @RequiredArgsConstructor
-public class ClusterSlave  implements Callable<Object> {
+public class HaClusterSlave implements Callable<Object> {
 
-    private final ClusterSlaveService slaveService;
-    private final HaClusterConfig clusterConfig;
-    private final HaClusterInfo cluster;
-    private final ClusterSlaveBootstrapFactory bootstrapFactory;
+    private final AbstractHaClusterSlaveService slaveService;
+    private final AbstractHaClusterConfig clusterConfig;
+    private final HaInfo cluster;
+    private final HaClusterSlaveBootstrapFactory bootstrapFactory;
 
     private Bootstrap bootstrap = null;
     private EventLoopGroup nioEventLoopGroup = null;
@@ -62,9 +62,9 @@ public class ClusterSlave  implements Callable<Object> {
                         IdleStateHandler idleStateHandler = new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS);
 
                         ch.pipeline().addLast(idleStateHandler);
-                        ch.pipeline().addLast(new ClusterMessageDecoder());
-                        ch.pipeline().addLast(new ClusterMessageEncoder());
-                        ch.pipeline().addLast(new ClusterSlaveHandler(slaveService, cluster));
+                        ch.pipeline().addLast(new HaClusterMessageDecoder());
+                        ch.pipeline().addLast(new HaClusterMessageEncoder());
+                        ch.pipeline().addLast(new HaClusterSlaveHandler(slaveService, cluster));
                     }
                 });
                 // 바인드 로컬 포트 설정
@@ -110,7 +110,7 @@ public class ClusterSlave  implements Callable<Object> {
             if (future.isSuccess()) {
                 Channel channel = future.channel();
                 log.info("ClusterSlave ..channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
-                channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
+                channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
                 this.cluster.getSyncState().connect(channel);
             }
             else {
@@ -134,7 +134,7 @@ public class ClusterSlave  implements Callable<Object> {
             Channel channel = future.channel();
             log.warn("ClusterSlave channelClosed: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
 
-            channel.attr(HaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
             this.cluster.getSyncState().disConnect();
             channel.close();
             channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);

+ 4 - 4
its-cluster/src/main/java/com/its/common/cluster/slave/ClusterSlaveHandler.java → its-cluster/src/main/java/com/its/common/cluster/slave/HaClusterSlaveHandler.java

@@ -1,6 +1,6 @@
 package com.its.common.cluster.slave;
 
-import com.its.common.cluster.vo.HaClusterInfo;
+import com.its.common.cluster.vo.HaInfo;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.ScheduledFuture;
@@ -10,10 +10,10 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ClusterSlaveHandler extends ChannelInboundHandlerAdapter {
+public class HaClusterSlaveHandler extends ChannelInboundHandlerAdapter {
 
-    private final ClusterSlaveService slaveService;
-    private final HaClusterInfo cluster;
+    private final AbstractHaClusterSlaveService slaveService;
+    private final HaInfo cluster;
 
     private ScheduledFuture<?> future;
 

+ 3 - 3
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterMessageData.java → its-cluster/src/main/java/com/its/common/cluster/utils/AbstractHaClusterMessageData.java

@@ -1,13 +1,11 @@
 package com.its.common.cluster.utils;
 
-import lombok.Builder;
 import lombok.Data;
 
 import java.io.Serializable;
 
-@Builder
 @Data
-public class ClusterMessageData implements Serializable {
+public abstract class AbstractHaClusterMessageData implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private String centerId;
@@ -21,4 +19,6 @@ public class ClusterMessageData implements Serializable {
     private long sendTm;
     private int sendSeconds;
 
+    private Object object;
+
 }

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterMessage.java → its-cluster/src/main/java/com/its/common/cluster/utils/HaClusterMessage.java

@@ -8,11 +8,11 @@ import java.util.List;
 
 @Builder
 @Data
-public class ClusterMessage implements Serializable {
+public class HaClusterMessage implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private int serverId;
     private String serverTime;
 
-    private List<ClusterMessageData> infos;
+    private List<AbstractHaClusterMessageData> infos;
 }

+ 1 - 1
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterMessageDecoder.java → its-cluster/src/main/java/com/its/common/cluster/utils/HaClusterMessageDecoder.java

@@ -8,7 +8,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
 import java.util.List;
 
-public class ClusterMessageDecoder extends ByteToMessageDecoder {
+public class HaClusterMessageDecoder extends ByteToMessageDecoder {
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterMessageEncoder.java → its-cluster/src/main/java/com/its/common/cluster/utils/HaClusterMessageEncoder.java

@@ -7,10 +7,10 @@ import io.netty.handler.codec.MessageToByteEncoder;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 
-public class ClusterMessageEncoder extends MessageToByteEncoder<ClusterMessage> {
+public class HaClusterMessageEncoder extends MessageToByteEncoder<HaClusterMessage> {
 
     @Override
-    protected void encode(ChannelHandlerContext ctx, ClusterMessage msg, ByteBuf out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, HaClusterMessage msg, ByteBuf out) throws Exception {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         oos.writeObject(msg);

+ 3 - 7
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterSlaveBootstrapFactory.java → its-cluster/src/main/java/com/its/common/cluster/utils/HaClusterSlaveBootstrapFactory.java

@@ -3,31 +3,27 @@ package com.its.common.cluster.utils;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @RequiredArgsConstructor
-public class ClusterSlaveBootstrapFactory {
+public class HaClusterSlaveBootstrapFactory {
     private final int workerThread;
     private final int connectTimeout;
     private EventLoopGroup nioEventLoopGroup = null;
 
     public Bootstrap createBootstrap() {
         if (this.nioEventLoopGroup == null) {
-            this.nioEventLoopGroup = ClusterUtils.newEventLoopGroup(this.workerThread, "itsClusterEventGroup");
+            this.nioEventLoopGroup = HaUtils.newEventLoopGroup(this.workerThread, "itsClusterEventGroup");
             //new NioEventLoopGroup(this.workerThread);  //EpollEventLoopGroup
         }
         Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(this.nioEventLoopGroup);
 
-        if (ClusterUtils.isEpollAvailable()) {
+        if (HaUtils.isEpollAvailable()) {
             bootstrap.channel(EpollSocketChannel.class);
         }
         else {

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterPlatform.java → its-cluster/src/main/java/com/its/common/cluster/utils/HaPlatform.java

@@ -1,9 +1,9 @@
 package com.its.common.cluster.utils;
 
-public class ClusterPlatform {
+public class HaPlatform {
     private static final String OS = System.getProperty("os.name").toLowerCase();
 
-    private ClusterPlatform() {
+    private HaPlatform() {
     }
 
     public static boolean isWindows() {

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/ClusterUtils.java → its-cluster/src/main/java/com/its/common/cluster/utils/HaUtils.java

@@ -17,12 +17,12 @@ import java.net.*;
 import java.util.ArrayList;
 import java.util.Enumeration;
 
-public class ClusterUtils {
+public class HaUtils {
     public static final String OS_NAME = System.getProperty("os.name");
     private static boolean isLinuxPlatform = false;
     private static boolean isWindowsPlatform = false;
 
-    private ClusterUtils() {
+    private HaUtils() {
     }
     public static String getAddress(Channel ch) {
         String localIp = "local-unknown";

+ 10 - 14
its-cluster/src/main/java/com/its/common/cluster/vo/HaClusterConfig.java → its-cluster/src/main/java/com/its/common/cluster/vo/AbstractHaClusterConfig.java

@@ -1,11 +1,8 @@
 package com.its.common.cluster.vo;
 
 import io.netty.util.AttributeKey;
-import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.io.BufferedReader;
@@ -13,28 +10,27 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 @Slf4j
 @Data
-public class HaClusterConfig {
+public abstract class AbstractHaClusterConfig {
 
-    public static final AttributeKey<HaClusterInfo> CLUSTER_ATTRIBUTE_KEY = AttributeKey.valueOf("clusterInfo");
+    public static final AttributeKey<HaInfo> CLUSTER_ATTRIBUTE_KEY = AttributeKey.valueOf("clusterInfo");
 
     private boolean master = false;
 
     private int syncSeconds = -1;
     private int serverId = -1;
     private String ipAddress;
-    private int syncPort = -1;       // 포트 1: 데이터 동기화를 위한 포트
+    private int syncPort = -1;  // 포트 1: 데이터 동기화를 위한 포트
 //    private int electionPort;   // 포트 2: 리더선출을 위한 포트
 
     private String configFile;
     private boolean logging = false;
 
-    private final HashMap<Integer, HaClusterInfo> clusterMap = new HashMap<>();
+    private final HashMap<Integer, HaInfo> clusterMap = new HashMap<>();
 
     @PostConstruct
     private void init() throws IOException {
@@ -44,9 +40,9 @@ public class HaClusterConfig {
         log.info("{}", this);
     }
 
-    public HaClusterInfo get(String ipAddress) {
-        for (Map.Entry<Integer, HaClusterInfo> entry : this.clusterMap.entrySet()) {
-            HaClusterInfo cluster = entry.getValue();
+    public HaInfo get(String ipAddress) {
+        for (Map.Entry<Integer, HaInfo> entry : this.clusterMap.entrySet()) {
+            HaInfo cluster = entry.getValue();
             if (cluster.getIpAddress().equals(ipAddress)) {
                 return cluster;
             }
@@ -137,7 +133,7 @@ public class HaClusterConfig {
                         if (serverId < masterId) {
                             masterId = serverId;
                         }
-                        HaClusterInfo haCluster = HaClusterInfo.builder()
+                        HaInfo haCluster = HaInfo.builder()
                                 .master(false)
                                 .serverId(serverId)
                                 .ipAddress(ipAddress)
@@ -156,8 +152,8 @@ public class HaClusterConfig {
             this.master = true;
         }
 
-        for (Map.Entry<Integer, HaClusterInfo> entry : this.clusterMap.entrySet()) {
-            HaClusterInfo cluster = entry.getValue();
+        for (Map.Entry<Integer, HaInfo> entry : this.clusterMap.entrySet()) {
+            HaInfo cluster = entry.getValue();
             if (cluster.getServerId() == masterId) {
                 cluster.setMaster(true);
                 break;

+ 1 - 3
its-cluster/src/main/java/com/its/common/cluster/vo/HaClusterInfo.java → its-cluster/src/main/java/com/its/common/cluster/vo/HaInfo.java

@@ -3,11 +3,9 @@ package com.its.common.cluster.vo;
 import lombok.Builder;
 import lombok.Data;
 
-import java.util.concurrent.atomic.LongAdder;
-
 @Data
 @Builder
-public class HaClusterInfo {
+public class HaInfo {
     private boolean master;
     private int serverId;
     private String ipAddress;