|
@@ -0,0 +1,147 @@
|
|
|
+package com.its.vds.process;
|
|
|
+
|
|
|
+import com.its.app.AppUtils;
|
|
|
+import com.its.vds.config.ThreadPoolInitializer;
|
|
|
+import com.its.vds.dao.mapper.UnitSystMapper;
|
|
|
+import com.its.vds.dao.mapper.VdsCtlrMapper;
|
|
|
+import com.its.vds.dao.mapper.VdsDtctMapper;
|
|
|
+import com.its.vds.dao.mapper.VdsStatMapper;
|
|
|
+import com.its.vds.dao.mapper.batch.VdsCtlrDao;
|
|
|
+import com.its.vds.dao.mapper.batch.VdsDtctDao;
|
|
|
+import com.its.vds.entity.TbUnitSystStts;
|
|
|
+import com.its.vds.entity.TbVdsCtlrStts;
|
|
|
+import com.its.vds.entity.voVdsDtctClct;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Service
|
|
|
+public class DbmsDataProcess {
|
|
|
+
|
|
|
+ private final LinkedBlockingQueue<DbmsData> DBMS_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
|
|
|
+ private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
|
|
|
+
|
|
|
+ private final DbmsDataAsyncTask asyncTask;
|
|
|
+ private final VdsCtlrDao vdsCtlrDao;
|
|
|
+ private final VdsDtctDao vdsDtctDao;
|
|
|
+
|
|
|
+ private final UnitSystMapper unitSystMapper;
|
|
|
+ private final VdsCtlrMapper vdsCtlrMapper;
|
|
|
+ private final VdsDtctMapper vdsDtctMapper;
|
|
|
+ private final VdsStatMapper vdsStatMapper;
|
|
|
+ int MAX_CORE = Runtime.getRuntime().availableProcessors();
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ void init() {
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ log.info("DbmsDataProcess.run: Start.");
|
|
|
+ if (this.MAX_CORE < 8) {
|
|
|
+ this.MAX_CORE = 8;
|
|
|
+ }
|
|
|
+ ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
|
|
|
+ int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork());
|
|
|
+ for (int ii = 0; ii < executePool; ii++) {
|
|
|
+ log.info("DbmsDataProcess.Task: {}", ii);
|
|
|
+ this.taskExecutor.execute(() -> {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ DbmsData data = DBMS_DATA_QUEUE.take();
|
|
|
+ if (data != null) {
|
|
|
+ asyncTask.run(this, data);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.error("DbmsDataProcess.Task: Received data null");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("DbmsDataProcess.run: ..End.");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 비동기 타스크에서 실행되는 함수.
|
|
|
+ * 클래스가 달라야 비동기 타스크로 실행된다.
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ public void runJob(DbmsData data) {
|
|
|
+ if (data.getType() == DbmsData.DBMS_DATA_UNIT_SYST_STTS) {
|
|
|
+ TbUnitSystStts stts = (TbUnitSystStts) data.getData();
|
|
|
+ this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트
|
|
|
+ if (data.isHistory()) {
|
|
|
+ this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ process(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void process(DbmsData data) {
|
|
|
+ try {
|
|
|
+ int type = data.getType();
|
|
|
+ switch(type) {
|
|
|
+ case DbmsData.DBMS_DATA_CTLR_STTS:
|
|
|
+ List<TbVdsCtlrStts> ctlrSttsList = (List<TbVdsCtlrStts>)data.getData();
|
|
|
+ this.vdsCtlrDao.updateStts(ctlrSttsList, data.isHistory());
|
|
|
+ if (data.isHistory()) {
|
|
|
+ this.vdsCtlrDao.insertStts(ctlrSttsList);
|
|
|
+ }
|
|
|
+// for (TbVdsCtlrStts vo : ctlrSttsList) {
|
|
|
+// this.vdsCtlrMapper.updateVdsCtlrStts(vo); // 상태정보 업데이트
|
|
|
+// if (data.isHistory()) {
|
|
|
+// this.vdsCtlrMapper.insertVdsCtlrSttsHs(vo); // 상태정보 이력저장
|
|
|
+// }
|
|
|
+// }
|
|
|
+ ctlrSttsList.clear();
|
|
|
+ break;
|
|
|
+ case DbmsData.DBMS_DATA_DTCT_CLCT:
|
|
|
+ List<voVdsDtctClct> dtctClctList = (List<voVdsDtctClct>)data.getData();
|
|
|
+ //this.vdsDtctDao.updateClct(dtctClctList, data.isHistory());
|
|
|
+ this.vdsDtctDao.insertClct(dtctClctList);
|
|
|
+// for (voVdsDtctClct vo : dtctClctList) {
|
|
|
+// this.vdsDtctMapper.updateVdsDtctClctPnst(vo); // 수집정보 업데이트
|
|
|
+// this.vdsDtctMapper.insertVdsDtctClct(vo); // 수집정보 이력저장
|
|
|
+// }
|
|
|
+ dtctClctList.clear();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("DbmsJobProcess.process: Exception: {}", e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * 작업큐에 데이터 추가
|
|
|
+ */
|
|
|
+ public boolean add(DbmsData data) {
|
|
|
+ boolean offer = false;
|
|
|
+ try {
|
|
|
+ //offer => full -> return
|
|
|
+ //add => full -> wait
|
|
|
+ //큐가 차더라도 바로 리턴함.
|
|
|
+ offer = DBMS_DATA_QUEUE.offer(data);
|
|
|
+ if (!offer) {
|
|
|
+ log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", DBMS_DATA_QUEUE.size());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return offer;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|