|
|
@@ -4,7 +4,7 @@ import com.its.common.cluster.codec.ClusterMessageDecoder;
|
|
|
import com.its.common.cluster.codec.ClusterMessageEncoder;
|
|
|
import com.its.common.cluster.handler.ClusterMasterHandler;
|
|
|
import com.its.common.cluster.utils.ClusterUtils;
|
|
|
-import com.its.common.cluster.vo.AbstractClusterConfig;
|
|
|
+import com.its.common.cluster.config.AbstractClusterConfig;
|
|
|
import com.its.common.cluster.vo.ClusterNode;
|
|
|
import io.netty.channel.Channel;
|
|
|
import io.netty.channel.ChannelInitializer;
|
|
|
@@ -28,51 +28,49 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
|
|
|
|
|
|
@Override
|
|
|
protected void initChannel(Channel channel) throws Exception {
|
|
|
-// InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
|
|
-// String clientIP = remoteAddress.getAddress().getHostAddress();
|
|
|
-// int clientPort = remoteAddress.getPort();
|
|
|
|
|
|
String ipAddress = ClusterUtils.getRemoteIpAddress(channel);
|
|
|
int clientPort = ClusterUtils.getRemotePort(channel);
|
|
|
int clusterId = clientPort - this.clusterConfig.getPort();
|
|
|
|
|
|
- if (this.clusterConfig.isLogging()) {
|
|
|
- log.info("ClusterMasterInitializer.initChannel: connected from: {}:{}, clusterId: {}.", ipAddress, clientPort, clusterId);
|
|
|
- }
|
|
|
+ log.info("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}.{}], connected.", clusterId, ipAddress, clientPort);
|
|
|
|
|
|
// 하나의 서버에 여러 개의 클러스터가 있을 수 있기 때문에 IP Address 로 찾는 것은 위험함.
|
|
|
- // HaClusterConfig.HaCluster cluster = this.clusterConfig.get(ipAddress);
|
|
|
- ClusterNode cluster = this.clusterConfig.getClusterMap().get(clusterId);
|
|
|
- if (cluster == null) {
|
|
|
- log.error("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}], Unknown Server Id. will be closed.", clusterId, ipAddress);
|
|
|
+ // 클러스터가 분산된 경우에는 이런 처리가 필요하지 않음.(IP Address 만으로도 해당 클러스터 서버를 찾을 수 있음.
|
|
|
+ // checking cluster id, find from clusterMap
|
|
|
+ ClusterNode clusterNode = this.clusterConfig.getClusterMap().get(clusterId);
|
|
|
+ if (clusterNode == null) {
|
|
|
+ log.error("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}.{}], Unknown Server Id. will be closed.", clusterId, ipAddress, clientPort);
|
|
|
channel.disconnect();
|
|
|
channel.close();
|
|
|
return;
|
|
|
}
|
|
|
- if (!cluster.getIp().equals(ipAddress)) {
|
|
|
- log.error("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}], Unknown IP Address. will be closed.", clusterId, ipAddress);
|
|
|
+
|
|
|
+ // checking ip address
|
|
|
+ if (!clusterNode.getIp().equals(ipAddress)) {
|
|
|
+ log.error("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}.{}], Unknown IP Address. will be closed.", clusterId, ipAddress, clientPort);
|
|
|
channel.disconnect();
|
|
|
channel.close();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
if (this.clusterConfig.isLogging()) {
|
|
|
- MDC.put("id", cluster.getLogKey());
|
|
|
- log.info("ClusterMasterInitializer.initChannel: [{}, {}].", cluster.getLogKey(), cluster.getIp());
|
|
|
+ MDC.put("id", clusterNode.getLogKey());
|
|
|
+ log.info("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}.{}], completed.", clusterId, ipAddress, clientPort);
|
|
|
}
|
|
|
|
|
|
- if (cluster.getElectionState().getChannel() != null) {
|
|
|
- log.warn("ClusterMasterInitializer.initChannel: {}, {}, Already Connected. Old Connection will be closed.", ipAddress, cluster.getId());
|
|
|
+ if (clusterNode.getElectionState().getChannel() != null) {
|
|
|
+ log.warn("ClusterMasterInitializer.initChannel: [clusterId: {}, IP Address: {}.{}], Already Connected. Old Connection will be closed.", clusterId, ipAddress, clientPort);
|
|
|
// 이벤트 핸들러 에서 중복 처리 되지 않도록 속성 값을 제거
|
|
|
channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
|
|
|
- cluster.getElectionState().disConnect();
|
|
|
+ clusterNode.getElectionState().disConnect();
|
|
|
|
|
|
channel.disconnect();
|
|
|
channel.close();
|
|
|
}
|
|
|
|
|
|
- cluster.getElectionState().connect(channel);
|
|
|
- channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(cluster);
|
|
|
+ clusterNode.getElectionState().connect(channel);
|
|
|
+ channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(clusterNode);
|
|
|
|
|
|
ChannelPipeline pipeline = channel.pipeline();
|
|
|
|
|
|
@@ -86,7 +84,7 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
|
|
|
pipeline.addLast(new ClusterMasterHandler(this.masterService, this.clusterConfig));
|
|
|
|
|
|
if (this.clusterConfig.isLogging()) {
|
|
|
- MDC.remove(cluster.getLogKey());
|
|
|
+ MDC.remove(clusterNode.getLogKey());
|
|
|
MDC.clear();
|
|
|
}
|
|
|
}
|