shjung пре 3 недеља
родитељ
комит
5d2d20800a

+ 1 - 1
its-cluster/src/main/java/com/its/common/ClusterMain.java → its-cluster/src/main/java/com/its/common/cluster/ClusterMain.java

@@ -1,4 +1,4 @@
-package com.its.common;
+package com.its.common.cluster;
 
 public class ClusterMain {
     public static void main(String[] args) {

+ 7 - 5
its-cluster/src/main/java/com/its/common/cluster/codec/ClusterMessageDecoder.java

@@ -1,5 +1,6 @@
 package com.its.common.cluster.codec;
 
+import com.its.common.cluster.config.AbstractClusterConfig;
 import com.its.common.cluster.vo.ClusterMessage;
 import com.its.common.cluster.vo.ClusterNode;
 import io.netty.buffer.ByteBuf;
@@ -15,19 +16,20 @@ import java.util.List;
 public class ClusterMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
 
     private final ClusterNode clusterNode;
-    private final boolean isLogging;
+    private final AbstractClusterConfig clusterConfig;
 
-    public ClusterMessageDecoder(ClusterNode clusterNode, boolean isLogging) {
+    public ClusterMessageDecoder(ClusterNode clusterNode, AbstractClusterConfig clusterConfig) {
         this.clusterNode = clusterNode;
-        this.isLogging = isLogging;
+        this.clusterConfig = clusterConfig;
     }
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         // 패킷 길이(4바이트)를 따로 저장
         int packetLength = in.readInt();
-        if (this.isLogging) {
-            log.info("ClusterNodeId: {}, ClusterMessageDecoder.decode: RECV {} Bytes.", this.clusterNode.getId(), packetLength + 4);
+        if (this.clusterConfig.isLogging()) {
+            log.info("ClusterNodeId: {}, ClusterMessageDecoder.decode: fromClusterNodeId: {}, RECV {} Bytes.",
+                    this.clusterConfig.getId(), this.clusterNode.getId(), packetLength + 4);
         }
 
         // 남은 데이터 부분 읽기

+ 7 - 5
its-cluster/src/main/java/com/its/common/cluster/codec/ClusterMessageEncoder.java

@@ -1,5 +1,6 @@
 package com.its.common.cluster.codec;
 
+import com.its.common.cluster.config.AbstractClusterConfig;
 import com.its.common.cluster.vo.ClusterMessage;
 import com.its.common.cluster.vo.ClusterNode;
 import io.netty.buffer.ByteBuf;
@@ -14,11 +15,11 @@ import java.io.ObjectOutputStream;
 public class ClusterMessageEncoder extends MessageToByteEncoder<ClusterMessage> {
 
     private final ClusterNode clusterNode;
-    private final boolean isLogging;
+    private final AbstractClusterConfig clusterConfig;
 
-    public ClusterMessageEncoder(ClusterNode clusterNode, boolean isLogging) {
+    public ClusterMessageEncoder(ClusterNode clusterNode, AbstractClusterConfig clusterConfig) {
         this.clusterNode = clusterNode;
-        this.isLogging = isLogging;
+        this.clusterConfig = clusterConfig;
     }
 
     @Override
@@ -30,8 +31,9 @@ public class ClusterMessageEncoder extends MessageToByteEncoder<ClusterMessage>
 
         byte[] bytes = bos.toByteArray();
 
-        if (this.isLogging) {
-            log.info("ClusterNodeId: {}, ClusterMessageEncoder.encode: SEND {} Bytes.", this.clusterNode.getId(), bytes.length + 4);
+        if (this.clusterConfig.isLogging()) {
+            log.info("ClusterNodeId: {}, ClusterMessageEncoder.encode: toClusterNodeId: {}, SEND {} Bytes.",
+                    this.clusterConfig.getId(), this.clusterNode.getId(), bytes.length + 4);
         }
 
         out.writeInt(bytes.length); // 길이 필드 추가

+ 3 - 0
its-cluster/src/main/java/com/its/common/cluster/config/AbstractClusterConfig.java

@@ -20,6 +20,7 @@ public abstract class AbstractClusterConfig {
     // 어플리케이션 클러스터 정보
     private boolean master = true;
     private int id = -1;        // 서버 ID (1부터 시작, 0은 사용하지 않음)
+    private int masterId = -1;
     private int syncSeconds = 5;           // 데이터 동기화 주기 (초 단위, 기본 5초, 최소 2초, 최대 60초)
     private String ip = "127.0.0.1";          // 클러스터 서버의 IP 주소
     private int port = 13888;      // 데이터 동기화를 위한 포트
@@ -60,6 +61,7 @@ public abstract class AbstractClusterConfig {
         // 클러스터가 하나라 단일 클러스터 정보로 설정
         this.master = true;
         this.id = 1;
+        this.masterId = 1;
         this.syncSeconds = 5;
         this.ip = "127.0.0.1";
         this.port = 13888;
@@ -152,6 +154,7 @@ public abstract class AbstractClusterConfig {
         ClusterNode masterNode = this.clusterMap.get(masterId);
         masterNode.setMaster(true);
         this.master = (this.id == masterId);
+        this.masterId = masterId;
 
         // 나 자신의 네트워크 상태정보를 항상 CONNECT로 지정
         ClusterNode localNode = this.clusterMap.get(this.id);

+ 27 - 20
its-cluster/src/main/java/com/its/common/cluster/handler/ClusterMasterHandler.java

@@ -27,18 +27,16 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
             ClusterMessage clusterMsg = (ClusterMessage) msg;
             ClusterNode cluster = ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-                log.error("ClusterNodeId: {}, ClusterMasterHandler.channelRead: [{}], [FROM: clusterId: {}, master: {}, serverTime: {}], Not Found Channel Cluster Object. Will be closed.",
-                        this.clusterNode.getId(), ClusterUtils.getTcpAddress(ctx.channel()),
-                        clusterMsg.getClusterId(), clusterMsg.isMaster(), clusterMsg.getCurrentTimeMillis());
+                log.error("ClusterNodeId: {}, ClusterMasterHandler.channelRead: [FROM: clusterId: {}, master: {}, serverTime: {}], Not Found Channel Cluster Object. Will be closed.",
+                        this.clusterConfig.getId(), clusterMsg.getClusterId(), clusterMsg.isMaster(), clusterMsg.getCurrentTimeMillis());
 
-                closeChannel(this.clusterNode, ctx.channel());
+                closeChannel(this.clusterNode, null, ctx.channel());
                 return;
             }
 
             if (this.clusterConfig.isLogging()) {
-                log.info("ClusterNodeId: {}, ClusterMasterHandler.channelRead: [{}], [FROM: clusterId: {}, master: {}, serverTime: {}]",
-                        this.clusterNode.getId(), ClusterUtils.getTcpAddress(ctx.channel()),
-                        clusterMsg.getClusterId(), clusterMsg.isMaster(), clusterMsg.getCurrentTimeMillis());
+                log.info("ClusterNodeId: {}, ClusterMasterHandler.channelRead: fromClusterId: {}, master: {}, serverTime: {}]",
+                        this.clusterConfig.getId(), clusterMsg.getClusterId(), clusterMsg.isMaster(), clusterMsg.getCurrentTimeMillis());
             }
 
             cluster.getElectionState().setLastRecvTime();
@@ -54,11 +52,13 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         ClusterNode cluster = ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
         if (cluster == null) {
-            log.error("ClusterNodeId: {}, ClusterMasterHandler.channelInactive: Unknown Cluster: {}.", this.clusterNode.getId(), ClusterUtils.getAddress(ctx.channel()));
+            log.error("ClusterNodeId: {}, ClusterMasterHandler.channelInactive: Unknown Cluster: [{}].",
+                    this.clusterConfig.getId(), ClusterUtils.getAddress(ctx.channel()));
             return;
         }
 
-        log.info("ClusterNodeId: {}, ClusterMasterHandler.channelInactive: fromClusterId: {}, [{}].", this.clusterNode.getId(), cluster.getId(), cluster.getIp());
+        log.warn("ClusterNodeId: {}, ClusterMasterHandler.channelInactive: fromClusterId: {}, [{}].",
+                this.clusterConfig.getId(), cluster.getId(), cluster.getIp());
 
         cluster.getElectionState().disConnect();
 
@@ -66,7 +66,7 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
         ctx.fireChannelInactive();
 
         // 클러스터가 연결 종료되었음을 추상함수를 호출해서 알리도록함.
-        this.masterService.onNotifyClusterNetworkState(cluster, true);
+        this.masterService.onNotifyClusterNetworkState(cluster, false);
     }
 
     @Override
@@ -74,31 +74,38 @@ public class ClusterMasterHandler extends ChannelInboundHandlerAdapter {
         if (e instanceof IdleStateEvent) {
             ClusterNode cluster = ctx.channel().attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).get();
             if (cluster == null) {
-                log.error("ClusterNodeId: {}, ClusterMasterHandler.userEventTriggered: Unknown Cluster: {}.", this.clusterNode.getId(), ClusterUtils.getAddress(ctx.channel()));
+                log.error("ClusterNodeId: {}, ClusterMasterHandler.userEventTriggered: Unknown Cluster: [{}].",
+                        this.clusterConfig.getId(), ClusterUtils.getAddress(ctx.channel()));
                 return;
             }
 
             IdleStateEvent evt = (IdleStateEvent) e;
-
-            if (this.clusterConfig.isLogging()) {
-                log.info("ClusterNodeId: {}, ClusterMasterHandler.userEventTriggered: {}. {}", this.clusterNode.getId(), ClusterUtils.getAddress(ctx.channel()), evt);
-            }
-
             if (evt.state() == IdleState.READER_IDLE) {
+                log.info("ClusterNodeId: {}, ClusterMasterHandler.userEventTriggered: fromClusterNodeId: {}.",
+                        this.clusterConfig.getId(), cluster.getId());
+
                 long recvTimeout = System.currentTimeMillis() - cluster.getElectionState().getLastRecvTime();
                 long heartbeatTimeout = this.clusterConfig.getSyncSeconds() * 1000L * 3;
                 if (recvTimeout > heartbeatTimeout) {
-                    log.warn("ClusterNodeId: {}, ClusterMasterHandler.userEventTriggered: {}. [{}, {}]. Heartbeat timeout, {}, {} ms. Will be closed.",
-                            this.clusterNode.getId(), ClusterUtils.getAddress(ctx.channel()), cluster.getLogKey(), cluster.getIp(), recvTimeout, heartbeatTimeout);
+                    log.warn("ClusterNodeId: {}, ClusterMasterHandler.userEventTriggered: fromClusterNodeId: {} [{}], Heartbeat timeout, {}/{} ms. Will be closed.",
+                            this.clusterConfig.getId(), cluster.getId(), cluster.getIp(), recvTimeout, heartbeatTimeout);
 
-                    closeChannel(this.clusterNode, ctx.channel());
+                    closeChannel(this.clusterNode, cluster, ctx.channel());
                 }
             }
         }
         ctx.fireUserEventTriggered(e);
     }
 
-    public static void closeChannel(ClusterNode clusterNode, Channel channel) {
+    public static void closeChannel(ClusterNode clusterNode, ClusterNode fromClusterNode, Channel channel) {
+        if (fromClusterNode == null) {
+            log.warn("ClusterNodeId: {}, ClusterMasterHandler.closeChannel: Unknown Cluster: [{}].",
+                    clusterNode.getId(), ClusterUtils.getAddress(channel));
+        }
+        else {
+            log.warn("ClusterNodeId: {}, ClusterMasterHandler.closeChannel: fromClusterNodeId: {} [{}].",
+                    clusterNode.getId(), fromClusterNode.getId(), fromClusterNode.getIp());
+        }
         try {
             if (channel != null) {
                 channel.flush();

+ 40 - 17
its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterMasterService.java

@@ -115,10 +115,14 @@ public abstract class AbstractClusterMasterService {
     public abstract void onClusterChannelInactive(ClusterNode clusterNode);
 
     public void onNotifyClusterNetworkState(ClusterNode clusterNode, boolean isActive) {
+        log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, isNetActive; {}",
+                this.clusterConfig.getId(), clusterNode.getId(), isActive);
 
         // slave의 네트워크 상태를 확인하고 slave 네트워크의 연결을 종료시킨다.
         if (!isActive && clusterNode.getSyncState().getState() != ClusterNET.CLOSED) {
             Channel channel = clusterNode.getSyncState().getChannel();
+            log.info("ClusterNodeId: {}, ClusterMasterService.onNotifyClusterNetworkState: fromClusterNodeId: {}, Try SlaveNetwork Close. channel: {}",
+                    this.clusterConfig.getId(), clusterNode.getId(), channel);
             if (channel != null) {
                 channel.flush();
                 channel.disconnect();
@@ -139,33 +143,49 @@ public abstract class AbstractClusterMasterService {
     }
 
     private synchronized void electionMasterCluster() {
-        int masterId = Integer.MAX_VALUE;
+        boolean isMaster = electionMaster();
+        boolean isChanged = this.clusterConfig.isMaster() != isMaster;
+        this.clusterConfig.setMaster(isMaster);
+
+        if (isChanged) {
+            log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Changed Master: {}, masterClusterNodeId: {}",
+                    this.clusterConfig.getId(), this.clusterConfig.isMaster(), this.clusterConfig.getMasterId());
+        }
+        if (this.clusterConfig.isLogging()) {
+            log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: Master: {}.",
+                    this.clusterConfig.getId(), this.clusterConfig.isMaster());
+        }
+
+        election(this.clusterConfig.getId(), this.clusterConfig.isMaster(), isChanged);
+    }
+
+    private boolean electionMaster() {
+        int minClusterNodeId = Integer.MAX_VALUE;
         for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
             ClusterNode cluster = entry.getValue();
             // 마스터에 연결된 슬래이브 클러스터의 네트워크 상태를 기준으로 평가함.
-            if (cluster.getElectionState().getState() != ClusterNET.CLOSED) {
-                if (cluster.getId() < masterId) {
-                    masterId = cluster.getId();
+            if (cluster.getSyncState().getState() != ClusterNET.CLOSED) {
+                if (cluster.getId() < minClusterNodeId) {
+                    minClusterNodeId = cluster.getId();
                 }
             }
         }
 
-        boolean isMaster = false;
-        if (masterId == Integer.MAX_VALUE || masterId >= this.clusterConfig.getId()) {
-            // 위의 클러스터의 나 자신의 네트워크 상태는 CLOSED로 되어 있기 때문에 나자신보다 현재 마스터 ID가
-            // 현재 masterId 가 나 자신의 clusterId보다 같거나 크면 나 자신이 master가 된 것임
-            isMaster = true;
+        if (minClusterNodeId == Integer.MAX_VALUE) {
+            minClusterNodeId = this.clusterConfig.getId();
         }
 
-        boolean isChanged = this.clusterConfig.isMaster() != isMaster;
-        this.clusterConfig.setMaster(isMaster);
-
-        if (this.clusterConfig.isLogging()) {
-            log.info("ClusterNodeId: {}, ClusterMasterService:electionMasterSchedule: clusterId: {}, Master: {}.",
-                    this.clusterConfig.getId(), this.clusterConfig.getId(), this.clusterConfig.isMaster());
+        // 위의 클러스터의 나 자신의 네트워크 상태는 CLOSED로 되어 있기 때문에 나자신보다 현재 마스터 ID가
+        // 현재 masterId 가 나 자신의 clusterId보다 같거나 크면 나 자신이 master가 된 것임
+        if (this.clusterConfig.getMasterId() != minClusterNodeId) {
+            // master id가 변경되었으므로 클러스터 노드 정보도 업데이트 한다
+            this.clusterConfig.setMasterId(minClusterNodeId);
+            for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
+                ClusterNode cluster = entry.getValue();
+                entry.getValue().setMaster(cluster.getId() == minClusterNodeId);
+            }
         }
-
-        election(this.clusterConfig.getId(), this.clusterConfig.isMaster(), isChanged);
+        return (minClusterNodeId >= this.clusterConfig.getId());
     }
 
     private void electionMasterSchedule() {
@@ -176,6 +196,7 @@ public abstract class AbstractClusterMasterService {
     }
 
     public void shutdown() {
+        log.info("ClusterNodeId: {}, ClusterMasterService.shutdown", this.clusterConfig.getId());
         if (this.taskFuture != null) {
             this.taskFuture.cancel(true);
         }
@@ -189,6 +210,7 @@ public abstract class AbstractClusterMasterService {
         catch (Exception e) {
             log.error("ClusterNodeId: {}, ClusterMasterService.acceptGroup.shutdownGracefully", this.clusterConfig.getId());
         }
+
         try {
             if (this.workerGroup != null) {
                 this.workerGroup.shutdownGracefully();
@@ -197,6 +219,7 @@ public abstract class AbstractClusterMasterService {
         catch (Exception e) {
             log.error("ClusterNodeId: {}, ClusterMasterService.workerGroup.shutdownGracefully", this.clusterConfig.getId());
         }
+
         try {
             if (this.channelFuture != null && this.channelFuture.channel() != null) {
                 this.channelFuture.channel().closeFuture();

+ 9 - 11
its-cluster/src/main/java/com/its/common/cluster/service/AbstractClusterSlaveService.java

@@ -1,7 +1,6 @@
 package com.its.common.cluster.service;
 
 import com.its.common.cluster.config.AbstractClusterConfig;
-import com.its.common.cluster.utils.ClusterUtils;
 import com.its.common.cluster.vo.ClusterMessage;
 import com.its.common.cluster.vo.ClusterMessageData;
 import com.its.common.cluster.vo.ClusterNET;
@@ -52,12 +51,12 @@ public abstract class AbstractClusterSlaveService {
 
         // 모든 HaCluster 접속
         for (Map.Entry<Integer, ClusterNode> entry : this.clusterConfig.getClusterMap().entrySet()) {
-            ClusterNode cluster = entry.getValue();
-            if (cluster.getId() == this.clusterConfig.getId()) {
+            ClusterNode clusterNode = entry.getValue();
+            if (clusterNode.getId() == this.clusterConfig.getId()) {
                 // 자기 자신은 접속하지 않도록 한다. 즉 타 클러스터 서버에 접속한다.
                 continue;
             }
-            ClusterSlave slaveClient = new ClusterSlave(this, this.clusterConfig, cluster, this.bootstrapFactory);
+            ClusterSlave slaveClient = new ClusterSlave(this, this.clusterConfig, clusterNode, this.bootstrapFactory);
             this.clientTasks.add(slaveClient);
         }
 
@@ -185,16 +184,14 @@ public abstract class AbstractClusterSlaveService {
                     cluster.getSyncState().setLastSendTime();
 
                     if (this.clusterConfig.isLogging()) {
-                        log.info("ClusterNodeId: {}, ClusterSlaveService.sendSyncData: [{}], [--TO: clusterId: {}, (clusterId: {}, serverTime: {})]",
-                                this.clusterConfig.getId(), ClusterUtils.getTcpAddress(channel),
-                                cluster.getId(), clusterMsg.getClusterId(), clusterMsg.getCurrentTimeMillis());
+                        log.info("ClusterNodeId: {}, ClusterSlaveService.sendSyncData: toClusterId: {}, (clusterId: {}, serverTime: {})",
+                                this.clusterConfig.getId(), cluster.getId(), clusterMsg.getClusterId(), clusterMsg.getCurrentTimeMillis());
                     }
                 }
             }
             catch (Exception e) {
-                log.error("ClusterNodeId: {}, ClusterSlaveService.sendSyncData: [{}], Failed: [--TO: clusterId: {}, (clusterId: {}, serverTime: {})], {}",
-                        this.clusterConfig.getId(), ClusterUtils.getTcpAddress(channel),
-                        cluster.getId(), clusterMsg.getClusterId(), clusterMsg.getCurrentTimeMillis(), e.getMessage());
+                log.error("ClusterNodeId: {}, ClusterSlaveService.sendSyncData: toClusterId: {}, (clusterId: {}, serverTime: {}), Failed, {}",
+                        this.clusterConfig.getId(), cluster.getId(), clusterMsg.getClusterId(), clusterMsg.getCurrentTimeMillis(), e.getMessage());
             }
         }
     }
@@ -227,7 +224,8 @@ public abstract class AbstractClusterSlaveService {
     }
 
     public void onNotifyClusterNetworkState(ClusterNode clusterNode, boolean isActive) {
-
+        log.info("ClusterNodeId: {}, ClusterSlaveService.onNotifyClusterNetworkState: fromClusterNodeId: {}, isNetActive; {}",
+                this.clusterConfig.getId(), clusterNode.getId(), isActive);
         if (isActive) {
             // 네트워크가 연결되었으면 클러스터 메시지를 전송
             ClusterMessage clusterMsg = getClusterMessage();

+ 5 - 5
its-cluster/src/main/java/com/its/common/cluster/service/ClusterMasterInitializer.java

@@ -75,14 +75,14 @@ public class ClusterMasterInitializer extends ChannelInitializer<Channel> {
         if (this.clusterConfig.isPacketLogging()) {
             pipeline.addLast(new LoggingHandler(LogLevel.INFO));
         }
-        pipeline.addLast(new IdleStateHandler(this.clusterConfig.getSyncSeconds(), 0, 0, TimeUnit.SECONDS));
-        pipeline.addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
-        pipeline.addLast(new ClusterMessageDecoder(clusterNode, this.clusterConfig.isLogging()));
-        pipeline.addLast(new ClusterMessageEncoder(clusterNode, this.clusterConfig.isLogging()));
+        pipeline.addLast(new IdleStateHandler(this.clusterConfig.getSyncSeconds()+2, 0, 0, TimeUnit.SECONDS));
+        pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
+        pipeline.addLast(new ClusterMessageDecoder(clusterNode, this.clusterConfig));
+        pipeline.addLast(new ClusterMessageEncoder(clusterNode, this.clusterConfig));
         pipeline.addLast(new ClusterMasterHandler(clusterNode, this.masterService, this.clusterConfig));
 
         // 클러스터가 연결되었음을 추상함수를 호출하여 알림
-        this.masterService.onNotifyClusterNetworkState(clusterNode, false);
+        this.masterService.onNotifyClusterNetworkState(clusterNode, true);
     }
 
 }

+ 3 - 3
its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlave.java

@@ -102,9 +102,9 @@ public class ClusterSlave implements Callable<Object> {
                     ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                 }
                 ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
-                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(8192, 0, 4));
-                ch.pipeline().addLast(new ClusterMessageDecoder(clusterNode, clusterConfig.isLogging()));
-                ch.pipeline().addLast(new ClusterMessageEncoder(clusterNode, clusterConfig.isLogging()));
+                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
+                ch.pipeline().addLast(new ClusterMessageDecoder(clusterNode, clusterConfig));
+                ch.pipeline().addLast(new ClusterMessageEncoder(clusterNode, clusterConfig));
                 ch.pipeline().addLast(new ClusterSlaveHandler(clusterNode, slaveService));
             }
         });

+ 2 - 2
its-cluster/src/main/java/com/its/common/cluster/service/ClusterSlaveBootstrapFactory.java

@@ -30,8 +30,8 @@ public class ClusterSlaveBootstrapFactory {
         }
         bootstrap.option(ChannelOption.AUTO_READ, true);
         bootstrap.option(ChannelOption.TCP_NODELAY, true);
-        bootstrap.option(ChannelOption.SO_RCVBUF, 8192);
-        bootstrap.option(ChannelOption.SO_SNDBUF, 8192);
+        bootstrap.option(ChannelOption.SO_RCVBUF, 65536);
+        bootstrap.option(ChannelOption.SO_SNDBUF, 65536);
         bootstrap.option(ChannelOption.SO_KEEPALIVE, false);
         //bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048));
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout * 1000);

+ 2 - 6
its-cluster/src/main/java/com/its/common/cluster/vo/ClusterNetState.java

@@ -1,12 +1,10 @@
 package com.its.common.cluster.vo;
 
+import com.its.common.cluster.utils.ClusterUtils;
 import io.netty.channel.Channel;
 import lombok.Data;
 
 import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
 import java.util.Calendar;
 import java.util.Date;
 
@@ -84,9 +82,7 @@ public class ClusterNetState {
     }
 
     private String timeToString(long timestamp) {
-        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
-                .withZone(ZoneId.systemDefault());
-        return formatter.format(Instant.ofEpochMilli(timestamp));
+        return ClusterUtils.timeToStringYmd(timestamp);
 
     }
     public String getLastRecvTimeString() {