Эх сурвалжийг харах

its-cluster code refectoring

shjung 2 долоо хоног өмнө
parent
commit
825d90b97f

+ 32 - 9
its-cluster/build.gradle

@@ -1,5 +1,6 @@
 plugins {
     id 'java'
+    id 'maven-publish'
 }
 
 group = 'com.its'
@@ -7,7 +8,6 @@ version = '0.0.1'
 
 sourceCompatibility = '1.8'
 targetCompatibility = '1.8'
-compileJava.options.encoding = 'UTF-8'
 
 repositories {
     mavenCentral()
@@ -17,18 +17,42 @@ dependencies {
     implementation 'io.netty:netty-all:4.1.52.Final'
 }
 
-processResources {
-    enabled = false
-}
+//processResources {
+//    enabled = false
+//}
 
-jar {
-    enabled = true
-}
+//jar {
+//    enabled = true
+//}
 
 test {
     useJUnitPlatform()
 }
 
+publishing {
+    publications {
+        mavenJava(MavenPublication) {
+            from components.java
+        }
+    }
+    repositories {
+        maven {
+            name = "customLocalRepo"
+            println "its-cluster library install mvn repository..."
+            def repoPath = org.gradle.internal.os.OperatingSystem.current().isWindows() ?
+                    "C:/java/repository" :
+                    "/Users/openvalue/Projects/java/repository"
+            url = uri(repoPath)
+        }
+
+        /*
+         * 참고: 더 표준적인 방법은 모든 프로젝트가 공유하는 기본 로컬 Maven 저장소(~/.m2/repository)를 사용하는 것입니다.
+         * 그렇게 하려면 위 'maven' 블록 대신 아래 'mavenLocal()'을 사용하면 됩니다.
+         * mavenLocal()
+         */
+    }
+}
+
 tasks.register('runInstallJarLibrary', Exec) {
     doFirst {
         println "its-cluster library install mvn repository..."
@@ -43,9 +67,8 @@ tasks.register('runInstallJarLibrary', Exec) {
 }
 jar.finalizedBy runInstallJarLibrary
 
-compileJava.options.encoding = 'UTF-8'
 tasks.withType(JavaCompile).configureEach {
+    options.encoding = 'UTF-8'
     options.compilerArgs << '-Xlint:unchecked'
     options.deprecation = true
-    options.encoding = 'UTF-8'
 }

+ 51 - 0
its-cluster/build.gradle.old

@@ -0,0 +1,51 @@
+plugins {
+    id 'java'
+}
+
+group = 'com.its'
+version = '0.0.1'
+
+sourceCompatibility = '1.8'
+targetCompatibility = '1.8'
+compileJava.options.encoding = 'UTF-8'
+
+repositories {
+    mavenCentral()
+}
+
+dependencies {
+    implementation 'io.netty:netty-all:4.1.52.Final'
+}
+
+processResources {
+    enabled = false
+}
+
+jar {
+    enabled = true
+}
+
+test {
+    useJUnitPlatform()
+}
+
+tasks.register('runInstallJarLibrary', Exec) {
+    doFirst {
+        println "its-cluster library install mvn repository..."
+        def os = org.gradle.internal.os.OperatingSystem.current()
+        workingDir = file('.')
+        if (os.isWindows()) {
+            commandLine 'cmd', '/C', 'start', 'install.bat'
+        } else {
+            commandLine 'sh', './install.sh'
+        }
+    }
+}
+jar.finalizedBy runInstallJarLibrary
+
+compileJava.options.encoding = 'UTF-8'
+tasks.withType(JavaCompile).configureEach {
+    options.compilerArgs << '-Xlint:unchecked'
+    options.deprecation = true
+    options.encoding = 'UTF-8'
+}

+ 10 - 0
its-cluster/src/main/java/com/its/common/cluster/config/AbstractClusterConfig.java

@@ -27,6 +27,10 @@ public abstract class AbstractClusterConfig {
     private int port = 13888;      // 데이터 동기화를 위한 포트
     private boolean logging = false;        // 라이브러리 내 로깅 여부
     private boolean packetLogging = false;  // 패킷 로깅 여부
+    private int electionScheduleSeconds = 2;    // 마스터 선출 스케줄 실행 주기 (단위: 초, 기본값 2초)
+    private int connectTimeoutSeconds = 5;      // 클라이언트(Slave)가 서버(Master)에 연결을 시도할 때의 타임아웃 (단위: 초, 기본값 5초)
+    private boolean witnessNode = false;        // 이 서버 인스턴스가 Witness 노드인지 여부
+
     private List<ClusterNode> nodes;
     private ClusterNode myCluster;
 
@@ -37,6 +41,12 @@ public abstract class AbstractClusterConfig {
         if (this.nodes == null) {
             this.nodes = new ArrayList<>();
         }
+        if (this.electionScheduleSeconds < 2) {
+            this.electionScheduleSeconds = 2;
+        }
+        if (this.electionScheduleSeconds > 10) {
+            this.electionScheduleSeconds = 10;
+        }
     }
 
     public boolean isClusterAlive(int clusterId) {

+ 176 - 16
its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java

@@ -1,11 +1,11 @@
 package com.its.common.cluster.service;
 
-import com.its.common.cluster.vo.ClusterMessage;
+import com.its.common.cluster.config.AbstractClusterConfig;
 import com.its.common.cluster.utils.ClusterPlatform;
 import com.its.common.cluster.utils.ClusterUtils;
-import com.its.common.cluster.config.AbstractClusterConfig;
-import com.its.common.cluster.vo.ClusterNode;
+import com.its.common.cluster.vo.ClusterMessage;
 import com.its.common.cluster.vo.ClusterNET;
+import com.its.common.cluster.vo.ClusterNode;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -19,6 +19,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 
@@ -96,7 +98,10 @@ public abstract class AbstractClusterMasterService {
         serverBootstrap.option(ChannelOption.SO_BACKLOG, 2);
         serverBootstrap.option(ChannelOption.SO_RCVBUF, 65535);//config.getRcvBuf());
         serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
-        serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5*1000);
+
+        // serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5*1000);
+        int connectTimeoutMillis = this.clusterConfig.getConnectTimeoutSeconds() * 1000;
+        serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
 
         serverBootstrap.childOption(ChannelOption.SO_LINGER, 0);           // 4way-handshake 비활성
         serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false);    // KEEPALIVE 비활성(활성: true)
@@ -185,48 +190,203 @@ public abstract class AbstractClusterMasterService {
                 entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
             }
         }
-        return (minClusterNodeId >= this.clusterConfig.getId());
+        //return (minClusterNodeId >= this.clusterConfig.getId());
+        return (minClusterNodeId == this.clusterConfig.getId());    // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임
+    }
+
+    private boolean electionMasterQuorum() {
+        // --- 1. 과반수(Quorum) 계산 로직 추가 ---
+        int totalNodes = this.clusterConfig.getClusterMap().size();
+        // 클러스터가 2개 이하일 때는 과반수 로직이 의미 없으므로, 3개 이상일 때만 적용
+        int quorum = (totalNodes >= 3) ? (totalNodes / 2) + 1 : totalNodes;
+
+        // --- 2. 현재 연결된 노드 수를 세고, 가장 작은 ID를 찾는 로직 ---
+        int connectedNodesCount = 0;
+        int minClusterNodeId = Integer.MAX_VALUE;
+
+        for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            ClusterNode cluster = entry.getValue();
+            if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
+                // 연결된 노드 수 증가
+                connectedNodesCount++;
+                // 가장 작은 ID 찾기
+                if (cluster.getId() < minClusterNodeId) {
+                    minClusterNodeId = cluster.getId();
+                }
+            }
+        }
+
+        // --- 3. 과반수 검증 로직 추가 ---
+        // 만약 연결된 노드 수가 과반수에 미치지 못하면, 절대 Master가 될 수 없다.
+        if (connectedNodesCount < quorum) {
+            log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. Cannot become master.",
+                    this.clusterConfig.getId(), connectedNodesCount, quorum);
+            // Master ID를 유효하지 않은 값(-1)으로 설정하여 현재 Master가 없음을 명확히 함
+            this.clusterConfig.setMasterId(-1);
+            return false; // Master가 될 수 없음을 반환
+        }
+
+        // --- 4. 기존 로직 (과반수가 충족되었을 경우에만 실행) ---
+        if (minClusterNodeId == Integer.MAX_VALUE) {
+            minClusterNodeId = this.clusterConfig.getId();
+        }
+
+        if (this.clusterConfig.getMasterId() != minClusterNodeId) {
+            this.clusterConfig.setMasterId(minClusterNodeId);
+            for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                ClusterNode cluster = entry.getValue();
+                entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
+            }
+        }
+
+        // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임
+        return (minClusterNodeId == this.clusterConfig.getId());
+    }
+// 파일: src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java
+
+    /**
+     * 클러스터의 마스터를 선출하는 핵심 로직.
+     * 1. 과반수(Quorum) 규칙을 적용하여 스플릿 브레인을 방지합니다.
+     * 2. Witness 노드는 마스터 후보에서 제외합니다.
+     * @return 이 노드가 마스터가 되어야 하면 true, 아니면 false.
+     */
+    private boolean electionMasterWitness() {
+        // --- 1. 과반수(Quorum) 계산 ---
+        // 전체 노드 수를 기반으로 마스터가 되기 위해 필요한 최소 연결 노드 수를 계산.
+        int totalNodes = this.clusterConfig.getClusterMap().size();
+        // 클러스터가 2개 이하일 때는 과반수 로직이 큰 의미가 없지만, 일관성을 위해 규칙을 적용.
+        // (2개일 경우 Quorum=2, 1개일 경우 Quorum=1)
+        int quorum = (totalNodes / 2) + 1;
+
+        // --- 2. 연결된 노드 수와 마스터 후보(가장 작은 ID) 찾기 ---
+        int connectedNodesCount = 0;
+        int minClusterNodeId = Integer.MAX_VALUE;
+
+        for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            ClusterNode cluster = entry.getValue();
+
+            // 현재 노드가 연결된 상태인지 확인합니다.
+            if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
+                connectedNodesCount++; // 연결된 노드 수 증가
+
+                // [Witness 로직] 만약 노드가 Witness 노드라면, 마스터 후보에서 제외.
+                if (cluster.isWitness()) {
+                    continue; // 다음 노드로 넘어감
+                }
+
+                // Witness가 아닌 노드 중에서 가장 작은 ID를 찾습니다.
+                if (cluster.getId() < minClusterNodeId) {
+                    minClusterNodeId = cluster.getId();
+                }
+            }
+        }
+
+        // --- 3. 과반수(Quorum) 검증 ---
+        // 현재 연결된 노드 수가 과반수를 넘지 못하면, 클러스터는 마스터를 선출할 자격이 없음.
+        if (connectedNodesCount < quorum) {
+//            log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. No master will be elected.",
+//                    this.clusterConfig.getId(), connectedNodesCount, quorum);
+
+            // 클러스터에 유효한 마스터가 없음을 명시적으로 설정합니다.
+            if (this.clusterConfig.getMasterId() != -1) {
+                this.clusterConfig.setMasterId(-1);
+                for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                    entry.getValue().setMaster(false);
+                }
+            }
+            return false; // 마스터가 될 수 없음
+        }
+
+        // --- 4. 마스터 ID 업데이트 (과반수 충족 시) ---
+        // 만약 minClusterNodeId가 초기값 그대로라면(모든 노드가 Witness이거나 연결이 끊긴 경우),
+        // 유효한 마스터 후보가 없는 것이므로 -1로 설정.
+        if (minClusterNodeId == Integer.MAX_VALUE) {
+            minClusterNodeId = -1;
+        }
+
+        // 클러스터의 마스터 정보가 변경되었다면, 모든 노드에 전파.
+        if (this.clusterConfig.getMasterId() != minClusterNodeId) {
+            this.clusterConfig.setMasterId(minClusterNodeId);
+            for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                ClusterNode cluster = entry.getValue();
+                entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
+            }
+        }
+
+        // --- 5. 최종 결정 ---
+        // [Witness 로직] 이 코드를 실행하는 '나 자신'이 Witness 노드라면, 절대 마스터가 될 수 없습니다.
+        if (this.clusterConfig.isWitnessNode()) {
+            return false;
+        }
+
+        // 마스터로 선출된 노드 ID가 '나 자신'의 ID와 일치하는지 확인합니다.
+        // minClusterNodeId가 -1(유효한 후보 없음)인 경우, 이 조건은 항상 false가 됩니다.
+        return (minClusterNodeId == this.clusterConfig.getId());
     }
 
     private void electionMasterSchedule() {
         // 2초 주기로 실행되며 클러스터의 마스터/슬래이브 정보를 업데이트 함
         // 클러스터맵에는 나 자신의 정보가 포함되어 있음.
         // scheduleAtFixedRate ==> scheduleWithFixedDelay 로 변경(혹시 모를 작업 병목을 위해서)
-        this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, 2 * 1000L);
+        // this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, 2 * 1000L);
+        long scheduleMillis = this.clusterConfig.getElectionScheduleSeconds() * 1000L;
+        this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, scheduleMillis);
     }
 
     public void shutdown() {
-        log.info("ClusterNodeId: {}, ClusterMasterService.shutdown", this.clusterConfig.getId());
-        if (this.taskFuture != null) {
-            this.taskFuture.cancel(true);
+        log.info("ClusterNodeId: {}, ClusterMasterService shutdown process started.", this.clusterConfig.getId());
+
+        List<Throwable> shutdownErrors = new ArrayList<>();
+
+        try {
+            if (this.taskFuture != null) {
+                this.taskFuture.cancel(true);
+            }
+            this.taskScheduler.shutdown();
+        } catch (Exception e) {
+            shutdownErrors.add(new RuntimeException("taskFuture shutdown failed", e));
         }
-        this.taskScheduler.shutdown();
 
         try {
             if (this.acceptGroup != null) {
-                this.acceptGroup.shutdownGracefully();
+                // shutdownGracefully()는 Future를 반환하므로, await()로 완료를 기다릴 수 있습니다.
+                // this.acceptGroup.shutdownGracefully();
+                this.acceptGroup.shutdownGracefully().awaitUninterruptibly();
             }
         }
         catch (Exception e) {
-            log.error("ClusterNodeId: {}, ClusterMasterService.acceptGroup.shutdownGracefully", this.clusterConfig.getId());
+            shutdownErrors.add(new RuntimeException("acceptGroup shutdown failed", e));
         }
 
         try {
             if (this.workerGroup != null) {
-                this.workerGroup.shutdownGracefully();
+                // this.workerGroup.shutdownGracefully();
+                this.workerGroup.shutdownGracefully().awaitUninterruptibly();
             }
         }
         catch (Exception e) {
-            log.error("ClusterNodeId: {}, ClusterMasterService.workerGroup.shutdownGracefully", this.clusterConfig.getId());
+            shutdownErrors.add(new RuntimeException("workerGroup shutdown failed", e));
         }
 
         try {
             if (this.channelFuture != null && this.channelFuture.channel() != null) {
-                this.channelFuture.channel().closeFuture();
+                // this.channelFuture.channel().closeFuture();
+                this.channelFuture.channel().close().awaitUninterruptibly();
             }
         }
         catch (Exception e) {
-            log.error("ClusterNodeId: {}, ClusterMasterService.closeFuture", this.clusterConfig.getId());
+            shutdownErrors.add(new RuntimeException("channelFuture closure failed", e));
+        }
+
+        if (!shutdownErrors.isEmpty()) {
+            log.error("ClusterNodeId: {}, ClusterMasterService shutdown encountered {} error(s).",
+                    this.clusterConfig.getId(), shutdownErrors.size());
+            // 각 예외를 상세히 로깅합니다.
+            for (int ii = 0; ii < shutdownErrors.size(); ii++) {
+                log.error("Shutdown error #{}: {}", ii + 1, shutdownErrors.get(ii).getMessage(), shutdownErrors.get(ii));
+            }
+        } else {
+            log.info("ClusterNodeId: {}, ClusterMasterService shutdown completed gracefully.", this.clusterConfig.getId());
         }
     }
 }

+ 53 - 34
its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterSlaveService.java

@@ -39,7 +39,8 @@ public abstract class AbstractClusterSlaveService {
 
     @PostConstruct
     void init() {
-        this.bootstrapFactory = new ClusterSlaveBootstrapFactory(1, 5);
+        //this.bootstrapFactory = new ClusterSlaveBootstrapFactory(1, 5);
+        this.bootstrapFactory = new ClusterSlaveBootstrapFactory(1, this.clusterConfig.getConnectTimeoutSeconds());
         this.taskScheduler.setPoolSize(1);
         this.taskScheduler.initialize();
     }
@@ -77,39 +78,6 @@ public abstract class AbstractClusterSlaveService {
         }
     }
 
-    public void shutdown() {
-        if (this.taskFuture != null) {
-            this.taskFuture.cancel(true);
-        }
-        this.taskScheduler.shutdown();
-
-        for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
-            ClusterNode cluster = entry.getValue();
-            if (cluster.getId() == this.clusterConfig.getId()) {
-                continue;
-            }
-            channelClose(cluster.getSyncState().getChannel());
-        }
-        try {
-            if (this.bootstrapFactory != null && this.bootstrapFactory.getEventLoopGroup() != null) {
-                this.bootstrapFactory.getEventLoopGroup().shutdownGracefully();
-            }
-        }
-        catch (Exception e) {
-            log.info("ClusterNodeId: {}, ClusterSlaveService.shutdownGracefully", this.clusterConfig.getId());
-        }
-    }
-
-    private void channelClose(Channel channel) {
-        try {
-            if (channel != null) {
-                channel.close();
-            }
-        } catch (Exception e) {
-            log.error("ClusterNodeId: {}, ClusterSlaveService.channelClose", this.clusterConfig.getId());
-        }
-    }
-
     public abstract List<ClusterMessageData> getClusterMessageData();
 
     private ClusterMessage getClusterMessage() {
@@ -257,4 +225,55 @@ public abstract class AbstractClusterSlaveService {
         }
     }
 
+    public void shutdown() {
+        log.info("ClusterNodeId: {}, ClusterSlaveService shutdown process started.", this.clusterConfig.getId());
+
+        List<Throwable> shutdownErrors = new ArrayList<>();
+
+        try {
+            if (this.taskFuture != null) {
+                this.taskFuture.cancel(true);
+            }
+            this.taskScheduler.shutdown();
+        }
+        catch (Exception e) {
+            shutdownErrors.add(new RuntimeException("taskFuture shutdown failed", e));
+        }
+
+        for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+            ClusterNode cluster = entry.getValue();
+            if (cluster.getId() == this.clusterConfig.getId()) {
+                continue;
+            }
+            Channel channel = cluster.getSyncState().getChannel();
+            if (channel != null) {
+                try {
+                    channel.close().awaitUninterruptibly();
+                } catch (Exception e) {
+                    // 어떤 채널에서 오류가 났는지 명시해주면 디버깅에 더 유리합니다.
+                    shutdownErrors.add(new RuntimeException("Failed to close channel for cluster node " + cluster.getId(), e));
+                }
+            }
+        }
+
+        try {
+            if (this.bootstrapFactory != null && this.bootstrapFactory.getEventLoopGroup() != null) {
+                // this.bootstrapFactory.getEventLoopGroup().shutdownGracefully();
+                this.bootstrapFactory.getEventLoopGroup().shutdownGracefully().awaitUninterruptibly();
+            }
+        } catch (Exception e) {
+            shutdownErrors.add(new RuntimeException("bootstrapFactory.getEventLoopGroup() shutdown failed", e));
+        }
+
+        if (!shutdownErrors.isEmpty()) {
+            log.error("ClusterNodeId: {}, ClusterSlaveService shutdown encountered {} error(s).",
+                    this.clusterConfig.getId(), shutdownErrors.size());
+            for (int ii = 0; ii < shutdownErrors.size(); ii++) {
+                log.error("Shutdown error #{}: {}", ii + 1, shutdownErrors.get(ii).getMessage(), shutdownErrors.get(ii));
+            }
+        } else {
+            log.info("ClusterNodeId: {}, ClusterSlaveService shutdown completed gracefully.", this.clusterConfig.getId());
+        }
+    }
+
 }

+ 4 - 2
its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlaveBootstrapFactory.java

@@ -11,7 +11,7 @@ import lombok.RequiredArgsConstructor;
 @RequiredArgsConstructor
 public class ClusterSlaveBootstrapFactory {
     private final int workerThread;
-    private final int connectTimeout;
+    private final int connectTimeoutSeconds;
     private EventLoopGroup nioEventLoopGroup = null;
 
     public Bootstrap createBootstrap() {
@@ -34,7 +34,9 @@ public class ClusterSlaveBootstrapFactory {
         bootstrap.option(ChannelOption.SO_SNDBUF, 65536);
         bootstrap.option(ChannelOption.SO_KEEPALIVE, false);
         //bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048));
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout * 1000);
+
+        int connectTimeoutMillis = this.connectTimeoutSeconds * 1000;
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
         return bootstrap;
     }
 

+ 1 - 0
its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNode.java

@@ -16,6 +16,7 @@ public class ClusterNode {
     private int port = 13888;       // 포트 1: 데이터 동기화를 위한 포트
     private boolean logging = false;
     private boolean packetLogging = false;
+    private boolean witness = false;        // Witness 노드인지 여부
 
     private ClusterNetState electionState = new ClusterNetState();  // 서버로 동작하며 클라이언트의 네트워크 상태를 저장
     private ClusterNetState syncState = new ClusterNetState();      // 클라이언트로 동작하며 서버와의 네트워크 상태를 저장