|
|
@@ -1,171 +0,0 @@
|
|
|
-package com.utic.center.utic.traf.server.service.worker;
|
|
|
-
|
|
|
-import com.utic.center.common.spring.SpringUtils;
|
|
|
-import com.utic.center.common.utils.LogUtils;
|
|
|
-import com.utic.center.utic.traf.server.dao.mapper.utic.UticTrafficMapper;
|
|
|
-import com.utic.center.utic.traf.server.dto.DbmsBatchJobResult;
|
|
|
-import com.utic.center.utic.traf.server.dto.DbmsJobResult;
|
|
|
-import com.utic.center.utic.traf.server.dto.LinkDto;
|
|
|
-import com.utic.center.utic.traf.server.dto.LinkTrafCenterDto;
|
|
|
-import com.utic.center.utic.traf.server.repository.ApplicationRepository;
|
|
|
-import com.utic.center.utic.traf.server.service.LinkService;
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.ibatis.session.ExecutorType;
|
|
|
-import org.apache.ibatis.session.SqlSession;
|
|
|
-import org.apache.ibatis.session.SqlSessionFactory;
|
|
|
-
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-@Slf4j
|
|
|
-@RequiredArgsConstructor
|
|
|
-public class LinkTrafSaveUticWorker implements Runnable {
|
|
|
-
|
|
|
- private final String tableName;
|
|
|
- private final LinkService linkService;
|
|
|
- private final DbmsJobResult dbmsJobResult;
|
|
|
- private final boolean isDebug;
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- final long start = System.currentTimeMillis();
|
|
|
- String jobTable = this.dbmsJobResult.getTableName();
|
|
|
-
|
|
|
- ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
- SqlSessionFactory sqlSessionFactory = (SqlSessionFactory) SpringUtils.getBean("sqlSessionFactory");
|
|
|
- if (sqlSessionFactory == null) {
|
|
|
- log.error("[ERR] LinkTrafSaveUticWorker.run: SqlSessionFactory is null, [{}].", jobTable);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // this.linkService.getLinkLists()
|
|
|
-// final List<String> linkList = this.linkService.getLinkMap().entrySet().stream()
|
|
|
-// .filter(entry -> entry.getValue().getTrafFsn().getState().isState())
|
|
|
-// .filter(entry -> entry.getValue().getTrafFsn().getSpeed() > 0)
|
|
|
-// .map(Map.Entry::getKey)
|
|
|
-// .collect(Collectors.toList());
|
|
|
-
|
|
|
- final int prcsThreadCount = ApplicationRepository.PRCS_THREAD_COUNT;
|
|
|
- final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, this.linkService.getValidTrafLinkList());
|
|
|
- if (threadPoolSize < 0) {
|
|
|
- log.warn("[WAN] LinkTrafSaveUticWorker.run: getValidTrafLinkList no data, job return: [{}].", jobTable);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- log.info("[INF] {}: Links({}), PoolSize({})", LogUtils.elapsedLog("UTIC BATCH SAVE"), this.linkService.getValidTrafLinkList().size(), threadPoolSize);
|
|
|
-
|
|
|
- final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
- mapData.forEach((key, dbmsJobResult) -> {
|
|
|
-// log.info("[INF] {}: Job({}), Link({})", LogUtils.elapsedLog("BatchThread"), dbmsJobResult.getJobIndex(), dbmsJobResult.getLinkIds().size());
|
|
|
- executorService.execute(() -> processBatch(sqlSessionFactory, dbmsJobResult, this.linkService, jobTable));
|
|
|
- });
|
|
|
-
|
|
|
- executorService.shutdown();
|
|
|
- try {
|
|
|
- if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
|
|
|
- log.warn("[WAN] LinkTrafSaveUticWorker.run: Timeout while waiting for tasks to finish, [{}].", jobTable);
|
|
|
- List<Runnable> droppedTasks = executorService.shutdownNow();
|
|
|
- log.warn("[WAN] LinkTrafSaveUticWorker.run: {} tasks were dropped during shutdownNow, [{}].", droppedTasks.size(), jobTable);
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- log.error("[ERR] LinkTrafSaveUticWorker.run: InterruptedException, [{}], {}.", jobTable, e.getMessage());
|
|
|
- List<Runnable> droppedTasks = executorService.shutdownNow();
|
|
|
- log.error("[ERR] LinkTrafSaveUticWorker.run: Forced shutdown due to interruption, [{}] tasks dropped.", droppedTasks.size());
|
|
|
- Thread.currentThread().interrupt(); // 현재 스레드의 인터럽트 상태 복원
|
|
|
- }
|
|
|
-
|
|
|
- int link1, link2, link3, link4;
|
|
|
- long elapsedTime = 0;
|
|
|
- link1 = link2 = link3 = link4 = 0;
|
|
|
- for (Map.Entry<Integer, DbmsBatchJobResult> entry : mapData.entrySet()) {
|
|
|
- DbmsBatchJobResult result = entry.getValue();
|
|
|
- elapsedTime = Math.max(elapsedTime, result.getElapsedTime());
|
|
|
- link1 += result.getLink1();
|
|
|
- link2 += result.getLink2();
|
|
|
- link3 += result.getLink3();
|
|
|
- link4 += result.getLink4();
|
|
|
- }
|
|
|
-
|
|
|
- int resultCount = link1 + link2 + link3 + link4;
|
|
|
- this.dbmsJobResult.setEffects(resultCount);
|
|
|
- this.dbmsJobResult.setElapsedTime(System.currentTimeMillis() - start);
|
|
|
-
|
|
|
- log.info("[INF] {}", LogUtils.elapsedLog("레벨1 " + jobTable, link1, elapsedTime));
|
|
|
- log.info("[INF] {}", LogUtils.elapsedLog("레벨2 " + jobTable, link2, elapsedTime));
|
|
|
- log.info("[INF] {}", LogUtils.elapsedLog("레벨3 " + jobTable, link3, elapsedTime));
|
|
|
- log.info("[INF] {}", LogUtils.elapsedLog("레벨4 " + jobTable, link4, elapsedTime));
|
|
|
- }
|
|
|
-
|
|
|
- private void processBatch(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkService, String jobTable) {
|
|
|
- final long start = System.currentTimeMillis();
|
|
|
- final String mapperName = "insertTrafficCenterCache";
|
|
|
- final int maxBatchSize = ApplicationRepository.DBMS_BATCH_SIZE;
|
|
|
- int total, target;
|
|
|
- int[] linkCounts = new int[4];
|
|
|
- int jobCnt = 0;
|
|
|
- total = target = linkCounts[0] = linkCounts[1] = linkCounts[2] = linkCounts[3] = 0;
|
|
|
-
|
|
|
- try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
|
|
|
- UticTrafficMapper mapper = sqlSession.getMapper(UticTrafficMapper.class);
|
|
|
- for (String linkId : dbmsJobResult.getLinkIds()) {
|
|
|
- LinkDto link = linkService.getLink(linkId);
|
|
|
- if (link == null) {
|
|
|
- log.error("[ERR] LinkTrafSaveCacheWorker.processBatch: {} link not found.", linkId);
|
|
|
- continue;
|
|
|
- }
|
|
|
- total++;
|
|
|
-
|
|
|
- if (!link.getTrafFsn().getState().isState()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (!"O".equals(link.getTrafFsn().getState().getMissValueYn()) && link.getTrafFsn().getSpeed() == 0) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- target++;
|
|
|
-
|
|
|
- LinkTrafCenterDto trafDto = link.getTrafCenter();
|
|
|
- try {
|
|
|
- mapper.insertTrafficCenter(this.tableName, trafDto);
|
|
|
- linkCounts[trafDto.getLinkLevel()-1]++;
|
|
|
- jobCnt++;
|
|
|
-
|
|
|
- if (jobCnt % maxBatchSize == 0) {
|
|
|
- sqlSession.flushStatements();
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("[ERR] LinkTrafSaveUticWorker.processBatch({}): jobCnt({}), Exception, [{}], {}.",
|
|
|
- dbmsJobResult.getJobIndex(), jobCnt, trafDto, e.getMessage());
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- sqlSession.flushStatements();
|
|
|
- sqlSession.commit();
|
|
|
- }
|
|
|
- catch (Exception e) {
|
|
|
- log.error("[ERR] LinkTrafSaveUticWorker.processBatch({}): commit Exception, [{}], {}.", dbmsJobResult.getJobIndex(), jobTable, e.getMessage());
|
|
|
- sqlSession.rollback(); // 예외 발생 시 롤백
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("[ERR] LinkTrafSaveUticWorker.processBatch({}): Exception, [{}], {}.", dbmsJobResult.getJobIndex(), jobTable, e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- dbmsJobResult.setTotal(total);
|
|
|
- dbmsJobResult.setTarget(target);
|
|
|
- dbmsJobResult.setLink1(linkCounts[0]);
|
|
|
- dbmsJobResult.setLink2(linkCounts[1]);
|
|
|
- dbmsJobResult.setLink3(linkCounts[2]);
|
|
|
- dbmsJobResult.setLink4(linkCounts[3]);
|
|
|
- dbmsJobResult.setEffects(jobCnt);
|
|
|
- dbmsJobResult.setElapsedTime(System.currentTimeMillis() - start);
|
|
|
- if (this.isDebug) {
|
|
|
- log.info("[INS] {} | {}", LogUtils.elapsedLog(mapperName + dbmsJobResult.getJobIndex(), jobCnt, dbmsJobResult.getElapsedTime()), Thread.currentThread().getName());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|