package com.its.common.cluster.service; import com.its.common.cluster.config.AbstractClusterConfig; import com.its.common.cluster.utils.ClusterPlatform; import com.its.common.cluster.utils.ClusterUtils; import com.its.common.cluster.vo.ClusterMessage; import com.its.common.cluster.vo.ClusterNET; import com.its.common.cluster.vo.ClusterNode; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.nio.NioEventLoopGroup; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; @Slf4j @RequiredArgsConstructor public abstract class AbstractClusterMasterService { private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); @Getter protected final AbstractClusterConfig clusterConfig; private ScheduledFuture taskFuture; private EventLoopGroup acceptGroup; private EventLoopGroup workerGroup; private ChannelFuture channelFuture; @PostConstruct void init() { this.taskScheduler.setPoolSize(1); this.taskScheduler.initialize(); } public void start() { if (!ClusterPlatform.isWindows()) { if (!Epoll.isAvailable()) { // 윈도우 서버가 아닌데 EPOLL 을 사용할 수 없으면 오류 메시지 표출 log.warn("ClusterNodeId: {}, Is not Windows system but can't use epoll networking: {}", this.clusterConfig.getId(), Epoll.unavailabilityCause().getMessage()); } } if (ClusterUtils.isEpollAvailable()) { log.info("ClusterNodeId: {}, The Cluster Master runs in LINUX EPOLL mode.", this.clusterConfig.getId()); } else { log.info("ClusterNodeId: {}, The Cluster Master runs in Windows NIO mode.", this.clusterConfig.getId()); } this.acceptGroup = new NioEventLoopGroup(); this.workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = createBootstrap(); log.info("*********************************************************************************"); log.info("** UTIC Center Cluster Master Server Information **"); log.info("** bindAddress: {}", this.clusterConfig.getIp()); log.info("** listenPort: {}", this.clusterConfig.getPort()); log.info("** clusterNodeId: {}", this.clusterConfig.getId()); log.info("** isMaster: {}", this.clusterConfig.isMaster()); log.info("*********************************************************************************"); try { if (this.clusterConfig.getIp().equals("0.0.0.0")) { this.channelFuture = serverBootstrap.bind(this.clusterConfig.getPort()); } else { this.channelFuture = serverBootstrap.bind(this.clusterConfig.getIp(), this.clusterConfig.getPort()); } electionMasterSchedule(); } catch (Exception e) { log.error("ClusterNodeId: {}, cluster start, InterruptedException", this.clusterConfig.getId()); shutdown(); } } public ServerBootstrap createBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap(); EventLoopGroup acceptGroups; EventLoopGroup workerGroups; acceptGroups = ClusterUtils.newEventLoopGroup(1, "Accept"); workerGroups = ClusterUtils.newEventLoopGroup(1, "Worker"); serverBootstrap.channel(ClusterUtils.getServerSocketChannel()); serverBootstrap.group(acceptGroups, workerGroups); serverBootstrap.option(ChannelOption.AUTO_READ, true); serverBootstrap.option(ChannelOption.SO_BACKLOG, 2); serverBootstrap.option(ChannelOption.SO_RCVBUF, 65535);//config.getRcvBuf()); serverBootstrap.option(ChannelOption.SO_REUSEADDR, true); // serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5*1000); int connectTimeoutMillis = this.clusterConfig.getConnectTimeoutSeconds() * 1000; serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis); serverBootstrap.childOption(ChannelOption.SO_LINGER, 0); // 4way-handshake 비활성 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false); // KEEPALIVE 비활성(활성: true) serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true); // 소켓 재사용 serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); // Nagle 알고리즘 비활성화 serverBootstrap.childHandler(new ClusterMasterInitializer(this, this.clusterConfig)); return serverBootstrap; } // 추상 클래스 선언... public abstract void election(int clusterId, boolean isMaster, boolean isMasterChanged); public abstract void onClusterMessage(ClusterMessage message); public abstract void onClusterChannelActive(ClusterNode clusterNode); public abstract void onClusterChannelInactive(ClusterNode clusterNode); public void onNotifyClusterNetworkState(ClusterNode clusterNode, boolean isActive) { log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, isNetActive; {}", this.clusterConfig.getId(), clusterNode.getId(), isActive); // slave의 네트워크 상태를 확인하고 slave 네트워크의 연결을 종료시킨다. if (!isActive && clusterNode.getSyncState().getState() != ClusterNET.CLOSED) { Channel channel = clusterNode.getSyncState().getChannel(); // log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, Try SlaveNetwork Close. channel: {}", // this.clusterConfig.getId(), clusterNode.getId(), channel); if (channel != null) { channel.flush(); channel.disconnect(); channel.close(); } clusterNode.getSyncState().disConnect(); } // 연결 수립/종료시 즉각 마스터를 선출한다. electionMasterCluster(); if (isActive) { onClusterChannelActive(clusterNode); } else { onClusterChannelInactive(clusterNode); } } public synchronized void electionMasterCluster() { boolean isMaster = electionMaster(); boolean isChanged = this.clusterConfig.isMaster() != isMaster; this.clusterConfig.setMaster(isMaster); if (isChanged) { log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Changed Master: {}, masterClusterNodeId: {}", this.clusterConfig.getId(), this.clusterConfig.isMaster(), this.clusterConfig.getMasterId()); } if (this.clusterConfig.isLogging()) { log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Master: {}.", this.clusterConfig.getId(), this.clusterConfig.isMaster()); } election(this.clusterConfig.getId(), this.clusterConfig.isMaster(), isChanged); } private boolean electionMaster() { int minClusterNodeId = Integer.MAX_VALUE; for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { ClusterNode cluster = entry.getValue(); // 마스터에 연결된 슬래이브 클러스터의 네트워크 상태를 기준으로 평가함. if (cluster.getSyncState().getState() != ClusterNET.CLOSED) { if (cluster.getId() < minClusterNodeId) { minClusterNodeId = cluster.getId(); } } } if (minClusterNodeId == Integer.MAX_VALUE) { minClusterNodeId = this.clusterConfig.getId(); } // 위의 클러스터의 나 자신의 네트워크 상태는 CLOSED로 되어 있기 때문에 나자신보다 현재 마스터 ID가 // 현재 masterId 가 나 자신의 clusterId보다 같거나 크면 나 자신이 master가 된 것임 if (this.clusterConfig.getMasterId() != minClusterNodeId) { // master id가 변경되었으므로 클러스터 노드 정보도 업데이트 한다 this.clusterConfig.setMasterId(minClusterNodeId); for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { ClusterNode cluster = entry.getValue(); entry.getValue().setMaster(cluster.getId() == minClusterNodeId); } } //return (minClusterNodeId >= this.clusterConfig.getId()); return (minClusterNodeId == this.clusterConfig.getId()); // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임 } private boolean electionMasterQuorum() { // --- 1. 과반수(Quorum) 계산 로직 추가 --- int totalNodes = this.clusterConfig.getClusterMap().size(); // 클러스터가 2개 이하일 때는 과반수 로직이 의미 없으므로, 3개 이상일 때만 적용 int quorum = (totalNodes >= 3) ? (totalNodes / 2) + 1 : totalNodes; // --- 2. 현재 연결된 노드 수를 세고, 가장 작은 ID를 찾는 로직 --- int connectedNodesCount = 0; int minClusterNodeId = Integer.MAX_VALUE; for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { ClusterNode cluster = entry.getValue(); if (cluster.getSyncState().getState() != ClusterNET.CLOSED) { // 연결된 노드 수 증가 connectedNodesCount++; // 가장 작은 ID 찾기 if (cluster.getId() < minClusterNodeId) { minClusterNodeId = cluster.getId(); } } } // --- 3. 과반수 검증 로직 추가 --- // 만약 연결된 노드 수가 과반수에 미치지 못하면, 절대 Master가 될 수 없다. if (connectedNodesCount < quorum) { log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. Cannot become master.", this.clusterConfig.getId(), connectedNodesCount, quorum); // Master ID를 유효하지 않은 값(-1)으로 설정하여 현재 Master가 없음을 명확히 함 this.clusterConfig.setMasterId(-1); return false; // Master가 될 수 없음을 반환 } // --- 4. 기존 로직 (과반수가 충족되었을 경우에만 실행) --- if (minClusterNodeId == Integer.MAX_VALUE) { minClusterNodeId = this.clusterConfig.getId(); } if (this.clusterConfig.getMasterId() != minClusterNodeId) { this.clusterConfig.setMasterId(minClusterNodeId); for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { ClusterNode cluster = entry.getValue(); entry.getValue().setMaster(cluster.getId() == minClusterNodeId); } } // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임 return (minClusterNodeId == this.clusterConfig.getId()); } // 파일: src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java /** * 클러스터의 마스터를 선출하는 핵심 로직. * 1. 과반수(Quorum) 규칙을 적용하여 스플릿 브레인을 방지합니다. * 2. Witness 노드는 마스터 후보에서 제외합니다. * @return 이 노드가 마스터가 되어야 하면 true, 아니면 false. */ private boolean electionMasterWitness() { // --- 1. 과반수(Quorum) 계산 --- // 전체 노드 수를 기반으로 마스터가 되기 위해 필요한 최소 연결 노드 수를 계산. int totalNodes = this.clusterConfig.getClusterMap().size(); // 클러스터가 2개 이하일 때는 과반수 로직이 큰 의미가 없지만, 일관성을 위해 규칙을 적용. // (2개일 경우 Quorum=2, 1개일 경우 Quorum=1) int quorum = (totalNodes / 2) + 1; // --- 2. 연결된 노드 수와 마스터 후보(가장 작은 ID) 찾기 --- int connectedNodesCount = 0; int minClusterNodeId = Integer.MAX_VALUE; for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { ClusterNode cluster = entry.getValue(); // 현재 노드가 연결된 상태인지 확인합니다. if (cluster.getSyncState().getState() != ClusterNET.CLOSED) { connectedNodesCount++; // 연결된 노드 수 증가 // [Witness 로직] 만약 노드가 Witness 노드라면, 마스터 후보에서 제외. if (cluster.isWitness()) { continue; // 다음 노드로 넘어감 } // Witness가 아닌 노드 중에서 가장 작은 ID를 찾습니다. if (cluster.getId() < minClusterNodeId) { minClusterNodeId = cluster.getId(); } } } // --- 3. 과반수(Quorum) 검증 --- // 현재 연결된 노드 수가 과반수를 넘지 못하면, 클러스터는 마스터를 선출할 자격이 없음. if (connectedNodesCount < quorum) { // log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. No master will be elected.", // this.clusterConfig.getId(), connectedNodesCount, quorum); // 클러스터에 유효한 마스터가 없음을 명시적으로 설정합니다. if (this.clusterConfig.getMasterId() != -1) { this.clusterConfig.setMasterId(-1); for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { entry.getValue().setMaster(false); } } return false; // 마스터가 될 수 없음 } // --- 4. 마스터 ID 업데이트 (과반수 충족 시) --- // 만약 minClusterNodeId가 초기값 그대로라면(모든 노드가 Witness이거나 연결이 끊긴 경우), // 유효한 마스터 후보가 없는 것이므로 -1로 설정. if (minClusterNodeId == Integer.MAX_VALUE) { minClusterNodeId = -1; } // 클러스터의 마스터 정보가 변경되었다면, 모든 노드에 전파. if (this.clusterConfig.getMasterId() != minClusterNodeId) { this.clusterConfig.setMasterId(minClusterNodeId); for (Map.Entry entry : this.clusterConfig.getClusterMap().entrySet()) { ClusterNode cluster = entry.getValue(); entry.getValue().setMaster(cluster.getId() == minClusterNodeId); } } // --- 5. 최종 결정 --- // [Witness 로직] 이 코드를 실행하는 '나 자신'이 Witness 노드라면, 절대 마스터가 될 수 없습니다. if (this.clusterConfig.isWitnessNode()) { return false; } // 마스터로 선출된 노드 ID가 '나 자신'의 ID와 일치하는지 확인합니다. // minClusterNodeId가 -1(유효한 후보 없음)인 경우, 이 조건은 항상 false가 됩니다. return (minClusterNodeId == this.clusterConfig.getId()); } private void electionMasterSchedule() { // 2초 주기로 실행되며 클러스터의 마스터/슬래이브 정보를 업데이트 함 // 클러스터맵에는 나 자신의 정보가 포함되어 있음. // scheduleAtFixedRate ==> scheduleWithFixedDelay 로 변경(혹시 모를 작업 병목을 위해서) // this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, 2 * 1000L); long scheduleMillis = this.clusterConfig.getElectionScheduleSeconds() * 1000L; this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, scheduleMillis); } public void shutdown() { log.info("ClusterNodeId: {}, ClusterMasterService shutdown process started.", this.clusterConfig.getId()); List shutdownErrors = new ArrayList<>(); try { if (this.taskFuture != null) { this.taskFuture.cancel(true); } this.taskScheduler.shutdown(); } catch (Exception e) { shutdownErrors.add(new RuntimeException("taskFuture shutdown failed", e)); } try { if (this.acceptGroup != null) { // shutdownGracefully()는 Future를 반환하므로, await()로 완료를 기다릴 수 있습니다. // this.acceptGroup.shutdownGracefully(); this.acceptGroup.shutdownGracefully().awaitUninterruptibly(); } } catch (Exception e) { shutdownErrors.add(new RuntimeException("acceptGroup shutdown failed", e)); } try { if (this.workerGroup != null) { // this.workerGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully().awaitUninterruptibly(); } } catch (Exception e) { shutdownErrors.add(new RuntimeException("workerGroup shutdown failed", e)); } try { if (this.channelFuture != null && this.channelFuture.channel() != null) { // this.channelFuture.channel().closeFuture(); this.channelFuture.channel().close().awaitUninterruptibly(); } } catch (Exception e) { shutdownErrors.add(new RuntimeException("channelFuture closure failed", e)); } if (!shutdownErrors.isEmpty()) { log.error("ClusterNodeId: {}, ClusterMasterService shutdown encountered {} error(s).", this.clusterConfig.getId(), shutdownErrors.size()); // 각 예외를 상세히 로깅합니다. for (int ii = 0; ii < shutdownErrors.size(); ii++) { log.error("Shutdown error #{}: {}", ii + 1, shutdownErrors.get(ii).getMessage(), shutdownErrors.get(ii)); } } else { log.info("ClusterNodeId: {}, ClusterMasterService shutdown completed gracefully.", this.clusterConfig.getId()); } } }