DbmsDataProcess.java 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package com.its.bis.process;
  2. import com.its.app.AppUtils;
  3. import com.its.bis.config.ThreadPoolInitializer;
  4. import com.its.bis.dao.mapper.NodeLinkMapper;
  5. import com.its.bis.dao.mapper.UnitSystMapper;
  6. import com.its.bis.entity.MakeTrafParam;
  7. import com.its.bis.entity.TbUnitSystStts;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.stereotype.Service;
  11. import java.util.concurrent.Executors;
  12. import java.util.concurrent.LinkedBlockingQueue;
  13. import java.util.concurrent.ThreadPoolExecutor;
  14. @Slf4j
  15. @RequiredArgsConstructor
  16. @Service
  17. public class DbmsDataProcess {
  18. private final LinkedBlockingQueue<DbmsData> dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000);
  19. private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  20. private final DbmsDataAsyncTask asyncTask;
  21. private final UnitSystMapper unitSystMapper;
  22. private final NodeLinkMapper nodeLinkMapper;
  23. private int maxCore = Runtime.getRuntime().availableProcessors();
  24. public void run() {
  25. log.info("DbmsDataProcess.run: Start.");
  26. if (this.maxCore < 8) {
  27. this.maxCore = 8;
  28. }
  29. ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
  30. int executePool = Math.max(this.maxCore, poolInitializer.getWork());
  31. for (int ii = 0; ii < executePool; ii++) {
  32. log.info("DbmsDataProcess.Task: {}", ii);
  33. this.taskExecutor.execute(() -> {
  34. boolean isRunning = true;
  35. while (isRunning) {
  36. try {
  37. DbmsData data = dbmsDataBlockingQueue.take();
  38. asyncTask.run(this, data);
  39. }
  40. catch (Exception e) {
  41. log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
  42. Thread.currentThread().interrupt();
  43. isRunning = false;
  44. }
  45. }
  46. });
  47. }
  48. log.info("DbmsDataProcess.run: ..End.");
  49. }
  50. /**
  51. * 비동기 타스크에서 실행되는 함수.
  52. * 클래스가 달라야 비동기 타스크로 실행된다.
  53. * @param data
  54. */
  55. public void runJob(DbmsData data) {
  56. if (data.getType() == DbmsDataType.DBMS_DATA_UNIT_SYST_STTS) {
  57. TbUnitSystStts stts = (TbUnitSystStts) data.getData();
  58. this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트
  59. if (data.isHistory()) {
  60. this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장
  61. }
  62. }
  63. else {
  64. process(data);
  65. }
  66. }
  67. public void process(DbmsData data) {
  68. int cnt = 0;
  69. try {
  70. DbmsDataType type = data.getType();
  71. switch(type) {
  72. case DBMS_DATA_CRT_BIS_LINK_TRAF:
  73. MakeTrafParam trafParam = (MakeTrafParam)data.getData();
  74. this.nodeLinkMapper.createBisLinkTraf(trafParam);
  75. break;
  76. default:
  77. log.error("DbmsJobProcess.process: Unknown Request {}.", type);
  78. break;
  79. }
  80. } catch (Exception e) {
  81. log.error("DbmsJobProcess.process: Exception: {}", e.toString());
  82. }
  83. }
  84. /*
  85. * 작업큐에 데이터 추가
  86. */
  87. public boolean add(DbmsData data) {
  88. boolean offer = false;
  89. try {
  90. //offer => full -> return
  91. //add => full -> wait
  92. //큐가 차더라도 바로 리턴함.
  93. offer = dbmsDataBlockingQueue.offer(data);
  94. if (!offer) {
  95. log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", dbmsDataBlockingQueue.size());
  96. }
  97. } catch (Exception e) {
  98. log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
  99. }
  100. return offer;
  101. }
  102. }