package com.its.common.cluster.master; import com.its.common.cluster.utils.HaClusterMessageDecoder; import com.its.common.cluster.utils.HaClusterMessageEncoder; import com.its.common.cluster.utils.HaUtils; import com.its.common.cluster.vo.AbstractHaClusterConfig; import com.its.common.cluster.vo.HaInfo; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import java.util.concurrent.TimeUnit; @Slf4j @RequiredArgsConstructor public class HaClusterMasterInitializer extends ChannelInitializer { private final AbstractHaClusterMasterService masterService; private final AbstractHaClusterConfig clusterConfig; @Override protected void initChannel(Channel channel) throws Exception { // InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); // String clientIP = remoteAddress.getAddress().getHostAddress(); // int clientPort = remoteAddress.getPort(); String ipAddress = HaUtils.getRemoteIpAddress(channel); int clientPort = HaUtils.getRemotePort(channel); int serverId = clientPort - this.clusterConfig.getSyncPort(); log.info("ClusterMasterInitializer.----initChannel: connected from: {}:{}, ServerId: {}.", ipAddress, clientPort, serverId); // HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress); HaInfo cluster = this.clusterConfig.getClusterMap().get(serverId); if (cluster == null) { log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress); channel.disconnect(); channel.close(); return; } if (!cluster.getIpAddress().equals(ipAddress)) { log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress); channel.disconnect(); channel.close(); return; } try { MDC.put("id", cluster.getLogKey()); log.info("ClusterMasterInitializer.----initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress()); if (cluster.getElectionState().getChannel() != null) { log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId()); // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거 channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null); cluster.getElectionState().disConnect(); channel.disconnect(); channel.close(); } cluster.getElectionState().connect(channel); channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster); IdleStateHandler idleStateHandler = new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS); ChannelPipeline pipeline = channel.pipeline(); if (this.clusterConfig.isLogging()) { pipeline.addLast(new LoggingHandler(LogLevel.INFO)); } pipeline.addLast(idleStateHandler); pipeline.addLast(new HaClusterMessageDecoder()); pipeline.addLast(new HaClusterMessageEncoder()); pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig)); } finally { MDC.remove(cluster.getLogKey()); MDC.clear(); } } }