Browse Source

utic-traf-server add workerUtils for laodblancing

HANTE 1 month ago
parent
commit
6b2f96fbc9

+ 1 - 14
utic-traf-server/src/main/java/com/utic/center/utic/traf/server/service/worker/LinkTrafSaveDwdbCenterHistWorker.java

@@ -36,20 +36,7 @@ public class LinkTrafSaveDwdbCenterHistWorker implements Runnable {
         ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
         SqlSessionFactory sqlSessionFactory = (SqlSessionFactory) SpringUtils.getBean("dwdbSqlSessionFactory");
 
-        int linkTotalCount = this.linkRepo.getLinkLists().size();
-        int threadPoolSize = this.linkRepo.getConfig().getPrcsThreadCount();
-        int batchPerThread = (int) Math.ceil((double) linkTotalCount / threadPoolSize);
-
-        for (int ii = 0; ii < threadPoolSize; ii++) {
-            int startIdx = ii * batchPerThread;
-            int endIdx = Math.min(startIdx + batchPerThread, linkTotalCount);
-            DbmsBatchJobResult dbmsJobResult = DbmsBatchJobResult.builder()
-                    .jobIndex(ii).start(startIdx).end(endIdx)
-                    .total(0).target(0).link1(0).link2(0).link3(0).link4(0).effects(0).elapsedTime(0)
-                    .linkIds(this.linkRepo.getLinkLists().subList(startIdx, endIdx))
-                    .build();
-            mapData.put(ii, dbmsJobResult);
-        }
+        int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, this.linkRepo.getConfig().getPrcsThreadCount(), this.linkRepo.getLinkLists());
         ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
 
         mapData.forEach((key, dbmsJobResult) -> {

+ 2 - 14
utic-traf-server/src/main/java/com/utic/center/utic/traf/server/service/worker/LinkTrafSaveDwdbMissingValueHistWorker.java

@@ -36,20 +36,8 @@ public class LinkTrafSaveDwdbMissingValueHistWorker implements Runnable {
         ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
         SqlSessionFactory sqlSessionFactory = (SqlSessionFactory) SpringUtils.getBean("dwdbSqlSessionFactory");
 
-        int linkTotalCount = this.linkRepo.getLink1Lists().size();
-        int threadPoolSize = 4;//this.linkRepo.getConfig().getPrcsThreadCount();
-        int batchPerThread = (int) Math.ceil((double) linkTotalCount / threadPoolSize);
-
-        for (int ii = 0; ii < threadPoolSize; ii++) {
-            int startIdx = ii * batchPerThread;
-            int endIdx = Math.min(startIdx + batchPerThread, linkTotalCount);
-            DbmsBatchJobResult dbmsJobResult = DbmsBatchJobResult.builder()
-                    .jobIndex(ii).start(startIdx).end(endIdx)
-                    .total(0).target(0).link1(0).link2(0).link3(0).link4(0).effects(0).elapsedTime(0)
-                    .linkIds(this.linkRepo.getLink1Lists().subList(startIdx, endIdx))
-                    .build();
-            mapData.put(ii, dbmsJobResult);
-        }
+        // 레벨1 링크에 대해서만, 스레드는 4개로 고정
+        int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, 4, this.linkRepo.getLink1Lists());
         ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
 
         mapData.forEach((key, dbmsJobResult) -> {

+ 70 - 0
utic-traf-server/src/main/java/com/utic/center/utic/traf/server/service/worker/WorkerUtils.java

@@ -0,0 +1,70 @@
+package com.utic.center.utic.traf.server.service.worker;
+
+import com.utic.center.utic.traf.server.dto.DbmsBatchJobResult;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+public final class WorkerUtils {
+
+    private WorkerUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static int allocateWorkerLinkJobs(ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData, int configuredThreads, List<String> orgLinkLists) {
+        // 몫+나머지를 앞쪽에 고르게 분산
+        List<String> linkList = Optional.ofNullable(orgLinkLists) .orElse(Collections.emptyList());
+
+        int linkTotalCount = linkList.size();
+        int threadPoolSize = Math.min(linkTotalCount, configuredThreads);   // 스레드 풀 크기 설정, 그럴일은 없겠지만,,, 링크 개수보다 작거나 같게 설정
+        // 기본 할당 수와 여분 계산
+        int baseCount = linkTotalCount / threadPoolSize;
+        int remainder = linkTotalCount % threadPoolSize;
+        int startIdx = 0;
+        for (int ii = 0; ii < threadPoolSize; ii++) {
+            // 앞쪽 스레드부터 하나씩 여분을 추가
+            int additional = (ii < remainder) ? 1 : 0;
+            int endIdx = Math.min(startIdx + baseCount + additional, linkTotalCount);
+
+            List<String> subList = linkList.subList(startIdx, endIdx);
+
+            DbmsBatchJobResult dbmsJobResult = DbmsBatchJobResult.builder()
+                    .jobIndex(ii)
+                    .start(startIdx).end(endIdx)
+                    .total(0).target(0).link1(0).link2(0).link3(0).link4(0).effects(0).elapsedTime(0)
+                    .linkIds(subList)
+                    .build();
+
+            mapData.put(ii, dbmsJobResult);
+
+            startIdx = endIdx; // 다음 범위 갱신
+        }
+        return threadPoolSize;
+    }
+
+//    public static int allocateWorkerLinkJobs1(ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData, int configuredThreads, List<String> orgLinkLists) {
+//        // 올림 후 스레드에 일괄 할당 분배(다만, 일정하게 분배되지 않을 수 있음)
+//        List<String> linkList = Optional.ofNullable(orgLinkLists) .orElse(Collections.emptyList());
+//        int linkTotalCount = linkList.size();
+//        int threadPoolSize = Math.min(linkTotalCount, configuredThreads);   // 스레드 풀 크기 설정, 그럴일은 없겠지만,,, 링크 개수보다 작거나 같게 설정
+//        int batchPerThread = (int) Math.ceil((double) linkTotalCount / threadPoolSize);
+//        for (int ii = 0; ii < threadPoolSize; ii++) {
+//            int startIdx = ii * batchPerThread;
+//            int endIdx = Math.min(startIdx + batchPerThread, linkTotalCount);
+//
+//            List<String> subList = linkList.subList(startIdx, endIdx);
+//
+//            DbmsBatchJobResult dbmsJobResult = DbmsBatchJobResult.builder()
+//                    .jobIndex(ii).start(startIdx).end(endIdx)
+//                    .total(0).target(0).link1(0).link2(0).link3(0).link4(0).effects(0).elapsedTime(0)
+//                    .linkIds(subList)
+//                    .build();
+//
+//            mapData.put(ii, dbmsJobResult);
+//        }
+//        return threadPoolSize;
+//    }
+
+}