123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- package com.its.common.cluster.master;
- import com.its.common.cluster.utils.HaClusterMessageDecoder;
- import com.its.common.cluster.utils.HaClusterMessageEncoder;
- import com.its.common.cluster.utils.HaUtils;
- import com.its.common.cluster.vo.AbstractHaClusterConfig;
- import com.its.common.cluster.vo.HaInfo;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.MDC;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @RequiredArgsConstructor
- public class HaClusterMasterInitializer extends ChannelInitializer<Channel> {
- private final AbstractHaClusterMasterService masterService;
- private final AbstractHaClusterConfig clusterConfig;
- @Override
- protected void initChannel(Channel channel) throws Exception {
- // InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
- // String clientIP = remoteAddress.getAddress().getHostAddress();
- // int clientPort = remoteAddress.getPort();
- String ipAddress = HaUtils.getRemoteIpAddress(channel);
- int clientPort = HaUtils.getRemotePort(channel);
- int serverId = clientPort - this.clusterConfig.getSyncPort();
- log.info("ClusterMasterInitializer.----initChannel: connected from: {}:{}, ServerId: {}.",
- ipAddress, clientPort, serverId);
- // HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
- HaInfo cluster = this.clusterConfig.getClusterMap().get(serverId);
- if (cluster == null) {
- log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown Server Id. will be closed.", serverId, ipAddress);
- channel.disconnect();
- channel.close();
- return;
- }
- if (!cluster.getIpAddress().equals(ipAddress)) {
- log.error("ClusterMasterInitializer.----initChannel: [ServerId: {}, IP Address: {}], Unknown IP Address. will be closed.", serverId, ipAddress);
- channel.disconnect();
- channel.close();
- return;
- }
- try {
- MDC.put("id", cluster.getLogKey());
- log.info("ClusterMasterInitializer.----initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIpAddress());
- if (cluster.getElectionState().getChannel() != null) {
- log.warn("ClusterMasterInitializer.----initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getServerId());
- // 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
- channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
- cluster.getElectionState().disConnect();
- channel.disconnect();
- channel.close();
- }
- cluster.getElectionState().connect(channel);
- channel.attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
- IdleStateHandler idleStateHandler = new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS);
- ChannelPipeline pipeline = channel.pipeline();
- if (this.clusterConfig.isLogging()) {
- pipeline.addLast(new LoggingHandler(LogLevel.INFO));
- }
- pipeline.addLast(idleStateHandler);
- pipeline.addLast(new HaClusterMessageDecoder());
- pipeline.addLast(new HaClusterMessageEncoder());
- pipeline.addLast(new HaClusterMasterHandler(this.masterService, this.clusterConfig));
- }
- finally {
- MDC.remove(cluster.getLogKey());
- MDC.clear();
- }
- }
- }
|