WorkDataProcess.java 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package com.its.pis.process;
  2. import com.its.app.AppUtils;
  3. import com.its.pis.config.ThreadPoolInitializer;
  4. import com.its.pis.dao.mapper.PrkPlceMapper;
  5. import com.its.pis.dao.mapper.UnitSystMapper;
  6. import com.its.pis.dao.mapper.batch.PisInfrDao;
  7. import com.its.pis.dao.mapper.batch.PrkPlceDao;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.stereotype.Service;
  11. import javax.annotation.PostConstruct;
  12. import java.util.concurrent.Executors;
  13. import java.util.concurrent.LinkedBlockingQueue;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. @Slf4j
  16. @RequiredArgsConstructor
  17. @Service
  18. public class WorkDataProcess {
  19. private final LinkedBlockingQueue<WorkData> WORK_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
  20. private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  21. private final WorkDataAsyncTask asyncTask;
  22. private final PisInfrDao pisInfrDao;
  23. private final PrkPlceDao prkPlceDao;
  24. private UnitSystMapper unitSystMapper;
  25. private PrkPlceMapper prkPlceMapper;
  26. int MAX_CORE = Runtime.getRuntime().availableProcessors();
  27. @PostConstruct
  28. void init() {
  29. this.unitSystMapper = (UnitSystMapper) AppUtils.getBean(UnitSystMapper.class);
  30. this.prkPlceMapper = (PrkPlceMapper) AppUtils.getBean(PrkPlceMapper.class);
  31. }
  32. public void run() {
  33. log.info("WorkDataProcess.run: Start.");
  34. if (this.MAX_CORE < 8) {
  35. this.MAX_CORE = 8;
  36. }
  37. ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
  38. int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork());
  39. for (int ii = 0; ii < executePool; ii++) {
  40. log.info("WorkDataProcess.Task: {}", ii);
  41. this.taskExecutor.execute(() -> {
  42. while (true) {
  43. try {
  44. WorkData data = WORK_DATA_QUEUE.take();
  45. if (data != null) {
  46. asyncTask.run(this, data);
  47. }
  48. else {
  49. log.error("WorkDataProcess.Task: Received data null");
  50. }
  51. }
  52. catch (Exception e) {
  53. log.error("WorkDataProcess.Task: Exception: {}", e.getMessage(), e);
  54. }
  55. }
  56. });
  57. }
  58. log.info("WorkDataProcess.run: ..End.");
  59. }
  60. /**
  61. * 비동기 타스크에서 실행되는 함수.
  62. * 클래스가 달라야 비동기 타스크로 실행된다.
  63. * @param data
  64. */
  65. public void runJob(WorkData data) {
  66. log.error("WorkDataProcess.runJob: {}", Thread.currentThread().getName());
  67. process(data);
  68. }
  69. public void process(WorkData data) {
  70. // try {
  71. // int type = data.getType();
  72. // TbPisInfr pis = data.getObj();
  73. // switch(type) {
  74. // case DbmsData.DBMS_DATA_RL_TIME:
  75. // if (pis != null) {
  76. // String DATA_COLCT_TIME = SysUtils.getSysTime();
  77. // C2FMessage<PrkPlceRlTimeResponseInfo> rlTimeInfo = (C2FMessage<PrkPlceRlTimeResponseInfo>) data.getData();
  78. // if (rlTimeInfo != null) {
  79. // PrkPlceRlTimeResponseInfo info = rlTimeInfo.getData().getPayload();
  80. // TbPrkPlceRt rt = TbPrkPlceRt.toEntity(pis.getPIS_NMBR(), DATA_COLCT_TIME, info);
  81. //
  82. // this.prkPlceMapper.updatePrkPlceRlTime(rt);
  83. // this.prkPlceDao.updateRlTimeFlr(pis.getPIS_NMBR(), DATA_COLCT_TIME, info.getFlr_info());
  84. // this.prkPlceDao.updateRlTimeAr(pis.getPIS_NMBR(), info.getAr_info());
  85. // this.prkPlceDao.updateRlTimePrv(pis.getPIS_NMBR(), info.getPrvuse_prkar_info());
  86. // this.prkPlceDao.updateRlTimeDev(pis.getPIS_NMBR(), info.getPrk_colct_device_info());
  87. // }
  88. // else {
  89. // log.error("RL TIME Data NULL...............");
  90. // }
  91. // }
  92. // else {
  93. // log.error("PIS Object null");
  94. // }
  95. // break;
  96. // case DbmsData.DBMS_DATA_RL_TIME_HS:
  97. // TbPrkPlceRtHs rtHs = (TbPrkPlceRtHs)data.getData();
  98. // int result = this.prkPlceMapper.insertPrkPlceRlTimeHs(rtHs);
  99. // log.error("DATA_TYPE_RL_TIME_HS: {}, {} EA.", rtHs.getCrtnDt(), result);
  100. // break;
  101. // case DbmsData.DBMS_DATA_PIS_STTS:
  102. // List<TbPisInfrStts> pisSttsList = (List<TbPisInfrStts>)data.getData();
  103. // this.pisInfrDao.updateStts(pisSttsList, data.isHistory());
  104. // if (data.isHistory()) {
  105. // // 주차정보시스템 상태정보 이력은 저장하지 않는다.
  106. // //this.pisInfrDao.insertStts(pisSttsList);
  107. // }
  108. // pisSttsList.clear();
  109. // break;
  110. // case DbmsData.DBMS_DATA_PRK_PLCE_STTS:
  111. // List<TbPrkPlceStts> prltSttsList = (List<TbPrkPlceStts>)data.getData();
  112. // this.prkPlceDao.updatePrkPlceStts(prltSttsList, data.isHistory());
  113. // if (data.isHistory()) {
  114. // int res = this.prkPlceDao.insertPrkPlceSttsHs(prltSttsList);
  115. // log.error("DATA_TYPE_PRK_PLCE_STTS: {}, {}/{} EA. ", prltSttsList.get(0).getUPDT_DT(), prltSttsList.size(), res);
  116. // }
  117. // prltSttsList.clear();
  118. // break;
  119. // default:
  120. // log.error("WorkDataProcess.process: Unknown Job Type: {}", type);
  121. // break;
  122. // }
  123. // } catch (Exception e) {
  124. // log.error("WorkDataProcess.process: Exception: {}", e.toString());
  125. // }
  126. }
  127. /*
  128. * 작업큐에 데이터 추가
  129. */
  130. public boolean add(WorkData data) {
  131. boolean offer = false;
  132. try {
  133. //offer => full -> return
  134. //add => full -> wait
  135. //큐가 차더라도 바로 리턴함.
  136. offer = WORK_DATA_QUEUE.offer(data);
  137. if (!offer) {
  138. log.error("WorkDataProcess.add: Queue Full Error, Size: {} EA", WORK_DATA_QUEUE.size());
  139. }
  140. } catch (Exception e) {
  141. log.error("WorkDataProcess.add: Exception: {}", e.getMessage(), e);
  142. }
  143. return offer;
  144. }
  145. }