|
@@ -1,214 +0,0 @@
|
|
|
-package com.its.rota.client.process.work;
|
|
|
|
|
-
|
|
|
|
|
-import com.its.app.common.utils.Elapsed;
|
|
|
|
|
-import com.its.rota.client.dto.SigGgitsTsinfoDto;
|
|
|
|
|
-import com.its.rota.client.process.AbstractAppWorker;
|
|
|
|
|
-import com.its.rota.client.process.dbms.DbmsDataProcess;
|
|
|
|
|
-import com.its.rota.client.repository.ApplicationRepository;
|
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
-import org.slf4j.MDC;
|
|
|
|
|
-
|
|
|
|
|
-import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
-
|
|
|
|
|
-@Slf4j
|
|
|
|
|
-public class DataPacketWorker extends AbstractAppWorker implements Runnable {
|
|
|
|
|
-
|
|
|
|
|
- private final ApplicationRepository repo;
|
|
|
|
|
- private final DbmsDataProcess dbmsDataProcess;
|
|
|
|
|
-
|
|
|
|
|
- private final LinkedBlockingQueue<SigGgitsTsinfoDto> DATA_QUEUE;
|
|
|
|
|
-
|
|
|
|
|
- public DataPacketWorker(int idx, int qSize, ApplicationRepository repo, DbmsDataProcess dbmsDataProcess) {
|
|
|
|
|
- this.idx = idx;
|
|
|
|
|
- this.qSize = qSize;
|
|
|
|
|
- this.repo = repo;
|
|
|
|
|
- this.dbmsDataProcess = dbmsDataProcess;
|
|
|
|
|
- this.DATA_QUEUE = new LinkedBlockingQueue<>(qSize);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- @Override
|
|
|
|
|
- public void run() {
|
|
|
|
|
- log.info("{} Start. QSIZE: {}", Thread.currentThread().getName(), this.qSize);
|
|
|
|
|
- while (true) {
|
|
|
|
|
- try {
|
|
|
|
|
- Object packet = this.DATA_QUEUE.take();
|
|
|
|
|
- process(packet);
|
|
|
|
|
- }
|
|
|
|
|
- catch (Exception e) {
|
|
|
|
|
- log.error("Exception: {}", e.getMessage());
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*
|
|
|
|
|
- * 작업큐에 데이터 추가
|
|
|
|
|
- */
|
|
|
|
|
- public boolean add(Object object) {
|
|
|
|
|
- boolean offer = false;
|
|
|
|
|
- SigGgitsTsinfoDto packet = (SigGgitsTsinfoDto)object;
|
|
|
|
|
- try {
|
|
|
|
|
- //offer => full -> return
|
|
|
|
|
- //add => full -> wait
|
|
|
|
|
- offer = this.DATA_QUEUE.offer(packet);
|
|
|
|
|
- if (!offer) {
|
|
|
|
|
- MDC.put("id", Long.toString(packet.getLocalPort()));
|
|
|
|
|
- log.error("Packet Queue.offer: [{}]({})/{}, Queue Full: {} EA.",
|
|
|
|
|
- packet.getLocalPort(), this.idx, this.DATA_QUEUE.size(), this.qSize);
|
|
|
|
|
- MDC.clear();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- MDC.put("id", Long.toString(packet.getLocalPort()));
|
|
|
|
|
- log.error("Packet Queue.offer: Exception: {}, {}, {}", packet.getLocalPort(), Thread.currentThread().getName(), e.getMessage());
|
|
|
|
|
- MDC.clear();
|
|
|
|
|
- }
|
|
|
|
|
- return offer;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- public void process(Object object) {
|
|
|
|
|
-// SigGgitsTsinfoDto data = (SigGgitsTsinfoDto)object;
|
|
|
|
|
-// long popTimestamp = System.currentTimeMillis();
|
|
|
|
|
-// int idx = 0;
|
|
|
|
|
-// try {
|
|
|
|
|
-//// RegionCenter center = data.getCenter();//this.repo.getCenterMap().get(data.getLocalPort());
|
|
|
|
|
-//// if (center == null) {
|
|
|
|
|
-//// log.warn("WorkDataProcess.process: [{}],Not Found Region Center By RegionId: {}", data.getLocalPort(), data.getLocalPort());
|
|
|
|
|
-//// return;
|
|
|
|
|
-//// }
|
|
|
|
|
-// List<IntStatusDto> statusLists = new ArrayList<>();
|
|
|
|
|
-//
|
|
|
|
|
-// byte stx = data.buffer[idx++];
|
|
|
|
|
-// byte opCode = data.buffer[idx++];
|
|
|
|
|
-// short year = (short)(data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// short month = (short)(data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// short day = (short)(data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// short hour = (short)(data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// short min = (short)(data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// short sec = (short)(data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// String COLLCT_DTIME = String.format("%4d%02d%02d%02d%02d%02d", year+2000, month, day, hour, min, sec);
|
|
|
|
|
-//
|
|
|
|
|
-// int sequence = ((data.buffer[idx++] & 0xFF) << 8) | (data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// int regionId = ((data.buffer[idx++] & 0xFF) << 8) | (data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// int dataLength = ((data.buffer[idx++] & 0xFF) << 8) | (data.buffer[idx++] & 0xFF);
|
|
|
|
|
-// int count = ((data.buffer[idx++] & 0xFF) << 8) | (data.buffer[idx++] & 0xFF);
|
|
|
|
|
-//
|
|
|
|
|
-//// log.info("WorkDataProcess.process: [{}], Sequence {}, RegionId {}, DataLength {}, Count {}, {} Bytes. {} ms.",
|
|
|
|
|
-//// data.getLocalPort(), sequence, regionId, dataLength, count, data.getBuffer().length, timestamp - data.getTimestamp());
|
|
|
|
|
-//
|
|
|
|
|
-// if (opCode != SigProtocolConst.SIG_PHASE_CHANGE) {
|
|
|
|
|
-// log.warn("WorkDataProcess.process: [{}], Unknown OpCode: {}, {}", data.getLocalPort(), stx, opCode);
|
|
|
|
|
-// return;
|
|
|
|
|
-// }
|
|
|
|
|
-// if (dataLength != (count * 10)) {
|
|
|
|
|
-// log.warn("WorkDataProcess.process: [{}], Data Length & Data count Error(length/count): {}, {}", data.getLocalPort(), dataLength, count);
|
|
|
|
|
-// return;
|
|
|
|
|
-// }
|
|
|
|
|
-//
|
|
|
|
|
-// if (sequence < center.getRecvSeqNo()) {
|
|
|
|
|
-//// log.warn("WorkDataProcess.process: [{}], Receive Sequence Error(curr/old): {}, {}", data.getLocalPort(), sequence, center.getRecvSeqNo());
|
|
|
|
|
-// }
|
|
|
|
|
-// if (regionId != data.getLocalPort()) {
|
|
|
|
|
-// log.warn("WorkDataProcess.process: [{}], Receive Region Id Error(packet/port): {}, {}", data.getLocalPort(), regionId, data.getLocalPort());
|
|
|
|
|
-// }
|
|
|
|
|
-// center.setRecvSeqNo(sequence);
|
|
|
|
|
-//
|
|
|
|
|
-// int intLcNo;
|
|
|
|
|
-// int ringAP;
|
|
|
|
|
-// //int ringAS;
|
|
|
|
|
-// int ringBP;
|
|
|
|
|
-// //int ringBS;
|
|
|
|
|
-// int comm;
|
|
|
|
|
-// int mapNo;
|
|
|
|
|
-// //int crs;
|
|
|
|
|
-// int mode;
|
|
|
|
|
-//// int pnlManCont;
|
|
|
|
|
-//// int pnlMan;
|
|
|
|
|
-//// int pnlBlink;
|
|
|
|
|
-//// int pnlOut;
|
|
|
|
|
-//// int pnlAS;
|
|
|
|
|
-// int turnOff;
|
|
|
|
|
-// int blink;
|
|
|
|
|
-//// int pnlDb;
|
|
|
|
|
-//// byte ppc;
|
|
|
|
|
-//// byte omit;
|
|
|
|
|
-// byte flags1;
|
|
|
|
|
-// byte flags2;
|
|
|
|
|
-// byte flags3;
|
|
|
|
|
-// byte flags4;
|
|
|
|
|
-//
|
|
|
|
|
-// for (int ii = 0; ii < count; ii++) {
|
|
|
|
|
-// intLcNo = ((data.buffer[idx++] & 0xFF) << 24) |
|
|
|
|
|
-// ((data.buffer[idx++] & 0xFF) << 16) |
|
|
|
|
|
-// ((data.buffer[idx++] & 0xFF) << 8) |
|
|
|
|
|
-// (data.buffer[idx++] & 0xFF);
|
|
|
|
|
-//
|
|
|
|
|
-// flags1 = data.buffer[idx++];
|
|
|
|
|
-// flags2 = data.buffer[idx++];
|
|
|
|
|
-// flags3 = data.buffer[idx++];
|
|
|
|
|
-// flags4 = data.buffer[idx++];
|
|
|
|
|
-//// ppc = data.buffer[idx++];
|
|
|
|
|
-//// omit = data.buffer[idx++];
|
|
|
|
|
-// idx++;
|
|
|
|
|
-// idx++;
|
|
|
|
|
-// IntDto intDto = this.repo.getIntMap().get(intLcNo);
|
|
|
|
|
-// if (intDto == null) {
|
|
|
|
|
-// UnknownIntDto unknownDto = this.repo.getUnknownIntMap().get(intLcNo);
|
|
|
|
|
-// if (unknownDto == null) {
|
|
|
|
|
-// unknownDto = new UnknownIntDto(intLcNo, center.getRegionCd());
|
|
|
|
|
-// this.repo.getUnknownIntMap().put(intLcNo, unknownDto);
|
|
|
|
|
-// log.warn("WorkDataProcess.process: [{}], Not Found IntLcNo: {}, {}, {}", data.getLocalPort(), intLcNo, intLcNo % 10000, intLcNo / 10000);
|
|
|
|
|
-// }
|
|
|
|
|
-// continue;
|
|
|
|
|
-// }
|
|
|
|
|
-//
|
|
|
|
|
-// ringAP = (flags1 >> 5) & 0x07;
|
|
|
|
|
-// //ringAS = flags1 & 0x1F;
|
|
|
|
|
-//
|
|
|
|
|
-// ringBP = (flags2 >> 5) & 0x07;
|
|
|
|
|
-// //ringBS = flags2 & 0x1F;
|
|
|
|
|
-//
|
|
|
|
|
-// comm = (flags3 >> 7 & 0x01);
|
|
|
|
|
-// mapNo = (flags3 >> 4) & 0x07;
|
|
|
|
|
-// //crs = (flags3 & 0x08);
|
|
|
|
|
-// mode = (flags3) & 0x07;
|
|
|
|
|
-//
|
|
|
|
|
-//// pnlManCont = (flags4 >> 7 & 0x01);
|
|
|
|
|
-//// pnlMan = (flags4 >> 6 & 0x01);
|
|
|
|
|
-//// pnlBlink = (flags4 >> 5 & 0x01);
|
|
|
|
|
-//// pnlOut = (flags4 >> 4 & 0x01);
|
|
|
|
|
-//// pnlAS = (flags4 >> 3 & 0x01);
|
|
|
|
|
-// turnOff = (flags4 >> 2 & 0x01);
|
|
|
|
|
-// blink = (flags4 >> 1 & 0x01);
|
|
|
|
|
-//// pnlDb = (flags4 >> 0 & 0x01);
|
|
|
|
|
-//
|
|
|
|
|
-//
|
|
|
|
|
-// IntStatusDto status = intDto.getStatus();
|
|
|
|
|
-// status.COLLCT_DTIME = COLLCT_DTIME;
|
|
|
|
|
-// status.A_RING_PHASE_VAL = ringAP;
|
|
|
|
|
-// status.B_RING_PHASE_VAL = ringBP;
|
|
|
|
|
-// status.COMM_ON_OFF_FLAG = String.valueOf(comm);
|
|
|
|
|
-// status.MAP_NO = mapNo;
|
|
|
|
|
-// status.CONTRLR_OPER_MODE_CD = String.valueOf(mode);
|
|
|
|
|
-// status.SIGLIGHT_TURNOFF_FLAG = String.valueOf(turnOff);
|
|
|
|
|
-// status.SIGLIGHT_BLINK_FLAG = String.valueOf(blink);
|
|
|
|
|
-//
|
|
|
|
|
-// statusLists.add(status);
|
|
|
|
|
-// }
|
|
|
|
|
-// if (statusLists.isEmpty()) {
|
|
|
|
|
-// log.warn("WorkDataProcess.process: [{}], Int Status Data Empty: {}, {} ms.", data.getLocalPort(), COLLCT_DTIME, System.currentTimeMillis() - popTimestamp);
|
|
|
|
|
-// return;
|
|
|
|
|
-// }
|
|
|
|
|
-//// log.info("WorkDataProcess.process: [{}], {} EA. {} Bytes. Parse {} ms.", data.getLocalPort(), count, data.getBuffer().length, System.currentTimeMillis() - parsetime);
|
|
|
|
|
-// this.dbmsDataProcess.add(new DbmsData(data.getCenter(), this.idx, data.getTimestamp(), popTimestamp, System.currentTimeMillis(),
|
|
|
|
|
-// center.getRegionCd(), center.getRegionId(), DbmsDataType.DBMS_DATA_INT_STATUS_UPDATE, false, statusLists));
|
|
|
|
|
-//// log.info("WorkDataProcess.process: [{}], {} EA. {} Bytes. {} ms.[Q Added]", data.getLocalPort(), count, data.getBuffer().length, System.currentTimeMillis() - timestamp);
|
|
|
|
|
-// }
|
|
|
|
|
-// catch (Exception e) {
|
|
|
|
|
-// log.error("WorkDataProcess.process: Exception: [{}] {}", data.getLocalPort(), e.toString());
|
|
|
|
|
-// }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- public void report() {
|
|
|
|
|
- long avgTime = 0;
|
|
|
|
|
- log.info("Packet: Remain Q: {}, Average: {}, {}", this.DATA_QUEUE.size(), Elapsed.elapsedTimeStr(avgTime), Thread.currentThread().getName());
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|