فهرست منبع

update its-cluster

HANTE 1 ماه پیش
والد
کامیت
67d63d3e57
21فایلهای تغییر یافته به همراه445 افزوده شده و 378 حذف شده
  1. 5 5
      its-cluster/src/main/java/com/its/common/cluster/codec/ClusterMessageDecoder.java
  2. 5 5
      its-cluster/src/main/java/com/its/common/cluster/codec/ClusterMessageEncoder.java
  3. 27 27
      its-cluster/src/main/java/com/its/common/cluster/handler/ClusterMasterHandler.java
  4. 7 7
      its-cluster/src/main/java/com/its/common/cluster/handler/ClusterSlaveHandler.java
  5. 33 33
      its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java
  6. 31 31
      its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterSlaveService.java
  7. 24 24
      its-cluster/src/main/java/com/its/common/cluster/service/ClusterMasterInitializer.java
  8. 23 23
      its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlave.java
  9. 4 5
      its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlaveBootstrapFactory.java
  10. 4 4
      its-cluster/src/main/java/com/its/common/cluster/utils/ClusterEvenAllocator.java
  11. 2 2
      its-cluster/src/main/java/com/its/common/cluster/utils/ClusterHashAllocator.java
  12. 2 2
      its-cluster/src/main/java/com/its/common/cluster/utils/ClusterPlatform.java
  13. 2 2
      its-cluster/src/main/java/com/its/common/cluster/utils/ClusterUtils.java
  14. 236 0
      its-cluster/src/main/java/com/its/common/cluster/vo/AbstractClusterConfig.java
  15. 0 170
      its-cluster/src/main/java/com/its/common/cluster/vo/AbstractHaClusterConfig.java
  16. 3 3
      its-cluster/src/main/java/com/its/common/cluster/vo/ClusterMessage.java
  17. 1 4
      its-cluster/src/main/java/com/its/common/cluster/vo/ClusterMessageData.java
  18. 2 2
      its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNET.java
  19. 8 8
      its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNetState.java
  20. 26 0
      its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNode.java
  21. 0 21
      its-cluster/src/main/java/com/its/common/cluster/vo/HaInfo.java

+ 5 - 5
its-cluster/src/main/java/com/its/common/cluster/codec/HaClusterMessageDecoder.java → its-cluster/src/main/java/com/its/common/cluster/codec/ClusterMessageDecoder.java

@@ -1,6 +1,6 @@
 package com.its.common.cluster.codec;
 
-import com.its.common.cluster.vo.HaClusterMessage;
+import com.its.common.cluster.vo.ClusterMessage;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
@@ -11,11 +11,11 @@ import java.io.ObjectInputStream;
 import java.util.List;
 
 @Slf4j
-public class HaClusterMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
+public class ClusterMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
 
     private final boolean isLogging;
 
-    public HaClusterMessageDecoder(boolean isLogging) {
+    public ClusterMessageDecoder(boolean isLogging) {
         this.isLogging = isLogging;
     }
 
@@ -24,7 +24,7 @@ public class HaClusterMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
         // 패킷 길이(4바이트)를 따로 저장
         int packetLength = in.readInt();
         if (this.isLogging) {
-            log.info("HaClusterMessageDecoder.decode: RECV {} Bytes.", packetLength + 4);
+            log.info("ClusterMessageDecoder.decode: RECV {} Bytes.", packetLength + 4);
         }
 
         // 남은 데이터 부분 읽기
@@ -33,7 +33,7 @@ public class HaClusterMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
 
         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
         ObjectInputStream ois = new ObjectInputStream(bis);
-        HaClusterMessage msg = (HaClusterMessage) ois.readObject();
+        ClusterMessage msg = (ClusterMessage) ois.readObject();
 
         out.add(msg);
 

+ 5 - 5
its-cluster/src/main/java/com/its/common/cluster/codec/HaClusterMessageEncoder.java → its-cluster/src/main/java/com/its/common/cluster/codec/ClusterMessageEncoder.java

@@ -1,6 +1,6 @@
 package com.its.common.cluster.codec;
 
-import com.its.common.cluster.vo.HaClusterMessage;
+import com.its.common.cluster.vo.ClusterMessage;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
@@ -10,16 +10,16 @@ import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 
 @Slf4j
-public class HaClusterMessageEncoder extends MessageToByteEncoder<HaClusterMessage> {
+public class ClusterMessageEncoder extends MessageToByteEncoder<ClusterMessage> {
 
     private final boolean isLogging;
 
-    public HaClusterMessageEncoder(boolean isLogging) {
+    public ClusterMessageEncoder(boolean isLogging) {
         this.isLogging = isLogging;
     }
 
     @Override
-    protected void encode(ChannelHandlerContext ctx, HaClusterMessage msg, ByteBuf out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, ClusterMessage msg, ByteBuf out) throws Exception {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
         oos.writeObject(msg);
@@ -28,7 +28,7 @@ public class HaClusterMessageEncoder extends MessageToByteEncoder<HaClusterMessa
         byte[] bytes = bos.toByteArray();
 
         if (this.isLogging) {
-            log.info("HaClusterMessageEncoder.encode: SEND {} Bytes.", bytes.length + 4);
+            log.info("ClusterMessageEncoder.encode: SEND {} Bytes.", bytes.length + 4);
         }
 
         out.writeInt(bytes.length); // 길이 필드 추가

+ 27 - 27
its-cluster/src/main/java/com/its/common/cluster/handler/HaClusterMasterHandler.java → its-cluster/src/main/java/com/its/common/cluster/handler/ClusterMasterHandler.java

@@ -1,10 +1,10 @@
 package com.its.common.cluster.handler;
 
-import com.its.common.cluster.service.AbstractHaClusterMasterService;
-import com.its.common.cluster.vo.HaClusterMessage;
-import com.its.common.cluster.utils.HaUtils;
-import com.its.common.cluster.vo.AbstractHaClusterConfig;
-import com.its.common.cluster.vo.HaInfo;
+import com.its.common.cluster.service.AbstractClusterMasterService;
+import com.its.common.cluster.vo.ClusterMessage;
+import com.its.common.cluster.utils.ClusterUtils;
+import com.its.common.cluster.vo.AbstractClusterConfig;
+import com.its.common.cluster.vo.ClusterNode;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -16,20 +16,20 @@ import org.slf4j.MDC;
 
 @Slf4j
 @RequiredArgsConstructor
-public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
+public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
 
-    private final AbstractHaClusterMasterService masterService;
-    private final AbstractHaClusterConfig clusterConfig;
+    private final AbstractClusterMasterService masterService;
+    private final AbstractClusterConfig clusterConfig;
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof HaClusterMessage) {
-            HaClusterMessage clusterMsg = (HaClusterMessage) msg;
-            HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+        if (msg instanceof ClusterMessage) {
+            ClusterMessage clusterMsg = (ClusterMessage) msg;
+            ClusterNode cluster = ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-                log.error("HaClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, master: {}, serverTime: {}], Not Found Channel Cluster Object. Will be closed.",
-                        this.clusterConfig.getServerId(), HaUtils.getTcpAddress(ctx.channel()),
-                        clusterMsg.getServerId(), clusterMsg.isMaster(), clusterMsg.getServerTime());
+                log.error("ClusterMasterHandler.channelRead: [{}], {}, [FROM: nodeId: {}, master: {}, serverTime: {}], Not Found Channel Cluster Object. Will be closed.",
+                        this.clusterConfig.getId(), ClusterUtils.getTcpAddress(ctx.channel()),
+                        clusterMsg.getNodeId(), clusterMsg.isMaster(), clusterMsg.getServerTime());
 
                 closeChannel(ctx.channel());
                 return;
@@ -38,9 +38,9 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
             if (this.clusterConfig.isLogging()) {
                 MDC.put("id", cluster.getLogKey());
 
-                log.info("HaClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, master: {}, serverTime: {}]",
-                        this.clusterConfig.getServerId(), HaUtils.getTcpAddress(ctx.channel()),
-                        clusterMsg.getServerId(), clusterMsg.isMaster(), clusterMsg.getServerTime());
+                log.info("ClusterMasterHandler.channelRead: [{}], {}, [FROM: nodeId: {}, master: {}, serverTime: {}]",
+                        this.clusterConfig.getId(), ClusterUtils.getTcpAddress(ctx.channel()),
+                        clusterMsg.getNodeId(), clusterMsg.isMaster(), clusterMsg.getServerTime());
 
                 MDC.remove(cluster.getLogKey());
                 MDC.clear();
@@ -53,16 +53,16 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+        ClusterNode cluster = ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
         if (cluster == null) {
-            log.error("HaClusterMasterHandler.channelInactive: Unknown Cluster: {}.", HaUtils.getAddress(ctx.channel()));
+            log.error("ClusterMasterHandler.channelInactive: Unknown Cluster: {}.", ClusterUtils.getAddress(ctx.channel()));
             return;
         }
 
         if (this.clusterConfig.isLogging()) {
             MDC.put("id", cluster.getLogKey());
 
-            log.info("HaClusterMasterHandler.channelInactive: [{}, {}].", cluster.getServerId(), cluster.getIpAddress());
+            log.info("ClusterMasterHandler.channelInactive: [{}, {}].", cluster.getId(), cluster.getIp());
 
             MDC.remove(cluster.getLogKey());
             MDC.clear();
@@ -70,16 +70,16 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
 
         cluster.getElectionState().disConnect();
 
-        ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+        ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
         ctx.fireChannelInactive();
     }
 
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object e) throws Exception {
         if (e instanceof IdleStateEvent) {
-            HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
+            ClusterNode cluster = ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-                log.error("HaClusterMasterHandler.userEventTriggered: Unknown Cluster: {}.", HaUtils.getAddress(ctx.channel()));
+                log.error("ClusterMasterHandler.userEventTriggered: Unknown Cluster: {}.", ClusterUtils.getAddress(ctx.channel()));
                 return;
             }
 
@@ -87,7 +87,7 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
 
             if (this.clusterConfig.isLogging()) {
                 MDC.put("id", cluster.getLogKey());
-                log.info("HaClusterMasterHandler.userEventTriggered: {}. {}", HaUtils.getAddress(ctx.channel()), evt);
+                log.info("ClusterMasterHandler.userEventTriggered: {}. {}", ClusterUtils.getAddress(ctx.channel()), evt);
                 MDC.remove(cluster.getLogKey());
                 MDC.clear();
             }
@@ -96,8 +96,8 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
                 long recvTimeout = System.currentTimeMillis() - cluster.getElectionState().getLastRecvTime();
                 long heartbeatTimeout = this.clusterConfig.getSyncSeconds() * 1000L * 3;
                 if (recvTimeout > heartbeatTimeout) {
-                    log.warn("HaClusterMasterHandler.userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
-                            HaUtils.getAddress(ctx.channel()), cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, heartbeatTimeout);
+                    log.warn("ClusterMasterHandler.userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
+                            ClusterUtils.getAddress(ctx.channel()), cluster.getLogKey(), cluster.getIp(), recvTimeout, heartbeatTimeout);
 
                     closeChannel(ctx.channel());
                 }
@@ -115,7 +115,7 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
             }
         }
         catch (Exception e) {
-            log.error("HaClusterMasterHandler.closeChannel: Exception: {}", e.getMessage());
+            log.error("ClusterMasterHandler.closeChannel: Exception: {}", e.getMessage());
         }
     }
 

+ 7 - 7
its-cluster/src/main/java/com/its/common/cluster/handler/HaClusterSlaveHandler.java → its-cluster/src/main/java/com/its/common/cluster/handler/ClusterSlaveHandler.java

@@ -1,7 +1,7 @@
 package com.its.common.cluster.handler;
 
-import com.its.common.cluster.service.AbstractHaClusterSlaveService;
-import com.its.common.cluster.vo.HaInfo;
+import com.its.common.cluster.service.AbstractClusterSlaveService;
+import com.its.common.cluster.vo.ClusterNode;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.ScheduledFuture;
@@ -11,10 +11,10 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @RequiredArgsConstructor
-public class HaClusterSlaveHandler extends ChannelInboundHandlerAdapter {
+public class ClusterSlaveHandler extends ChannelInboundHandlerAdapter {
 
-    private final AbstractHaClusterSlaveService slaveService;
-    private final HaInfo cluster;
+    private final AbstractClusterSlaveService slaveService;
+    private final ClusterNode cluster;
 
     private ScheduledFuture<?> future = null;
 
@@ -27,8 +27,8 @@ public class HaClusterSlaveHandler extends ChannelInboundHandlerAdapter {
 //    public void channelRead(ChannelHandlerContext ctx, Object msg) {
 //        if (msg instanceof ClusterMessage) {
 //            ClusterMessage clusterMsg = (ClusterMessage) msg;
-//            log.info("ClusterSlaveHandler.channelRead: serverId: {}, serverTime: {}, infos: {}",
-//                    clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
+//            log.info("ClusterSlaveHandler.channelRead: nodeId: {}, serverTime: {}, infos: {}",
+//                    clusterMsg.getNodeId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
 //        }
 //    }
 

+ 33 - 33
its-cluster/src/main/java/com/its/common/cluster/service/AbstractHaClusterMasterService.java → its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java

@@ -1,11 +1,11 @@
 package com.its.common.cluster.service;
 
-import com.its.common.cluster.vo.HaClusterMessage;
-import com.its.common.cluster.utils.HaPlatform;
-import com.its.common.cluster.utils.HaUtils;
-import com.its.common.cluster.vo.AbstractHaClusterConfig;
-import com.its.common.cluster.vo.HaInfo;
-import com.its.common.cluster.vo.HaNET;
+import com.its.common.cluster.vo.ClusterMessage;
+import com.its.common.cluster.utils.ClusterPlatform;
+import com.its.common.cluster.utils.ClusterUtils;
+import com.its.common.cluster.vo.AbstractClusterConfig;
+import com.its.common.cluster.vo.ClusterNode;
+import com.its.common.cluster.vo.ClusterNET;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
@@ -22,10 +22,10 @@ import java.util.concurrent.ScheduledFuture;
 
 @Slf4j
 @RequiredArgsConstructor
-public abstract class AbstractHaClusterMasterService {
+public abstract class AbstractClusterMasterService {
 
     private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
-    private final AbstractHaClusterConfig clusterConfig;
+    private final AbstractClusterConfig clusterConfig;
 
     private ScheduledFuture<?> taskFuture;
     private EventLoopGroup acceptGroup;
@@ -39,12 +39,12 @@ public abstract class AbstractHaClusterMasterService {
     }
 
     public void start() {
-        if (!HaPlatform.isWindows()) {
+        if (!ClusterPlatform.isWindows()) {
             if (!Epoll.isAvailable()) {
                 Epoll.unavailabilityCause().printStackTrace();
             }
         }
-        if (HaUtils.isEpollAvailable()) {
+        if (ClusterUtils.isEpollAvailable()) {
             log.info("The Cluster Master runs in LINUX EPOLL mode.");
         }
         else {
@@ -56,18 +56,18 @@ public abstract class AbstractHaClusterMasterService {
         ServerBootstrap serverBootstrap = createBootstrap();
 
         log.info("*********************************************************************************");
-        log.info("**              UTIC HA Cluster Master Server Information                      **");
-        log.info("**     bindAddress: {}", this.clusterConfig.getIpAddress());
-        log.info("**      listenPort: {}", this.clusterConfig.getSyncPort());
+        log.info("**             UTIC Center Cluster Master Server Information                   **");
+        log.info("**     bindAddress: {}", this.clusterConfig.getIp());
+        log.info("**      listenPort: {}", this.clusterConfig.getPort());
         log.info("**        isMaster: {}", this.clusterConfig.isMaster());
         log.info("*********************************************************************************");
 
         try {
-            if (this.clusterConfig.getIpAddress().equals("0.0.0.0")) {
-                this.channelFuture = serverBootstrap.bind(this.clusterConfig.getSyncPort());
+            if (this.clusterConfig.getIp().equals("0.0.0.0")) {
+                this.channelFuture = serverBootstrap.bind(this.clusterConfig.getPort());
             }
             else {
-                this.channelFuture = serverBootstrap.bind(this.clusterConfig.getIpAddress(), this.clusterConfig.getSyncPort());
+                this.channelFuture = serverBootstrap.bind(this.clusterConfig.getIp(), this.clusterConfig.getPort());
             }
             electionMasterSchedule();
         }
@@ -81,9 +81,9 @@ public abstract class AbstractHaClusterMasterService {
         EventLoopGroup acceptGroups;
         EventLoopGroup workerGroups;
 
-        acceptGroups = HaUtils.newEventLoopGroup(1, "Accept");
-        workerGroups = HaUtils.newEventLoopGroup(1, "Worker");
-        serverBootstrap.channel(HaUtils.getServerSocketChannel());
+        acceptGroups = ClusterUtils.newEventLoopGroup(1, "Accept");
+        workerGroups = ClusterUtils.newEventLoopGroup(1, "Worker");
+        serverBootstrap.channel(ClusterUtils.getServerSocketChannel());
         serverBootstrap.group(acceptGroups, workerGroups);
 
         serverBootstrap.option(ChannelOption.AUTO_READ, true);
@@ -97,28 +97,28 @@ public abstract class AbstractHaClusterMasterService {
         serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);     // 소켓 재사용
         serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);      // Nagle 알고리즘 비활성화
 
-        HaClusterMasterInitializer haClusterMasterInitializer = new HaClusterMasterInitializer(this, this.clusterConfig);
-        serverBootstrap.childHandler(haClusterMasterInitializer);
+        ClusterMasterInitializer clusterMasterInitializer = new ClusterMasterInitializer(this, this.clusterConfig);
+        serverBootstrap.childHandler(clusterMasterInitializer);
 
         return serverBootstrap;
     }
 
-    public abstract void election(int serverId, boolean isMaster);
-    public abstract void onClusterMessage(HaClusterMessage message);
+    public abstract void election(int nodeId, boolean isMaster);
+    public abstract void onClusterMessage(ClusterMessage message);
 
     private void electionMasterSchedule() {
         this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
             int masterId = Integer.MAX_VALUE;
-            for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-                HaInfo cluster = entry.getValue();
-                if (cluster.getElectionState().getState() != HaNET.CLOSED) {
-                    if (cluster.getServerId() < masterId) {
-                        masterId = cluster.getServerId();
+            for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                ClusterNode cluster = entry.getValue();
+                if (cluster.getElectionState().getState() != ClusterNET.CLOSED) {
+                    if (cluster.getId() < masterId) {
+                        masterId = cluster.getId();
                     }
                 }
             }
-//            log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, masterId: {}", this.clusterConfig.getServerId(), masterId);
-            if (masterId == Integer.MAX_VALUE || masterId > this.clusterConfig.getServerId()) {
+//            log.info("ClusterMasterService:electionMasterSchedule: nodeId: {}, masterId: {}", this.clusterConfig.getNodeId(), masterId);
+            if (masterId == Integer.MAX_VALUE || masterId > this.clusterConfig.getId()) {
                 this.clusterConfig.setMaster(true);
             }
             else {
@@ -126,11 +126,11 @@ public abstract class AbstractHaClusterMasterService {
             }
 
             if (this.clusterConfig.isLogging()) {
-                log.info("ClusterMasterService:electionMasterSchedule: serverId: {}, Master: {}.",
-                        this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
+                log.info("ClusterMasterService:electionMasterSchedule: nodeId: {}, Master: {}.",
+                        this.clusterConfig.getId(), this.clusterConfig.isMaster());
             }
 
-            election(this.clusterConfig.getServerId(), this.clusterConfig.isMaster());
+            election(this.clusterConfig.getId(), this.clusterConfig.isMaster());
         }, 2 * 1000L);
     }
 

+ 31 - 31
its-cluster/src/main/java/com/its/common/cluster/service/AbstractHaClusterSlaveService.java → its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterSlaveService.java

@@ -1,7 +1,7 @@
 package com.its.common.cluster.service;
 
 import com.its.common.cluster.vo.*;
-import com.its.common.cluster.utils.HaUtils;
+import com.its.common.cluster.utils.ClusterUtils;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import lombok.RequiredArgsConstructor;
@@ -19,21 +19,21 @@ import java.util.concurrent.ScheduledFuture;
 
 @Slf4j
 @RequiredArgsConstructor
-public abstract class AbstractHaClusterSlaveService {
+public abstract class AbstractClusterSlaveService {
 
     private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
     private final ExecutorService executorService= Executors.newFixedThreadPool(1);
-    private final List<HaClusterSlave> clientTasks = Collections.synchronizedList(new ArrayList<>());
+    private final List<ClusterSlave> clientTasks = Collections.synchronizedList(new ArrayList<>());
 
-    private final AbstractHaClusterConfig clusterConfig;
+    private final AbstractClusterConfig clusterConfig;
 
     private ScheduledFuture<?> taskFuture;
-    private HaClusterSlaveBootstrapFactory bootstrapFactory;
+    private ClusterSlaveBootstrapFactory bootstrapFactory;
 
 
     @PostConstruct
     void init() {
-        this.bootstrapFactory = new HaClusterSlaveBootstrapFactory(1, 5);
+        this.bootstrapFactory = new ClusterSlaveBootstrapFactory(1, 5);
         this.taskScheduler.setPoolSize(1);
         this.taskScheduler.initialize();
     }
@@ -44,12 +44,12 @@ public abstract class AbstractHaClusterSlaveService {
         }
 
         // 모든 HaCluster 접속
-        for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-            HaInfo cluster = entry.getValue();
-            if (cluster.getServerId() == this.clusterConfig.getServerId()) {
+        for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            ClusterNode cluster = entry.getValue();
+            if (cluster.getId() == this.clusterConfig.getId()) {
                 continue;
             }
-            HaClusterSlave slaveClient = new HaClusterSlave(this, clusterConfig, cluster, this.bootstrapFactory);
+            ClusterSlave slaveClient = new ClusterSlave(this, clusterConfig, cluster, this.bootstrapFactory);
             this.clientTasks.add(slaveClient);
         }
 
@@ -76,9 +76,9 @@ public abstract class AbstractHaClusterSlaveService {
         }
         this.taskScheduler.shutdown();
 
-        for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-            HaInfo cluster = entry.getValue();
-            if (cluster.getServerId() == this.clusterConfig.getServerId()) {
+        for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            ClusterNode cluster = entry.getValue();
+            if (cluster.getId() == this.clusterConfig.getId()) {
                 continue;
             }
             channelClose(cluster.getSyncState().getChannel());
@@ -108,12 +108,12 @@ public abstract class AbstractHaClusterSlaveService {
         return sdfDate.format(dtLog);
     }
 
-    public abstract List<HaClusterMessageData> getClusterMessageData();
+    public abstract List<ClusterMessageData> getClusterMessageData();
 
-    private HaClusterMessage getClusterMessage() {
-        List<HaClusterMessageData> details = getClusterMessageData();
-        return HaClusterMessage.builder()
-                .serverId(this.clusterConfig.getServerId())
+    private ClusterMessage getClusterMessage() {
+        List<ClusterMessageData> details = getClusterMessageData();
+        return ClusterMessage.builder()
+                .nodeId(this.clusterConfig.getId())
                 .master(this.clusterConfig.isMaster())
                 .serverTime(getSysTime())
                 .infos(details)
@@ -133,12 +133,12 @@ public abstract class AbstractHaClusterSlaveService {
             log.info("ClusterSlaveService:dataSyncSchedule: {} seconds.", this.clusterConfig.getSyncSeconds());
         }
         this.taskFuture = this.taskScheduler.scheduleAtFixedRate(() -> {
-            for (Map.Entry<Integer, HaInfo> entry : this.clusterConfig.getClusterMap().entrySet()) {
-                HaInfo cluster = entry.getValue();
-                if (cluster.getServerId() == this.clusterConfig.getServerId()) {
+            for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                ClusterNode cluster = entry.getValue();
+                if (cluster.getId() == this.clusterConfig.getId()) {
                     continue;
                 }
-                if (cluster.getSyncState().getState() != HaNET.CLOSED) {
+                if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
                     sendSyncData(cluster, cluster.getSyncState().getChannel());
                 }
             }
@@ -147,8 +147,8 @@ public abstract class AbstractHaClusterSlaveService {
 
     }
 
-    public void sendSyncData(final HaInfo cluster, final Channel channel) {
-        HaClusterMessage clusterMsg = getClusterMessage();
+    public void sendSyncData(final ClusterNode cluster, final Channel channel) {
+        ClusterMessage clusterMsg = getClusterMessage();
 
         if (this.clusterConfig.isLogging()) {
             MDC.put("id", cluster.getLogKey());
@@ -159,18 +159,18 @@ public abstract class AbstractHaClusterSlaveService {
             f.awaitUninterruptibly();
             if (f.isDone() || f.isSuccess()) {
                 if (this.clusterConfig.isLogging()) {
-                    log.info("ClusterSlaveService.sendSyncData: [{}], {}, [--TO: serverId: {}, (serverId: {}, serverTime: {})]",
-                            this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
-                            cluster.getServerId(), clusterMsg.getServerId(), clusterMsg.getServerTime());
+                    log.info("ClusterSlaveService.sendSyncData: [{}], {}, [--TO: nodeId: {}, (nodeId: {}, serverTime: {})]",
+                            this.clusterConfig.getId(), ClusterUtils.getTcpAddress(channel),
+                            cluster.getId(), clusterMsg.getNodeId(), clusterMsg.getServerTime());
                 }
             }
         }
         catch (Exception e) {
-            log.error("ClusterSlaveService.sendSyncData: [{}], {}, Failed: [--TO: serverId: {}, (serverId: {}, serverTime: {})]",
-                    this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel),
-                    cluster.getServerId(), clusterMsg.getServerId(), clusterMsg.getServerTime());
+            log.error("ClusterSlaveService.sendSyncData: [{}], {}, Failed: [--TO: nodeId: {}, (nodeId: {}, serverTime: {})]",
+                    this.clusterConfig.getId(), ClusterUtils.getTcpAddress(channel),
+                    cluster.getId(), clusterMsg.getNodeId(), clusterMsg.getServerTime());
             log.error("ClusterSlaveService.sendSyncData: [{}], {}, Failed: {}",
-                    this.clusterConfig.getServerId(), HaUtils.getTcpAddress(channel), e.getMessage());
+                    this.clusterConfig.getId(), ClusterUtils.getTcpAddress(channel), e.getMessage());
         }
 
         if (this.clusterConfig.isLogging()) {

+ 24 - 24
its-cluster/src/main/java/com/its/common/cluster/service/HaClusterMasterInitializer.java → its-cluster/src/main/java/com/its/common/cluster/service/ClusterMasterInitializer.java

@@ -1,11 +1,11 @@
 package com.its.common.cluster.service;
 
-import com.its.common.cluster.codec.HaClusterMessageDecoder;
-import com.its.common.cluster.codec.HaClusterMessageEncoder;
-import com.its.common.cluster.handler.HaClusterMasterHandler;
-import com.its.common.cluster.utils.HaUtils;
-import com.its.common.cluster.vo.AbstractHaClusterConfig;
-import com.its.common.cluster.vo.HaInfo;
+import com.its.common.cluster.codec.ClusterMessageDecoder;
+import com.its.common.cluster.codec.ClusterMessageEncoder;
+import com.its.common.cluster.handler.ClusterMasterHandler;
+import com.its.common.cluster.utils.ClusterUtils;
+import com.its.common.cluster.vo.AbstractClusterConfig;
+import com.its.common.cluster.vo.ClusterNode;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
@@ -21,10 +21,10 @@ import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @RequiredArgsConstructor
-public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
+public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
 
-    private final AbstractHaClusterMasterService masterService;
-    private final AbstractHaClusterConfig clusterConfig;
+    private final AbstractClusterMasterService masterService;
+    private final AbstractClusterConfig clusterConfig;
 
     @Override
     protected void initChannel(Channel channel) throws Exception {
@@ -32,25 +32,25 @@ public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
 //        String clientIP = remoteAddress.getAddress().getHostAddress();
 //        int clientPort = remoteAddress.getPort();
 
-        String ipAddress  = HaUtils.getRemoteIpAddress(channel);
-        int clientPort = HaUtils.getRemotePort(channel);
-        int serverId = clientPort - this.clusterConfig.getSyncPort();
+        String ipAddress  = ClusterUtils.getRemoteIpAddress(channel);
+        int clientPort = ClusterUtils.getRemotePort(channel);
+        int nodeId = clientPort - this.clusterConfig.getPort();
 
         if (this.clusterConfig.isLogging()) {
-            log.info("HaClusterMasterInitializer.initChannel: connected from: {}:{}, ServerId: {}.", ipAddress, clientPort, serverId);
+            log.info("ClusterMasterInitializer.initChannel: connected from: {}:{}, nodeId: {}.", ipAddress, clientPort, nodeId);
         }
 
         // 하나의 서버에 여러 개의 클러스터가 있을 수 있기 때문에 IP Address 로 찾는 것은 위험함.
         // HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
-        HaInfo cluster = this.clusterConfig.getClusterMap().get(serverId);
+        ClusterNode cluster = this.clusterConfig.getClusterMap().get(nodeId);
         if (cluster == null) {
-            log.error("HaClusterMasterInitializer.initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
+            log.error("ClusterMasterInitializer.initChannel: [nodeId: {}, IP Address: {}], Unknown Server Id. will be closed.", nodeId, ipAddress);
             channel.disconnect();
             channel.close();
             return;
         }
-        if (!cluster.getIpAddress().equals(ipAddress)) {
-            log.error("HaClusterMasterInitializer.initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress);
+        if (!cluster.getIp().equals(ipAddress)) {
+            log.error("ClusterMasterInitializer.initChannel: [nodeId: {}, IP Address: {}], Unknown IP Address. will be closed.", nodeId, ipAddress);
             channel.disconnect();
             channel.close();
             return;
@@ -58,13 +58,13 @@ public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
 
         if (this.clusterConfig.isLogging()) {
             MDC.put("id", cluster.getLogKey());
-            log.info("HaClusterMasterInitializer.initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
+            log.info("ClusterMasterInitializer.initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIp());
         }
 
         if (cluster.getElectionState().getChannel() != null) {
-            log.warn("HaClusterMasterInitializer.initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
+            log.warn("ClusterMasterInitializer.initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getId());
             // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
-            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+            channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
             cluster.getElectionState().disConnect();
 
             channel.disconnect();
@@ -72,7 +72,7 @@ public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
         }
 
         cluster.getElectionState().connect(channel);
-        channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
+        channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
 
         ChannelPipeline pipeline = channel.pipeline();
 
@@ -81,9 +81,9 @@ public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
         }
         pipeline.addLast(new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS));
         pipeline.addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
-        pipeline.addLast(new HaClusterMessageDecoder(this.clusterConfig.isLogging()));
-        pipeline.addLast(new HaClusterMessageEncoder(this.clusterConfig.isLogging()));
-        pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig));
+        pipeline.addLast(new ClusterMessageDecoder(this.clusterConfig.isLogging()));
+        pipeline.addLast(new ClusterMessageEncoder(this.clusterConfig.isLogging()));
+        pipeline.addLast(new ClusterMasterHandler(this.masterService, this.clusterConfig));
 
         if (this.clusterConfig.isLogging()) {
             MDC.remove(cluster.getLogKey());

+ 23 - 23
its-cluster/src/main/java/com/its/common/cluster/service/HaClusterSlave.java → its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlave.java

@@ -1,10 +1,10 @@
 package com.its.common.cluster.service;
 
-import com.its.common.cluster.codec.HaClusterMessageDecoder;
-import com.its.common.cluster.codec.HaClusterMessageEncoder;
-import com.its.common.cluster.handler.HaClusterSlaveHandler;
-import com.its.common.cluster.vo.AbstractHaClusterConfig;
-import com.its.common.cluster.vo.HaInfo;
+import com.its.common.cluster.codec.ClusterMessageDecoder;
+import com.its.common.cluster.codec.ClusterMessageEncoder;
+import com.its.common.cluster.handler.ClusterSlaveHandler;
+import com.its.common.cluster.vo.AbstractClusterConfig;
+import com.its.common.cluster.vo.ClusterNode;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.*;
 import io.netty.channel.socket.SocketChannel;
@@ -26,12 +26,12 @@ import java.util.concurrent.TimeUnit;
 @Getter
 @Setter
 @RequiredArgsConstructor
-public class HaClusterSlave implements Callable<Object> {
+public class ClusterSlave implements Callable<Object> {
 
-    private final AbstractHaClusterSlaveService slaveService;
-    private final AbstractHaClusterConfig clusterConfig;
-    private final HaInfo cluster;
-    private final HaClusterSlaveBootstrapFactory bootstrapFactory;
+    private final AbstractClusterSlaveService slaveService;
+    private final AbstractClusterConfig clusterConfig;
+    private final ClusterNode cluster;
+    private final ClusterSlaveBootstrapFactory bootstrapFactory;
 
     private Bootstrap bootstrap = null;
     private EventLoopGroup nioEventLoopGroup = null;
@@ -47,12 +47,12 @@ public class HaClusterSlave implements Callable<Object> {
                 MDC.put("id", this.cluster.getLogKey());
             }
 
-            this.ipAddress = this.cluster.getIpAddress();
-            this.port = this.cluster.getSyncPort();
+            this.ipAddress = this.cluster.getIp();
+            this.port = this.cluster.getPort();
 
             if (this.bootstrap == null) {
                 if (this.clusterConfig.isLogging()) {
-                    log.info("HaClusterSlave: >>>>>>>>Start: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
+                    log.info("ClusterSlave: >>>>>>>>Start: [{}, {}], {}", this.cluster.getId(), this.ipAddress, this.port);
                 }
                 this.bootstrap = this.bootstrapFactory.createBootstrap();
                 this.bootstrap.option(ChannelOption.SO_REUSEADDR, true);
@@ -66,19 +66,19 @@ public class HaClusterSlave implements Callable<Object> {
                     }
                     ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
                     ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
-                    ch.pipeline().addLast(new HaClusterMessageDecoder(clusterConfig.isLogging()));
-                    ch.pipeline().addLast(new HaClusterMessageEncoder(clusterConfig.isLogging()));
-                    ch.pipeline().addLast(new HaClusterSlaveHandler(slaveService, cluster));
+                    ch.pipeline().addLast(new ClusterMessageDecoder(clusterConfig.isLogging()));
+                    ch.pipeline().addLast(new ClusterMessageEncoder(clusterConfig.isLogging()));
+                    ch.pipeline().addLast(new ClusterSlaveHandler(slaveService, cluster));
                     }
                 });
 
                 // 바인딩 로컬 포트 설정(설정파일에 등록된 바인딩 포트번호 + 서버 ID), 따라서 하나의 서버에 여러 개의 클러스터 서버가 존재할 수 있다.
                 // 주의) 하나의 서버에 여러 개의 클러스터 서버가 존재할 경우, 바인딩 포트는 위의 규칙에 속하지 않는 포트번호를 할당해야 한다.
-                this.bootstrap.localAddress(new InetSocketAddress(this.port + this.clusterConfig.getServerId()));
+                this.bootstrap.localAddress(new InetSocketAddress(this.port + this.clusterConfig.getId()));
             }
 
             if (this.clusterConfig.isLogging()) {
-                log.info("HaClusterSlave: >>Connect Try: [{}, {}], {}", this.cluster.getServerId(), this.ipAddress, this.port);
+                log.info("ClusterSlave: >>Connect Try: [{}, {}], {}", this.cluster.getId(), this.ipAddress, this.port);
             }
 
             if (this.channelFuture != null && this.channelFuture.channel() != null) {
@@ -121,15 +121,15 @@ public class HaClusterSlave implements Callable<Object> {
             Channel channel = future.channel();
 
             if (this.clusterConfig.isLogging()) {
-                log.info("HaClusterSlave: channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
+                log.info("ClusterSlave: channelOpen: [{}, {}], {}, Channel: {}", this.cluster.getId(), this.ipAddress, this.port, channel);
             }
 
-            channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
+            channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.cluster);
             this.cluster.getSyncState().connect(channel);
         }
         else {
             if (this.clusterConfig.isLogging()) {
-                log.warn("HaClusterSlave: ConnectFailed: [{}, {}], {}, Cause: {}", cluster.getServerId(), cluster.getIpAddress(), cluster.getSyncPort(), future.cause().getMessage());
+                log.warn("ClusterSlave: ConnectFailed: [{}, {}], {}, Cause: {}", cluster.getId(), cluster.getIp(), cluster.getPort(), future.cause().getMessage());
             }
         }
 
@@ -148,12 +148,12 @@ public class HaClusterSlave implements Callable<Object> {
 
         if (this.clusterConfig.isLogging()) {
             MDC.put("id", this.cluster.getLogKey());
-            log.warn("ClusterSlave:channelClosed: [{}, {}], {}, Channel: {}", this.cluster.getServerId(), this.ipAddress, this.port, channel);
+            log.warn("ClusterSlave:channelClosed: [{}, {}], {}, Channel: {}", this.cluster.getId(), this.ipAddress, this.port, channel);
             MDC.remove(this.cluster.getLogKey());
             MDC.clear();
         }
 
-        channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
+        channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
         this.cluster.getSyncState().disConnect();
         channel.close();
         channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);

+ 4 - 5
its-cluster/src/main/java/com/its/common/cluster/service/HaClusterSlaveBootstrapFactory.java → its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlaveBootstrapFactory.java

@@ -1,29 +1,28 @@
 package com.its.common.cluster.service;
 
-import com.its.common.cluster.utils.HaUtils;
+import com.its.common.cluster.utils.ClusterUtils;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 
 @RequiredArgsConstructor
-public class HaClusterSlaveBootstrapFactory {
+public class ClusterSlaveBootstrapFactory {
     private final int workerThread;
     private final int connectTimeout;
     private EventLoopGroup nioEventLoopGroup = null;
 
     public Bootstrap createBootstrap() {
         if (this.nioEventLoopGroup == null) {
-            this.nioEventLoopGroup = HaUtils.newEventLoopGroup(this.workerThread, "itsClusterEventGroup");
+            this.nioEventLoopGroup = ClusterUtils.newEventLoopGroup(this.workerThread, "itsClusterEventGroup");
             //new NioEventLoopGroup(this.workerThread);  //EpollEventLoopGroup
         }
         Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(this.nioEventLoopGroup);
 
-        if (HaUtils.isEpollAvailable()) {
+        if (ClusterUtils.isEpollAvailable()) {
             bootstrap.channel(EpollSocketChannel.class);
         }
         else {

+ 4 - 4
its-cluster/src/main/java/com/its/common/cluster/utils/HaClusterEvenAllocator.java → its-cluster/src/main/java/com/its/common/cluster/utils/ClusterEvenAllocator.java

@@ -5,12 +5,12 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.*;
 
 @Slf4j
-public final class HaClusterEvenAllocator {
+public final class ClusterEvenAllocator {
 
     private final List<String> servers;
     private final Map<String, List<String>> allocation;
 
-    public HaClusterEvenAllocator() {
+    public ClusterEvenAllocator() {
         this.servers = new ArrayList<>();
         this.allocation = new HashMap<>();
     }
@@ -173,14 +173,14 @@ public final class HaClusterEvenAllocator {
         );
 
 //        Map<String, List<String>> allocation = HaClusterEvenAllocator.allocate(serverList, clientList);
-        Map<String, List<String>> allocation = HaClusterEvenAllocator.sequenceAllocate(serverList, clientList);
+        Map<String, List<String>> allocation = ClusterEvenAllocator.sequenceAllocate(serverList, clientList);
 
         // 결과 출력
         allocation.forEach((server, assignedClients) -> {
             log.info("server {} -> clients: {}", server, assignedClients);
         });
 
-        HaClusterEvenAllocator allocator = new HaClusterEvenAllocator();
+        ClusterEvenAllocator allocator = new ClusterEvenAllocator();
         serverList.forEach(allocator::addServer);
         allocator.allocate(clientList);
         String server = "Server-1";

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/HaClusterHashAllocator.java → its-cluster/src/main/java/com/its/common/cluster/utils/ClusterHashAllocator.java

@@ -8,11 +8,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class HaClusterHashAllocator {
+public class ClusterHashAllocator {
     private TreeMap<Integer, String> hashRing = new TreeMap<>();
     private List<String> servers;
 
-    public HaClusterHashAllocator(List<String> servers) {
+    public ClusterHashAllocator(List<String> servers) {
         this.servers = servers;
         initializeHashRing();
     }

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/HaPlatform.java → its-cluster/src/main/java/com/its/common/cluster/utils/ClusterPlatform.java

@@ -1,9 +1,9 @@
 package com.its.common.cluster.utils;
 
-public class HaPlatform {
+public class ClusterPlatform {
     private static final String OS = System.getProperty("os.name").toLowerCase();
 
-    private HaPlatform() {
+    private ClusterPlatform() {
     }
 
     public static boolean isWindows() {

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/utils/HaUtils.java → its-cluster/src/main/java/com/its/common/cluster/utils/ClusterUtils.java

@@ -17,12 +17,12 @@ import java.net.*;
 import java.util.ArrayList;
 import java.util.Enumeration;
 
-public class HaUtils {
+public class ClusterUtils {
     public static final String OS_NAME = System.getProperty("os.name");
     private static boolean isLinuxPlatform = false;
     private static boolean isWindowsPlatform = false;
 
-    private HaUtils() {
+    private ClusterUtils() {
     }
     public static String getAddress(Channel ch) {
         String localIp = "local-unknown";

+ 236 - 0
its-cluster/src/main/java/com/its/common/cluster/vo/AbstractClusterConfig.java

@@ -0,0 +1,236 @@
+package com.its.common.cluster.vo;
+
+import io.netty.util.AttributeKey;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Data
+public abstract class AbstractClusterConfig {
+
+    public static final AttributeKey<ClusterNode> CLUSTER_ATTRIBUTE_KEY = AttributeKey.valueOf("clusterInfo");
+
+    private boolean enabled = false;        // 클러스터 기능 사용 여부
+    // 어플리케이션 클러스터 정보
+    private boolean master = false;
+    private int id = -1;        // 서버 ID (1부터 시작, 0은 사용하지 않음)
+    private int syncSeconds = -1;           // 데이터 동기화 주기 (초 단위, 최소 5초, 최대 60초)
+    private String ip = "127.0.0.1";          // 클러스터 서버의 IP 주소
+    private int port = 13888;      // 데이터 동기화를 위한 포트
+    private boolean logging = false;        // 라이브러리 내 로깅 여부
+    private boolean packetLogging = false;  // 패킷 로깅 여부
+    private List<ClusterNode> nodes;
+
+
+    private final HashMap<Integer, ClusterNode> clusterMap = new HashMap<>();
+
+    @PostConstruct
+    private void init() throws IOException {
+//        loadClusterConfig();
+    }
+
+    private void setDefaults() {
+        this.master = true;
+        this.id = 1;
+        this.syncSeconds = 5;
+        this.ip = "127.0.0.1";
+        this.port = 13888;
+        this.logging = false;
+        this.packetLogging = false;
+        this.nodes = new ArrayList<>();
+    }
+
+    public boolean validateClusterInfo() {
+        if (!this.enabled) {
+            setDefaults();
+            return true;
+        }
+
+        if (this.nodes == null || this.nodes.isEmpty()) {
+            log.error("클러스터 노드 리스트가 비어 있습니다.");
+            return false;
+        }
+
+        // ID만 추출해서 정렬
+        List<Integer> ids = this.nodes.stream()
+                .map(ClusterNode::getId)
+                .sorted()
+                .collect(Collectors.toList());
+
+        // 중복 제거 후 크기 비교
+        Set<Integer> uniqueIds = new HashSet<>(ids);
+        if (uniqueIds.size() != ids.size()) {
+            log.error("클러스터 노드 ID에 중복이 있습니다: {}", ids);
+            return false;
+        }
+
+        // 순차성 검증: 1부터 시작해서 1씩 증가해야 함
+        for (int ii = 0; ii < ids.size(); ii++) {
+            int expected = ii + 1;
+            if (ids.get(ii) != expected) {
+                log.error("클러스터 노드 ID가 순차적이지 않습니다. 예상: {}, 실제: {}", expected, ids.get(ii));
+                return false;
+            }
+        }
+
+        // 검증 통과, 클러스터 정보 설정
+        if (this.syncSeconds < 2) {
+            this.syncSeconds = 2;
+        }
+        if (this.syncSeconds > 60) {
+            this.syncSeconds = 60;
+        }
+        // Master and host information setting
+        int masterId = Integer.MAX_VALUE;
+        for (ClusterNode node : this.nodes) {
+            this.clusterMap.put(node.getId(), node);
+            int nodeId = node.getId();
+            if (nodeId == this.id) {
+                this.ip = node.getIp();
+                this.port = node.getPort();
+            }
+
+            if (nodeId < masterId) {
+                masterId = nodeId;
+            }
+        }
+        // Master setting
+        ClusterNode masterNode = this.clusterMap.get(masterId);
+        masterNode.setMaster(true);
+        this.master = (this.id == masterId);
+        return true;
+    }
+
+    public ClusterNode get(String ipAddress) {
+        for (Map.Entry<Integer, ClusterNode> entry : this.clusterMap.entrySet()) {
+            ClusterNode cluster = entry.getValue();
+            if (cluster.getIp().equals(ipAddress)) {
+                return cluster;
+            }
+        }
+        return null;
+    }
+
+    private int parseConfigData(String data) {
+        try {
+            return Integer.parseInt(data);
+        }
+        catch (NumberFormatException e) {
+            return -1;
+        }
+    }
+
+//    private String getStringValue(String item, String defValue) throws IOException {
+//        if (!this.enabled) {
+//            return defValue;
+//        }
+//        Pattern stringPattern = Pattern.compile(item + "=([a-zA-Z0-9]+)");
+//        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
+//            String line;
+//            while ((line = reader.readLine()) != null) {
+//                Matcher matcher = stringPattern.matcher(line);
+//                if (matcher.matches()) {
+//                    return matcher.group(1);
+//                }
+//            }
+//        }
+//        return defValue;
+//    }
+
+//    private int getIntValue(String item, int defValue) throws IOException {
+//        if (!this.enabled) {
+//            return defValue;
+//        }
+//        Pattern serverIdPattern = Pattern.compile(item+"=(\\d+)");
+//        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
+//            String line;
+//            while ((line = reader.readLine()) != null) {
+//                Matcher matcher = serverIdPattern.matcher(line);
+//                if (matcher.matches()) {
+//                    return parseConfigData(matcher.group(1));
+//                }
+//            }
+//        }
+//        return defValue;
+//    }
+
+//    private void setServerId() throws IOException {
+//        Pattern serverIdPattern = Pattern.compile("server\\.id=(\\d+)");
+//        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
+//            String line;
+//            while ((line = reader.readLine()) != null) {
+//                Matcher matcher = serverIdPattern.matcher(line);
+//                if (matcher.matches()) {
+//                    this.serverId = parseConfigData(matcher.group(1));
+//                    break;
+//                }
+//            }
+//        }
+//    }
+
+//    private void loadClusterConfig() throws IOException {
+//        this.serverId = getIntValue("server.id", 1);
+//        this.syncSeconds = getIntValue("syncSeconds", 5);
+//        if (this.syncSeconds < 2) {
+//            this.syncSeconds = 2;
+//        }
+//        if (this.syncSeconds > 60) {
+//            this.syncSeconds = 60;
+//        }
+//        if (!this.enabled) {
+//            return;
+//        }
+//        Pattern serverPattern = Pattern.compile("server\\.(\\d+)=([\\d\\.]+):(\\d+)");
+//        Pattern pattern = Pattern.compile("server\\.(\\d+)=([\\d\\.]+):(\\d+):(\\d+)");
+//        int masterId = Integer.MAX_VALUE;
+//        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
+//            String line;
+//            while ((line = reader.readLine()) != null) {
+//                if (line.startsWith("server.")) {
+//                    Matcher matcher = serverPattern.matcher(line);
+//                    if (matcher.matches()) {
+//                        int serverId = parseConfigData(matcher.group(1));
+//                        String ipAddress = matcher.group(2);
+//                        int syncPort = parseConfigData(matcher.group(3));
+//
+//                        if (serverId == this.serverId) {
+//                            this.ipAddress = ipAddress;
+//                            this.syncPort = syncPort;
+//                        }
+//                        if (serverId < masterId) {
+//                            masterId = serverId;
+//                        }
+//                        ClusterNode haCluster = ClusterNode.builder()
+//                                .master(false)
+//                                .id(serverId)
+//                                .ip(ipAddress)
+//                                .syncPort(syncPort)
+//                                .logging(this.logging)
+//                                .electionState(new ClusterNetState())
+//                                .syncState(new ClusterNetState())
+//                                .build();
+//                        this.clusterMap.put(haCluster.getId(), haCluster);
+//                        log.info("{}", haCluster);
+//                    }
+//                }
+//            }
+//        }
+//        if (this.serverId == masterId) {
+//            this.master = true;
+//        }
+//
+//        for (Map.Entry<Integer, ClusterNode> entry : this.clusterMap.entrySet()) {
+//            ClusterNode cluster = entry.getValue();
+//            if (cluster.getId() == masterId) {
+//                cluster.setMaster(true);
+//                break;
+//            }
+//        }
+//    }
+
+}

+ 0 - 170
its-cluster/src/main/java/com/its/common/cluster/vo/AbstractHaClusterConfig.java

@@ -1,170 +0,0 @@
-package com.its.common.cluster.vo;
-
-import io.netty.util.AttributeKey;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-
-import javax.annotation.PostConstruct;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-@Slf4j
-@Data
-public abstract class AbstractHaClusterConfig {
-
-    public static final AttributeKey<HaInfo> CLUSTER_ATTRIBUTE_KEY = AttributeKey.valueOf("clusterInfo");
-
-    private boolean master = false;
-
-    // 클러스터 파일에서 읽어오는 정보(xxx.cfg)
-    private int serverId = -1;      // 서버 ID (1부터 시작, 0은 사용하지 않음)
-    private int syncSeconds = -1;   // 데이터 동기화 주기 (초 단위, 최소 5초, 최대 60초)
-    private String ipAddress;       // 클러스터 서버의 IP 주소
-    private int syncPort = -1;      // 데이터 동기화를 위한 포트
-
-    // application.yml 에 설정되는 정보(application.ha-cluster)
-    private boolean enabled = false;        // 클러스터 기능 사용 여부
-    private boolean logging = false;        // 라이브러리 내 로깅 여부
-    private boolean packetLogging = false;  // 패킷 로깅 여부
-    private String configFile;              // 클러스터 설정 파일 경로
-
-    private final HashMap<Integer, HaInfo> clusterMap = new HashMap<>();
-
-    @PostConstruct
-    private void init() throws IOException {
-        loadClusterConfig();
-    }
-
-    public HaInfo get(String ipAddress) {
-        for (Map.Entry<Integer, HaInfo> entry : this.clusterMap.entrySet()) {
-            HaInfo cluster = entry.getValue();
-            if (cluster.getIpAddress().equals(ipAddress)) {
-                return cluster;
-            }
-        }
-        return null;
-    }
-
-    private int parseConfigData(String data) {
-        try {
-            return Integer.parseInt(data);
-        }
-        catch (NumberFormatException e) {
-            return -1;
-        }
-    }
-
-    private String getStringValue(String item, String defValue) throws IOException {
-        if (!this.enabled) {
-            return defValue;
-        }
-        Pattern stringPattern = Pattern.compile(item + "=([a-zA-Z0-9]+)");
-        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
-            String line;
-            while ((line = reader.readLine()) != null) {
-                Matcher matcher = stringPattern.matcher(line);
-                if (matcher.matches()) {
-                    return matcher.group(1);
-                }
-            }
-        }
-        return defValue;
-    }
-
-    private int getIntValue(String item, int defValue) throws IOException {
-        if (!this.enabled) {
-            return defValue;
-        }
-        Pattern serverIdPattern = Pattern.compile(item+"=(\\d+)");
-        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
-            String line;
-            while ((line = reader.readLine()) != null) {
-                Matcher matcher = serverIdPattern.matcher(line);
-                if (matcher.matches()) {
-                    return parseConfigData(matcher.group(1));
-                }
-            }
-        }
-        return defValue;
-    }
-
-//    private void setServerId() throws IOException {
-//        Pattern serverIdPattern = Pattern.compile("server\\.id=(\\d+)");
-//        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
-//            String line;
-//            while ((line = reader.readLine()) != null) {
-//                Matcher matcher = serverIdPattern.matcher(line);
-//                if (matcher.matches()) {
-//                    this.serverId = parseConfigData(matcher.group(1));
-//                    break;
-//                }
-//            }
-//        }
-//    }
-
-    private void loadClusterConfig() throws IOException {
-        this.serverId = getIntValue("server.id", 1);
-        this.syncSeconds = getIntValue("syncSeconds", 5);
-        if (this.syncSeconds < 2) {
-            this.syncSeconds = 2;
-        }
-        if (this.syncSeconds > 60) {
-            this.syncSeconds = 60;
-        }
-        if (!this.enabled) {
-            return;
-        }
-        Pattern serverPattern = Pattern.compile("server\\.(\\d+)=([\\d\\.]+):(\\d+)");
-//        Pattern pattern = Pattern.compile("server\\.(\\d+)=([\\d\\.]+):(\\d+):(\\d+)");
-        int masterId = Integer.MAX_VALUE;
-        try (BufferedReader reader = new BufferedReader(new FileReader(this.configFile))) {
-            String line;
-            while ((line = reader.readLine()) != null) {
-                if (line.startsWith("server.")) {
-                    Matcher matcher = serverPattern.matcher(line);
-                    if (matcher.matches()) {
-                        int serverId = parseConfigData(matcher.group(1));
-                        String ipAddress = matcher.group(2);
-                        int syncPort = parseConfigData(matcher.group(3));
-
-                        if (serverId == this.serverId) {
-                            this.ipAddress = ipAddress;
-                            this.syncPort = syncPort;
-                        }
-                        if (serverId < masterId) {
-                            masterId = serverId;
-                        }
-                        HaInfo haCluster = HaInfo.builder()
-                                .master(false)
-                                .serverId(serverId)
-                                .ipAddress(ipAddress)
-                                .syncPort(syncPort)
-                                .logging(this.logging)
-                                .electionState(new HaNetState())
-                                .syncState(new HaNetState())
-                                .build();
-                        this.clusterMap.put(haCluster.getServerId(), haCluster);
-                        log.info("{}", haCluster);
-                    }
-                }
-            }
-        }
-        if (this.serverId == masterId) {
-            this.master = true;
-        }
-
-        for (Map.Entry<Integer, HaInfo> entry : this.clusterMap.entrySet()) {
-            HaInfo cluster = entry.getValue();
-            if (cluster.getServerId() == masterId) {
-                cluster.setMaster(true);
-                break;
-            }
-        }
-    }
-
-}

+ 3 - 3
its-cluster/src/main/java/com/its/common/cluster/vo/HaClusterMessage.java → its-cluster/src/main/java/com/its/common/cluster/vo/ClusterMessage.java

@@ -8,12 +8,12 @@ import java.util.List;
 
 @Builder
 @Data
-public class HaClusterMessage implements Serializable {
+public class ClusterMessage implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private int serverId;
+    private int nodeId;
     private boolean master;
     private String serverTime;
 
-    private List<HaClusterMessageData> infos;
+    private List<ClusterMessageData> infos;
 }

+ 1 - 4
its-cluster/src/main/java/com/its/common/cluster/vo/HaClusterMessageData.java → its-cluster/src/main/java/com/its/common/cluster/vo/ClusterMessageData.java

@@ -1,10 +1,7 @@
 package com.its.common.cluster.vo;
 
-import lombok.Builder;
-import lombok.Data;
-
 import java.io.Serializable;
 
-public interface HaClusterMessageData extends Serializable {
+public interface ClusterMessageData extends Serializable {
     String getType(); // 타입 구분을 위한 메서드 추가
 }

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/vo/HaNET.java → its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNET.java

@@ -1,8 +1,8 @@
 package com.its.common.cluster.vo;
 
-public final class HaNET {
+public final class ClusterNET {
 
-    private HaNET() {
+    private ClusterNET() {
         throw new IllegalStateException("HaNET class");
     }
 

+ 8 - 8
its-cluster/src/main/java/com/its/common/cluster/vo/HaNetState.java → its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNetState.java

@@ -8,7 +8,7 @@ import java.util.Calendar;
 import java.util.Date;
 
 @Data
-public class HaNetState {
+public class ClusterNetState {
 
     private int state;
     private Channel channel;
@@ -19,12 +19,12 @@ public class HaNetState {
     private Date connectTime;
     private Date disconnectTime;
 
-    public HaNetState() {
+    public ClusterNetState() {
         init();
     }
 
     public void init() {
-        this.state = HaNET.CLOSED;
+        this.state = ClusterNET.CLOSED;
         this.channel = null;
         this.connectCount = 0;
         this.lastRecvTime = 0;
@@ -48,7 +48,7 @@ public class HaNetState {
     }
 
     public void connect(Channel channel) {
-        this.state = HaNET.LOGIN_WAIT;
+        this.state = ClusterNET.LOGIN_WAIT;
         this.channel = channel;
         this.connectCount++;
         this.connectTime = new Date();
@@ -56,18 +56,18 @@ public class HaNetState {
         setLastRecvTime();
     }
     public void loginOk() {
-        this.state = HaNET.DATA_TRANS;
+        this.state = ClusterNET.DATA_TRANS;
     }
     public void disConnect() {
-        if (this.state != HaNET.CLOSED) {
+        if (this.state != ClusterNET.CLOSED) {
             this.disconnectTime = new Date();
         }
-        this.state = HaNET.CLOSED;
+        this.state = ClusterNET.CLOSED;
         this.channel = null;
         this.retryCount = 0;
     }
     public void terminate() {
-        this.state = HaNET.TERMINATE;
+        this.state = ClusterNET.TERMINATE;
     }
     public boolean isActive() {
         return this.channel != null && this.channel.isActive();

+ 26 - 0
its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNode.java

@@ -0,0 +1,26 @@
+package com.its.common.cluster.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ClusterNode {
+    private boolean master = false;
+    private int id = -1;
+    private String ip = "";
+    private int port = 13888;       // 포트 1: 데이터 동기화를 위한 포트
+    private int syncSeconds = 5;
+    private boolean logging = false;
+    private boolean packetLogging = false;
+
+    private ClusterNetState electionState = new ClusterNetState();
+    private ClusterNetState syncState = new ClusterNetState();
+
+    public String getLogKey() {
+        return String.valueOf(this.id);
+    }
+}

+ 0 - 21
its-cluster/src/main/java/com/its/common/cluster/vo/HaInfo.java

@@ -1,21 +0,0 @@
-package com.its.common.cluster.vo;
-
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder
-public class HaInfo {
-    private boolean master;
-    private int serverId;
-    private String ipAddress;
-    private int syncPort;       // 포트 1: 데이터 동기화를 위한 포트
-    private boolean logging;
-
-    private HaNetState electionState;
-    private HaNetState syncState;
-
-    public String getLogKey() {
-        return String.valueOf(this.serverId);
-    }
-}