123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package com.evp.comm.server.process.dbms;
- import com.evp.app.common.utils.Elapsed;
- import com.evp.comm.server.common.SpringUtils;
- import com.evp.comm.server.config.ThreadPoolInitializer;
- import com.evp.comm.server.dao.mapper.EvpsServiceMapper;
- import com.evp.comm.server.dao.mapper.ProcessMapper;
- import com.evp.comm.server.dao.mapper.batch.EvpCommServerDao;
- import com.evp.comm.server.dto.EvpsCenter;
- import com.evp.comm.server.entity.TbEvpEvent;
- import com.evp.comm.server.entity.TbEvpService;
- import com.evp.comm.server.entity.TbRegionCenterComm;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.MDC;
- import org.springframework.stereotype.Service;
- import java.util.HashMap;
- import java.util.List;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- @Slf4j
- @RequiredArgsConstructor
- @Service
- public class DbmsDataProcess {
- private final LinkedBlockingQueue<DbmsData> dbmsDataBlockingQueue = new LinkedBlockingQueue<>(1000);
- private final ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
- private final DbmsDataAsyncTask asyncTask;
- private final ProcessMapper processMapper;
- private final EvpsServiceMapper serviceMapper;
- private final EvpCommServerDao evpCommServerDao;
- private int maxCore = Runtime.getRuntime().availableProcessors();
- public void run() {
- log.info("DbmsDataProcess.run: Start.");
- if (this.maxCore < 8) {
- this.maxCore = 8;
- }
- ThreadPoolInitializer poolInitializer = SpringUtils.getBean(ThreadPoolInitializer.class);
- int executePool = poolInitializer.getDbms();
- for (int ii = 0; ii < executePool; ii++) {
- log.info("DbmsDataProcess.Task: {}", ii);
- this.taskExecutor.execute(() -> {
- boolean isRunning = true;
- while (isRunning) {
- try {
- DbmsData data = dbmsDataBlockingQueue.take();
- this.asyncTask.run(this, data);
- }
- catch (Exception e) {
- log.error("DbmsDataProcess.Task: Exception: {}", e.getMessage(), e);
- Thread.currentThread().interrupt();
- isRunning = false;
- }
- }
- });
- }
- log.info("DbmsDataProcess.run: ..End.");
- }
- public void runJob(DbmsData data) {
- if (data.getType() == DbmsData.DBMS_DATA_PROCESS_STTS) {
- String processId = (String) data.getData();
- this.processMapper.updateProcessState(processId);
- }
- else {
- process(data);
- }
- }
- public void process(DbmsData data) {
- int result = -1;
- int type = -1;
- EvpsCenter center = data.getCenter();
- try {
- MDC.put("id", center.getLogKey());
- Elapsed elapsed1 = new Elapsed();
- type = data.getType();
- switch(type) {
- case DbmsData.DBMS_DATA_INS_SERVICE:
- TbEvpService newService = (TbEvpService)data.getData();
- result = this.serviceMapper.insertEvpService(newService);
- break;
- case DbmsData.DBMS_DATA_UPD_SERVICE:
- TbEvpService updService = (TbEvpService)data.getData();
- result = this.serviceMapper.updateEvpService(updService);
- break;
- case DbmsData.DBMS_DATA_INS_EVENT:
- TbEvpEvent event = (TbEvpEvent)data.getData();
- result = this.serviceMapper.insertEvpEvent(event);
- break;
- case DbmsData.DBMS_DATA_INS_ROUTE:
- List<HashMap<String, Object>> routeList = (List<HashMap<String, Object>>)data.getData();
- result = this.evpCommServerDao.insertEvpRoute(routeList);
- break;
- case DbmsData.DBMS_DATA_INS_NODE:
- List<HashMap<String, Object>> nodeList = (List<HashMap<String, Object>>)data.getData();
- result = this.evpCommServerDao.insertEvpNode(nodeList);
- break;
- case DbmsData.DBMS_DATA_INS_PHASE:
- List<HashMap<String, Object>> phaseList = (List<HashMap<String, Object>>)data.getData();
- result = this.evpCommServerDao.insertEvpPhase(phaseList);
- break;
- case DbmsData.DBMS_DATA_INS_SIGNAL:
- List<HashMap<String, Object>> signalList = (List<HashMap<String, Object>>)data.getData();
- result = this.evpCommServerDao.insertEvpSignal(signalList);
- break;
- case DbmsData.DBMS_DATA_CENTER_STTS_UPDATE:
- TbRegionCenterComm updStts = (TbRegionCenterComm) data.getData();
- // this.centerMapper.updateCommState(updStts);
- break;
- case DbmsData.DBMS_DATA_CENTER_STTS:
- TbRegionCenterComm stts = (TbRegionCenterComm) data.getData();
- // if (TbRegionCenterComm.CENTER_COMM_START.equals(stts.getCommState())) {
- // result = this.centerMapper.updateCommStateConnect(stts);
- // }
- // else if (TbRegionCenterComm.CENTER_COMM_STOP.equals(stts.getCommState())) {
- // result = this.centerMapper.updateCommStateDisconnect(stts);
- // }
- break;
- default:
- log.error("DbmsJobProcess.process: Unknown Request {}.", type);
- break;
- }
- // log.info("DbmsDataProcess.run: [{}]. {}, {} EA. {}",
- // center.getLogKey(), type, result, Elapsed.elapsedTimeStr(elapsed1.nanoSeconds()));
- }
- catch (Exception e) {
- log.error("DbmsJobProcess.process: [{}]. {}, Exception: {}", center.getLogKey(), type, e.toString());
- }
- finally {
- MDC.remove(center.getLogKey());
- MDC.clear();
- }
- }
- /*
- * 작업큐에 데이터 추가
- */
- public boolean add(DbmsData data) {
- boolean offer = false;
- try {
- offer = dbmsDataBlockingQueue.offer(data);
- if (!offer) {
- log.error("DbmsDataProcess.add: Queue Full Error, Size: {} EA", dbmsDataBlockingQueue.size());
- }
- } catch (Exception e) {
- log.error("DbmsDataProcess.add: Exception: {}", e.getMessage(), e);
- }
- return offer;
- }
- }
|