123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package com.its.vds.process;
- import com.its.app.AppUtils;
- import com.its.vds.config.ThreadPoolInitializer;
- import com.its.vds.dao.mapper.UnitSystMapper;
- import com.its.vds.dao.mapper.VdsCtlrMapper;
- import com.its.vds.dao.mapper.VdsDtctMapper;
- import com.its.vds.dao.mapper.VdsStatMapper;
- import com.its.vds.dao.mapper.batch.VdsCtlrDao;
- import com.its.vds.dao.mapper.batch.VdsDtctDao;
- import com.its.vds.entity.TbUnitSystStts;
- import com.its.vds.entity.TbVdsCtlrStts;
- import com.its.vds.entity.voVdsDtctClct;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import java.util.List;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- @Slf4j
- @RequiredArgsConstructor
- @Service
- public class DbmsDataProcess {
- private final LinkedBlockingQueue<DbmsData> DBMS_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
- private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
- private final DbmsDataAsyncTask asyncTask;
- private final VdsCtlrDao vdsCtlrDao;
- private final VdsDtctDao vdsDtctDao;
- private final UnitSystMapper unitSystMapper;
- private final VdsCtlrMapper vdsCtlrMapper;
- private final VdsDtctMapper vdsDtctMapper;
- private final VdsStatMapper vdsStatMapper;
- int MAX_CORE = Runtime.getRuntime().availableProcessors();
- @PostConstruct
- void init() {
- }
- public void run() {
- log.info("DbmsDataProcess.run: Start.");
- if (this.MAX_CORE < 8) {
- this.MAX_CORE = 8;
- }
- ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
- int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork());
- for (int ii = 0; ii < executePool; ii++) {
- log.info("DbmsDataProcess.Task: {}", ii);
- this.taskExecutor.execute(() -> {
- while (true) {
- try {
- DbmsData data = DBMS_DATA_QUEUE.take();
- if (data != null) {
- asyncTask.run(this, data);
- }
- else {
- log.error("DbmsDataProcess.Task: Received data null");
- }
- }
- catch (Exception e) {
- log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
- }
- }
- });
- }
- log.info("DbmsDataProcess.run: ..End.");
- }
- /**
- * 비동기 타스크에서 실행되는 함수.
- * 클래스가 달라야 비동기 타스크로 실행된다.
- * @param data
- */
- public void runJob(DbmsData data) {
- if (data.getType() == DbmsData.DBMS_DATA_UNIT_SYST_STTS) {
- TbUnitSystStts stts = (TbUnitSystStts) data.getData();
- this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트
- if (data.isHistory()) {
- this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장
- }
- }
- else {
- process(data);
- }
- }
- public void process(DbmsData data) {
- try {
- int type = data.getType();
- switch(type) {
- case DbmsData.DBMS_DATA_CTLR_STTS:
- List<TbVdsCtlrStts> ctlrSttsList = (List<TbVdsCtlrStts>)data.getData();
- this.vdsCtlrDao.updateStts(ctlrSttsList, data.isHistory()); // 상태정보 업데이트
- if (data.isHistory()) {
- this.vdsCtlrDao.insertStts(ctlrSttsList); // 상태정보 이력저장
- }
- ctlrSttsList.clear();
- break;
- case DbmsData.DBMS_DATA_DTCT_CLCT:
- List<voVdsDtctClct> dtctClctList = (List<voVdsDtctClct>)data.getData();
- this.vdsDtctDao.updateClct(dtctClctList, data.isHistory()); // 수집정보 업데이트
- this.vdsDtctDao.insertClct(dtctClctList); // 수집정보 이력저장
- dtctClctList.clear();
- break;
- }
- } catch (Exception e) {
- log.error("DbmsJobProcess.process: Exception: {}", e.toString());
- }
- }
- /*
- * 작업큐에 데이터 추가
- */
- public boolean add(DbmsData data) {
- boolean offer = false;
- try {
- //offer => full -> return
- //add => full -> wait
- //큐가 차더라도 바로 리턴함.
- offer = DBMS_DATA_QUEUE.offer(data);
- if (!offer) {
- log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", DBMS_DATA_QUEUE.size());
- }
- } catch (Exception e) {
- log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
- }
- return offer;
- }
- }
|