|
@@ -0,0 +1,171 @@
|
|
|
+package com.its.pis.process;
|
|
|
+
|
|
|
+import com.its.app.AppUtils;
|
|
|
+import com.its.app.utils.SysUtils;
|
|
|
+import com.its.pis.config.ThreadPoolInitializer;
|
|
|
+import com.its.pis.dao.mapper.PrkPlceMapper;
|
|
|
+import com.its.pis.dao.mapper.UnitSystMapper;
|
|
|
+import com.its.pis.dao.mapper.batch.PisInfrDao;
|
|
|
+import com.its.pis.dao.mapper.batch.PrkPlceDao;
|
|
|
+import com.its.pis.entity.*;
|
|
|
+import com.its.pis.websocket.message.c2f.C2FMessage;
|
|
|
+import com.its.pis.websocket.message.c2f.PrkPlceRlTimeResponseInfo;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Service
|
|
|
+public class DbmsDataProcess {
|
|
|
+
|
|
|
+ private final LinkedBlockingQueue<DbmsData> DBMS_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
|
|
|
+ private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
|
|
|
+
|
|
|
+ private final DbmsDataAsyncTask asyncTask;
|
|
|
+ private final PisInfrDao pisInfrDao;
|
|
|
+ private final PrkPlceDao prkPlceDao;
|
|
|
+
|
|
|
+ private UnitSystMapper unitSystMapper;
|
|
|
+ private PrkPlceMapper prkPlceMapper;
|
|
|
+ int MAX_CORE = Runtime.getRuntime().availableProcessors();
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ void init() {
|
|
|
+ this.unitSystMapper = (UnitSystMapper) AppUtils.getBean(UnitSystMapper.class);
|
|
|
+ this.prkPlceMapper = (PrkPlceMapper) AppUtils.getBean(PrkPlceMapper.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ log.info("DbmsDataProcess.run: Start.");
|
|
|
+ if (this.MAX_CORE < 8) {
|
|
|
+ this.MAX_CORE = 8;
|
|
|
+ }
|
|
|
+ ThreadPoolInitializer poolInitializer = (ThreadPoolInitializer) AppUtils.getBean(ThreadPoolInitializer.class);
|
|
|
+ int executePool = Math.max(this.MAX_CORE, poolInitializer.getWork());
|
|
|
+ for (int ii = 0; ii < executePool; ii++) {
|
|
|
+ log.info("DbmsDataProcess.Task: {}", ii);
|
|
|
+ this.taskExecutor.execute(() -> {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ DbmsData data = DBMS_DATA_QUEUE.take();
|
|
|
+ if (data != null) {
|
|
|
+ asyncTask.run(this, data);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.error("DbmsDataProcess.Task: Received data null");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("DbmsDataProcess.run: ..End.");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 비동기 타스크에서 실행되는 함수.
|
|
|
+ * 클래스가 달라야 비동기 타스크로 실행된다.
|
|
|
+ * @param data
|
|
|
+ */
|
|
|
+ public void runJob(DbmsData data) {
|
|
|
+ if (data.getType() == DbmsData.DBMS_DATA_UNIT_SYST_STTS) {
|
|
|
+ TbUnitSystStts stts = (TbUnitSystStts) data.getData();
|
|
|
+ this.unitSystMapper.updateUnitSystStts(stts); // 상태정보 업데이트
|
|
|
+ if (data.isHistory()) {
|
|
|
+ this.unitSystMapper.insertUnitSystSttsHs(stts); // 상태정보 이력저장
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ process(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void process(DbmsData data) {
|
|
|
+ try {
|
|
|
+ int type = data.getType();
|
|
|
+ TbPisInfr pis = data.getObj();
|
|
|
+ switch(type) {
|
|
|
+ case DbmsData.DBMS_DATA_RL_TIME:
|
|
|
+ if (pis != null) {
|
|
|
+ String DATA_COLCT_TIME = SysUtils.getSysTime();
|
|
|
+ C2FMessage<PrkPlceRlTimeResponseInfo> rlTimeInfo = (C2FMessage<PrkPlceRlTimeResponseInfo>) data.getData();
|
|
|
+ if (rlTimeInfo != null) {
|
|
|
+ PrkPlceRlTimeResponseInfo info = rlTimeInfo.getData().getPayload();
|
|
|
+ TbPrkPlceRt rt = TbPrkPlceRt.toEntity(pis.getPIS_NMBR(), DATA_COLCT_TIME, info);
|
|
|
+
|
|
|
+ this.prkPlceMapper.updatePrkPlceRlTime(rt);
|
|
|
+ this.prkPlceDao.updateRlTimeFlr(pis.getPIS_NMBR(), DATA_COLCT_TIME, info.getFlr_info());
|
|
|
+ this.prkPlceDao.updateRlTimeAr(pis.getPIS_NMBR(), info.getAr_info());
|
|
|
+ this.prkPlceDao.updateRlTimePrv(pis.getPIS_NMBR(), info.getPrvuse_prkar_info());
|
|
|
+ this.prkPlceDao.updateRlTimeDev(pis.getPIS_NMBR(), info.getPrk_colct_device_info());
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.error("RL TIME Data NULL...............");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.error("PIS Object null");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case DbmsData.DBMS_DATA_RL_TIME_HS:
|
|
|
+ TbPrkPlceRtHs rtHs = (TbPrkPlceRtHs)data.getData();
|
|
|
+ int result = this.prkPlceMapper.insertPrkPlceRlTimeHs(rtHs);
|
|
|
+ log.error("DATA_TYPE_RL_TIME_HS: {}, {} EA.", rtHs.getCrtnDt(), result);
|
|
|
+ break;
|
|
|
+ case DbmsData.DBMS_DATA_PIS_STTS:
|
|
|
+ List<TbPisInfrStts> pisSttsList = (List<TbPisInfrStts>)data.getData();
|
|
|
+ this.pisInfrDao.updateStts(pisSttsList, data.isHistory());
|
|
|
+ if (data.isHistory()) {
|
|
|
+ // 주차정보시스템 상태정보 이력은 저장하지 않는다.
|
|
|
+ //this.pisInfrDao.insertStts(pisSttsList);
|
|
|
+ }
|
|
|
+ pisSttsList.clear();
|
|
|
+ break;
|
|
|
+ case DbmsData.DBMS_DATA_PRK_PLCE_STTS:
|
|
|
+ List<TbPrkPlceStts> prltSttsList = (List<TbPrkPlceStts>)data.getData();
|
|
|
+ this.prkPlceDao.updatePrkPlceStts(prltSttsList, data.isHistory());
|
|
|
+ if (data.isHistory()) {
|
|
|
+ int res = this.prkPlceDao.insertPrkPlceSttsHs(prltSttsList);
|
|
|
+ log.error("DATA_TYPE_PRK_PLCE_STTS: {}, {}/{} EA. ", prltSttsList.get(0).getUPDT_DT(), prltSttsList.size(), res);
|
|
|
+ }
|
|
|
+ prltSttsList.clear();
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.error("DbmsDataProcess.process: Unknown Job Type: {}", type);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("DbmsDataProcess.process: Exception: {}", e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * 작업큐에 데이터 추가
|
|
|
+ */
|
|
|
+ public boolean add(DbmsData data) {
|
|
|
+ boolean offer = false;
|
|
|
+ try {
|
|
|
+ //offer => full -> return
|
|
|
+ //add => full -> wait
|
|
|
+ //큐가 차더라도 바로 리턴함.
|
|
|
+ offer = DBMS_DATA_QUEUE.offer(data);
|
|
|
+ if (!offer) {
|
|
|
+ log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", DBMS_DATA_QUEUE.size());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return offer;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|