package com.its.wthr.process; import com.its.app.AppUtils; import com.its.wthr.config.ThreadPoolInitializer; import com.its.wthr.mapper.UnitSystMapper; import com.its.wthr.vo.UnitSystSttsVo; import lombok.extern.slf4j.Slf4j; import javax.annotation.PostConstruct; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @Slf4j public abstract class AbstractDbmsJobProcess { public static LinkedBlockingQueue DBMS_DATA_QUEUE = new LinkedBlockingQueue<>(1000); private UnitSystMapper unitSystMapper; private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(1); int MAX_CORE = Runtime.getRuntime().availableProcessors(); @PostConstruct void init() { this.unitSystMapper = (UnitSystMapper) AppUtils.getBean(UnitSystMapper.class); postConstruct(); } protected abstract void postConstruct(); public abstract void process(DbmsJobData data); public void run() { log.info("AbstractDbmsJobProcess.run: Start."); if (this.MAX_CORE < 8) { this.MAX_CORE = 8; } ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class); int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork()); for (int ii = 0; ii < executePool; ii++) { log.info("AbstractDbmsJobProcess.Task: {}", ii); this.taskExecutor.execute(() -> { while (true) { try { DbmsJobData dbmsJobData = AbstractDbmsJobProcess.DBMS_DATA_QUEUE.take(); if (dbmsJobData != null) { //log.info("DBMS_DATA_QUEUE..take: {} EA, {}, {}", DBMS_DATA_QUEUE.size(), dbmsJobData, Thread.currentThread().getName()); DbmsJobTask handler = (DbmsJobTask) AppUtils.getBean(DbmsJobTask.class); handler.run(this, dbmsJobData); } else { log.error("AbstractDbmsJobProcess.Task: Received data null"); } } catch (Exception e) { log.error("AbstractDbmsJobProcess.Task: Exception: {}", e.getMessage(), e); } } }); } log.info("AbstractDbmsJobProcess.run: ..End."); } /* * 작업큐에 데이터 추가 */ public boolean add(DbmsJobData dbmsJobData) { boolean offer = false; try { //offer => full -> return //add => full -> wait //큐가 차더라도 바로 리턴함. offer = AbstractDbmsJobProcess.DBMS_DATA_QUEUE.offer(dbmsJobData); if (!offer) { log.error("DbmsJobProcess.add: Queue Full Error, Size: {} EA", DBMS_DATA_QUEUE.size()); } } catch (Exception e) { log.error("DbmsJobProcess.add: Exception: {}", e.getMessage(), e); } return offer; } public static boolean addQ(DbmsJobData dbmsJobData) { try { //offer => full -> return //add => full -> wait //큐가 차더라도 바로 리턴함. if (!AbstractDbmsJobProcess.DBMS_DATA_QUEUE.offer(dbmsJobData)) { log.error("AbstractDbmsJobProcess.add: Queue Full Error, Size: {} EA, {}", DBMS_DATA_QUEUE.size(), dbmsJobData); return false; } //log.info("DBMS_DATA_QUEUE.offer: {} EA, {}", DBMS_DATA_QUEUE.size(), dbmsJobData); return true; } catch (Exception e) { log.error("AbstractDbmsJobProcess.add: Exception: {}", e.getMessage(), e); } return false; } public void runJob(DbmsJobData dbmsJobData) { //log.info("AbstractDbmsJobProcess.runJob: type: {}, {}", dbmsJobData.getType(), Thread.currentThread().getName()); if (dbmsJobData.getType() == DbmsJobType.DATA_TYPE_UNIT_SYST_STTS) { // UNIT 상태정보 관리는 라이브러리에서 수행한다. UnitSystSttsVo stts = (UnitSystSttsVo)dbmsJobData.getData(); this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트 if (dbmsJobData.isHistory()) { this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장 } } else { process(dbmsJobData); } } }