package com.its.vds.process; import com.its.app.AppUtils; import com.its.vds.config.ThreadPoolInitializer; import com.its.vds.dao.mapper.UnitSystMapper; import com.its.vds.dao.mapper.VdsCtlrMapper; import com.its.vds.dao.mapper.VdsDtctMapper; import com.its.vds.dao.mapper.VdsStatMapper; import com.its.vds.dao.mapper.batch.VdsCtlrDao; import com.its.vds.dao.mapper.batch.VdsDtctDao; import com.its.vds.entity.TbUnitSystStts; import com.its.vds.entity.TbVdsCtlrStts; import com.its.vds.entity.voVdsDtctClct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @RequiredArgsConstructor @Service public class DbmsDataProcess { private final LinkedBlockingQueue DBMS_DATA_QUEUE = new LinkedBlockingQueue<>(1000); private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); private final DbmsDataAsyncTask asyncTask; private final VdsCtlrDao vdsCtlrDao; private final VdsDtctDao vdsDtctDao; private final UnitSystMapper unitSystMapper; private final VdsCtlrMapper vdsCtlrMapper; private final VdsDtctMapper vdsDtctMapper; private final VdsStatMapper vdsStatMapper; int MAX_CORE = Runtime.getRuntime().availableProcessors(); @PostConstruct void init() { } public void run() { log.info("DbmsDataProcess.run: Start."); if (this.MAX_CORE < 8) { this.MAX_CORE = 8; } ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class); int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork()); for (int ii = 0; ii < executePool; ii++) { log.info("DbmsDataProcess.Task: {}", ii); this.taskExecutor.execute(() -> { while (true) { try { DbmsData data = DBMS_DATA_QUEUE.take(); if (data != null) { asyncTask.run(this, data); } else { log.error("DbmsDataProcess.Task: Received data null"); } } catch (Exception e) { log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e); } } }); } log.info("DbmsDataProcess.run: ..End."); } /** * 비동기 타스크에서 실행되는 함수. * 클래스가 달라야 비동기 타스크로 실행된다. * @param data */ public void runJob(DbmsData data) { if (data.getType() == DbmsData.DBMS_DATA_UNIT_SYST_STTS) { TbUnitSystStts stts = (TbUnitSystStts) data.getData(); this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트 if (data.isHistory()) { this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장 } } else { process(data); } } public void process(DbmsData data) { try { int type = data.getType(); switch(type) { case DbmsData.DBMS_DATA_CTLR_STTS: List ctlrSttsList = (List)data.getData(); this.vdsCtlrDao.updateStts(ctlrSttsList, data.isHistory()); // 상태정보 업데이트 if (data.isHistory()) { this.vdsCtlrDao.insertStts(ctlrSttsList); // 상태정보 이력저장 } ctlrSttsList.clear(); break; case DbmsData.DBMS_DATA_DTCT_CLCT: List dtctClctList = (List)data.getData(); this.vdsDtctDao.updateClct(dtctClctList, data.isHistory()); // 수집정보 업데이트 this.vdsDtctDao.insertClct(dtctClctList); // 수집정보 이력저장 dtctClctList.clear(); break; } } catch (Exception e) { log.error("DbmsJobProcess.process: Exception: {}", e.toString()); } } /* * 작업큐에 데이터 추가 */ public boolean add(DbmsData data) { boolean offer = false; try { //offer => full -> return //add => full -> wait //큐가 차더라도 바로 리턴함. offer = DBMS_DATA_QUEUE.offer(data); if (!offer) { log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", DBMS_DATA_QUEUE.size()); } } catch (Exception e) { log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e); } return offer; } }