DbmsDataProcess.java 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package com.evp.comm.server.process.dbms;
  2. import com.evp.app.common.utils.Elapsed;
  3. import com.evp.comm.server.common.SpringUtils;
  4. import com.evp.comm.server.config.ThreadPoolInitializer;
  5. import com.evp.comm.server.dao.mapper.EvpsCenterMapper;
  6. import com.evp.comm.server.dao.mapper.EvpsServiceMapper;
  7. import com.evp.comm.server.dao.mapper.ProcessMapper;
  8. import com.evp.comm.server.dao.mapper.batch.EvpCommServerDao;
  9. import com.evp.comm.server.dto.EvpsCenter;
  10. import com.evp.comm.server.entity.TbRegionCenterComm;
  11. import com.evp.comm.server.kafka.dto.KafkaEvpsEventDto;
  12. import com.evp.comm.server.kafka.dto.KafkaEvpsServiceDto;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.slf4j.MDC;
  16. import org.springframework.stereotype.Service;
  17. import java.util.HashMap;
  18. import java.util.List;
  19. import java.util.concurrent.Executors;
  20. import java.util.concurrent.LinkedBlockingQueue;
  21. import java.util.concurrent.ThreadPoolExecutor;
  22. @Slf4j
  23. @RequiredArgsConstructor
  24. @Service
  25. public class DbmsDataProcess {
  26. private final LinkedBlockingQueue<DbmsData> dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000);
  27. private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  28. private final DbmsDataAsyncTask asyncTask;
  29. private final ProcessMapper processMapper;
  30. private final EvpsServiceMapper serviceMapper;
  31. private final EvpCommServerDao evpCommServerDao;
  32. private final EvpsCenterMapper evpsCenterMapper;
  33. private int maxCore = Runtime.getRuntime().availableProcessors();
  34. public void run() {
  35. log.info("DbmsDataProcess.run: Start.");
  36. if (this.maxCore < 8) {
  37. this.maxCore = 8;
  38. }
  39. ThreadPoolInitializer poolInitializer = SpringUtils.getBean(ThreadPoolInitializer.class);
  40. int executePool = poolInitializer.getDbms();
  41. for (int ii = 0; ii < executePool; ii++) {
  42. log.info("DbmsDataProcess.Task: {}", ii);
  43. this.taskExecutor.execute(() -> {
  44. boolean isRunning = true;
  45. while (isRunning) {
  46. try {
  47. DbmsData data = dbmsDataBlockingQueue.take();
  48. this.asyncTask.run(this, data);
  49. }
  50. catch (Exception e) {
  51. log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
  52. Thread.currentThread().interrupt();
  53. isRunning = false;
  54. }
  55. }
  56. });
  57. }
  58. log.info("DbmsDataProcess.run: ..End.");
  59. }
  60. public void runJob(DbmsData data) {
  61. if (data.getType() == DbmsData.DBMS_DATA_PROCESS_STTS) {
  62. String processId = (String) data.getData();
  63. this.processMapper.updateProcessState(processId);
  64. }
  65. else {
  66. process(data);
  67. }
  68. }
  69. public void process(DbmsData data) {
  70. int result = -1;
  71. int type = -1;
  72. EvpsCenter center = data.getCenter();
  73. try {
  74. MDC.put("id", center.getLogKey());
  75. Elapsed elapsed1 = new Elapsed();
  76. type = data.getType();
  77. switch(type) {
  78. case DbmsData.DBMS_DATA_INS_SERVICE:
  79. KafkaEvpsServiceDto newService = (KafkaEvpsServiceDto)data.getData();
  80. result = this.serviceMapper.insertEvpService(newService);
  81. break;
  82. case DbmsData.DBMS_DATA_UPD_SERVICE:
  83. KafkaEvpsServiceDto updService = (KafkaEvpsServiceDto)data.getData();
  84. result = this.serviceMapper.updateEvpService(updService);
  85. break;
  86. case DbmsData.DBMS_DATA_INS_EVENT:
  87. KafkaEvpsEventDto event = (KafkaEvpsEventDto)data.getData();
  88. result = this.serviceMapper.insertEvpEvent(event);
  89. break;
  90. case DbmsData.DBMS_DATA_INS_ROUTE:
  91. List<HashMap<String, Object>> routeList = (List<HashMap<String, Object>>)data.getData();
  92. result = this.evpCommServerDao.insertEvpRoute(routeList);
  93. break;
  94. case DbmsData.DBMS_DATA_INS_NODE:
  95. List<HashMap<String, Object>> nodeList = (List<HashMap<String, Object>>)data.getData();
  96. result = this.evpCommServerDao.insertEvpNode(nodeList);
  97. break;
  98. case DbmsData.DBMS_DATA_INS_PHASE:
  99. List<HashMap<String, Object>> phaseList = (List<HashMap<String, Object>>)data.getData();
  100. result = this.evpCommServerDao.insertEvpPhase(phaseList);
  101. break;
  102. case DbmsData.DBMS_DATA_INS_SIGNAL:
  103. List<HashMap<String, Object>> signalList = (List<HashMap<String, Object>>)data.getData();
  104. result = this.evpCommServerDao.insertEvpSignal(signalList);
  105. break;
  106. case DbmsData.DBMS_DATA_CENTER_STTS_UPDATE:
  107. TbRegionCenterComm updStts = (TbRegionCenterComm) data.getData();
  108. // this.centerMapper.updateCommState(updStts);
  109. break;
  110. case DbmsData.DBMS_DATA_CENTER_STTS:
  111. TbRegionCenterComm stts = (TbRegionCenterComm) data.getData();
  112. switch(stts.getCommState()) {
  113. case TbRegionCenterComm.CENTER_COMM_START:
  114. result = this.evpsCenterMapper.updateCommStateConnect(stts.getCenterId());
  115. break;
  116. case TbRegionCenterComm.CENTER_COMM_STOP:
  117. result = this.evpsCenterMapper.updateCommStateDisconnect(stts.getCenterId());
  118. break;
  119. case TbRegionCenterComm.CENTER_COMM_LAST:
  120. result = this.evpsCenterMapper.updateCommLastComm(stts.getCenterId());
  121. break;
  122. }
  123. break;
  124. default:
  125. log.error("DbmsJobProcess.process: Unknown Request {}.", type);
  126. break;
  127. }
  128. log.info("DbmsDataProcess.run: [{}]. {}, {} EA. {}",
  129. center.getLogKey(), type, result, Elapsed.elapsedTimeStr(elapsed1.nanoSeconds()));
  130. }
  131. catch (Exception e) {
  132. log.error("DbmsJobProcess.process: [{}]. {}, Exception: {}", center.getLogKey(), type, e.toString());
  133. }
  134. finally {
  135. MDC.remove(center.getLogKey());
  136. MDC.clear();
  137. }
  138. }
  139. /*
  140. * 작업큐에 데이터 추가
  141. */
  142. public boolean add(DbmsData data) {
  143. boolean offer = false;
  144. try {
  145. offer = dbmsDataBlockingQueue.offer(data);
  146. if (!offer) {
  147. log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", dbmsDataBlockingQueue.size());
  148. }
  149. } catch (Exception e) {
  150. log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
  151. }
  152. return offer;
  153. }
  154. }