123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- package com.its.pis.process;
- import com.its.app.AppUtils;
- import com.its.pis.config.ThreadPoolInitializer;
- import com.its.pis.dao.mapper.PrkPlceMapper;
- import com.its.pis.dao.mapper.UnitSystMapper;
- import com.its.pis.dao.mapper.batch.PisInfrDao;
- import com.its.pis.dao.mapper.batch.PrkPlceDao;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- @Slf4j
- @RequiredArgsConstructor
- @Service
- public class WorkDataProcess {
- private final LinkedBlockingQueue<WorkData> WORK_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
- private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
- private final WorkDataAsyncTask asyncTask;
- private final PisInfrDao pisInfrDao;
- private final PrkPlceDao prkPlceDao;
- private UnitSystMapper unitSystMapper;
- private PrkPlceMapper prkPlceMapper;
- int MAX_CORE = Runtime.getRuntime().availableProcessors();
- @PostConstruct
- void init() {
- this.unitSystMapper = (UnitSystMapper) AppUtils.getBean(UnitSystMapper.class);
- this.prkPlceMapper = (PrkPlceMapper) AppUtils.getBean(PrkPlceMapper.class);
- }
- public void run() {
- log.info("WorkDataProcess.run: Start.");
- if (this.MAX_CORE < 8) {
- this.MAX_CORE = 8;
- }
- ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
- int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork());
- for (int ii = 0; ii < executePool; ii++) {
- log.info("WorkDataProcess.Task: {}", ii);
- this.taskExecutor.execute(() -> {
- while (true) {
- try {
- WorkData data = WORK_DATA_QUEUE.take();
- if (data != null) {
- asyncTask.run(this, data);
- }
- else {
- log.error("WorkDataProcess.Task: Received data null");
- }
- }
- catch (Exception e) {
- log.error("WorkDataProcess.Task: Exception: {}", e.getMessage(), e);
- }
- }
- });
- }
- log.info("WorkDataProcess.run: ..End.");
- }
- /**
- * 비동기 타스크에서 실행되는 함수.
- * 클래스가 달라야 비동기 타스크로 실행된다.
- * @param data
- */
- public void runJob(WorkData data) {
- log.error("WorkDataProcess.runJob: {}", Thread.currentThread().getName());
- process(data);
- }
- public void process(WorkData data) {
- // try {
- // int type = data.getType();
- // TbPisInfr pis = data.getObj();
- // switch(type) {
- // case DbmsData.DBMS_DATA_RL_TIME:
- // if (pis != null) {
- // String DATA_COLCT_TIME = SysUtils.getSysTime();
- // C2FMessage<PrkPlceRlTimeResponseInfo> rlTimeInfo = (C2FMessage<PrkPlceRlTimeResponseInfo>) data.getData();
- // if (rlTimeInfo != null) {
- // PrkPlceRlTimeResponseInfo info = rlTimeInfo.getData().getPayload();
- // TbPrkPlceRt rt = TbPrkPlceRt.toEntity(pis.getPIS_NMBR(), DATA_COLCT_TIME, info);
- //
- // this.prkPlceMapper.updatePrkPlceRlTime(rt);
- // this.prkPlceDao.updateRlTimeFlr(pis.getPIS_NMBR(), DATA_COLCT_TIME, info.getFlr_info());
- // this.prkPlceDao.updateRlTimeAr(pis.getPIS_NMBR(), info.getAr_info());
- // this.prkPlceDao.updateRlTimePrv(pis.getPIS_NMBR(), info.getPrvuse_prkar_info());
- // this.prkPlceDao.updateRlTimeDev(pis.getPIS_NMBR(), info.getPrk_colct_device_info());
- // }
- // else {
- // log.error("RL TIME Data NULL...............");
- // }
- // }
- // else {
- // log.error("PIS Object null");
- // }
- // break;
- // case DbmsData.DBMS_DATA_RL_TIME_HS:
- // TbPrkPlceRtHs rtHs = (TbPrkPlceRtHs)data.getData();
- // int result = this.prkPlceMapper.insertPrkPlceRlTimeHs(rtHs);
- // log.error("DATA_TYPE_RL_TIME_HS: {}, {} EA.", rtHs.getCrtnDt(), result);
- // break;
- // case DbmsData.DBMS_DATA_PIS_STTS:
- // List<TbPisInfrStts> pisSttsList = (List<TbPisInfrStts>)data.getData();
- // this.pisInfrDao.updateStts(pisSttsList, data.isHistory());
- // if (data.isHistory()) {
- // // 주차정보시스템 상태정보 이력은 저장하지 않는다.
- // //this.pisInfrDao.insertStts(pisSttsList);
- // }
- // pisSttsList.clear();
- // break;
- // case DbmsData.DBMS_DATA_PRK_PLCE_STTS:
- // List<TbPrkPlceStts> prltSttsList = (List<TbPrkPlceStts>)data.getData();
- // this.prkPlceDao.updatePrkPlceStts(prltSttsList, data.isHistory());
- // if (data.isHistory()) {
- // int res = this.prkPlceDao.insertPrkPlceSttsHs(prltSttsList);
- // log.error("DATA_TYPE_PRK_PLCE_STTS: {}, {}/{} EA. ", prltSttsList.get(0).getUPDT_DT(), prltSttsList.size(), res);
- // }
- // prltSttsList.clear();
- // break;
- // default:
- // log.error("WorkDataProcess.process: Unknown Job Type: {}", type);
- // break;
- // }
- // } catch (Exception e) {
- // log.error("WorkDataProcess.process: Exception: {}", e.toString());
- // }
- }
- /*
- * 작업큐에 데이터 추가
- */
- public boolean add(WorkData data) {
- boolean offer = false;
- try {
- //offer => full -> return
- //add => full -> wait
- //큐가 차더라도 바로 리턴함.
- offer = WORK_DATA_QUEUE.offer(data);
- if (!offer) {
- log.error("WorkDataProcess.add: Queue Full Error, Size: {} EA", WORK_DATA_QUEUE.size());
- }
- } catch (Exception e) {
- log.error("WorkDataProcess.add: Exception: {}", e.getMessage(), e);
- }
- return offer;
- }
- }
|