|
|
@@ -6,10 +6,9 @@ import com.utic.center.common.spring.SpringUtils;
|
|
|
import com.utic.center.common.utils.LogUtils;
|
|
|
import com.utic.center.utic.traf.server.config.TraceConfig;
|
|
|
import com.utic.center.utic.traf.server.dao.mapper.dwdb.DwdbTrafficMapper;
|
|
|
-import com.utic.center.utic.traf.server.dao.mapper.utic.UticTrafficMapper;
|
|
|
-import com.utic.center.utic.traf.server.dao.repository.dwdb.DwdbTrafficRepository;
|
|
|
-import com.utic.center.utic.traf.server.dao.repository.utic.UticTrafficRepository;
|
|
|
-import com.utic.center.utic.traf.server.dto.*;
|
|
|
+import com.utic.center.utic.traf.server.dto.DbmsBatchJobResult;
|
|
|
+import com.utic.center.utic.traf.server.dto.LinkDto;
|
|
|
+import com.utic.center.utic.traf.server.dto.LinkTrafCenterDto;
|
|
|
import com.utic.center.utic.traf.server.repository.ApplicationRepository;
|
|
|
import com.utic.center.utic.traf.server.service.worker.WorkerUtils;
|
|
|
import lombok.Data;
|
|
|
@@ -22,14 +21,11 @@ import org.apache.ibatis.session.SqlSessionFactory;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Objects;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
|
@Data
|
|
|
@@ -40,13 +36,9 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
|
|
|
private final TraceConfig trace;
|
|
|
private final LinkService linkService;
|
|
|
- private final LinkTrafSaveCenterService linkTrafSaveCenterService;
|
|
|
- private final UticTrafficRepository uticRepo;
|
|
|
- private final DwdbTrafficRepository dwdbRepo;
|
|
|
- private final UticTrafficMapper uticMapper;
|
|
|
- private final DwdbTrafficMapper dwdbMapper;
|
|
|
|
|
|
private SqlSessionFactory sqlSessionFactory;
|
|
|
+ private String histMonth;
|
|
|
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
@@ -54,12 +46,6 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
this.sqlSessionFactory = (SqlSessionFactory) SpringUtils.getBean("dwdbSqlSessionFactory");
|
|
|
}
|
|
|
|
|
|
- private void insertLinkFusionLog() {
|
|
|
- LinkTrafFusionInfo fusionInfo = this.linkService.getFusionInfo();
|
|
|
- this.dwdbRepo.insertLinkFusionLog(fusionInfo);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
private void batchTrafficCenterHist(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkRepo, String jobTable) {
|
|
|
final long start = System.currentTimeMillis();
|
|
|
final String mapperName = "insertTrafficCenterHist";
|
|
|
@@ -90,7 +76,7 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
|
|
|
LinkTrafCenterDto trafDto = link.getTrafCenter();
|
|
|
try {
|
|
|
- mapper.insertTrafficCenterHist(trafDto);
|
|
|
+ mapper.insertTrafficCenterHist(this.histMonth, trafDto);
|
|
|
linkCounts[trafDto.getLinkLevel()-1]++;
|
|
|
jobCnt++;
|
|
|
|
|
|
@@ -124,7 +110,7 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
|
|
|
private void insertTrafficCenterHist() {
|
|
|
final long start = System.currentTimeMillis();
|
|
|
- final String jobTable = ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST;
|
|
|
+ final String jobTable = ApplicationRepository.TABLE_TRAFFIC_CENTER_HIST + this.histMonth + "@DWDB";
|
|
|
|
|
|
// 모든 링크 레벨에 대하여 작업처리
|
|
|
// this.linkService.getLinkLists()
|
|
|
@@ -137,6 +123,10 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
final ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
final int prcsThreadCount = ApplicationRepository.PRCS_THREAD_COUNT;
|
|
|
final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, this.linkService.getValidTrafLinkList());
|
|
|
+ if (threadPoolSize < 0) {
|
|
|
+ log.warn("[WAN] insertTrafficCenterHist: getValidTrafLinkList no data, job return: [{}].", jobTable);
|
|
|
+ return;
|
|
|
+ }
|
|
|
final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
|
|
|
mapData.forEach((key, dbmsJobResult) -> {
|
|
|
@@ -175,128 +165,12 @@ public class LinkTrafSaveDwdbService implements AbstractProcessService {
|
|
|
log.info("[INF] {}", LogUtils.elapsedLog(jobTable, resultCount, System.currentTimeMillis() - start));
|
|
|
}
|
|
|
|
|
|
- private void batchLinkMissingValueHist(SqlSessionFactory sqlSessionFactory, DbmsBatchJobResult dbmsJobResult, LinkService linkService, String jobTable) {
|
|
|
- final long start = System.currentTimeMillis();
|
|
|
- final String mapperName = "insertLinkMissingValueHist";
|
|
|
- final int maxBatchSize = ApplicationRepository.DBMS_BATCH_SIZE;
|
|
|
- int total = 0;
|
|
|
- int jobCnt = 0;
|
|
|
-
|
|
|
- String exeDate = ApplicationRepository.trafPrcsTime.getPrcsTime().substring(0, 12);
|
|
|
- try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
|
|
|
- DwdbTrafficMapper mapper = sqlSession.getMapper(DwdbTrafficMapper.class);
|
|
|
- for (String linkId : dbmsJobResult.getLinkIds()) {
|
|
|
- LinkDto link = linkService.getLinkMap().get(linkId);
|
|
|
- if (link == null) {
|
|
|
- log.error("[ERR] LinkTrafSaveDwdbService.batchLinkMissingValueHist: {} link not found.", linkId);
|
|
|
- continue;
|
|
|
- }
|
|
|
- LinkMissingValueHistDto valueHist = linkService.getMissingValueHist(link, exeDate);
|
|
|
- if (valueHist == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- total++;
|
|
|
-
|
|
|
- try {
|
|
|
- mapper.insertLinkMissingValueHist(valueHist);
|
|
|
- jobCnt++;
|
|
|
-
|
|
|
- if (jobCnt % maxBatchSize == 0) {
|
|
|
- sqlSession.flushStatements();
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("[ERR] batchLinkMissingValueHist({}): jobCnt({}), Exception, [{}], {}.",
|
|
|
- dbmsJobResult.getJobIndex(), jobCnt, valueHist, e.getMessage());
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- sqlSession.flushStatements();
|
|
|
- sqlSession.commit();
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("[ERR] batchLinkMissingValueHist({}): Exception, [{}], {}.", dbmsJobResult.getJobIndex(), jobTable, e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- dbmsJobResult.setTotal(total);
|
|
|
- dbmsJobResult.setTarget(total);
|
|
|
- dbmsJobResult.setLink1(jobCnt);
|
|
|
- dbmsJobResult.setLink2(0);
|
|
|
- dbmsJobResult.setLink3(0);
|
|
|
- dbmsJobResult.setLink4(0);
|
|
|
- dbmsJobResult.setEffects(jobCnt);
|
|
|
- dbmsJobResult.setElapsedTime(System.currentTimeMillis() - start);
|
|
|
- if (this.trace.isDebug()) {
|
|
|
- log.info("[INS] {} | {}", LogUtils.elapsedLog(mapperName + dbmsJobResult.getJobIndex(), jobCnt, dbmsJobResult.getElapsedTime()), Thread.currentThread().getName());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void insertLinkMissingValueHist() {
|
|
|
- final long start = System.currentTimeMillis();
|
|
|
- final String jobTable = ApplicationRepository.TABLE_LINK_MISSING_VALUE_HIST;
|
|
|
-
|
|
|
- // 레벨1 링크에 대해서만, 스레드는 4개로 고정
|
|
|
- // this.linkService.getLink1Lists()
|
|
|
-// final List<String> level1LinkList = this.linkService.getLinkMap().entrySet().stream()
|
|
|
-// .filter(entry -> entry.getValue().getLinkLevel() == 1)
|
|
|
-// .filter(entry -> entry.getValue().isPrcsPtrnMissing())
|
|
|
-// .filter(entry -> entry.getValue().getTrafFsn().getState().getCrtDataType() != LinkTrafState.TRAFFIC_CRT_DATA_TYPE.CRT_DATA_NORMAL)
|
|
|
-// .filter(entry -> !"O".equals(entry.getValue().getTrafFsn().getState().getMissValueYn()))
|
|
|
-// .map(Map.Entry::getKey)
|
|
|
-// .collect(Collectors.toList());
|
|
|
-// final List<String> level1LinkList = this.linkService.getLink1Lists().stream()
|
|
|
-// .map(this.linkService.getLinkMap()::get)
|
|
|
-// .filter(Objects::nonNull)
|
|
|
-// .filter(LinkDto::isPrcsPtrnMissing)
|
|
|
-// .filter(dto -> dto.getTrafFsn().getState().getCrtDataType() != LinkTrafState.TRAFFIC_CRT_DATA_TYPE.CRT_DATA_NORMAL)
|
|
|
-// .filter(dto -> !"O".equals(dto.getTrafFsn().getState().getMissValueYn()))
|
|
|
-// .map(LinkDto::getLinkId)
|
|
|
-// .collect(Collectors.toList());
|
|
|
- final List<String> level1LinkList = this.linkService.getLink1Lists().parallelStream()
|
|
|
- .map(this.linkService.getLinkMap()::get)
|
|
|
- .filter(Objects::nonNull)
|
|
|
- .filter(LinkDto::isPrcsPtrnMissing)
|
|
|
- .filter(dto -> dto.getTrafFsn().getState().getCrtDataType() != LinkTrafState.TRAFFIC_CRT_DATA_TYPE.CRT_DATA_NORMAL)
|
|
|
- .filter(dto -> !"O".equals(dto.getTrafFsn().getState().getMissValueYn()))
|
|
|
- .map(LinkDto::getLinkId)
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- final ConcurrentHashMap<Integer, DbmsBatchJobResult> mapData = new ConcurrentHashMap<>();
|
|
|
- final int prcsThreadCount = 4;
|
|
|
- final int threadPoolSize = WorkerUtils.allocateWorkerLinkJobs(mapData, prcsThreadCount, level1LinkList);
|
|
|
- final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
|
|
-
|
|
|
- mapData.forEach((key, dbmsJobResult) -> {
|
|
|
- executorService.execute(() -> batchLinkMissingValueHist(this.sqlSessionFactory, dbmsJobResult, this.linkService, jobTable));
|
|
|
- });
|
|
|
-
|
|
|
- try {
|
|
|
- executorService.shutdown();
|
|
|
- if (!executorService.awaitTermination(100, TimeUnit.SECONDS)) {
|
|
|
- log.warn("[WAN] insertLinkMissingValueHist: Timeout while waiting for tasks to finish, [{}].", jobTable);
|
|
|
- executorService.shutdownNow();
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- log.error("[ERR] insertLinkMissingValueHist: InterruptedException, [{}], {}.", jobTable, e.getMessage());
|
|
|
- executorService.shutdownNow();
|
|
|
- }
|
|
|
-
|
|
|
- int resultCount = 0;
|
|
|
- long elapsedTime = 0;
|
|
|
- for (Map.Entry<Integer, DbmsBatchJobResult> entry : mapData.entrySet()) {
|
|
|
- DbmsBatchJobResult result = entry.getValue();
|
|
|
- elapsedTime = Math.max(elapsedTime, result.getElapsedTime());
|
|
|
- resultCount += result.getLink1();
|
|
|
- }
|
|
|
-
|
|
|
- log.info("[INF] {}", LogUtils.elapsedLog(jobTable, resultCount, System.currentTimeMillis() - start));
|
|
|
- }
|
|
|
-
|
|
|
@ProcessingElapsed(type="DWDB", name="교통정보 DWDB 저장", starting = false)
|
|
|
@Override
|
|
|
public boolean processing() {
|
|
|
+ this.histMonth = ApplicationRepository.trafPrcsTime.getPrcsTime().substring(4, 6);
|
|
|
|
|
|
- insertLinkFusionLog();
|
|
|
insertTrafficCenterHist();
|
|
|
- insertLinkMissingValueHist();
|
|
|
|
|
|
return true;
|
|
|
}
|