|
@@ -1,6 +1,7 @@
|
|
|
-package com.its.common.cluster.master;
|
|
|
+package com.its.common.cluster.handler;
|
|
|
|
|
|
-import com.its.common.cluster.utils.HaClusterMessage;
|
|
|
+import com.its.common.cluster.service.AbstractHaClusterMasterService;
|
|
|
+import com.its.common.cluster.vo.HaClusterMessage;
|
|
|
import com.its.common.cluster.utils.HaUtils;
|
|
|
import com.its.common.cluster.vo.AbstractHaClusterConfig;
|
|
|
import com.its.common.cluster.vo.HaInfo;
|
|
@@ -24,20 +25,29 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
|
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
|
if (msg instanceof HaClusterMessage) {
|
|
|
HaClusterMessage clusterMsg = (HaClusterMessage) msg;
|
|
|
- log.info("ClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, serverTime: {}, infos: {}]",
|
|
|
- this.clusterConfig.getServerId(), HaUtils.getTcpAddress(ctx.channel()),
|
|
|
- clusterMsg.getServerId(), clusterMsg.getServerTime(), clusterMsg.getInfos().size());
|
|
|
-
|
|
|
HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
|
|
|
if (cluster == null) {
|
|
|
- log.error("RECV: [{}]. Not Found Channel Cluster Object... Oops Will be closed.", HaUtils.getAddress(ctx.channel()));
|
|
|
+ log.error("HaClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, master: {}, serverTime: {}], Not Found Channel Cluster Object. Will be closed.",
|
|
|
+ this.clusterConfig.getServerId(), HaUtils.getTcpAddress(ctx.channel()),
|
|
|
+ clusterMsg.getServerId(), clusterMsg.isMaster(), clusterMsg.getServerTime());
|
|
|
+
|
|
|
closeChannel(ctx.channel());
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ if (this.clusterConfig.isLogging()) {
|
|
|
+ MDC.put("id", cluster.getLogKey());
|
|
|
+
|
|
|
+ log.info("HaClusterMasterHandler.channelRead: [{}], {}, [FROM: serverId: {}, master: {}, serverTime: {}]",
|
|
|
+ this.clusterConfig.getServerId(), HaUtils.getTcpAddress(ctx.channel()),
|
|
|
+ clusterMsg.getServerId(), clusterMsg.isMaster(), clusterMsg.getServerTime());
|
|
|
+
|
|
|
+ MDC.remove(cluster.getLogKey());
|
|
|
+ MDC.clear();
|
|
|
+ }
|
|
|
+
|
|
|
cluster.getElectionState().setLastRecvTime();
|
|
|
this.masterService.onClusterMessage(clusterMsg);
|
|
|
-// ctx.writeAndFlush(clusterMsg);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -45,21 +55,23 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
|
|
|
if (cluster == null) {
|
|
|
- log.error("{}.++channelInactive: Unknown Cluster: {}.", this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()));
|
|
|
+ log.error("HaClusterMasterHandler.channelInactive: Unknown Cluster: {}.", HaUtils.getAddress(ctx.channel()));
|
|
|
return;
|
|
|
}
|
|
|
- try {
|
|
|
+
|
|
|
+ if (this.clusterConfig.isLogging()) {
|
|
|
MDC.put("id", cluster.getLogKey());
|
|
|
- log.info("{}.++channelInactive: [{}, {}].", this.getClass().getSimpleName(), cluster.getServerId(), cluster.getIpAddress());
|
|
|
- cluster.getElectionState().disConnect();
|
|
|
|
|
|
- ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
|
|
|
- ctx.fireChannelInactive();
|
|
|
- }
|
|
|
- finally {
|
|
|
+ log.info("HaClusterMasterHandler.channelInactive: [{}, {}].", cluster.getServerId(), cluster.getIpAddress());
|
|
|
+
|
|
|
MDC.remove(cluster.getLogKey());
|
|
|
MDC.clear();
|
|
|
}
|
|
|
+
|
|
|
+ cluster.getElectionState().disConnect();
|
|
|
+
|
|
|
+ ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
|
|
|
+ ctx.fireChannelInactive();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -67,27 +79,29 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
|
|
|
if (e instanceof IdleStateEvent) {
|
|
|
HaInfo cluster = ctx.channel().attr(AbstractHaClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
|
|
|
if (cluster == null) {
|
|
|
- log.error("{}.userEventTriggered: Unknown Cluster: {}.", this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()));
|
|
|
+ log.error("HaClusterMasterHandler.userEventTriggered: Unknown Cluster: {}.", HaUtils.getAddress(ctx.channel()));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
IdleStateEvent evt = (IdleStateEvent) e;
|
|
|
|
|
|
- MDC.put("id", cluster.getLogKey());
|
|
|
- log.info("{}.++userEventTriggered: {}. {}", this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()), evt);
|
|
|
+ if (this.clusterConfig.isLogging()) {
|
|
|
+ MDC.put("id", cluster.getLogKey());
|
|
|
+ log.info("HaClusterMasterHandler.userEventTriggered: {}. {}", HaUtils.getAddress(ctx.channel()), evt);
|
|
|
+ MDC.remove(cluster.getLogKey());
|
|
|
+ MDC.clear();
|
|
|
+ }
|
|
|
|
|
|
if (evt.state() == IdleState.READER_IDLE) {
|
|
|
long recvTimeout = System.currentTimeMillis() - cluster.getElectionState().getLastRecvTime();
|
|
|
long heartbeatTimeout = this.clusterConfig.getSyncSeconds() * 1000L * 3;
|
|
|
if (recvTimeout > heartbeatTimeout) {
|
|
|
- log.info("{}.++userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
|
|
|
- this.getClass().getSimpleName(), HaUtils.getAddress(ctx.channel()),
|
|
|
- cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, heartbeatTimeout);
|
|
|
+ log.warn("HaClusterMasterHandler.userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
|
|
|
+ HaUtils.getAddress(ctx.channel()), cluster.getLogKey(), cluster.getIpAddress(), recvTimeout, heartbeatTimeout);
|
|
|
+
|
|
|
closeChannel(ctx.channel());
|
|
|
}
|
|
|
}
|
|
|
- MDC.remove(cluster.getLogKey());
|
|
|
- MDC.clear();
|
|
|
}
|
|
|
ctx.fireUserEventTriggered(e);
|
|
|
}
|
|
@@ -101,7 +115,7 @@ public class HaClusterMasterHandler extends ChannelInboundHandlerAdapter {
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e) {
|
|
|
- log.error("ApplicationRepository.closeChannel Exception: {}", e.getMessage());
|
|
|
+ log.error("HaClusterMasterHandler.closeChannel: Exception: {}", e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|