package com.evp.comm.server.process.dbms; import com.evp.app.common.utils.Elapsed; import com.evp.comm.server.common.SpringUtils; import com.evp.comm.server.config.ThreadPoolInitializer; import com.evp.comm.server.dao.mapper.EvpsServiceMapper; import com.evp.comm.server.dao.mapper.ProcessMapper; import com.evp.comm.server.dao.mapper.batch.EvpCommServerDao; import com.evp.comm.server.dto.EvpsCenter; import com.evp.comm.server.entity.TbEvpEvent; import com.evp.comm.server.entity.TbEvpService; import com.evp.comm.server.entity.TbRegionCenterComm; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @RequiredArgsConstructor @Service public class DbmsDataProcess { private final LinkedBlockingQueue dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000); private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); private final DbmsDataAsyncTask asyncTask; private final ProcessMapper processMapper; private final EvpsServiceMapper serviceMapper; private final EvpCommServerDao evpCommServerDao; private int maxCore = Runtime.getRuntime().availableProcessors(); public void run() { log.info("DbmsDataProcess.run: Start."); if (this.maxCore < 8) { this.maxCore = 8; } ThreadPoolInitializer poolInitializer = SpringUtils.getBean(ThreadPoolInitializer.class); int executePool = poolInitializer.getDbms(); for (int ii = 0; ii < executePool; ii++) { log.info("DbmsDataProcess.Task: {}", ii); this.taskExecutor.execute(() -> { boolean isRunning = true; while (isRunning) { try { DbmsData data = dbmsDataBlockingQueue.take(); this.asyncTask.run(this, data); } catch (Exception e) { log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e); Thread.currentThread().interrupt(); isRunning = false; } } }); } log.info("DbmsDataProcess.run: ..End."); } public void runJob(DbmsData data) { if (data.getType() == DbmsData.DBMS_DATA_PROCESS_STTS) { String processId = (String) data.getData(); this.processMapper.updateProcessState(processId); } else { process(data); } } public void process(DbmsData data) { int result = -1; int type = -1; EvpsCenter center = data.getCenter(); try { MDC.put("id", center.getLogKey()); Elapsed elapsed1 = new Elapsed(); type = data.getType(); switch(type) { case DbmsData.DBMS_DATA_INS_SERVICE: TbEvpService newService = (TbEvpService)data.getData(); result = this.serviceMapper.insertEvpService(newService); break; case DbmsData.DBMS_DATA_UPD_SERVICE: TbEvpService updService = (TbEvpService)data.getData(); result = this.serviceMapper.updateEvpService(updService); break; case DbmsData.DBMS_DATA_INS_EVENT: TbEvpEvent event = (TbEvpEvent)data.getData(); result = this.serviceMapper.insertEvpEvent(event); break; case DbmsData.DBMS_DATA_INS_ROUTE: List> routeList = (List>)data.getData(); result = this.evpCommServerDao.insertEvpRoute(routeList); break; case DbmsData.DBMS_DATA_INS_NODE: List> nodeList = (List>)data.getData(); result = this.evpCommServerDao.insertEvpNode(nodeList); break; case DbmsData.DBMS_DATA_INS_PHASE: List> phaseList = (List>)data.getData(); result = this.evpCommServerDao.insertEvpPhase(phaseList); break; case DbmsData.DBMS_DATA_INS_SIGNAL: List> signalList = (List>)data.getData(); result = this.evpCommServerDao.insertEvpSignal(signalList); break; case DbmsData.DBMS_DATA_CENTER_STTS_UPDATE: TbRegionCenterComm updStts = (TbRegionCenterComm) data.getData(); // this.centerMapper.updateCommState(updStts); break; case DbmsData.DBMS_DATA_CENTER_STTS: TbRegionCenterComm stts = (TbRegionCenterComm) data.getData(); // if (TbRegionCenterComm.CENTER_COMM_START.equals(stts.getCommState())) { // result = this.centerMapper.updateCommStateConnect(stts); // } // else if (TbRegionCenterComm.CENTER_COMM_STOP.equals(stts.getCommState())) { // result = this.centerMapper.updateCommStateDisconnect(stts); // } break; default: log.error("DbmsJobProcess.process: Unknown Request {}.", type); break; } // log.info("DbmsDataProcess.run: [{}]. {}, {} EA. {}", // center.getLogKey(), type, result, Elapsed.elapsedTimeStr(elapsed1.nanoSeconds())); } catch (Exception e) { log.error("DbmsJobProcess.process: [{}]. {}, Exception: {}", center.getLogKey(), type, e.toString()); } finally { MDC.remove(center.getLogKey()); MDC.clear(); } } /* * 작업큐에 데이터 추가 */ public boolean add(DbmsData data) { boolean offer = false; try { offer = dbmsDataBlockingQueue.offer(data); if (!offer) { log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", dbmsDataBlockingQueue.size()); } } catch (Exception e) { log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e); } return offer; } }