KafkaConsumerWorker.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package com.sig.comm.consumer.kafka;
  2. import com.sig.app.common.utils.SysUtils;
  3. import lombok.AllArgsConstructor;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.springframework.kafka.listener.MessageListener;
  7. import java.text.SimpleDateFormat;
  8. import java.util.Date;
  9. import java.util.HashSet;
  10. @Slf4j
  11. @AllArgsConstructor
  12. public class KafkaConsumerWorker implements MessageListener<String, byte[]> {
  13. private final String topic;
  14. private final HashSet<String> keyValues;
  15. @Override
  16. public void onMessage(ConsumerRecord<String, byte[]> record) {
  17. // log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
  18. // this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
  19. if (!this.keyValues.contains(record.key())) {
  20. return;
  21. }
  22. log.info("onMessage: Topic: {}, Key: {}, {} Bytes. {}",
  23. this.topic, record.key(), record.value().length, SysUtils.byteArrayToHex(record.value()));
  24. // byte[] buffer = record.value();
  25. int idx = 0;
  26. int tail = 0;
  27. displayPacket(record.key(), record.value(), idx, tail);
  28. }
  29. public static void displayPacket(String topicKey, byte[] buffer, int start, int tail) {
  30. int idx = start;
  31. long nodeId = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
  32. int version = buffer[idx++];
  33. long localTime = ((long) (buffer[idx++] & 0xFF) << 24) | ((buffer[idx++] & 0xFF) << 16) | ((buffer[idx++] & 0xFF) << 8) | (buffer[idx++] & 0xFF);
  34. Date date = new java.util.Date(localTime * 1000L);
  35. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  36. sdf.setTimeZone(java.util.TimeZone.getTimeZone("GMT+9"));
  37. String signalTime = sdf.format(date);
  38. int aRingCode; // 현시코드-RING A
  39. int aRingPhase; //7-5 RING A의 PHASE (0 ~ 7)
  40. int aRingStep; //4-0 RING A의 STEP (0 ~ 31)
  41. int bRingCode; // 현시코드-RING B
  42. int bRingPhase; //7-5 RING B의 PHASE (0 ~ 7)
  43. int bRingStep; //4-0 RING B의 STEP (0 ~ 31)
  44. int operStts; // 제어기 운영 상태
  45. int centerComm; //7 센터와 통신 FAIL 상태 0 : 정상, 1 : 통신 FAIL
  46. int operMapNo; //6...4 현재 운영중인 맵 번호 ( 0 : 일반제, 1~5: 시차제, 6 : 전용맵)
  47. int addNode; //3 교차로 연등 0 : 일반교차로, 1 : 연등교차로
  48. int operMode; //2...0 교통신호기 운영 모드
  49. // 0 : SCU 고정주기 모드
  50. // 1 : 감응하지 않는 OFFLINE 제어모드
  51. // 2 : 감응되는 OFFLINE 제어모드
  52. // 4 : 감응되는 온라인 제어모드
  53. // 5 : 감응하지 않는 온라인 제어모드
  54. int lcStts; // 제어기 상태
  55. int policeManProg; //7 POLICE PANEL 수동 진행S/W상태, 1 : ON, 0 : OFF
  56. int policeMan; //6 POLICE PANEL 수동 S/W 상태, 1 : ON, 0 : OFF
  57. int policeBlink; //5 POLICE PANEL 점멸 S/W 상태, 1 : ON, 0 : OFF
  58. int policeTurnOff; //4 POLICE PANEL 소등 S/W 상태, 1 : ON, 0 : OFF
  59. int contration; //3 모순 상태, 1 : 모순, 0 : 정상
  60. int turnOff; //2 소등 상태, 1 : 소등, 0 : 정상
  61. int blink; //1 점멸 상태, 1 : 점멸, 0 : 정상
  62. int ppc; //0 PPC Control, 0: PPC Disable 1: PPC Enabled
  63. operStts = buffer[idx++] & 0xFF;
  64. aRingCode = buffer[idx++] & 0xFF;
  65. bRingCode = buffer[idx++] & 0xFF;
  66. lcStts = buffer[idx++] & 0xFF;
  67. aRingPhase = (aRingCode >> 5) & 0x07; // 0x7 = 0000 0111
  68. aRingStep = (aRingCode ) & 0x1F; // 0x1F == 0001 1111
  69. bRingPhase = (bRingCode >> 5) & 0x07;
  70. bRingStep =(bRingCode ) & 0x1F;
  71. centerComm = (operStts >> 7) & 0x01;
  72. operMapNo = (operStts >> 4) & 0x07;
  73. addNode = (operStts >> 2) & 0x01;
  74. operMode = (operStts ) & 0x07;
  75. policeManProg = (lcStts >> 7) & 0x01;
  76. policeMan = (lcStts >> 6) & 0x01;
  77. policeBlink = (lcStts >> 5) & 0x01;
  78. policeTurnOff = (lcStts >> 4) & 0x01;
  79. contration = (lcStts >> 3) & 0x01;
  80. turnOff = (lcStts >> 2) & 0x01;
  81. blink = (lcStts >> 1) & 0x01;
  82. ppc = (lcStts ) & 0x01;
  83. log.info(" NodeId({}), Topic({})", nodeId, topicKey);
  84. log.info(" Version({})", version);
  85. log.info(" LocalTime({})", signalTime);
  86. log.info(" SystemTime({})", SysUtils.getSysTimeStr());
  87. log.info(" 통신({}), (0:정상, 1:통신 FAIL)", centerComm);
  88. log.info(" A Ring PHASE/STEP({}/{})", aRingPhase, aRingStep);
  89. log.info(" B Ring PHASE/STEP({}/{})", bRingPhase, bRingStep);
  90. log.info(" 맵 번호({}), (0:일반제, 1~5:시차제, 6:전용맵)", operMapNo);
  91. log.info(" 운영 모드({}), (0:SCU 고정주기 모드, 1:감응하지 않는 OFFLINE 제어모드, 2:감응되는 OFFLINE 제어모드, 4:감응되는 온라인 제어모드, 5:감응하지 않는 온라인 제어모드)", operMode);
  92. log.info(" STATUS({}/{}/{}/{}/{}/{}/{}/{}), (Police ManualProg/Manual/Blink/Off, Cont/Off/Blink/PPC)",
  93. policeManProg, policeMan, policeBlink, policeTurnOff, contration, turnOff, blink, ppc);
  94. // POLICE PANEL 수동진행S/W상태
  95. // POLICE PANEL 수동 S/W 상태
  96. // POLICE PANEL 점멸 S/W 상태
  97. // POLICE PANEL 소등 S/W 상태
  98. // 모순 상태
  99. // 소등 상태
  100. // 점멸 상태
  101. // PPC 제어 상태
  102. if (version == 1) {
  103. int cycleCount = buffer[idx++] & 0xFF;
  104. int currentCycle = buffer[idx++] & 0xFF;
  105. log.info(" 주기 Count/Length({}/{})", cycleCount, currentCycle);
  106. log.info(" B Ring PHASE/STEP({}/{})", cycleCount, currentCycle);
  107. int a1 = buffer[idx++] & 0xFF;
  108. int a2 = buffer[idx++] & 0xFF;
  109. int a3 = buffer[idx++] & 0xFF;
  110. int a4 = buffer[idx++] & 0xFF;
  111. int a5 = buffer[idx++] & 0xFF;
  112. int a6 = buffer[idx++] & 0xFF;
  113. int a7 = buffer[idx++] & 0xFF;
  114. int a8 = buffer[idx++] & 0xFF;
  115. int b1 = buffer[idx++] & 0xFF;
  116. int b2 = buffer[idx++] & 0xFF;
  117. int b3 = buffer[idx++] & 0xFF;
  118. int b4 = buffer[idx++] & 0xFF;
  119. int b5 = buffer[idx++] & 0xFF;
  120. int b6 = buffer[idx++] & 0xFF;
  121. int b7 = buffer[idx++] & 0xFF;
  122. int b8 = buffer[idx++] & 0xFF;
  123. log.info(" A Ring Phase Value({}/{}/{}/{}/{}/{}/{}/{})", a1, a2, a3, a4, a5, a6, a7, a8);
  124. log.info(" B Ring Phase Value({}/{}/{}/{}/{}/{}/{}/{})", b1, b2, b3, b4, b5, b6, b7, b8);
  125. }
  126. // log.info(" STATUS: 전이({}), 감응({}), 소등({}), 점멸({}), 수동({}) - (1:전이중, 0:전이완료... 1:상태, 0:정상)",
  127. // trans, sensing, turnOff, blink, manual);
  128. // log.info(" ERROR: SCU통신({}), 센터통신({}), 모순({}) - (1: 이상, 0: 정상)",
  129. // scuComm, centerComm, conflict);
  130. // log.info("CycleCounter: {} sec.", counter);
  131. // log.info(" Signals: {} EA, DataSplit({}) - (0:First/Middle, 1:Single/Last)", statusCount, splitFlag);
  132. //
  133. // final int SIGNAL_STATUS_SIZE = 5;
  134. // int remainLength = buffer.length - idx + tail;
  135. // if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
  136. // log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
  137. // return;
  138. // }
  139. //
  140. // log.info("SEQ\t신호등정보\t방향\t시간정보신뢰성\t보행자 \t비보호신호\t신호등상태\t표출\t잔여\t방향코드");
  141. // for (int ii = 0; ii < statusCount; ii++) {
  142. // int dirInfo = buffer[idx++] & 0xFF;
  143. // int sttsInfo = buffer[idx++] & 0xFF;
  144. // int displayTm = buffer[idx++] & 0xFF;
  145. // int remainTm = buffer[idx++] & 0xFF;
  146. // int dirCode = buffer[idx++] & 0xFF;
  147. //
  148. // int dirAdd = ((dirInfo) & 0x0F); //3 ~ 0, 방향추가정보, 해당 방향에 연등지 없음(0), 해당 방향의 첫번째 연등지(1), 해당 방향의 두번째 연등지(2)
  149. // int lightsType = ((dirInfo >> 4) & 0x0F); //7 ~ 4, 신호등 정보, ■ 미지정(0), 직진(1), 좌회전(2), 보행자(3), 자전거(4), 우회전(5), 버스(6), 유턴(7)
  150. //
  151. // int lighting = ((sttsInfo ) & 0x07); //2 ~ 0, 신호등 상태, ■ 소등(0), 적색점등(1), 황색점등(2), 녹색점등(3), 적색점멸(4), 황색점멸(5), 녹색점멸(6)
  152. // int unprotect = ((sttsInfo >> 3) & 0x01); //3, 비보호 상태, ■ 신호등 정보 유턴/좌회전에 대한 비보호 여부, ■ 비보호 아님(0), 비보호(1)
  153. // int walkerPush = ((sttsInfo >> 6) & 0x01); //6, 보행자(푸쉬 또는 자동검지), ■ 없음(0), 버튼 눌림 or 자동검지(1)
  154. // int timeFlag = ((sttsInfo >> 7) & 0x01); //7, 시간 정보 신뢰성, ■ 고정신호시간(0), 가변신호시간(1)
  155. //
  156. // String plightInfo;
  157. // switch (lightsType)
  158. // {
  159. // case 0: plightInfo = "미지정(0)"; break;
  160. // case 1: plightInfo = "직진(1) "; break;
  161. // case 2: plightInfo = "좌회전(2)"; break;
  162. // case 3: plightInfo = "보행자(3)"; break;
  163. // case 4: plightInfo = "자전거(4)"; break;
  164. // case 5: plightInfo = "우회전(5)"; break;
  165. // case 6: plightInfo = "버스(6) "; break;
  166. // case 7: plightInfo = "유턴(7) "; break;
  167. // default: plightInfo = "XXX(" + lightsType + ")"; break;
  168. // }
  169. // String ptimeFlag;
  170. // switch (timeFlag)
  171. // {
  172. // case 0: ptimeFlag = "고정신호(0)"; break;
  173. // case 1: ptimeFlag = "가변신호(1)"; break;
  174. // default: ptimeFlag = "XXX(" + timeFlag + ")"; break;
  175. // }
  176. // String pwalkerPush;
  177. // switch (walkerPush)
  178. // {
  179. // case 0: pwalkerPush = "없음(0) "; break;
  180. // case 1: pwalkerPush = "자동검지(1)"; break;
  181. // default: pwalkerPush = "XXX(" + walkerPush + ")"; break;
  182. // }
  183. // String punprotect;
  184. // switch (unprotect)
  185. // {
  186. // case 0: punprotect = "아님(0) "; break;
  187. // case 1: punprotect = "비보호(1)"; break;
  188. // default: punprotect = "XXX(" + unprotect + ")"; break;
  189. // }
  190. // String plighting;
  191. // switch (lighting)
  192. // {
  193. // case 0: plighting = "소등(0) "; break;
  194. // case 1: plighting = "적색점등(1)"; break;
  195. // case 2: plighting = "황색점등(2)"; break;
  196. // case 3: plighting = "녹색점등(3)"; break;
  197. // case 4: plighting = "적색점멸(4)"; break;
  198. // case 5: plighting = "황색점멸(5)"; break;
  199. // case 6: plighting = "녹색점멸(6)"; break;
  200. // default: plighting = "XXX(" + lighting + ")"; break;
  201. // }
  202. // log.info("{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
  203. // String.format("%3d", ii),
  204. // plightInfo,
  205. // dirAdd,
  206. // ptimeFlag,
  207. // pwalkerPush,
  208. // punprotect,
  209. // plighting,
  210. // displayTm,
  211. // remainTm,
  212. // dirCode);
  213. // }
  214. }
  215. }