|
@@ -26,28 +26,36 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
|
|
|
log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
|
|
|
this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
|
|
|
|
|
|
+ byte cpuVersion = 0x00;
|
|
|
byte[] buffer = record.value();
|
|
|
int idx = 0;
|
|
|
int tail = 0;
|
|
|
if ("cvim-raw".equals(this.topic)) {
|
|
|
+ //"cvim"
|
|
|
idx = 27 + 6;
|
|
|
tail = -2;
|
|
|
+ int stx1 = buffer[27];
|
|
|
+ int stx2 = buffer[28];
|
|
|
+ int length = ((buffer[29] & 0xFF) << 8) | (buffer[30] & 0xFF);
|
|
|
+ int opCode = buffer[31];
|
|
|
+ cpuVersion = buffer[32];
|
|
|
}
|
|
|
else if ("topic-for-ssd-test".equals(this.topic)) {
|
|
|
+ //"test"
|
|
|
int stx1 = buffer[idx++];
|
|
|
int stx2 = buffer[idx++];
|
|
|
int length = ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
|
|
|
int opCode = buffer[idx++];
|
|
|
- int version = buffer[idx++];
|
|
|
+ cpuVersion = buffer[idx++];
|
|
|
log.info("STX1: {}, STX2: {}, Length: {}, OpCode: {}, Version: {}, Idx: {}",
|
|
|
- stx1, stx2, length, opCode, version, idx);
|
|
|
+ stx1, stx2, length, opCode, cpuVersion, idx);
|
|
|
//idx = 6;
|
|
|
tail = -2;
|
|
|
}
|
|
|
- displayPacket(record.key(), record.value(), idx, tail);
|
|
|
+ displayPacket(record.key(), record.value(), idx, tail, cpuVersion);
|
|
|
}
|
|
|
|
|
|
- public static void displayPacket(String topicKey, byte[] buffer, int start, int tail) {
|
|
|
+ public static void displayPacket(String topicKey, byte[] buffer, int start, int tail, byte cpuVersion) {
|
|
|
int idx = start;
|
|
|
// int stx1 = buffer[idx++];
|
|
|
// int stx2 = buffer[idx++];
|
|
@@ -117,10 +125,26 @@ public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
|
|
|
//} tsc_cvim_hdr_t, *ptsc_cvim_hdr_t;
|
|
|
|
|
|
final int SIGNAL_STATUS_SIZE = 5;
|
|
|
+ boolean isR28 = false;
|
|
|
int remainLength = buffer.length - idx + tail;
|
|
|
if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
|
|
|
- log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
|
|
|
- return;
|
|
|
+ if ((statusCount * SIGNAL_STATUS_SIZE != remainLength-1)) {
|
|
|
+ log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ isR28 = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isR28) {
|
|
|
+ byte cpuComp = buffer[buffer.length - 1 + tail]; // CRC 2 Bytes
|
|
|
+ if (tail == 0) {
|
|
|
+ // Topic 이 node 인경우 패킷 헤더 정보가 없다.
|
|
|
+ log.info(" CPU Company: {}.", String.format("0x%02X", cpuComp));
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.info(" CPU Company: {}. Version: {}", String.format("0x%02X", cpuComp), String.format("R%d", cpuVersion));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
log.info("SEQ\t신호등정보\t방향\t시간정보신뢰성\t보행자 \t보행/감응\t비보호신호\t신호등상태\t표출\t잔여\t방향코드");
|