123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package com.sig.comm.consumer.kafka;
- import com.sig.app.common.utils.SysUtils;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.listener.MessageListener;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashSet;
- @Slf4j
- @AllArgsConstructor
- public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
- private final String topic;
- private final HashSet<String> keyValues;
- @Override
- public void onMessage(ConsumerRecord<String, byte[]> record) {
- // log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
- // this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
- if (!this.keyValues.contains(record.key())) {
- return;
- }
- log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
- this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
- // byte[] buffer = record.value();
- int idx = 0;
- int tail = 0;
- displayPacket(record.key(), record.value(), idx, tail);
- }
- public static void displayPacket(String topicKey, byte[] buffer, int start, int tail) {
- int idx = start;
- long nodeId = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
- int version = buffer[idx++];
- long localTime = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
- Date date = new java.util.Date(localTime * 1000L);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- sdf.setTimeZone(java.util.TimeZone.getTimeZone("GMT+9"));
- String signalTime = sdf.format(date);
- int aRingCode; // 현시코드-RING A
- int aRingPhase; //7-5 RING A의 PHASE (0 ~ 7)
- int aRingStep; //4-0 RING A의 STEP (0 ~ 31)
- int bRingCode; // 현시코드-RING B
- int bRingPhase; //7-5 RING B의 PHASE (0 ~ 7)
- int bRingStep; //4-0 RING B의 STEP (0 ~ 31)
- int operStts; // 제어기 운영 상태
- int centerComm; //7 센터와 통신 FAIL 상태 0 : 정상, 1 : 통신 FAIL
- int operMapNo; //6...4 현재 운영중인 맵 번호 ( 0 : 일반제, 1~5: 시차제, 6 : 전용맵)
- int addNode; //3 교차로 연등 0 : 일반교차로, 1 : 연등교차로
- int operMode; //2...0 교통신호기 운영 모드
- // 0 : SCU 고정주기 모드
- // 1 : 감응하지 않는 OFFLINE 제어모드
- // 2 : 감응되는 OFFLINE 제어모드
- // 4 : 감응되는 온라인 제어모드
- // 5 : 감응하지 않는 온라인 제어모드
- int lcStts; // 제어기 상태
- int policeManProg; //7 POLICE PANEL 수동 진행S/W상태, 1 : ON, 0 : OFF
- int policeMan; //6 POLICE PANEL 수동 S/W 상태, 1 : ON, 0 : OFF
- int policeBlink; //5 POLICE PANEL 점멸 S/W 상태, 1 : ON, 0 : OFF
- int policeTurnOff; //4 POLICE PANEL 소등 S/W 상태, 1 : ON, 0 : OFF
- int contration; //3 모순 상태, 1 : 모순, 0 : 정상
- int turnOff; //2 소등 상태, 1 : 소등, 0 : 정상
- int blink; //1 점멸 상태, 1 : 점멸, 0 : 정상
- int ppc; //0 PPC Control, 0: PPC Disable 1: PPC Enabled
- operStts = buffer[idx++] & 0xFF;
- aRingCode = buffer[idx++] & 0xFF;
- bRingCode = buffer[idx++] & 0xFF;
- lcStts = buffer[idx++] & 0xFF;
- aRingPhase = (aRingCode >> 5) & 0x07; // 0x7 = 0000 0111
- aRingStep = (aRingCode ) & 0x1F; // 0x1F == 0001 1111
- bRingPhase = (bRingCode >> 5) & 0x07;
- bRingStep =(bRingCode ) & 0x1F;
- centerComm = (operStts >> 7) & 0x01;
- operMapNo = (operStts >> 4) & 0x07;
- addNode = (operStts >> 2) & 0x01;
- operMode = (operStts ) & 0x07;
- policeManProg = (lcStts >> 7) & 0x01;
- policeMan = (lcStts >> 6) & 0x01;
- policeBlink = (lcStts >> 5) & 0x01;
- policeTurnOff = (lcStts >> 4) & 0x01;
- contration = (lcStts >> 3) & 0x01;
- turnOff = (lcStts >> 2) & 0x01;
- blink = (lcStts >> 1) & 0x01;
- ppc = (lcStts ) & 0x01;
- log.info(" NodeId({}), Topic({})", nodeId, topicKey);
- log.info(" Version({})", version);
- log.info(" LocalTime({})", signalTime);
- log.info(" SystemTime({})", SysUtils.getSysTimeStr());
- log.info(" 통신({}), (0:정상, 1:통신 FAIL)", centerComm);
- log.info(" A Ring PHASE/STEP({}/{})", aRingPhase, aRingStep);
- log.info(" B Ring PHASE/STEP({}/{})", bRingPhase, bRingStep);
- log.info(" 맵 번호({}), (0:일반제, 1~5:시차제, 6:전용맵)", operMapNo);
- log.info(" 운영 모드({}), (0:SCU 고정주기 모드, 1:감응하지 않는 OFFLINE 제어모드, 2:감응되는 OFFLINE 제어모드, 4:감응되는 온라인 제어모드, 5:감응하지 않는 온라인 제어모드)", operMode);
- log.info(" STATUS({}/{}/{}/{}/{}/{}/{}/{}), (Police ManualProg/Manual/Blink/Off, Cont/Off/Blink/PPC)",
- policeManProg, policeMan, policeBlink, policeTurnOff, contration, turnOff, blink, ppc);
- // POLICE PANEL 수동진행S/W상태
- // POLICE PANEL 수동 S/W 상태
- // POLICE PANEL 점멸 S/W 상태
- // POLICE PANEL 소등 S/W 상태
- // 모순 상태
- // 소등 상태
- // 점멸 상태
- // PPC 제어 상태
- if (version == 1) {
- int cycleCount = buffer[idx++] & 0xFF;
- int currentCycle = buffer[idx++] & 0xFF;
- log.info(" 주기 Count/Length({}/{})", cycleCount, currentCycle);
- log.info(" B Ring PHASE/STEP({}/{})", cycleCount, currentCycle);
- int a1 = buffer[idx++] & 0xFF;
- int a2 = buffer[idx++] & 0xFF;
- int a3 = buffer[idx++] & 0xFF;
- int a4 = buffer[idx++] & 0xFF;
- int a5 = buffer[idx++] & 0xFF;
- int a6 = buffer[idx++] & 0xFF;
- int a7 = buffer[idx++] & 0xFF;
- int a8 = buffer[idx++] & 0xFF;
- int b1 = buffer[idx++] & 0xFF;
- int b2 = buffer[idx++] & 0xFF;
- int b3 = buffer[idx++] & 0xFF;
- int b4 = buffer[idx++] & 0xFF;
- int b5 = buffer[idx++] & 0xFF;
- int b6 = buffer[idx++] & 0xFF;
- int b7 = buffer[idx++] & 0xFF;
- int b8 = buffer[idx++] & 0xFF;
- log.info(" A Ring Phase Value({}/{}/{}/{}/{}/{}/{}/{})", a1, a2, a3, a4, a5, a6, a7, a8);
- log.info(" B Ring Phase Value({}/{}/{}/{}/{}/{}/{}/{})", b1, b2, b3, b4, b5, b6, b7, b8);
- }
- // log.info(" STATUS: 전이({}), 감응({}), 소등({}), 점멸({}), 수동({}) - (1:전이중, 0:전이완료... 1:상태, 0:정상)",
- // trans, sensing, turnOff, blink, manual);
- // log.info(" ERROR: SCU통신({}), 센터통신({}), 모순({}) - (1: 이상, 0: 정상)",
- // scuComm, centerComm, conflict);
- // log.info("CycleCounter: {} sec.", counter);
- // log.info(" Signals: {} EA, DataSplit({}) - (0:First/Middle, 1:Single/Last)", statusCount, splitFlag);
- //
- // final int SIGNAL_STATUS_SIZE = 5;
- // int remainLength = buffer.length - idx + tail;
- // if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
- // log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
- // return;
- // }
- //
- // log.info("SEQ\t신호등정보\t방향\t시간정보신뢰성\t보행자 \t비보호신호\t신호등상태\t표출\t잔여\t방향코드");
- // for (int ii = 0; ii < statusCount; ii++) {
- // int dirInfo = buffer[idx++] & 0xFF;
- // int sttsInfo = buffer[idx++] & 0xFF;
- // int displayTm = buffer[idx++] & 0xFF;
- // int remainTm = buffer[idx++] & 0xFF;
- // int dirCode = buffer[idx++] & 0xFF;
- //
- // int dirAdd = ((dirInfo) & 0x0F); //3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2)
- // int lightsType = ((dirInfo >> 4) & 0x0F); //7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7)
- //
- // int lighting = ((sttsInfo ) & 0x07); //2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6)
- // int unprotect = ((sttsInfo >> 3) & 0x01); //3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1)
- // int walkerPush = ((sttsInfo >> 6) & 0x01); //6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1)
- // int timeFlag = ((sttsInfo >> 7) & 0x01); //7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1)
- //
- // String plightInfo;
- // switch (lightsType)
- // {
- // case 0: plightInfo = "미지정(0)"; break;
- // case 1: plightInfo = "직진(1) "; break;
- // case 2: plightInfo = "좌회전(2)"; break;
- // case 3: plightInfo = "보행자(3)"; break;
- // case 4: plightInfo = "자전거(4)"; break;
- // case 5: plightInfo = "우회전(5)"; break;
- // case 6: plightInfo = "버스(6) "; break;
- // case 7: plightInfo = "유턴(7) "; break;
- // default: plightInfo = "XXX(" + lightsType + ")"; break;
- // }
- // String ptimeFlag;
- // switch (timeFlag)
- // {
- // case 0: ptimeFlag = "고정신호(0)"; break;
- // case 1: ptimeFlag = "가변신호(1)"; break;
- // default: ptimeFlag = "XXX(" + timeFlag + ")"; break;
- // }
- // String pwalkerPush;
- // switch (walkerPush)
- // {
- // case 0: pwalkerPush = "없음(0) "; break;
- // case 1: pwalkerPush = "자동검지(1)"; break;
- // default: pwalkerPush = "XXX(" + walkerPush + ")"; break;
- // }
- // String punprotect;
- // switch (unprotect)
- // {
- // case 0: punprotect = "아님(0) "; break;
- // case 1: punprotect = "비보호(1)"; break;
- // default: punprotect = "XXX(" + unprotect + ")"; break;
- // }
- // String plighting;
- // switch (lighting)
- // {
- // case 0: plighting = "소등(0) "; break;
- // case 1: plighting = "적색점등(1)"; break;
- // case 2: plighting = "황색점등(2)"; break;
- // case 3: plighting = "녹색점등(3)"; break;
- // case 4: plighting = "적색점멸(4)"; break;
- // case 5: plighting = "황색점멸(5)"; break;
- // case 6: plighting = "녹색점멸(6)"; break;
- // default: plighting = "XXX(" + lighting + ")"; break;
- // }
- // log.info("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
- // String.format("%3d", ii),
- // plightInfo,
- // dirAdd,
- // ptimeFlag,
- // pwalkerPush,
- // punprotect,
- // plighting,
- // displayTm,
- // remainTm,
- // dirCode);
- // }
- }
- }
|