HaClusterMasterInitializer.java 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package com.its.common.cluster.master;
  2. import com.its.common.cluster.utils.HaClusterMessageDecoder;
  3. import com.its.common.cluster.utils.HaClusterMessageEncoder;
  4. import com.its.common.cluster.utils.HaUtils;
  5. import com.its.common.cluster.vo.AbstractHaClusterConfig;
  6. import com.its.common.cluster.vo.HaInfo;
  7. import io.netty.channel.Channel;
  8. import io.netty.channel.ChannelInitializer;
  9. import io.netty.channel.ChannelPipeline;
  10. import io.netty.handler.logging.LogLevel;
  11. import io.netty.handler.logging.LoggingHandler;
  12. import io.netty.handler.timeout.IdleStateHandler;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.slf4j.MDC;
  16. import java.util.concurrent.TimeUnit;
  17. @Slf4j
  18. @RequiredArgsConstructor
  19. public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
  20. private final AbstractHaClusterMasterService masterService;
  21. private final AbstractHaClusterConfig clusterConfig;
  22. @Override
  23. protected void initChannel(Channel channel) throws Exception {
  24. // InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
  25. // String clientIP = remoteAddress.getAddress().getHostAddress();
  26. // int clientPort = remoteAddress.getPort();
  27. String ipAddress = HaUtils.getRemoteIpAddress(channel);
  28. int clientPort = HaUtils.getRemotePort(channel);
  29. int serverId = clientPort - this.clusterConfig.getSyncPort();
  30. log.info("ClusterMasterInitializer.----initChannel: connected from: {}:{}, ServerId: {}.",
  31. ipAddress, clientPort, serverId);
  32. // HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
  33. HaInfo cluster = this.clusterConfig.getClusterMap().get(serverId);
  34. if (cluster == null) {
  35. log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
  36. channel.disconnect();
  37. channel.close();
  38. return;
  39. }
  40. if (!cluster.getIpAddress().equals(ipAddress)) {
  41. log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress);
  42. channel.disconnect();
  43. channel.close();
  44. return;
  45. }
  46. try {
  47. MDC.put("id", cluster.getLogKey());
  48. log.info("ClusterMasterInitializer.----initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
  49. if (cluster.getElectionState().getChannel() != null) {
  50. log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
  51. // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
  52. channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
  53. cluster.getElectionState().disConnect();
  54. channel.disconnect();
  55. channel.close();
  56. }
  57. cluster.getElectionState().connect(channel);
  58. channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
  59. IdleStateHandler idleStateHandler = new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS);
  60. ChannelPipeline pipeline = channel.pipeline();
  61. if (this.clusterConfig.isLogging()) {
  62. pipeline.addLast(new LoggingHandler(LogLevel.INFO));
  63. }
  64. pipeline.addLast(idleStateHandler);
  65. pipeline.addLast(new HaClusterMessageDecoder());
  66. pipeline.addLast(new HaClusterMessageEncoder());
  67. pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig));
  68. }
  69. finally {
  70. MDC.remove(cluster.getLogKey());
  71. MDC.clear();
  72. }
  73. }
  74. }