DbmsDataProcess.java 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package com.evp.comm.server.process.dbms;
  2. import com.evp.app.common.utils.Elapsed;
  3. import com.evp.comm.server.common.SpringUtils;
  4. import com.evp.comm.server.config.ThreadPoolInitializer;
  5. import com.evp.comm.server.dao.mapper.EvpsServiceMapper;
  6. import com.evp.comm.server.dao.mapper.ProcessMapper;
  7. import com.evp.comm.server.dao.mapper.batch.EvpCommServerDao;
  8. import com.evp.comm.server.dto.EvpsCenter;
  9. import com.evp.comm.server.entity.TbEvpEvent;
  10. import com.evp.comm.server.entity.TbEvpService;
  11. import com.evp.comm.server.entity.TbRegionCenterComm;
  12. import lombok.RequiredArgsConstructor;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.slf4j.MDC;
  15. import org.springframework.stereotype.Service;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.concurrent.Executors;
  19. import java.util.concurrent.LinkedBlockingQueue;
  20. import java.util.concurrent.ThreadPoolExecutor;
  21. @Slf4j
  22. @RequiredArgsConstructor
  23. @Service
  24. public class DbmsDataProcess {
  25. private final LinkedBlockingQueue<DbmsData> dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000);
  26. private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  27. private final DbmsDataAsyncTask asyncTask;
  28. private final ProcessMapper processMapper;
  29. private final EvpsServiceMapper serviceMapper;
  30. private final EvpCommServerDao evpCommServerDao;
  31. private int maxCore = Runtime.getRuntime().availableProcessors();
  32. public void run() {
  33. log.info("DbmsDataProcess.run: Start.");
  34. if (this.maxCore < 8) {
  35. this.maxCore = 8;
  36. }
  37. ThreadPoolInitializer poolInitializer = SpringUtils.getBean(ThreadPoolInitializer.class);
  38. int executePool = poolInitializer.getDbms();
  39. for (int ii = 0; ii < executePool; ii++) {
  40. log.info("DbmsDataProcess.Task: {}", ii);
  41. this.taskExecutor.execute(() -> {
  42. boolean isRunning = true;
  43. while (isRunning) {
  44. try {
  45. DbmsData data = dbmsDataBlockingQueue.take();
  46. this.asyncTask.run(this, data);
  47. }
  48. catch (Exception e) {
  49. log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
  50. Thread.currentThread().interrupt();
  51. isRunning = false;
  52. }
  53. }
  54. });
  55. }
  56. log.info("DbmsDataProcess.run: ..End.");
  57. }
  58. public void runJob(DbmsData data) {
  59. if (data.getType() == DbmsData.DBMS_DATA_PROCESS_STTS) {
  60. String processId = (String) data.getData();
  61. this.processMapper.updateProcessState(processId);
  62. }
  63. else {
  64. process(data);
  65. }
  66. }
  67. public void process(DbmsData data) {
  68. int result = -1;
  69. int type = -1;
  70. EvpsCenter center = data.getCenter();
  71. try {
  72. MDC.put("id", center.getLogKey());
  73. Elapsed elapsed1 = new Elapsed();
  74. type = data.getType();
  75. switch(type) {
  76. case DbmsData.DBMS_DATA_INS_SERVICE:
  77. TbEvpService newService = (TbEvpService)data.getData();
  78. result = this.serviceMapper.insertEvpService(newService);
  79. break;
  80. case DbmsData.DBMS_DATA_UPD_SERVICE:
  81. TbEvpService updService = (TbEvpService)data.getData();
  82. result = this.serviceMapper.updateEvpService(updService);
  83. break;
  84. case DbmsData.DBMS_DATA_INS_EVENT:
  85. TbEvpEvent event = (TbEvpEvent)data.getData();
  86. result = this.serviceMapper.insertEvpEvent(event);
  87. break;
  88. case DbmsData.DBMS_DATA_INS_ROUTE:
  89. List<HashMap<String, Object>> routeList = (List<HashMap<String, Object>>)data.getData();
  90. result = this.evpCommServerDao.insertEvpRoute(routeList);
  91. break;
  92. case DbmsData.DBMS_DATA_INS_NODE:
  93. List<HashMap<String, Object>> nodeList = (List<HashMap<String, Object>>)data.getData();
  94. result = this.evpCommServerDao.insertEvpNode(nodeList);
  95. break;
  96. case DbmsData.DBMS_DATA_INS_PHASE:
  97. List<HashMap<String, Object>> phaseList = (List<HashMap<String, Object>>)data.getData();
  98. result = this.evpCommServerDao.insertEvpPhase(phaseList);
  99. break;
  100. case DbmsData.DBMS_DATA_INS_SIGNAL:
  101. List<HashMap<String, Object>> signalList = (List<HashMap<String, Object>>)data.getData();
  102. result = this.evpCommServerDao.insertEvpSignal(signalList);
  103. break;
  104. case DbmsData.DBMS_DATA_CENTER_STTS_UPDATE:
  105. TbRegionCenterComm updStts = (TbRegionCenterComm) data.getData();
  106. // this.centerMapper.updateCommState(updStts);
  107. break;
  108. case DbmsData.DBMS_DATA_CENTER_STTS:
  109. TbRegionCenterComm stts = (TbRegionCenterComm) data.getData();
  110. // if (TbRegionCenterComm.CENTER_COMM_START.equals(stts.getCommState())) {
  111. // result = this.centerMapper.updateCommStateConnect(stts);
  112. // }
  113. // else if (TbRegionCenterComm.CENTER_COMM_STOP.equals(stts.getCommState())) {
  114. // result = this.centerMapper.updateCommStateDisconnect(stts);
  115. // }
  116. break;
  117. default:
  118. log.error("DbmsJobProcess.process: Unknown Request {}.", type);
  119. break;
  120. }
  121. // log.info("DbmsDataProcess.run: [{}]. {}, {} EA. {}",
  122. // center.getLogKey(), type, result, Elapsed.elapsedTimeStr(elapsed1.nanoSeconds()));
  123. }
  124. catch (Exception e) {
  125. log.error("DbmsJobProcess.process: [{}]. {}, Exception: {}", center.getLogKey(), type, e.toString());
  126. }
  127. finally {
  128. MDC.remove(center.getLogKey());
  129. MDC.clear();
  130. }
  131. }
  132. /*
  133. * 작업큐에 데이터 추가
  134. */
  135. public boolean add(DbmsData data) {
  136. boolean offer = false;
  137. try {
  138. offer = dbmsDataBlockingQueue.offer(data);
  139. if (!offer) {
  140. log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", dbmsDataBlockingQueue.size());
  141. }
  142. } catch (Exception e) {
  143. log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
  144. }
  145. return offer;
  146. }
  147. }