AbstractClusterMasterService.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. package com.its.common.cluster.service;
  2. import com.its.common.cluster.config.AbstractClusterConfig;
  3. import com.its.common.cluster.utils.ClusterPlatform;
  4. import com.its.common.cluster.utils.ClusterUtils;
  5. import com.its.common.cluster.vo.ClusterMessage;
  6. import com.its.common.cluster.vo.ClusterNET;
  7. import com.its.common.cluster.vo.ClusterNode;
  8. import io.netty.bootstrap.ServerBootstrap;
  9. import io.netty.channel.Channel;
  10. import io.netty.channel.ChannelFuture;
  11. import io.netty.channel.ChannelOption;
  12. import io.netty.channel.EventLoopGroup;
  13. import io.netty.channel.epoll.Epoll;
  14. import io.netty.channel.nio.NioEventLoopGroup;
  15. import lombok.Getter;
  16. import lombok.RequiredArgsConstructor;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  19. import javax.annotation.PostConstruct;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import java.util.Map;
  23. import java.util.concurrent.ScheduledFuture;
  24. @Slf4j
  25. @RequiredArgsConstructor
  26. public abstract class AbstractClusterMasterService {
  27. private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
  28. @Getter
  29. protected final AbstractClusterConfig clusterConfig;
  30. private ScheduledFuture<?> taskFuture;
  31. private EventLoopGroup acceptGroup;
  32. private EventLoopGroup workerGroup;
  33. private ChannelFuture channelFuture;
  34. @PostConstruct
  35. void init() {
  36. this.taskScheduler.setPoolSize(1);
  37. this.taskScheduler.initialize();
  38. }
  39. public void start() {
  40. if (!ClusterPlatform.isWindows()) {
  41. if (!Epoll.isAvailable()) {
  42. // 윈도우 서버가 아닌데 EPOLL 을 사용할 수 없으면 오류 메시지 표출
  43. log.warn("ClusterNodeId: {}, Is not Windows system but can't use epoll networking: {}", this.clusterConfig.getId(), Epoll.unavailabilityCause().getMessage());
  44. }
  45. }
  46. if (ClusterUtils.isEpollAvailable()) {
  47. log.info("ClusterNodeId: {}, The Cluster Master runs in LINUX EPOLL mode.", this.clusterConfig.getId());
  48. }
  49. else {
  50. log.info("ClusterNodeId: {}, The Cluster Master runs in Windows NIO mode.", this.clusterConfig.getId());
  51. }
  52. this.acceptGroup = new NioEventLoopGroup();
  53. this.workerGroup = new NioEventLoopGroup();
  54. ServerBootstrap serverBootstrap = createBootstrap();
  55. log.info("*********************************************************************************");
  56. log.info("** UTIC Center Cluster Master Server Information **");
  57. log.info("** bindAddress: {}", this.clusterConfig.getIp());
  58. log.info("** listenPort: {}", this.clusterConfig.getPort());
  59. log.info("** clusterNodeId: {}", this.clusterConfig.getId());
  60. log.info("** isMaster: {}", this.clusterConfig.isMaster());
  61. log.info("*********************************************************************************");
  62. try {
  63. if (this.clusterConfig.getIp().equals("0.0.0.0")) {
  64. this.channelFuture = serverBootstrap.bind(this.clusterConfig.getPort());
  65. }
  66. else {
  67. this.channelFuture = serverBootstrap.bind(this.clusterConfig.getIp(), this.clusterConfig.getPort());
  68. }
  69. electionMasterSchedule();
  70. }
  71. catch (Exception e) {
  72. log.error("ClusterNodeId: {}, cluster start, InterruptedException", this.clusterConfig.getId());
  73. shutdown();
  74. }
  75. }
  76. public ServerBootstrap createBootstrap() {
  77. ServerBootstrap serverBootstrap = new ServerBootstrap();
  78. EventLoopGroup acceptGroups;
  79. EventLoopGroup workerGroups;
  80. acceptGroups = ClusterUtils.newEventLoopGroup(1, "Accept");
  81. workerGroups = ClusterUtils.newEventLoopGroup(1, "Worker");
  82. serverBootstrap.channel(ClusterUtils.getServerSocketChannel());
  83. serverBootstrap.group(acceptGroups, workerGroups);
  84. serverBootstrap.option(ChannelOption.AUTO_READ, true);
  85. serverBootstrap.option(ChannelOption.SO_BACKLOG, 2);
  86. serverBootstrap.option(ChannelOption.SO_RCVBUF, 65535);//config.getRcvBuf());
  87. serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
  88. // serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5*1000);
  89. int connectTimeoutMillis = this.clusterConfig.getConnectTimeoutSeconds() * 1000;
  90. serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis);
  91. serverBootstrap.childOption(ChannelOption.SO_LINGER, 0); // 4way-handshake 비활성
  92. serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false); // KEEPALIVE 비활성(활성: true)
  93. serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true); // 소켓 재사용
  94. serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); // Nagle 알고리즘 비활성화
  95. serverBootstrap.childHandler(new ClusterMasterInitializer(this, this.clusterConfig));
  96. return serverBootstrap;
  97. }
  98. // 추상 클래스 선언...
  99. public abstract void election(int clusterId, boolean isMaster, boolean isMasterChanged);
  100. public abstract void onClusterMessage(ClusterMessage message);
  101. public abstract void onClusterChannelActive(ClusterNode clusterNode);
  102. public abstract void onClusterChannelInactive(ClusterNode clusterNode);
  103. public void onNotifyClusterNetworkState(ClusterNode clusterNode, boolean isActive) {
  104. log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, isNetActive; {}",
  105. this.clusterConfig.getId(), clusterNode.getId(), isActive);
  106. // slave의 네트워크 상태를 확인하고 slave 네트워크의 연결을 종료시킨다.
  107. if (!isActive && clusterNode.getSyncState().getState() != ClusterNET.CLOSED) {
  108. Channel channel = clusterNode.getSyncState().getChannel();
  109. // log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, Try SlaveNetwork Close. channel: {}",
  110. // this.clusterConfig.getId(), clusterNode.getId(), channel);
  111. if (channel != null) {
  112. channel.flush();
  113. channel.disconnect();
  114. channel.close();
  115. }
  116. clusterNode.getSyncState().disConnect();
  117. }
  118. // 연결 수립/종료시 즉각 마스터를 선출한다.
  119. electionMasterCluster();
  120. if (isActive) {
  121. onClusterChannelActive(clusterNode);
  122. }
  123. else {
  124. onClusterChannelInactive(clusterNode);
  125. }
  126. }
  127. public synchronized void electionMasterCluster() {
  128. boolean isMaster = electionMaster();
  129. boolean isChanged = this.clusterConfig.isMaster() != isMaster;
  130. this.clusterConfig.setMaster(isMaster);
  131. if (isChanged) {
  132. log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Changed Master: {}, masterClusterNodeId: {}",
  133. this.clusterConfig.getId(), this.clusterConfig.isMaster(), this.clusterConfig.getMasterId());
  134. }
  135. if (this.clusterConfig.isLogging()) {
  136. log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Master: {}.",
  137. this.clusterConfig.getId(), this.clusterConfig.isMaster());
  138. }
  139. election(this.clusterConfig.getId(), this.clusterConfig.isMaster(), isChanged);
  140. }
  141. private boolean electionMaster() {
  142. int minClusterNodeId = Integer.MAX_VALUE;
  143. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  144. ClusterNode cluster = entry.getValue();
  145. // 마스터에 연결된 슬래이브 클러스터의 네트워크 상태를 기준으로 평가함.
  146. if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
  147. if (cluster.getId() < minClusterNodeId) {
  148. minClusterNodeId = cluster.getId();
  149. }
  150. }
  151. }
  152. if (minClusterNodeId == Integer.MAX_VALUE) {
  153. minClusterNodeId = this.clusterConfig.getId();
  154. }
  155. // 위의 클러스터의 나 자신의 네트워크 상태는 CLOSED로 되어 있기 때문에 나자신보다 현재 마스터 ID가
  156. // 현재 masterId 가 나 자신의 clusterId보다 같거나 크면 나 자신이 master가 된 것임
  157. if (this.clusterConfig.getMasterId() != minClusterNodeId) {
  158. // master id가 변경되었으므로 클러스터 노드 정보도 업데이트 한다
  159. this.clusterConfig.setMasterId(minClusterNodeId);
  160. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  161. ClusterNode cluster = entry.getValue();
  162. entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
  163. }
  164. }
  165. //return (minClusterNodeId >= this.clusterConfig.getId());
  166. return (minClusterNodeId == this.clusterConfig.getId()); // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임
  167. }
  168. private boolean electionMasterQuorum() {
  169. // --- 1. 과반수(Quorum) 계산 로직 추가 ---
  170. int totalNodes = this.clusterConfig.getClusterMap().size();
  171. // 클러스터가 2개 이하일 때는 과반수 로직이 의미 없으므로, 3개 이상일 때만 적용
  172. int quorum = (totalNodes >= 3) ? (totalNodes / 2) + 1 : totalNodes;
  173. // --- 2. 현재 연결된 노드 수를 세고, 가장 작은 ID를 찾는 로직 ---
  174. int connectedNodesCount = 0;
  175. int minClusterNodeId = Integer.MAX_VALUE;
  176. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  177. ClusterNode cluster = entry.getValue();
  178. if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
  179. // 연결된 노드 수 증가
  180. connectedNodesCount++;
  181. // 가장 작은 ID 찾기
  182. if (cluster.getId() < minClusterNodeId) {
  183. minClusterNodeId = cluster.getId();
  184. }
  185. }
  186. }
  187. // --- 3. 과반수 검증 로직 추가 ---
  188. // 만약 연결된 노드 수가 과반수에 미치지 못하면, 절대 Master가 될 수 없다.
  189. if (connectedNodesCount < quorum) {
  190. log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. Cannot become master.",
  191. this.clusterConfig.getId(), connectedNodesCount, quorum);
  192. // Master ID를 유효하지 않은 값(-1)으로 설정하여 현재 Master가 없음을 명확히 함
  193. this.clusterConfig.setMasterId(-1);
  194. return false; // Master가 될 수 없음을 반환
  195. }
  196. // --- 4. 기존 로직 (과반수가 충족되었을 경우에만 실행) ---
  197. if (minClusterNodeId == Integer.MAX_VALUE) {
  198. minClusterNodeId = this.clusterConfig.getId();
  199. }
  200. if (this.clusterConfig.getMasterId() != minClusterNodeId) {
  201. this.clusterConfig.setMasterId(minClusterNodeId);
  202. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  203. ClusterNode cluster = entry.getValue();
  204. entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
  205. }
  206. }
  207. // 연결된 가장 작은 노드가 나의 노드와 같은 경우에 마스터임
  208. return (minClusterNodeId == this.clusterConfig.getId());
  209. }
  210. // 파일: src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java
  211. /**
  212. * 클러스터의 마스터를 선출하는 핵심 로직.
  213. * 1. 과반수(Quorum) 규칙을 적용하여 스플릿 브레인을 방지합니다.
  214. * 2. Witness 노드는 마스터 후보에서 제외합니다.
  215. * @return 이 노드가 마스터가 되어야 하면 true, 아니면 false.
  216. */
  217. private boolean electionMasterWitness() {
  218. // --- 1. 과반수(Quorum) 계산 ---
  219. // 전체 노드 수를 기반으로 마스터가 되기 위해 필요한 최소 연결 노드 수를 계산.
  220. int totalNodes = this.clusterConfig.getClusterMap().size();
  221. // 클러스터가 2개 이하일 때는 과반수 로직이 큰 의미가 없지만, 일관성을 위해 규칙을 적용.
  222. // (2개일 경우 Quorum=2, 1개일 경우 Quorum=1)
  223. int quorum = (totalNodes / 2) + 1;
  224. // --- 2. 연결된 노드 수와 마스터 후보(가장 작은 ID) 찾기 ---
  225. int connectedNodesCount = 0;
  226. int minClusterNodeId = Integer.MAX_VALUE;
  227. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  228. ClusterNode cluster = entry.getValue();
  229. // 현재 노드가 연결된 상태인지 확인합니다.
  230. if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
  231. connectedNodesCount++; // 연결된 노드 수 증가
  232. // [Witness 로직] 만약 노드가 Witness 노드라면, 마스터 후보에서 제외.
  233. if (cluster.isWitness()) {
  234. continue; // 다음 노드로 넘어감
  235. }
  236. // Witness가 아닌 노드 중에서 가장 작은 ID를 찾습니다.
  237. if (cluster.getId() < minClusterNodeId) {
  238. minClusterNodeId = cluster.getId();
  239. }
  240. }
  241. }
  242. // --- 3. 과반수(Quorum) 검증 ---
  243. // 현재 연결된 노드 수가 과반수를 넘지 못하면, 클러스터는 마스터를 선출할 자격이 없음.
  244. if (connectedNodesCount < quorum) {
  245. // log.warn("ClusterNodeId: {}, Quorum not met. Connected nodes: {}, Quorum: {}. No master will be elected.",
  246. // this.clusterConfig.getId(), connectedNodesCount, quorum);
  247. // 클러스터에 유효한 마스터가 없음을 명시적으로 설정합니다.
  248. if (this.clusterConfig.getMasterId() != -1) {
  249. this.clusterConfig.setMasterId(-1);
  250. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  251. entry.getValue().setMaster(false);
  252. }
  253. }
  254. return false; // 마스터가 될 수 없음
  255. }
  256. // --- 4. 마스터 ID 업데이트 (과반수 충족 시) ---
  257. // 만약 minClusterNodeId가 초기값 그대로라면(모든 노드가 Witness이거나 연결이 끊긴 경우),
  258. // 유효한 마스터 후보가 없는 것이므로 -1로 설정.
  259. if (minClusterNodeId == Integer.MAX_VALUE) {
  260. minClusterNodeId = -1;
  261. }
  262. // 클러스터의 마스터 정보가 변경되었다면, 모든 노드에 전파.
  263. if (this.clusterConfig.getMasterId() != minClusterNodeId) {
  264. this.clusterConfig.setMasterId(minClusterNodeId);
  265. for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
  266. ClusterNode cluster = entry.getValue();
  267. entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
  268. }
  269. }
  270. // --- 5. 최종 결정 ---
  271. // [Witness 로직] 이 코드를 실행하는 '나 자신'이 Witness 노드라면, 절대 마스터가 될 수 없습니다.
  272. if (this.clusterConfig.isWitnessNode()) {
  273. return false;
  274. }
  275. // 마스터로 선출된 노드 ID가 '나 자신'의 ID와 일치하는지 확인합니다.
  276. // minClusterNodeId가 -1(유효한 후보 없음)인 경우, 이 조건은 항상 false가 됩니다.
  277. return (minClusterNodeId == this.clusterConfig.getId());
  278. }
  279. private void electionMasterSchedule() {
  280. // 2초 주기로 실행되며 클러스터의 마스터/슬래이브 정보를 업데이트 함
  281. // 클러스터맵에는 나 자신의 정보가 포함되어 있음.
  282. // scheduleAtFixedRate ==> scheduleWithFixedDelay 로 변경(혹시 모를 작업 병목을 위해서)
  283. // this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, 2 * 1000L);
  284. long scheduleMillis = this.clusterConfig.getElectionScheduleSeconds() * 1000L;
  285. this.taskFuture = this.taskScheduler.scheduleWithFixedDelay(this::electionMasterCluster, scheduleMillis);
  286. }
  287. public void shutdown() {
  288. log.info("ClusterNodeId: {}, ClusterMasterService shutdown process started.", this.clusterConfig.getId());
  289. List<Throwable> shutdownErrors = new ArrayList<>();
  290. try {
  291. if (this.taskFuture != null) {
  292. this.taskFuture.cancel(true);
  293. }
  294. this.taskScheduler.shutdown();
  295. } catch (Exception e) {
  296. shutdownErrors.add(new RuntimeException("taskFuture shutdown failed", e));
  297. }
  298. try {
  299. if (this.acceptGroup != null) {
  300. // shutdownGracefully()는 Future를 반환하므로, await()로 완료를 기다릴 수 있습니다.
  301. // this.acceptGroup.shutdownGracefully();
  302. this.acceptGroup.shutdownGracefully().awaitUninterruptibly();
  303. }
  304. }
  305. catch (Exception e) {
  306. shutdownErrors.add(new RuntimeException("acceptGroup shutdown failed", e));
  307. }
  308. try {
  309. if (this.workerGroup != null) {
  310. // this.workerGroup.shutdownGracefully();
  311. this.workerGroup.shutdownGracefully().awaitUninterruptibly();
  312. }
  313. }
  314. catch (Exception e) {
  315. shutdownErrors.add(new RuntimeException("workerGroup shutdown failed", e));
  316. }
  317. try {
  318. if (this.channelFuture != null && this.channelFuture.channel() != null) {
  319. // this.channelFuture.channel().closeFuture();
  320. this.channelFuture.channel().close().awaitUninterruptibly();
  321. }
  322. }
  323. catch (Exception e) {
  324. shutdownErrors.add(new RuntimeException("channelFuture closure failed", e));
  325. }
  326. if (!shutdownErrors.isEmpty()) {
  327. log.error("ClusterNodeId: {}, ClusterMasterService shutdown encountered {} error(s).",
  328. this.clusterConfig.getId(), shutdownErrors.size());
  329. // 각 예외를 상세히 로깅합니다.
  330. for (int ii = 0; ii < shutdownErrors.size(); ii++) {
  331. log.error("Shutdown error #{}: {}", ii + 1, shutdownErrors.get(ii).getMessage(), shutdownErrors.get(ii));
  332. }
  333. } else {
  334. log.info("ClusterNodeId: {}, ClusterMasterService shutdown completed gracefully.", this.clusterConfig.getId());
  335. }
  336. }
  337. }