package com.its.pis.process; import com.its.app.AppUtils; import com.its.pis.config.ThreadPoolInitializer; import com.its.pis.dao.mapper.PrkPlceMapper; import com.its.pis.dao.mapper.UnitSystMapper; import com.its.pis.dao.mapper.batch.PisInfrDao; import com.its.pis.dao.mapper.batch.PrkPlceDao; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @RequiredArgsConstructor @Service public class WorkDataProcess { private final LinkedBlockingQueue WORK_DATA_QUEUE = new LinkedBlockingQueue<>(1000); private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); private final WorkDataAsyncTask asyncTask; private final PisInfrDao pisInfrDao; private final PrkPlceDao prkPlceDao; private UnitSystMapper unitSystMapper; private PrkPlceMapper prkPlceMapper; int MAX_CORE = Runtime.getRuntime().availableProcessors(); @PostConstruct void init() { this.unitSystMapper = (UnitSystMapper) AppUtils.getBean(UnitSystMapper.class); this.prkPlceMapper = (PrkPlceMapper) AppUtils.getBean(PrkPlceMapper.class); } public void run() { log.info("WorkDataProcess.run: Start."); if (this.MAX_CORE < 8) { this.MAX_CORE = 8; } ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class); int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork()); for (int ii = 0; ii < executePool; ii++) { log.info("WorkDataProcess.Task: {}", ii); this.taskExecutor.execute(() -> { while (true) { try { WorkData data = WORK_DATA_QUEUE.take(); if (data != null) { asyncTask.run(this, data); } else { log.error("WorkDataProcess.Task: Received data null"); } } catch (Exception e) { log.error("WorkDataProcess.Task: Exception: {}", e.getMessage(), e); } } }); } log.info("WorkDataProcess.run: ..End."); } /** * 비동기 타스크에서 실행되는 함수. * 클래스가 달라야 비동기 타스크로 실행된다. * @param data */ public void runJob(WorkData data) { log.error("WorkDataProcess.runJob: {}", Thread.currentThread().getName()); process(data); } public void process(WorkData data) { // try { // int type = data.getType(); // TbPisInfr pis = data.getObj(); // switch(type) { // case DbmsData.DBMS_DATA_RL_TIME: // if (pis != null) { // String DATA_COLCT_TIME = SysUtils.getSysTime(); // C2FMessage rlTimeInfo = (C2FMessage) data.getData(); // if (rlTimeInfo != null) { // PrkPlceRlTimeResponseInfo info = rlTimeInfo.getData().getPayload(); // TbPrkPlceRt rt = TbPrkPlceRt.toEntity(pis.getPIS_NMBR(), DATA_COLCT_TIME, info); // // this.prkPlceMapper.updatePrkPlceRlTime(rt); // this.prkPlceDao.updateRlTimeFlr(pis.getPIS_NMBR(), DATA_COLCT_TIME, info.getFlr_info()); // this.prkPlceDao.updateRlTimeAr(pis.getPIS_NMBR(), info.getAr_info()); // this.prkPlceDao.updateRlTimePrv(pis.getPIS_NMBR(), info.getPrvuse_prkar_info()); // this.prkPlceDao.updateRlTimeDev(pis.getPIS_NMBR(), info.getPrk_colct_device_info()); // } // else { // log.error("RL TIME Data NULL..............."); // } // } // else { // log.error("PIS Object null"); // } // break; // case DbmsData.DBMS_DATA_RL_TIME_HS: // TbPrkPlceRtHs rtHs = (TbPrkPlceRtHs)data.getData(); // int result = this.prkPlceMapper.insertPrkPlceRlTimeHs(rtHs); // log.error("DATA_TYPE_RL_TIME_HS: {}, {} EA.", rtHs.getCrtnDt(), result); // break; // case DbmsData.DBMS_DATA_PIS_STTS: // List pisSttsList = (List)data.getData(); // this.pisInfrDao.updateStts(pisSttsList, data.isHistory()); // if (data.isHistory()) { // // 주차정보시스템 상태정보 이력은 저장하지 않는다. // //this.pisInfrDao.insertStts(pisSttsList); // } // pisSttsList.clear(); // break; // case DbmsData.DBMS_DATA_PRK_PLCE_STTS: // List prltSttsList = (List)data.getData(); // this.prkPlceDao.updatePrkPlceStts(prltSttsList, data.isHistory()); // if (data.isHistory()) { // int res = this.prkPlceDao.insertPrkPlceSttsHs(prltSttsList); // log.error("DATA_TYPE_PRK_PLCE_STTS: {}, {}/{} EA. ", prltSttsList.get(0).getUPDT_DT(), prltSttsList.size(), res); // } // prltSttsList.clear(); // break; // default: // log.error("WorkDataProcess.process: Unknown Job Type: {}", type); // break; // } // } catch (Exception e) { // log.error("WorkDataProcess.process: Exception: {}", e.toString()); // } } /* * 작업큐에 데이터 추가 */ public boolean add(WorkData data) { boolean offer = false; try { //offer => full -> return //add => full -> wait //큐가 차더라도 바로 리턴함. offer = WORK_DATA_QUEUE.offer(data); if (!offer) { log.error("WorkDataProcess.add: Queue Full Error, Size: {} EA", WORK_DATA_QUEUE.size()); } } catch (Exception e) { log.error("WorkDataProcess.add: Exception: {}", e.getMessage(), e); } return offer; } }