|
|
@@ -21,11 +21,13 @@ import org.apache.ibatis.session.SqlSessionFactory;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
|
@Data
|
|
|
@@ -37,9 +39,9 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
private final LinkService linkService;
|
|
|
private final LinkTrafSaveCenterService linkTrafSaveCenterService;
|
|
|
private final UticTrafficRepository uticRepo;
|
|
|
+ private final DwdbTrafficRepository dwdbRepo;
|
|
|
private final UticTrafficMapper uticMapper;
|
|
|
private final DwdbTrafficMapper dwdbMapper;
|
|
|
- private final DwdbTrafficRepository dwdbRepo;
|
|
|
|
|
|
private SqlSessionFactory sqlSessionFactory;
|
|
|
|
|
|
@@ -51,7 +53,6 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
|
|
|
private void insertLinkFusionLog() {
|
|
|
LinkTrafFusionInfo fusionInfo = this.linkService.getFusionInfo();
|
|
|
- log.info("[INF] DWDB 교통정보 처리결과: {}", fusionInfo);
|
|
|
this.dwdbRepo.insertLinkFusionLog(fusionInfo);
|
|
|
}
|
|
|
|
|
|
@@ -120,9 +121,15 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
final String jobTable = ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST;
|
|
|
|
|
|
// 모든 링크 레벨에 대하여 작업처리
|
|
|
+ // this.linkService.getLinkLists()
|
|
|
+ final List<String> linkList = this.linkService.getLinkMap().entrySet().stream()
|
|
|
+ .filter(entry -> entry.getValue().getTrafFsn().getState().isState())
|
|
|
+ .filter(entry -> entry.getValue().getTrafFsn().getSpeed() > 0)
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .collect(Collectors.toList());
|
|
|
final ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
final int prcsThreadCount = ApplicationRepository.PRCS_THREAD_COUNT;
|
|
|
- final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, this.linkService.getLinkLists());
|
|
|
+ final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, linkList);
|
|
|
final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
|
|
|
mapData.forEach((key, dbmsJobResult) -> {
|
|
|
@@ -162,7 +169,7 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
log.info("[INF] {}", LogUtils.elapsedLog(jobTable, resultCount, totalTime));
|
|
|
}
|
|
|
|
|
|
- private void batchLinkMissingValueHist(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkRepo, String jobTable) {
|
|
|
+ private void batchLinkMissingValueHist(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkService, String jobTable) {
|
|
|
final long start = System.currentTimeMillis();
|
|
|
final String mapperName = "insertLinkMissingValueHist";
|
|
|
final int maxBatchSize = ApplicationRepository.DBMS_BATCH_SIZE;
|
|
|
@@ -173,12 +180,11 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
|
|
|
DwdbTrafficMapper mapper = sqlSession.getMapper(DwdbTrafficMapper.class);
|
|
|
for (String linkId : dbmsJobResult.getLinkIds()) {
|
|
|
- LinkDto link = linkRepo.getLinkMap().get(linkId);
|
|
|
+ LinkDto link = linkService.getLinkMap().get(linkId);
|
|
|
if (link == null) {
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
- LinkMissingValueHistDto valueHist = linkRepo.getMissingValueHist(link, exeDate);
|
|
|
+ LinkMissingValueHistDto valueHist = linkService.getMissingValueHist(link, exeDate);
|
|
|
if (valueHist == null) {
|
|
|
continue;
|
|
|
}
|
|
|
@@ -219,9 +225,18 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
final String jobTable = ApplicationRepository.TABLE_LINK_MISSING_VALUE_HIST;
|
|
|
|
|
|
// 레벨1 링크에 대해서만, 스레드는 4개로 고정
|
|
|
+ // this.linkService.getLink1Lists()
|
|
|
+ final List<String> level1LinkList = this.linkService.getLinkMap().entrySet().stream()
|
|
|
+ .filter(entry -> entry.getValue().getLinkLevel() == 1)
|
|
|
+ .filter(entry -> entry.getValue().isPrcsPtrnMissing())
|
|
|
+ .filter(entry -> entry.getValue().getTrafFsn().getState().getCrtDataType() != LinkTrafState.TRAFFIC_CRT_DATA_TYPE.CRT_DATA_NORMAL)
|
|
|
+ .filter(entry -> !"O".equals(entry.getValue().getTrafFsn().getState().getMissValueYn()))
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
final ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
final int prcsThreadCount = 4;
|
|
|
- final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, this.linkService.getLink1Lists());
|
|
|
+ final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, level1LinkList);
|
|
|
final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
|
|
|
mapData.forEach((key, dbmsJobResult) -> {
|