| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392 |
- package com.its.common.cluster.service;
- import com.its.common.cluster.config.AbstractClusterConfig;
- import com.its.common.cluster.utils.ClusterPlatform;
- import com.its.common.cluster.utils.ClusterUtils;
- import com.its.common.cluster.vo.ClusterMessage;
- import com.its.common.cluster.vo.ClusterNET;
- import com.its.common.cluster.vo.ClusterNode;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.epoll.Epoll;
- import io.netty.channel.nio.NioEventLoopGroup;
- import lombok.Getter;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- import javax.annotation.PostConstruct;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ScheduledFuture;
- @Slf4j
- @RequiredArgsConstructor
- public abstract class AbstractClusterMasterService {
- private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
- @Getter
- protected final AbstractClusterConfig clusterConfig;
- private ScheduledFuture<?> taskFuture;
- private EventLoopGroup acceptGroup;
- private EventLoopGroup workerGroup;
- private ChannelFuture channelFuture;
- @PostConstruct
- void init() {
- this.taskScheduler.setPoolSize(1);
- this.taskScheduler.initialize();
- }
- public void start() {
- if (!ClusterPlatform.isWindows()) {
- if (!Epoll.isAvailable()) {
- // 윈도우 서버가 아닌데 EPOLL 을 사용할 수 없으면 오류 메시지 표출
- log.warn("ClusterNodeId: {}, Is not Windows system but can't use epoll networking: {}", this.clusterConfig.getId(), Epoll.unavailabilityCause().getMessage());
- }
- }
- if (ClusterUtils.isEpollAvailable()) {
- log.info("ClusterNodeId: {}, The Cluster Master runs in LINUX EPOLL mode.", this.clusterConfig.getId());
- }
- else {
- log.info("ClusterNodeId: {}, The Cluster Master runs in Windows NIO mode.", this.clusterConfig.getId());
- }
- this.acceptGroup = new NioEventLoopGroup();
- this.workerGroup = new NioEventLoopGroup();
- ServerBootstrap serverBootstrap = createBootstrap();
- log.info("*********************************************************************************");
- log.info("** UTIC Center Cluster Master Server Information **");
- log.info("** bindAddress: {}", this.clusterConfig.getIp());
- log.info("** listenPort: {}", this.clusterConfig.getPort());
- log.info("** clusterNodeId: {}", this.clusterConfig.getId());
- log.info("** isMaster: {}", this.clusterConfig.isMaster());
- log.info("*********************************************************************************");
- try {
- if (this.clusterConfig.getIp().equals("0.0.0.0")) {
- this.channelFuture = serverBootstrap.bind(this.clusterConfig.getPort());
- }
- else {
- this.channelFuture = serverBootstrap.bind(this.clusterConfig.getIp(), this.clusterConfig.getPort());
- }
- electionMasterSchedule();
- }
- catch (Exception e) {
- log.error("ClusterNodeId: {}, cluster start, InterruptedException", this.clusterConfig.getId());
- shutdown();
- }
- }
- public ServerBootstrap createBootstrap() {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- EventLoopGroup acceptGroups;
- EventLoopGroup workerGroups;
- acceptGroups = ClusterUtils.newEventLoopGroup(1, "Accept");
- workerGroups = ClusterUtils.newEventLoopGroup(1, "Worker");
- serverBootstrap.channel(ClusterUtils.getServerSocketChannel());
- serverBootstrap.group(acceptGroups, workerGroups);
- serverBootstrap.option(ChannelOption.AUTO_READ, true);
- serverBootstrap.option(ChannelOption.SO_BACKLOG, 2);
- serverBootstrap.option(ChannelOption.SO_RCVBUF, 65535);//config.getRcvBuf());
- serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
- // serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5*1000);
- int connectTimeoutMillis = this.clusterConfig.getConnectTimeoutSeconds() * 1000;
- serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
- serverBootstrap.childOption(ChannelOption.SO_LINGER, 0); // 4way-handshake 비활성
- serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false); // KEEPALIVE 비활성(활성: true)
- serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true); // 소켓 재사용
- serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); // Nagle 알고리즘 비활성화
- serverBootstrap.childHandler(new ClusterMasterInitializer(this, this.clusterConfig));
- return serverBootstrap;
- }
- // 추상 클래스 선언...
- public abstract void election(int clusterId, boolean isMaster, boolean isMasterChanged);
- public abstract void onClusterMessage(ClusterMessage message);
- public abstract void onClusterChannelActive(ClusterNode clusterNode);
- public abstract void onClusterChannelInactive(ClusterNode clusterNode);
- public void onNotifyClusterNetworkState(ClusterNode clusterNode, boolean isActive) {
- log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, isNetActive; {}",
- this.clusterConfig.getId(), clusterNode.getId(), isActive);
- // slave의 네트워크 상태를 확인하고 slave 네트워크의 연결을 종료시킨다.
- if (!isActive && clusterNode.getSyncState().getState() != ClusterNET.CLOSED) {
- Channel channel = clusterNode.getSyncState().getChannel();
- // log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, Try SlaveNetwork Close. channel: {}",
- // this.clusterConfig.getId(), clusterNode.getId(), channel);
- if (channel != null) {
- channel.flush();
- channel.disconnect();
- channel.close();
- }
- clusterNode.getSyncState().disConnect();
- }
- // 연결 수립/종료시 즉각 마스터를 선출한다.
- electionMasterCluster();
- if (isActive) {
- onClusterChannelActive(clusterNode);
- }
- else {
- onClusterChannelInactive(clusterNode);
- }
- }
- public synchronized void electionMasterCluster() {
- boolean isMaster = electionMaster();
- boolean isChanged = this.clusterConfig.isMaster() != isMaster;
- this.clusterConfig.setMaster(isMaster);
- if (isChanged) {
- log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Changed Master: {}, masterClusterNodeId: {}",
- this.clusterConfig.getId(), this.clusterConfig.isMaster(), this.clusterConfig.getMasterId());
- }
- if (this.clusterConfig.isLogging()) {
- log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Master: {}.",
- this.clusterConfig.getId(), this.clusterConfig.isMaster());
- }
- election(this.clusterConfig.getId(), this.clusterConfig.isMaster(), isChanged);
- }
- private boolean electionMaster() {
- int minClusterNodeId = Integer.MAX_VALUE;
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- ClusterNode cluster = entry.getValue();
- // 마스터에 연결된 슬래이브 클러스터의 네트워크 상태를 기준으로 평가함.
- if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
- if (cluster.getId() < minClusterNodeId) {
- minClusterNodeId = cluster.getId();
- }
- }
- }
- if (minClusterNodeId == Integer.MAX_VALUE) {
- minClusterNodeId = this.clusterConfig.getId();
- }
- // 위의 클러스터의 나 자신의 네트워크 상태는 CLOSED로 되어 있기 때문에 나자신보다 현재 마스터 ID가
- // 현재 masterId 가 나 자신의 clusterId보다 같거나 크면 나 자신이 master가 된 것임
- if (this.clusterConfig.getMasterId() != minClusterNodeId) {
- // master id가 변경되었으므로 클러스터 노드 정보도 업데이트 한다
- this.clusterConfig.setMasterId(minClusterNodeId);
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- ClusterNode cluster = entry.getValue();
- entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
- }
- }
- //return (minClusterNodeId >= this.clusterConfig.getId());
- return (minClusterNodeId == this.clusterConfig.getId()); // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임
- }
- private boolean electionMasterQuorum() {
- // --- 1. 과반수(Quorum) 계산 로직 추가 ---
- int totalNodes = this.clusterConfig.getClusterMap().size();
- // 클러스터가 2개 이하일 때는 과반수 로직이 의미 없으므로, 3개 이상일 때만 적용
- int quorum = (totalNodes >= 3) ? (totalNodes / 2) + 1 : totalNodes;
- // --- 2. 현재 연결된 노드 수를 세고, 가장 작은 ID를 찾는 로직 ---
- int connectedNodesCount = 0;
- int minClusterNodeId = Integer.MAX_VALUE;
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- ClusterNode cluster = entry.getValue();
- if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
- // 연결된 노드 수 증가
- connectedNodesCount++;
- // 가장 작은 ID 찾기
- if (cluster.getId() < minClusterNodeId) {
- minClusterNodeId = cluster.getId();
- }
- }
- }
- // --- 3. 과반수 검증 로직 추가 ---
- // 만약 연결된 노드 수가 과반수에 미치지 못하면, 절대 Master가 될 수 없다.
- if (connectedNodesCount < quorum) {
- log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. Cannot become master.",
- this.clusterConfig.getId(), connectedNodesCount, quorum);
- // Master ID를 유효하지 않은 값(-1)으로 설정하여 현재 Master가 없음을 명확히 함
- this.clusterConfig.setMasterId(-1);
- return false; // Master가 될 수 없음을 반환
- }
- // --- 4. 기존 로직 (과반수가 충족되었을 경우에만 실행) ---
- if (minClusterNodeId == Integer.MAX_VALUE) {
- minClusterNodeId = this.clusterConfig.getId();
- }
- if (this.clusterConfig.getMasterId() != minClusterNodeId) {
- this.clusterConfig.setMasterId(minClusterNodeId);
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- ClusterNode cluster = entry.getValue();
- entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
- }
- }
- // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임
- return (minClusterNodeId == this.clusterConfig.getId());
- }
- // 파일: src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java
- /**
- * 클러스터의 마스터를 선출하는 핵심 로직.
- * 1. 과반수(Quorum) 규칙을 적용하여 스플릿 브레인을 방지합니다.
- * 2. Witness 노드는 마스터 후보에서 제외합니다.
- * @return 이 노드가 마스터가 되어야 하면 true, 아니면 false.
- */
- private boolean electionMasterWitness() {
- // --- 1. 과반수(Quorum) 계산 ---
- // 전체 노드 수를 기반으로 마스터가 되기 위해 필요한 최소 연결 노드 수를 계산.
- int totalNodes = this.clusterConfig.getClusterMap().size();
- // 클러스터가 2개 이하일 때는 과반수 로직이 큰 의미가 없지만, 일관성을 위해 규칙을 적용.
- // (2개일 경우 Quorum=2, 1개일 경우 Quorum=1)
- int quorum = (totalNodes / 2) + 1;
- // --- 2. 연결된 노드 수와 마스터 후보(가장 작은 ID) 찾기 ---
- int connectedNodesCount = 0;
- int minClusterNodeId = Integer.MAX_VALUE;
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- ClusterNode cluster = entry.getValue();
- // 현재 노드가 연결된 상태인지 확인합니다.
- if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
- connectedNodesCount++; // 연결된 노드 수 증가
- // [Witness 로직] 만약 노드가 Witness 노드라면, 마스터 후보에서 제외.
- if (cluster.isWitness()) {
- continue; // 다음 노드로 넘어감
- }
- // Witness가 아닌 노드 중에서 가장 작은 ID를 찾습니다.
- if (cluster.getId() < minClusterNodeId) {
- minClusterNodeId = cluster.getId();
- }
- }
- }
- // --- 3. 과반수(Quorum) 검증 ---
- // 현재 연결된 노드 수가 과반수를 넘지 못하면, 클러스터는 마스터를 선출할 자격이 없음.
- if (connectedNodesCount < quorum) {
- // log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. No master will be elected.",
- // this.clusterConfig.getId(), connectedNodesCount, quorum);
- // 클러스터에 유효한 마스터가 없음을 명시적으로 설정합니다.
- if (this.clusterConfig.getMasterId() != -1) {
- this.clusterConfig.setMasterId(-1);
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- entry.getValue().setMaster(false);
- }
- }
- return false; // 마스터가 될 수 없음
- }
- // --- 4. 마스터 ID 업데이트 (과반수 충족 시) ---
- // 만약 minClusterNodeId가 초기값 그대로라면(모든 노드가 Witness이거나 연결이 끊긴 경우),
- // 유효한 마스터 후보가 없는 것이므로 -1로 설정.
- if (minClusterNodeId == Integer.MAX_VALUE) {
- minClusterNodeId = -1;
- }
- // 클러스터의 마스터 정보가 변경되었다면, 모든 노드에 전파.
- if (this.clusterConfig.getMasterId() != minClusterNodeId) {
- this.clusterConfig.setMasterId(minClusterNodeId);
- for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
- ClusterNode cluster = entry.getValue();
- entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
- }
- }
- // --- 5. 최종 결정 ---
- // [Witness 로직] 이 코드를 실행하는 '나 자신'이 Witness 노드라면, 절대 마스터가 될 수 없습니다.
- if (this.clusterConfig.isWitnessNode()) {
- return false;
- }
- // 마스터로 선출된 노드 ID가 '나 자신'의 ID와 일치하는지 확인합니다.
- // minClusterNodeId가 -1(유효한 후보 없음)인 경우, 이 조건은 항상 false가 됩니다.
- return (minClusterNodeId == this.clusterConfig.getId());
- }
- private void electionMasterSchedule() {
- // 2초 주기로 실행되며 클러스터의 마스터/슬래이브 정보를 업데이트 함
- // 클러스터맵에는 나 자신의 정보가 포함되어 있음.
- // scheduleAtFixedRate ==> scheduleWithFixedDelay 로 변경(혹시 모를 작업 병목을 위해서)
- // this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, 2 * 1000L);
- long scheduleMillis = this.clusterConfig.getElectionScheduleSeconds() * 1000L;
- this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, scheduleMillis);
- }
- public void shutdown() {
- log.info("ClusterNodeId: {}, ClusterMasterService shutdown process started.", this.clusterConfig.getId());
- List<Throwable> shutdownErrors = new ArrayList<>();
- try {
- if (this.taskFuture != null) {
- this.taskFuture.cancel(true);
- }
- this.taskScheduler.shutdown();
- } catch (Exception e) {
- shutdownErrors.add(new RuntimeException("taskFuture shutdown failed", e));
- }
- try {
- if (this.acceptGroup != null) {
- // shutdownGracefully()는 Future를 반환하므로, await()로 완료를 기다릴 수 있습니다.
- // this.acceptGroup.shutdownGracefully();
- this.acceptGroup.shutdownGracefully().awaitUninterruptibly();
- }
- }
- catch (Exception e) {
- shutdownErrors.add(new RuntimeException("acceptGroup shutdown failed", e));
- }
- try {
- if (this.workerGroup != null) {
- // this.workerGroup.shutdownGracefully();
- this.workerGroup.shutdownGracefully().awaitUninterruptibly();
- }
- }
- catch (Exception e) {
- shutdownErrors.add(new RuntimeException("workerGroup shutdown failed", e));
- }
- try {
- if (this.channelFuture != null && this.channelFuture.channel() != null) {
- // this.channelFuture.channel().closeFuture();
- this.channelFuture.channel().close().awaitUninterruptibly();
- }
- }
- catch (Exception e) {
- shutdownErrors.add(new RuntimeException("channelFuture closure failed", e));
- }
- if (!shutdownErrors.isEmpty()) {
- log.error("ClusterNodeId: {}, ClusterMasterService shutdown encountered {} error(s).",
- this.clusterConfig.getId(), shutdownErrors.size());
- // 각 예외를 상세히 로깅합니다.
- for (int ii = 0; ii < shutdownErrors.size(); ii++) {
- log.error("Shutdown error #{}: {}", ii + 1, shutdownErrors.get(ii).getMessage(), shutdownErrors.get(ii));
- }
- } else {
- log.info("ClusterNodeId: {}, ClusterMasterService shutdown completed gracefully.", this.clusterConfig.getId());
- }
- }
- }
|