Parcourir la source

cluster logic update

shjung il y a 2 semaines
Parent
commit
95f12bef50

+ 20 - 0
conf/rota-utic-client-dev.yml

@@ -0,0 +1,20 @@
+server:
+  port: 9872
+
+application:
+  process-id: 70060
+
+logging:
+  file:
+    path: ${user.home}/logs/rota-utic-client/
+
+cluster:
+  id: 1
+  #auto-failback-enabled: true
+  nodes:
+    - id: 1               # must be 1,2,3,...
+      ip: 192.168.10.91
+      port: 13888
+    - id: 2
+      ip: 192.168.10.92
+      port: 13888

+ 1 - 0
moct-utic-server/src/main/java/com/utic/its/moct/utic/server/scheduler/ApplicationScheduler.java

@@ -47,6 +47,7 @@ public class ApplicationScheduler {
                     connCount++;
                 }
             }
+            ApplicationRepository.processState.setSndCnt(this.moctUticServerService.getSendTrafficCount());
             ApplicationRepository.processState.setEtcCnt((int)connCount);
             this.processStateService.processRunning();
         } catch(Exception e) {

+ 6 - 4
moct-utic-server/src/main/java/com/utic/its/moct/utic/server/service/MoctUticServerService.java

@@ -30,9 +30,11 @@ public class MoctUticServerService {
     private final MoctUticServerExecutor executor;
     private final MoctUticServerRepository repo;
     private boolean isRunning = false;
+    private int sendTrafficCount;
 
     @PostConstruct
     private void init() {
+        this.sendTrafficCount = 0;
         log.info("ItsMoctServerService.init: start.");
         log.info("ItsMoctServerService.init: ..end.");
     }
@@ -95,8 +97,8 @@ public class MoctUticServerService {
                 return ++result;
             }
 
-            final int trafficCount = traffics.size();
-            log.info("[TRAF] {}: [{}], {} EA.", LogUtils.elapsedLog("NewTraffic"), currTrafficTime, trafficCount);
+            this.sendTrafficCount = traffics.size();
+            log.info("[TRAF] {}: [{}], {} EA.", LogUtils.elapsedLog("NewTraffic"), currTrafficTime, this.sendTrafficCount);
 
             // 메모리에 현재 교통정보 시각과 교통정보 목록을 저장한다.
             ApplicationRepository.updateTrafficData(currTrafficTime, traffics);
@@ -106,7 +108,7 @@ public class MoctUticServerService {
                 this.executor.executeSendTraffic(center);
             });
 
-            ApplicationRepository.processState.setSndCnt(trafficCount);
+            ApplicationRepository.processState.setSndCnt(this.sendTrafficCount);
 
             // CENTER_SEND_INFO 로깅
             TbSndLog log = TbSndLog.builder()
@@ -114,7 +116,7 @@ public class MoctUticServerService {
                     .infoKind(DbmsData.D_INFOTYPE_MOCT)
                     .fromCenterId(ApplicationRepository.CENTER.getCenterId())
                     .toCenterId(ApplicationRepository.CENTER.getCenterId())
-                    .dataCnt(trafficCount)
+                    .dataCnt(this.sendTrafficCount)
                     .clusterId(this.clusterConfig.getId())
                     .build();
             this.dbmsDataProcess.add(new DbmsData(0, ApplicationRepository.CENTER, DbmsData.DBMS_CENTER_SEND_INFO, "", log, 0));

+ 58 - 20
rota-utic-client/src/main/java/com/utic/its/rota/utic/client/cluster/ClusterMasterService.java

@@ -36,26 +36,32 @@ public class ClusterMasterService extends AbstractClusterMasterService {
      * 센터를 운영할 클러스터 정보를 재 분배한다.
      */
     private void rebalanceRunClusterId() {
+
+        // Master 여부에 따라 할당 클러스터 노드를 확인한다.
+        // ActiveClusterId 에 대해서는 처리하지 않는다(이건 순수하게 통신 담당 클러스터 정보를 저장한다.)
+        // RealClusterId 를 설정해 주어야 스케쥴러에서 네트워크 연결을 시도하거나 처리하지 않는다.
         ApplicationRepository.CENTER_MAP.forEach((regionId, center) -> {
             if (this.clusterConfig.getId() == center.getClusterId()) {
                 center.setRealClusterId(this.clusterConfig.getId());
                 // 클러스터가 동일하면 추가
                 return;
             }
-            if (!this.clusterConfig.isMaster()) {
-                // 마스터가 아니면 리턴
+
+            if (this.clusterConfig.isMaster()) {
+                // 내가 마스터로 운영중일 경우 센터담당 클러스터가 통신이상이면 마스터가 담당한다.
+                int clusterId = center.getClusterId();
+                ClusterNode clusterNode = this.clusterConfig.getClusterNode(clusterId);
+                if (clusterNode != null && !clusterNode.getElectionState().isAlive()) {
+                    // 통신이 이상이면 추가
+                    center.setRealClusterId(this.clusterConfig.getId());
+                }
+            }
+            else {
+                // 마스터가 아닌데 실재 할당 클러스터가 나로 되어 있으면 이전에 내가 수행하던 센터이므로 원복 시킨다.
                 if (this.clusterConfig.getId() == center.getRealClusterId()) {
                     // 원복(내가 마스터일때 처리하던것)
                     center.setRealClusterId(center.getClusterId());
                 }
-                return;
-            }
-            // 내가 마스터로 운영중일 경우 센터담당 클러스터가 통신이상이면 마스터가 담당한다.
-            int clusterId = center.getClusterId();
-            ClusterNode clusterNode = this.clusterConfig.getClusterNode(clusterId);
-            if (clusterNode != null && !clusterNode.getElectionState().isAlive()) {
-                // 통신이 이상이면 추가
-                center.setRealClusterId(this.clusterConfig.getId());
             }
         });
     }
@@ -97,24 +103,24 @@ public class ClusterMasterService extends AbstractClusterMasterService {
                             this.clusterConfig.getId(), message.getClusterId(), rcvCenter.getCenterId());
                     continue;
                 }
-                if (center.getRealClusterId() == 0) {
+
+                if (center.getRealClusterId() == 0 || center.getActiveClusterId() == 0) {
                     // 최초 정보 수신한 경우에 해당함
-                    rcvCenter.toCopyInfo(center);
+                    rcvCenter.toCopyInfo(this.clusterConfig.getId(), center);
                     continue;
                 }
 
-                if (center.getRealClusterId() == message.getClusterId()) {
-                    // 기존에 연결된 클러스터의 정보임
-                    rcvCenter.toCopyInfo(center);
+                if (center.getActiveClusterId() == message.getClusterId()) {
+                    // 기존에 연결된 클러스터의 정보임(즉, 다른 노드에서 통신이 연결된 상태인 경우)
+                    rcvCenter.toCopyInfo(this.clusterConfig.getId(), center);
                     continue;
                 }
 
-                // 기타, 클러스터가 현재 서버인경우와 그렇치 않은경우(클러스터가 3개이상인 경우)
                 if (this.clusterConfig.getId() == center.getRealClusterId()) {
                     // 현재 서버에 할당된 센터의 정보를 클러스터로 부터 수신한 경우
                     if (rcvCenter.getNetState().isAlive() &&
                             center.getNetState().isAlive()) {
-                        // 현재 서버의 연결을 종료하고 원격서버 정보를 수용한다.
+                        // 수신한 네트워크 정보와 서버의 메모리상 네트워크 상태가 모두 연결된 상태인 경우
                         log.warn("ClusterNodeId: {}, ClusterMasterService.onClusterMessage: fromClusterNodeId: {}, realClusterNodeId: {}, center {} dup cluster connected.",
                                 this.clusterConfig.getId(), center.getRealClusterId(), message.getClusterId(), rcvCenter.getCenterId());
 
@@ -127,15 +133,14 @@ public class ClusterMasterService extends AbstractClusterMasterService {
                             center.getNetState().disConnect();
                         }
                     }
-                    rcvCenter.toCopyInfo(center);
+                    rcvCenter.toCopyInfo(this.clusterConfig.getId(), center);
                 }
                 else {
-                    // 클러스터가 2대 이상인 경우 해당
                     // 수신한 클러스터의 정보에서 네트워크가 정상인 경우에만 업데이트 한다.
                     log.warn("ClusterNodeId: {}, ClusterMasterService.onClusterMessage: fromClusterNodeId: {}, realClusterNodeId: {}, other cluster not working.",
                             this.clusterConfig.getId(), center.getRealClusterId(), message.getClusterId());
                     if (rcvCenter.getNetState().isAlive()) {
-                        rcvCenter.toCopyInfo(center);
+                        rcvCenter.toCopyInfo(this.clusterConfig.getId(), center);
                     }
                 }
             }
@@ -145,6 +150,34 @@ public class ClusterMasterService extends AbstractClusterMasterService {
     @Override
     public void onClusterChannelActive(ClusterNode clusterNode) {
         rebalanceRunClusterId();
+
+        if (this.clusterConfig.isAutoFailbackEnabled()) {
+            // 자동으로 Failback 을 해야 하는 경우 나한테 연결된 다른 노드의 센터 통신연결을 종료한다.
+            int activeClusterId = clusterNode.getId();
+            disconnectClusterNodeAll(activeClusterId);
+        }
+    }
+
+    private void disconnectClusterNodeAll(int clusterNodeId) {
+        // 할당된 클러스터 노드에 속하고 실재 통신 연결을 나하고 하고 있는 센터의 통신 연결을 종료한다.
+        ApplicationRepository.CENTER_MAP.forEach((regionId, center) -> {
+            if (clusterNodeId == center.getRealClusterId() &&
+                    this.clusterConfig.getId() == center.getActiveClusterId() &&
+                    center.getNetState().isAlive()) {
+                // 다시 살아난 노드에 속하는 센터 중에 현재 나하고 통신 중인 센터의 통신연결을 종료한다.
+                Channel channel = center.getNetState().getChannel();
+                try {
+                    if (channel != null) {
+                        channel.disconnect();
+                        channel.close();
+                    }
+                } catch (Exception e) {
+                    // no logging
+                }
+                center.getNetState().disConnect();
+                center.setActiveClusterId(0);
+            }
+        });
     }
 
     /**
@@ -154,6 +187,11 @@ public class ClusterMasterService extends AbstractClusterMasterService {
     @Override
     public void onClusterChannelInactive(ClusterNode clusterNode) {
         rebalanceRunClusterId();
+
+        if (this.clusterConfig.isAutoFailbackEnabled()) {
+            int inactiveClusterId = clusterNode.getId();
+            disconnectClusterNodeAll(inactiveClusterId);
+        }
     }
 
 }

+ 3 - 2
rota-utic-client/src/main/java/com/utic/its/rota/utic/client/cluster/ClusterSlaveService.java

@@ -35,8 +35,9 @@ public class ClusterSlaveService extends AbstractClusterSlaveService {
         result.add(clusterInfo);
 
         ApplicationRepository.CENTER_MAP.forEach((key, center) -> {
-            if (this.clusterConfig.getId() == center.getRealClusterId()) {
-                // 현재 서버에 운영중인 센터 정보만 전송
+            if (this.clusterConfig.getId() == center.getRealClusterId() ||
+                this.clusterConfig.getId() == center.getActiveClusterId()) {
+                // 현재 서버에 운영중인 센터 또는 실제 네트워크가 연결된 센터 정보만 전송
                 clusterInfo.getCenters().add(RegionCenterInfo.createInfo(center));
             }
         });

+ 11 - 3
rota-utic-client/src/main/java/com/utic/its/rota/utic/client/cluster/dto/RegionCenterInfo.java

@@ -19,6 +19,7 @@ public class RegionCenterInfo implements Serializable {
 
     private int clusterId;              // 서버 ID
     private int realClusterId;          // 실제 운영중인 서버 ID
+    private int activeClusterId;        // 실제 통신이 이루어지고 있는 서버 ID
 
     private String centerId;
 
@@ -30,6 +31,7 @@ public class RegionCenterInfo implements Serializable {
         return RegionCenterInfo.builder()
                 .clusterId(center.getClusterId())
                 .realClusterId(center.getRealClusterId())
+                .activeClusterId(center.getActiveClusterId())
                 .centerId(center.getCenterId())
                 .netState(new NetStateInfo(center.getNetState()))
                 .recvTraffic(center.getRecvTraffic())
@@ -37,14 +39,20 @@ public class RegionCenterInfo implements Serializable {
                 .build();
     }
 
-    public void toCopyInfo(CenterDto center) {
+    public void toCopyInfo(int clusterId, CenterDto center) {
         // this: 클러스터로 부터 수신한 메시지
         // center: 시스템의 메모리상에 위치한 센터 정보
         center.setRealClusterId(this.realClusterId);
 
-//        NetState state = center.getNetState();
+        if (clusterId == center.getActiveClusterId() && center.getNetState().isAlive()) {
+            // 통신이 이루어지고 있는 클러스터가 현재 클러스터와 같은 경우(네트워크가 연결된 상태) 복사하지 않는다.
+            return;
+        }
+
+        // 현재 메모리상에 통신이 이루어 지지 않은 경우 수신한 정보를 메모리에 복사한다.
+        // 즉, 나하고 통신중일 경우에는 복사하지 않는다.
+        center.setActiveClusterId(this.activeClusterId);
         center.getNetState().setState(this.netState.getState());
-//        center.getNetState().setChannel(this.netState.getChannel());
         center.getNetState().setConnectCount(this.netState.getConnectCount());
         center.getNetState().setLastRecvTime(this.netState.getLastRecvTime());
         center.getNetState().setLastSendTime(this.netState.getLastSendTime());

+ 7 - 2
rota-utic-client/src/main/java/com/utic/its/rota/utic/client/config/ApplicationConfig.java

@@ -40,6 +40,10 @@ public class ApplicationConfig extends NettyServerConfig {
 
         configure();
 
+        if (this.retryConnectSeconds < 10) {
+            this.retryConnectSeconds = 10;
+        }
+
         if (this.readerIdleTimeSeconds < 10) {
             this.readerIdleTimeSeconds = 12;
         }
@@ -61,12 +65,13 @@ public class ApplicationConfig extends NettyServerConfig {
 
         log.info("[ApplicationConfig] -------------------------");
         log.info("[ApplicationConfig]             processId: {}", this.processId);
-        log.info("[ApplicationConfig]       packetQueueSize: {}", this.packetQueueSize);
-        log.info("[ApplicationConfig]         dbmsQueueSize: {}", this.dbmsQueueSize);
+        log.info("[ApplicationConfig]   retryConnectSeconds: {}", this.retryConnectSeconds);
         log.info("[ApplicationConfig]     connectionTimeout: {}", this.connectionTimeout);
         log.info("[ApplicationConfig] readerIdleTimeSeconds: {}", this.readerIdleTimeSeconds);
         log.info("[ApplicationConfig]         acceptThreads: {}", this.acceptThreads);
         log.info("[ApplicationConfig]         workerThreads: {}", this.workerThreads);
+        log.info("[ApplicationConfig]       packetQueueSize: {}", this.packetQueueSize);
+        log.info("[ApplicationConfig]         dbmsQueueSize: {}", this.dbmsQueueSize);
     }
 
 }

+ 4 - 2
rota-utic-client/src/main/java/com/utic/its/rota/utic/client/controller/RotaUticClientRestController.java

@@ -121,7 +121,8 @@ public class RotaUticClientRestController {
 
             sb.append(String.format("%s %6.6s  %-15.15s %-8.8s %-20.20s %4d %-20.20s %-20.20s %-20.20s %9d %7s  %s",
                     region.getCenterId(),
-                    region.getClusterId() + "/" + region.getRealClusterId(),
+                    //region.getClusterId() + "/" + region.getRealClusterId() + "/" + region.getActiveClusterId(),
+                    region.getClusterId() + "/" + region.getActiveClusterId(),
                     region.getIpAddress(), commStatus,
                     region.getNetState().getConnectTimeString(),
                     Math.min(region.getNetState().getConnectCount(), 9999),
@@ -163,7 +164,8 @@ public class RotaUticClientRestController {
             }
 
             if (this.clusterConfig.isEnabled()) {
-                if (region.getNetState().isAlive()) {
+                if (region.getNetState().isAlive() && this.clusterConfig.getId() != region.getActiveClusterId()) {
+                    // 네트워크가 연결된 상태이면서 실제 연결된 네트워크가 현재 노드가 아닌경우
                     if (this.clusterConfig.getId() != region.getRealClusterId()) {
                         // 다른 노드에서 작업을 수행하기로 설정된 지역센터의 연결을 종료한다.
                         try {

+ 3 - 1
rota-utic-client/src/main/java/com/utic/its/rota/utic/client/service/ApplicationService.java

@@ -87,6 +87,7 @@ public class ApplicationService {
         dto.setIdx(0); // Center 는 항상 0번 인덱스
         dto.setClusterId(this.clusterConfig.getId());
         dto.setRealClusterId(this.clusterConfig.getId());
+        dto.setActiveClusterId(0);
         dto.setSyncCluster(true);
         // 클라이언트가 이전 버전이 있기 때문에 여기서 한번에 설정할 수 없다.
         // 기존에는 DATEX_MAX_TRAFFIC_COUNT(1000)을 사용하고 있었기 때문에 클라이언트가
@@ -127,6 +128,7 @@ public class ApplicationService {
 
             // 연결클러스터는 기본적으로 0 으로 설정한다.
             dto.setRealClusterId(0);   // 기본값(운영중이지 않음)
+            dto.setActiveClusterId(0);
             dto.setSyncCluster(false);
 
             ApplicationRepository.CENTER_MAP.put(dto.getCenterId(), dto);
@@ -173,7 +175,7 @@ public class ApplicationService {
             if (center == null) {
                 continue;
             }
-            log.info("Cluster: {} ==> {}", centerId, center.getClusterId());
+            log.info("Cluster: {} ==> {}, {}/{}", centerId, center.getClusterId(), center.getRealClusterId(), center.getActiveClusterId());
         }
     }
 

+ 1 - 0
utic-its-common/src/main/java/com/utic/its/common/dto/CenterDto.java

@@ -38,6 +38,7 @@ public class CenterDto implements Serializable {
     private int idx;
     private int clusterId;          // 서버 ID
     private int realClusterId;      // 실제 운영중인 서버 ID
+    private int activeClusterId;    // 네트워크에 연결된 클러스터 노드 ID
     private boolean syncCluster;    // 클러스터 간 정보 업데이트 여부
 
     private String centerId;

+ 2 - 0
utic-its-common/src/main/java/com/utic/its/common/entity/TbCenter.java

@@ -18,6 +18,7 @@ public class TbCenter {
 
     private int clusterId;
     private int realClusterId;
+    private int activeClusterId;
     private String centerId;
     private String centerInfo;
     private String ipAddress;
@@ -36,6 +37,7 @@ public class TbCenter {
                 .idx(0)
                 .clusterId(this.clusterId)
                 .realClusterId(this.realClusterId)
+                .activeClusterId(this.activeClusterId)
                 .centerId(this.centerId)
                 .centerInfo(this.centerInfo)
                 .ipAddress(this.ipAddress)

+ 3 - 0
utic-its-common/src/main/java/com/utic/its/common/xnet/client/handler/ItsAsnClient.java

@@ -56,6 +56,7 @@ public class ItsAsnClient implements Callable<Object> {
         if (this.clusterId > 0) {
             // 클러스터 운영모드 인 경우
             if (this.clusterId != this.center.getRealClusterId()) {
+                // 클러스터가 통신이상이면 다른 클러스터에서 수행됐던 작업을 할당한다(마스터인경우, setRealClusterId(clusterConfig.getId())
                 // 클러스터 ID 가 다르면 연결하지 않는다.
                 scheduleReconnect(this.retryConnectSeconds*2); // 재연결 예약
                 return null;
@@ -165,6 +166,7 @@ public class ItsAsnClient implements Callable<Object> {
         log.info("ItsAsnClient ..channelOpen: [{}, {}], {}, Channel: {}", this.center.getCenterId(), this.center.getIpAddress(), this.center.getCommPort(), channel);
         ApplicationRepository.setCenterObject(channel, this.center);
         this.center.getNetState().connect(channel);
+        this.center.setActiveClusterId(this.clusterId);
 
         try {
             // 서버에 최초 접속한 경우 AI_Login Message Send
@@ -191,6 +193,7 @@ public class ItsAsnClient implements Callable<Object> {
             ApplicationRepository.setCenterObject(channel, null);
             this.center.setRecvPktNmbr(0);
             this.center.getNetState().disConnect();
+            this.center.setActiveClusterId(0);
             channel.close();
 //            channel.eventLoop().schedule(this, this.retryConnectSeconds, TimeUnit.SECONDS);
             scheduleReconnect(this.retryConnectSeconds);