package com.its.bis.process; import com.its.app.AppUtils; import com.its.bis.config.ThreadPoolInitializer; import com.its.bis.dao.mapper.NodeLinkMapper; import com.its.bis.dao.mapper.UnitSystMapper; import com.its.bis.entity.MakeTrafParam; import com.its.bis.entity.TbUnitSystStts; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @RequiredArgsConstructor @Service public class DbmsDataProcess { private final LinkedBlockingQueue dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000); private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); private final DbmsDataAsyncTask asyncTask; private final UnitSystMapper unitSystMapper; private final NodeLinkMapper nodeLinkMapper; private int maxCore = Runtime.getRuntime().availableProcessors(); public void run() { log.info("DbmsDataProcess.run: Start."); if (this.maxCore < 8) { this.maxCore = 8; } ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class); int executePool = Math.max(this.maxCore, poolInitializer.getWork()); for (int ii = 0; ii < executePool; ii++) { log.info("DbmsDataProcess.Task: {}", ii); this.taskExecutor.execute(() -> { boolean isRunning = true; while (isRunning) { try { DbmsData data = dbmsDataBlockingQueue.take(); asyncTask.run(this, data); } catch (Exception e) { log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e); Thread.currentThread().interrupt(); isRunning = false; } } }); } log.info("DbmsDataProcess.run: ..End."); } /** * 비동기 타스크에서 실행되는 함수. * 클래스가 달라야 비동기 타스크로 실행된다. * @param data */ public void runJob(DbmsData data) { if (data.getType() == DbmsDataType.DBMS_DATA_UNIT_SYST_STTS) { TbUnitSystStts stts = (TbUnitSystStts) data.getData(); this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트 if (data.isHistory()) { this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장 } } else { process(data); } } public void process(DbmsData data) { int cnt = 0; try { DbmsDataType type = data.getType(); switch(type) { case DBMS_DATA_CRT_BIS_LINK_TRAF: MakeTrafParam trafParam = (MakeTrafParam)data.getData(); this.nodeLinkMapper.createBisLinkTraf(trafParam); break; default: log.error("DbmsJobProcess.process: Unknown Request {}.", type); break; } } catch (Exception e) { log.error("DbmsJobProcess.process: Exception: {}", e.toString()); } } /* * 작업큐에 데이터 추가 */ public boolean add(DbmsData data) { boolean offer = false; try { //offer => full -> return //add => full -> wait //큐가 차더라도 바로 리턴함. offer = dbmsDataBlockingQueue.offer(data); if (!offer) { log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", dbmsDataBlockingQueue.size()); } } catch (Exception e) { log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e); } return offer; } }