|
|
@@ -2,19 +2,22 @@ package com.utic.center.utic.traf.server.service;
|
|
|
|
|
|
import com.utic.center.common.annotation.ProcessingElapsed;
|
|
|
import com.utic.center.common.service.AbstractProcessService;
|
|
|
+import com.utic.center.common.spring.SpringUtils;
|
|
|
import com.utic.center.common.utils.LogUtils;
|
|
|
import com.utic.center.utic.traf.server.dao.mapper.dwdb.DwdbTrafficMapper;
|
|
|
import com.utic.center.utic.traf.server.dao.mapper.utic.UticTrafficMapper;
|
|
|
+import com.utic.center.utic.traf.server.dao.repository.dwdb.DwdbTrafficRepository;
|
|
|
import com.utic.center.utic.traf.server.dao.repository.utic.UticTrafficRepository;
|
|
|
-import com.utic.center.utic.traf.server.dto.DbmsJobResult;
|
|
|
+import com.utic.center.utic.traf.server.dto.*;
|
|
|
import com.utic.center.utic.traf.server.repository.ApplicationRepository;
|
|
|
-import com.utic.center.utic.traf.server.service.worker.LinkTrafSaveDwdbCenterHistWorker;
|
|
|
-import com.utic.center.utic.traf.server.service.worker.LinkTrafSaveDwdbFusionLogWorker;
|
|
|
-import com.utic.center.utic.traf.server.service.worker.LinkTrafSaveDwdbMissingValueHistWorker;
|
|
|
+import com.utic.center.utic.traf.server.service.worker.WorkerUtils;
|
|
|
import lombok.Data;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.ToString;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.ibatis.session.ExecutorType;
|
|
|
+import org.apache.ibatis.session.SqlSession;
|
|
|
+import org.apache.ibatis.session.SqlSessionFactory;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
@@ -36,68 +39,227 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
private final UticTrafficRepository uticRepo;
|
|
|
private final UticTrafficMapper uticMapper;
|
|
|
private final DwdbTrafficMapper dwdbMapper;
|
|
|
+ private final DwdbTrafficRepository dwdbRepo;
|
|
|
|
|
|
- private ConcurrentHashMap<String, DbmsJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
+ private SqlSessionFactory sqlSessionFactory;
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
// 초기화
|
|
|
- this.mapData.put(ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST, DbmsJobResult.builder().tableName(ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST).effects(0).elapsedTime(0).build());
|
|
|
- this.mapData.put(ApplicationRepository.TABLE_LINK_MISSING_VALUE_HIST, DbmsJobResult.builder().tableName(ApplicationRepository.TABLE_LINK_MISSING_VALUE_HIST).effects(0).elapsedTime(0).build());
|
|
|
- this.mapData.put(ApplicationRepository.TABLE_LINK_FUSION_LOG, DbmsJobResult.builder().tableName(ApplicationRepository.TABLE_LINK_FUSION_LOG).effects(0).elapsedTime(0).build());
|
|
|
+ this.sqlSessionFactory = (SqlSessionFactory) SpringUtils.getBean("dwdbSqlSessionFactory");
|
|
|
}
|
|
|
|
|
|
- private void initialize() {
|
|
|
- this.mapData.forEach((jobTable, result) -> {
|
|
|
- result.setEffects(0);
|
|
|
- result.setElapsedTime(0);
|
|
|
+ private void insertLinkFusionLog() {
|
|
|
+ LinkTrafFusionInfo fusionInfo = this.linkService.getFusionInfo();
|
|
|
+ log.info("[INF] DWDB 교통정보 처리결과: {}", fusionInfo);
|
|
|
+ this.dwdbRepo.insertLinkFusionLog(fusionInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void batchTrafficCenterHist(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkRepo, String jobTable) {
|
|
|
+ final long start = System.currentTimeMillis();
|
|
|
+ final String mapperName = "insertTrafficCenterHist";
|
|
|
+ final int maxBatchSize = ApplicationRepository.DBMS_BATCH_SIZE;
|
|
|
+ int total, target;
|
|
|
+ int[] linkCounts = new int[4];
|
|
|
+ int jobCnt = 0;
|
|
|
+ total = target = linkCounts[0] = linkCounts[1] = linkCounts[2] = linkCounts[3] = 0;
|
|
|
+
|
|
|
+ try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
|
|
|
+ DwdbTrafficMapper mapper = sqlSession.getMapper(DwdbTrafficMapper.class);
|
|
|
+ for (String linkId : dbmsJobResult.getLinkIds()) {
|
|
|
+ LinkDto link = linkRepo.getLinkMap().get(linkId);
|
|
|
+ if (link == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ total++;
|
|
|
+
|
|
|
+ if (!link.getTrafFsn().getState().isState()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (link.getTrafFsn().getSpeed() == 0) {
|
|
|
+ // 운영자 입력 및 속도 정보가 0인 것은 제외
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ target++;
|
|
|
+
|
|
|
+ LinkTrafCenterDto trafDto = link.getTrafCenter();
|
|
|
+ try {
|
|
|
+ mapper.insertTrafficCenterHist(trafDto);
|
|
|
+ linkCounts[trafDto.getLinkLevel()-1]++;
|
|
|
+ jobCnt++;
|
|
|
+
|
|
|
+ if (jobCnt % maxBatchSize == 0) {
|
|
|
+ sqlSession.flushStatements();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[ERR] batchTrafficCenterHist({}): jobCnt({}), Exception, [{}], {}.",
|
|
|
+ dbmsJobResult.getJobIndex(), jobCnt, trafDto, e.getMessage());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sqlSession.flushStatements();
|
|
|
+ sqlSession.commit();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[ERR] batchTrafficCenterHist({}): Exception, [{}], {}.", dbmsJobResult.getJobIndex(), jobTable, e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ dbmsJobResult.setTotal(total);
|
|
|
+ dbmsJobResult.setTarget(target);
|
|
|
+ dbmsJobResult.setLink1(linkCounts[0]);
|
|
|
+ dbmsJobResult.setLink2(linkCounts[1]);
|
|
|
+ dbmsJobResult.setLink3(linkCounts[2]);
|
|
|
+ dbmsJobResult.setLink4(linkCounts[3]);
|
|
|
+ dbmsJobResult.setEffects(jobCnt);
|
|
|
+ dbmsJobResult.setElapsedTime(System.currentTimeMillis() - start);
|
|
|
+ log.info("[INS] {} | {}", LogUtils.elapsedLog(mapperName + dbmsJobResult.getJobIndex(), jobCnt, dbmsJobResult.getElapsedTime()), Thread.currentThread().getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void insertTrafficCenterHist() {
|
|
|
+ final long start = System.currentTimeMillis();
|
|
|
+ final String jobTable = ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST;
|
|
|
+
|
|
|
+ // 모든 링크 레벨에 대하여 작업처리
|
|
|
+ final ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
+ final int prcsThreadCount = ApplicationRepository.PRCS_THREAD_COUNT;
|
|
|
+ final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, this.linkService.getLinkLists());
|
|
|
+ final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
+
|
|
|
+ mapData.forEach((key, dbmsJobResult) -> {
|
|
|
+ executorService.execute(() -> batchTrafficCenterHist(this.sqlSessionFactory, dbmsJobResult, this.linkService, jobTable));
|
|
|
});
|
|
|
|
|
|
- long count = this.linkService.getLinkMap().values().parallelStream()
|
|
|
- .filter(link -> link.getLinkLevel() == 1)
|
|
|
- .filter(link -> link.getTrafFsn().getSpeed() == 0)
|
|
|
- .count();
|
|
|
- this.linkService.getFusionInfo().setZeroSpdCnt((int)count);
|
|
|
+ try {
|
|
|
+ executorService.shutdown();
|
|
|
+ if (!executorService.awaitTermination(100, TimeUnit.SECONDS)) {
|
|
|
+ log.warn("[WAN] insertTrafficCenterHist: Timeout while waiting for tasks to finish, [{}].", jobTable);
|
|
|
+ executorService.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("[ERR] insertTrafficCenterHist: InterruptedException, [{}], {}.", jobTable, e.getMessage());
|
|
|
+ executorService.shutdownNow();
|
|
|
+ }
|
|
|
+
|
|
|
+ int link1, link2, link3, link4;
|
|
|
+ long elapsedTime = 0;
|
|
|
+ link1 = link2 = link3 = link4 = 0;
|
|
|
+ for (Map.Entry<Integer, DbmsBatchJobResult> entry : mapData.entrySet()) {
|
|
|
+ DbmsBatchJobResult result = entry.getValue();
|
|
|
+ elapsedTime = Math.max(elapsedTime, result.getElapsedTime());
|
|
|
+ link1 += result.getLink1();
|
|
|
+ link2 += result.getLink2();
|
|
|
+ link3 += result.getLink3();
|
|
|
+ link4 += result.getLink4();
|
|
|
+ }
|
|
|
|
|
|
- // 모든 링크에 대해 센터 소통정보 생성
|
|
|
- this.linkService.makeTrafficCenterDto();
|
|
|
+ final int resultCount = link1 + link2 + link3 + link4;
|
|
|
+ final long totalTime = System.currentTimeMillis() - start;
|
|
|
+
|
|
|
+ log.info("[INF] {}", LogUtils.elapsedLog("레벨1 " + jobTable, link1, elapsedTime));
|
|
|
+ log.info("[INF] {}", LogUtils.elapsedLog("레벨2 " + jobTable, link2, elapsedTime));
|
|
|
+ log.info("[INF] {}", LogUtils.elapsedLog("레벨3 " + jobTable, link3, elapsedTime));
|
|
|
+ log.info("[INF] {}", LogUtils.elapsedLog("레벨4 " + jobTable, link4, elapsedTime));
|
|
|
+ log.info("[INF] {}", LogUtils.elapsedLog(jobTable, resultCount, totalTime));
|
|
|
}
|
|
|
|
|
|
- @ProcessingElapsed(type="TRAFFIC", name="교통정보 DWDB 저장", starting = false)
|
|
|
- @Override
|
|
|
- public boolean processing() {
|
|
|
- final int THREAD_POOL_SIZE = 3; // 작업 스레드 개수 설정
|
|
|
- final int TIMEOUT_SECONDS = 120; // 최대 대기 시간 설정
|
|
|
+ private void batchLinkMissingValueHist(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkRepo, String jobTable) {
|
|
|
+ final long start = System.currentTimeMillis();
|
|
|
+ final String mapperName = "insertLinkMissingValueHist";
|
|
|
+ final int maxBatchSize = ApplicationRepository.DBMS_BATCH_SIZE;
|
|
|
+ int total = 0;
|
|
|
+ int jobCnt = 0;
|
|
|
+
|
|
|
+ String exeDate = ApplicationRepository.trafPrcsTime.getPrcsTime().substring(0, 12);
|
|
|
+ try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
|
|
|
+ DwdbTrafficMapper mapper = sqlSession.getMapper(DwdbTrafficMapper.class);
|
|
|
+ for (String linkId : dbmsJobResult.getLinkIds()) {
|
|
|
+ LinkDto link = linkRepo.getLinkMap().get(linkId);
|
|
|
+ if (link == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ LinkMissingValueHistDto valueHist = linkRepo.getMissingValueHist(link, exeDate);
|
|
|
+ if (valueHist == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ total++;
|
|
|
+
|
|
|
+ try {
|
|
|
+ mapper.insertLinkMissingValueHist(valueHist);
|
|
|
+ jobCnt++;
|
|
|
+
|
|
|
+ if (jobCnt % maxBatchSize == 0) {
|
|
|
+ sqlSession.flushStatements();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[ERR] batchLinkMissingValueHist({}): jobCnt({}), Exception, [{}], {}.",
|
|
|
+ dbmsJobResult.getJobIndex(), jobCnt, valueHist, e.getMessage());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sqlSession.flushStatements();
|
|
|
+ sqlSession.commit();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("[ERR] batchLinkMissingValueHist({}): Exception, [{}], {}.", dbmsJobResult.getJobIndex(), jobTable, e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ dbmsJobResult.setTotal(total);
|
|
|
+ dbmsJobResult.setTarget(total);
|
|
|
+ dbmsJobResult.setLink1(jobCnt);
|
|
|
+ dbmsJobResult.setLink2(0);
|
|
|
+ dbmsJobResult.setLink3(0);
|
|
|
+ dbmsJobResult.setLink4(0);
|
|
|
+ dbmsJobResult.setEffects(jobCnt);
|
|
|
+ dbmsJobResult.setElapsedTime(System.currentTimeMillis() - start);
|
|
|
+ log.info("[INS] {} | {}", LogUtils.elapsedLog(mapperName + dbmsJobResult.getJobIndex(), jobCnt, dbmsJobResult.getElapsedTime()), Thread.currentThread().getName());
|
|
|
+ }
|
|
|
|
|
|
- // 데이터 초기화
|
|
|
- initialize();
|
|
|
+ private void insertLinkMissingValueHist() {
|
|
|
+ final long start = System.currentTimeMillis();
|
|
|
+ final String jobTable = ApplicationRepository.TABLE_LINK_MISSING_VALUE_HIST;
|
|
|
|
|
|
- // 센터 교통정보 캐시 테이블 초기화
|
|
|
- this.uticRepo.truncateTrafficCenterCache();
|
|
|
+ // 레벨1 링크에 대해서만, 스레드는 4개로 고정
|
|
|
+ final ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
+ final int prcsThreadCount = 4;
|
|
|
+ final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, this.linkService.getLink1Lists());
|
|
|
+ final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
|
|
|
- ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
|
|
|
- executorService.execute(new LinkTrafSaveDwdbCenterHistWorker(this.linkService, this.mapData.get(ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST)));
|
|
|
- executorService.execute(new LinkTrafSaveDwdbMissingValueHistWorker(this.linkService, this.mapData.get(ApplicationRepository.TABLE_LINK_MISSING_VALUE_HIST)));
|
|
|
- executorService.execute(new LinkTrafSaveDwdbFusionLogWorker(this.linkService, this.mapData.get(ApplicationRepository.TABLE_LINK_FUSION_LOG), this.dwdbMapper));
|
|
|
+ mapData.forEach((key, dbmsJobResult) -> {
|
|
|
+ executorService.execute(() -> batchLinkMissingValueHist(this.sqlSessionFactory, dbmsJobResult, this.linkService, jobTable));
|
|
|
+ });
|
|
|
|
|
|
try {
|
|
|
executorService.shutdown();
|
|
|
- if (!executorService.awaitTermination(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
|
|
|
+ if (!executorService.awaitTermination(100, TimeUnit.SECONDS)) {
|
|
|
+ log.warn("[WAN] insertLinkMissingValueHist: Timeout while waiting for tasks to finish, [{}].", jobTable);
|
|
|
executorService.shutdownNow();
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
- log.error("[ERR] LinkTrafCollectService.processing: LinkCollectWorker InterruptedException, {}", e.getMessage());
|
|
|
+ log.error("[ERR] insertLinkMissingValueHist: InterruptedException, [{}], {}.", jobTable, e.getMessage());
|
|
|
executorService.shutdownNow();
|
|
|
}
|
|
|
|
|
|
- // 모든 작업이 완료된 후 결과를 출력
|
|
|
+ int resultCount = 0;
|
|
|
long elapsedTime = 0;
|
|
|
- for (Map.Entry<String, DbmsJobResult> entry : this.mapData.entrySet()) {
|
|
|
- DbmsJobResult result = entry.getValue();
|
|
|
+ for (Map.Entry<Integer, DbmsBatchJobResult> entry : mapData.entrySet()) {
|
|
|
+ DbmsBatchJobResult result = entry.getValue();
|
|
|
elapsedTime = Math.max(elapsedTime, result.getElapsedTime());
|
|
|
- log.info("[INF] {}", LogUtils.elapsedLog(result.getTableName(), result.getEffects(), result.getElapsedTime()));
|
|
|
+ resultCount += result.getLink1();
|
|
|
}
|
|
|
|
|
|
+ final long totalTime = System.currentTimeMillis() - start;
|
|
|
+
|
|
|
+ log.info("[INF] {}", LogUtils.elapsedLog(jobTable, resultCount, totalTime));
|
|
|
+ }
|
|
|
+
|
|
|
+ @ProcessingElapsed(type="DWDB", name="교통정보 DWDB 저장", starting = false)
|
|
|
+ @Override
|
|
|
+ public boolean processing() {
|
|
|
+
|
|
|
+ insertLinkFusionLog();
|
|
|
+ insertTrafficCenterHist();
|
|
|
+ insertLinkMissingValueHist();
|
|
|
+
|
|
|
return true;
|
|
|
}
|
|
|
|