ClusterSlave.java 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package com.its.common.cluster.service;
  2. import com.its.common.cluster.codec.ClusterMessageDecoder;
  3. import com.its.common.cluster.codec.ClusterMessageEncoder;
  4. import com.its.common.cluster.config.AbstractClusterConfig;
  5. import com.its.common.cluster.handler.ClusterSlaveHandler;
  6. import com.its.common.cluster.vo.ClusterNode;
  7. import io.netty.bootstrap.Bootstrap;
  8. import io.netty.channel.*;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  11. import io.netty.handler.logging.LogLevel;
  12. import io.netty.handler.logging.LoggingHandler;
  13. import io.netty.handler.timeout.IdleStateHandler;
  14. import io.netty.util.concurrent.GenericFutureListener;
  15. import lombok.Getter;
  16. import lombok.RequiredArgsConstructor;
  17. import lombok.Setter;
  18. import lombok.extern.slf4j.Slf4j;
  19. import java.net.InetSocketAddress;
  20. import java.util.concurrent.Callable;
  21. import java.util.concurrent.TimeUnit;
  22. @Slf4j
  23. @Getter
  24. @Setter
  25. @RequiredArgsConstructor
  26. public class ClusterSlave implements Callable<Object> {
  27. private final AbstractClusterSlaveService slaveService;
  28. private final AbstractClusterConfig clusterConfig;
  29. private final ClusterNode clusterNode;
  30. private final ClusterSlaveBootstrapFactory bootstrapFactory;
  31. private Bootstrap bootstrap = null;
  32. private ChannelFuture channelFuture = null;
  33. private String ipAddress;
  34. private int port;
  35. @Override
  36. public Object call() {
  37. this.ipAddress = this.clusterNode.getIp();
  38. this.port = this.clusterNode.getPort();
  39. try {
  40. initializeBootstrapIfNeeded();
  41. if (this.clusterConfig.isLogging()) {
  42. log.info("ClusterNodeId: {}, ClusterSlave.call: Try Connect: toClusterId: {}, [{}.{}]", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port);
  43. }
  44. closePreviousChannel();
  45. this.channelFuture = this.bootstrap.connect(new InetSocketAddress(this.ipAddress, this.port));
  46. // 채널 연결 리스너 등록
  47. this.channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
  48. @Override
  49. public void operationComplete(ChannelFuture future) throws Exception {
  50. channelOpen(future);
  51. }
  52. });
  53. // 채널 연결 종료 리스너 등록
  54. this.channelFuture.channel().closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
  55. @Override
  56. public void operationComplete(ChannelFuture future) throws Exception {
  57. channelClosed(future);
  58. }
  59. });
  60. return this.channelFuture != null && this.channelFuture.isSuccess();
  61. } catch (Exception e) {
  62. log.error("ClusterNodeId: {}, ClusterSlave.call: Connection error toClusterId: {}, [{}.{}], Exception: {}",
  63. this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, e.getMessage(), e);
  64. return false;
  65. }
  66. }
  67. private void initializeBootstrapIfNeeded() {
  68. if (this.bootstrap != null) {
  69. return;
  70. }
  71. if (this.clusterConfig.isLogging()) {
  72. log.info("ClusterNodeId: {}, ClusterSlave.call: Startup: toClusterId: {}, [{}.{}]", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port);
  73. }
  74. this.bootstrap = bootstrapFactory.createBootstrap();
  75. this.bootstrap.option(ChannelOption.SO_REUSEADDR, true);
  76. this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
  77. // 바인딩 로컬 포트 설정(설정파일에 등록된 바인딩 포트번호 + 서버 ID), 따라서 하나의 서버에 여러 개의 클러스터 서버가 존재할 수 있다.
  78. // 주의) 하나의 서버에 여러 개의 클러스터 서버가 존재할 경우, 바인딩 포트는 위의 규칙에 속하지 않는 포트번호를 할당해야 한다.
  79. this.bootstrap.localAddress(new InetSocketAddress(port + clusterConfig.getId()));
  80. this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  81. @Override
  82. protected void initChannel(SocketChannel ch) {
  83. if (clusterConfig.isPacketLogging()) {
  84. ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
  85. }
  86. ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
  87. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
  88. ch.pipeline().addLast(new ClusterMessageDecoder(clusterNode, clusterConfig));
  89. ch.pipeline().addLast(new ClusterMessageEncoder(clusterNode, clusterConfig));
  90. ch.pipeline().addLast(new ClusterSlaveHandler(clusterNode, slaveService));
  91. }
  92. });
  93. }
  94. private void closePreviousChannel() {
  95. if (this.channelFuture != null && this.channelFuture.channel().isOpen()) {
  96. this.channelFuture.channel().close();
  97. this.channelFuture = null;
  98. }
  99. }
  100. /**
  101. * 연결 성공시 처리 이벤트
  102. */
  103. protected void channelOpen(ChannelFuture future) {
  104. if (future.isSuccess()) {
  105. Channel channel = future.channel();
  106. channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.clusterNode);
  107. this.clusterNode.getSyncState().connect(channel);
  108. if (this.clusterConfig.isLogging()) {
  109. log.info("ClusterNodeId: {}, ClusterSlave.channelOpen: toClusterId: {}, [{}.{}], Channel: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, channel);
  110. }
  111. }
  112. else {
  113. if (this.clusterConfig.isLogging()) {
  114. log.warn("ClusterNodeId: {}, ClusterSlave.channelOpen: toClusterId: {}, [{}.{}], Failed: {}", this.clusterConfig.getId(), clusterNode.getId(), clusterNode.getIp(), clusterNode.getPort(), future.cause().getMessage());
  115. }
  116. }
  117. }
  118. /**
  119. * 연결 종료시 처리 이벤트
  120. */
  121. protected synchronized void channelClosed(ChannelFuture future) {
  122. Channel channel = future.channel();
  123. if (this.clusterConfig.isLogging()) {
  124. log.warn("ClusterNodeId: {}, ClusterSlave.channelClosed: toClusterId: {}, [{}.{}], Channel: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, channel);
  125. }
  126. channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
  127. this.clusterNode.getSyncState().disConnect();
  128. if (channel.isOpen()) {
  129. channel.close();
  130. }
  131. channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);
  132. }
  133. }