DbmsDataProcess.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package com.its.vds.process;
  2. import com.its.app.AppUtils;
  3. import com.its.vds.config.ThreadPoolInitializer;
  4. import com.its.vds.dao.mapper.UnitSystMapper;
  5. import com.its.vds.dao.mapper.VdsCtlrMapper;
  6. import com.its.vds.dao.mapper.VdsDtctMapper;
  7. import com.its.vds.dao.mapper.VdsStatMapper;
  8. import com.its.vds.dao.mapper.batch.VdsCtlrDao;
  9. import com.its.vds.dao.mapper.batch.VdsDtctDao;
  10. import com.its.vds.entity.TbUnitSystStts;
  11. import com.its.vds.entity.TbVdsCtlrStts;
  12. import com.its.vds.entity.voVdsDtctClct;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.stereotype.Service;
  16. import javax.annotation.PostConstruct;
  17. import java.util.List;
  18. import java.util.concurrent.Executors;
  19. import java.util.concurrent.LinkedBlockingQueue;
  20. import java.util.concurrent.ThreadPoolExecutor;
  21. @Slf4j
  22. @RequiredArgsConstructor
  23. @Service
  24. public class DbmsDataProcess {
  25. private final LinkedBlockingQueue<DbmsData> DBMS_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
  26. private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  27. private final DbmsDataAsyncTask asyncTask;
  28. private final VdsCtlrDao vdsCtlrDao;
  29. private final VdsDtctDao vdsDtctDao;
  30. private final UnitSystMapper unitSystMapper;
  31. private final VdsCtlrMapper vdsCtlrMapper;
  32. private final VdsDtctMapper vdsDtctMapper;
  33. private final VdsStatMapper vdsStatMapper;
  34. int MAX_CORE = Runtime.getRuntime().availableProcessors();
  35. @PostConstruct
  36. void init() {
  37. }
  38. public void run() {
  39. log.info("DbmsDataProcess.run: Start.");
  40. if (this.MAX_CORE < 8) {
  41. this.MAX_CORE = 8;
  42. }
  43. ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
  44. int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork());
  45. for (int ii = 0; ii < executePool; ii++) {
  46. log.info("DbmsDataProcess.Task: {}", ii);
  47. this.taskExecutor.execute(() -> {
  48. while (true) {
  49. try {
  50. DbmsData data = DBMS_DATA_QUEUE.take();
  51. if (data != null) {
  52. asyncTask.run(this, data);
  53. }
  54. else {
  55. log.error("DbmsDataProcess.Task: Received data null");
  56. }
  57. }
  58. catch (Exception e) {
  59. log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
  60. }
  61. }
  62. });
  63. }
  64. log.info("DbmsDataProcess.run: ..End.");
  65. }
  66. /**
  67. * 비동기 타스크에서 실행되는 함수.
  68. * 클래스가 달라야 비동기 타스크로 실행된다.
  69. * @param data
  70. */
  71. public void runJob(DbmsData data) {
  72. if (data.getType() == DbmsData.DBMS_DATA_UNIT_SYST_STTS) {
  73. TbUnitSystStts stts = (TbUnitSystStts) data.getData();
  74. this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트
  75. if (data.isHistory()) {
  76. this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장
  77. }
  78. }
  79. else {
  80. process(data);
  81. }
  82. }
  83. public void process(DbmsData data) {
  84. try {
  85. int type = data.getType();
  86. switch(type) {
  87. case DbmsData.DBMS_DATA_CTLR_STTS:
  88. List<TbVdsCtlrStts> ctlrSttsList = (List<TbVdsCtlrStts>)data.getData();
  89. this.vdsCtlrDao.updateStts(ctlrSttsList, data.isHistory()); // 상태정보 업데이트
  90. if (data.isHistory()) {
  91. this.vdsCtlrDao.insertStts(ctlrSttsList); // 상태정보 이력저장
  92. }
  93. ctlrSttsList.clear();
  94. break;
  95. case DbmsData.DBMS_DATA_DTCT_CLCT:
  96. List<voVdsDtctClct> dtctClctList = (List<voVdsDtctClct>)data.getData();
  97. this.vdsDtctDao.updateClct(dtctClctList, data.isHistory()); // 수집정보 업데이트
  98. this.vdsDtctDao.insertClct(dtctClctList); // 수집정보 이력저장
  99. dtctClctList.clear();
  100. break;
  101. }
  102. } catch (Exception e) {
  103. log.error("DbmsJobProcess.process: Exception: {}", e.toString());
  104. }
  105. }
  106. /*
  107. * 작업큐에 데이터 추가
  108. */
  109. public boolean add(DbmsData data) {
  110. boolean offer = false;
  111. try {
  112. //offer => full -> return
  113. //add => full -> wait
  114. //큐가 차더라도 바로 리턴함.
  115. offer = DBMS_DATA_QUEUE.offer(data);
  116. if (!offer) {
  117. log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", DBMS_DATA_QUEUE.size());
  118. }
  119. } catch (Exception e) {
  120. log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
  121. }
  122. return offer;
  123. }
  124. }