瀏覽代碼

multiple topic input

shjung 11 月之前
父節點
當前提交
42ea6b654f

+ 5 - 0
pom.xml

@@ -91,6 +91,11 @@
             <artifactId>jackson-databind</artifactId>
             <version>2.10.3</version>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 

+ 19 - 3
src/main/java/com/tsi/comm/consumer/TsiCommConsumerApplication.java

@@ -13,6 +13,7 @@ import org.springframework.context.ApplicationListener;
 import org.springframework.context.event.ContextClosedEvent;
 
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Set;
@@ -28,7 +29,7 @@ public class TsiCommConsumerApplication implements ApplicationRunner, Applicatio
 
     private KafkaConsumerService kafkaConsumerService = null;
 
-    // --servers=172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094 --topic=node --group=tsi-consumer1 --key=1510017600
+    // java -jar tsi-consumer.jar --servers=172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094 --topic=node --group=tsi-consumer1 --key=4060000101,4060000112
     public static void main(String[] args) {
         SpringApplication application = new SpringApplicationBuilder()
                 .sources(TsiCommConsumerApplication.class)
@@ -78,16 +79,31 @@ public class TsiCommConsumerApplication implements ApplicationRunner, Applicatio
             }
         }
 
+        List<String> keyValues = new ArrayList<String>();
+        String[] nodeIds = nodeId.split(",");
+        for (int ii = 0; ii < nodeIds.length; ii++) {
+            String node = nodeIds[ii].trim();
+            if (node.isEmpty()) {
+                continue;
+            }
+            keyValues.add(node);
+        }
+        if (keyValues.isEmpty()) {
+            log.error("****** key options are required(all input data is blank).");
+            usage();
+            return;
+        }
+
         if (!"node".equals(topic) && !"test".equals(topic) && !"cvim".equals(topic)) {
             usage();
             return;
         }
-        kafkaConsumerService = new KafkaConsumerService(bootstrapServers, topic, consumerGroup, nodeId);
+        kafkaConsumerService = new KafkaConsumerService(bootstrapServers, topic, consumerGroup, keyValues);
         kafkaConsumerService.start();
     }
 
     private void usage() {
-        log.info("\r\n\n\n\n\nUsage: java -jar tsi-consumer --servers=xxx.xxx.xxx.xxx:nnnn --topic=node --group:tsi-consumer --key=1510017600\n" +
+        log.info("\r\n\n\n\n\nUsage: java -jar tsi-consumer --servers=xxx.xxx.xxx.xxx:nnnn --topic=node --group:tsi-consumer --key=4060000101\n" +
                 "   --servers=kafka bootstrap server[optional, default=172.24.0.30:9092,172.24.0.31:9093,172.24.0.32:9094]\n" +
                 "   --topic=topic name[optional, node/test/cvim, default=node]\n" +
                 "   --group=kafka consumer group name[optional, default=tsi-consumer]\n" +

+ 13 - 9
src/main/java/com/tsi/comm/consumer/kafka/KafkaConsumerService.java

@@ -11,9 +11,7 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
 import org.springframework.kafka.listener.ContainerProperties;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -22,26 +20,32 @@ public class KafkaConsumerService {
     private final String bootstrapServers;
     private final String topic;
     private final String groupId;
-    private final String nodeId;
+    private final List<String> keyValues;
 
     private ConcurrentMessageListenerContainer<String, byte[]> kafkaListenerContainer;
 
     public void start() {
-        log.info("Starting Kafka: {}, {}, {}, {}", this.bootstrapServers, this.topic, this.groupId, this.nodeId);
+        log.info("Starting Kafka: {}, {}, {}, {}", this.bootstrapServers, this.topic, this.groupId, this.keyValues);
 
-        String consumerTopic = this.nodeId;
+        String consumerTopic = "node";
+        HashSet<String> keys = new HashSet<String>();
         if ("test".equals(this.topic)) {
             consumerTopic = "topic-for-ssd-test";
+            keys.add(consumerTopic);
         }
-        if ("cvim".equals(topic)) {
+        else if ("cvim".equals(topic)) {
             consumerTopic = "cvim-raw";
+            keys.add(consumerTopic);
+        }
+        else {
+            keys.addAll(this.keyValues);
         }
 
-        ContainerProperties containerProperties = new ContainerProperties(consumerTopic);
+        ContainerProperties containerProperties = new ContainerProperties(keys.toArray(new String[0]));
         containerProperties.setGroupId(this.groupId);
         containerProperties.setPollTimeout(5000);
         //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
-        containerProperties.setMessageListener(new TsiKafkaConsumerWorker(consumerTopic, this.nodeId));
+        containerProperties.setMessageListener(new TsiKafkaConsumerWorker(consumerTopic, keys));
         containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
             @Override
             public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

+ 9 - 6
src/main/java/com/tsi/comm/consumer/kafka/TsiKafkaConsumerWorker.java

@@ -8,18 +8,18 @@ import org.springframework.kafka.listener.MessageListener;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashSet;
 
 @Slf4j
 @AllArgsConstructor
 public class TsiKafkaConsumerWorker implements MessageListener<String, byte[]> {
 
     private final String topic;
-    private final String nodeId;
-    private final String TEST_TOPIC = "topic-for-ssd-test";
+    private final HashSet<String> keyValues;
 
     @Override
     public void onMessage(ConsumerRecord<String, byte[]> record) {
-        if (!this.nodeId.equals(record.key())) {
+        if (!this.keyValues.contains(record.key())) {
             return;
         }
 
@@ -28,8 +28,10 @@ public class TsiKafkaConsumerWorker implements MessageListener<String, byte[]> {
 
         byte[] buffer = record.value();
         int idx = 0;
+        int tail = 0;
         if ("cvim-raw".equals(this.topic)) {
             idx = 27 + 6;
+            //tail = -2;
         }
         else if ("topic-for-ssd-test".equals(this.topic)) {
             int stx1    = buffer[idx++];
@@ -40,11 +42,12 @@ public class TsiKafkaConsumerWorker implements MessageListener<String, byte[]> {
             log.info("STX1: {}, STX2: {}, Length: {}, OpCode: {}, Version: {}, Idx: {}",
                     stx1, stx2, length, opCode, version, idx);
             //idx = 6;
+            tail = -2;
         }
-        displayPacket(record.key(), record.value(), idx);
+        displayPacket(record.key(), record.value(), idx, tail);
     }
 
-    public static void displayPacket(String topicKey, byte[] buffer, int start) {
+    public static void displayPacket(String topicKey, byte[] buffer, int start, int tail) {
         int idx = start;
 //        int stx1    = buffer[idx++];
 //        int stx2    = buffer[idx++];
@@ -112,7 +115,7 @@ public class TsiKafkaConsumerWorker implements MessageListener<String, byte[]> {
 //} tsc_cvim_hdr_t, *ptsc_cvim_hdr_t;
 
         final int SIGNAL_STATUS_SIZE = 5;
-        int remainLength = buffer.length - idx;
+        int remainLength = buffer.length - idx + tail;
         if (statusCount * SIGNAL_STATUS_SIZE != remainLength) {
             log.error("Signal Status Data length error: {} EA, {}, {}", statusCount, statusCount * SIGNAL_STATUS_SIZE, remainLength);
             return;

+ 1 - 1
src/main/resources/logback-spring.xml

@@ -6,7 +6,7 @@
     <property name="ROOT_LOG_LEVEL"  value="DEBUG"/>
     <property name="LOG_CHARSET"     value="UTF-8" />
 
-    <property name="LOG_PATTERN_CONSOLE"     value="[%d{HH:mm:ss.SSS}] %highlight([%5level]) %highlight(${PID:-}): %cyan(%msg) %n"/>
+    <property name="LOG_PATTERN_CONSOLE" value="[%d{HH:mm:ss.SSS}] %highlight([%5level]): %cyan(%msg) %n"/>
 
     <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
         <withJansi>true</withJansi>

+ 13 - 28
src/test/java/com/tsi/comm/consumer/TsiCommConsumerApplicationTests.java

@@ -1,26 +1,30 @@
 package com.tsi.comm.consumer;
 
-import com.tsi.app.common.utils.ByteUtils;
-import com.tsi.app.common.utils.CRC16Utils;
-import com.tsi.app.common.utils.HexString;
-import com.tsi.comm.consumer.protocol.TsiCpuPacket;
 import lombok.extern.slf4j.Slf4j;
-import org.junit.Test;
-import org.springframework.boot.test.context.SpringBootTest;
+import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.HashSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
-@SpringBootTest
+//@SpringBootTest
 public class TsiCommConsumerApplicationTests {
 
-    //@Test
-    void contextLoads() {
+    @Test
+    public void nodeAdd() {
+        int directionCode = 16;
+        log.info("{}, {}, {}", directionCode, directionCode/10, directionCode%10);
+        HashSet<String> keys = new HashSet<String>();
+        keys.add("a");
+        keys.add("b");
+        keys.add("c");
+        log.info("toString: {}", keys.toString());
+        log.info(" toArray: {}", (Object) keys.toArray(new String[0]));
     }
 
     @Test
@@ -126,23 +130,4 @@ public class TsiCommConsumerApplicationTests {
         }
         Thread.sleep(20000);
     }
-
-    @Test
-    public void checkSumTest() {
-        long nodeId = 110000001;
-        String strHex = "7E7E004E1301068E77810002310C60F1375E10CB280B0A20CB321F0A30CB320D0A10CA03011420CA32011430CA322B1410C9280B1E20C9321F1E30C9320D1E10C903012820C928012830C9282B28FD89";
-        byte[] packet = ByteUtils.hexToByteArray(strHex);
-        log.error("{}", HexString.fromBytes(packet));
-        int dataLength = ByteUtils.getUnsignedShort(packet, TsiCpuPacket.INDEX_LENGTH);
-        log.error("dataLength: {}, packetLength: {}", dataLength, packet.length);
-        int recvCheckSum = ByteUtils.getUnsignedShort(packet, packet.length-2);
-        int calcCheckSum = CRC16Utils.CRC16_ccitt_cvim(packet, 2, dataLength-2);
-        if (recvCheckSum != calcCheckSum) {
-            log.info("Node: {}, Check Sum Error: {},{}, recv: {}, calc: {}", nodeId, 0xFD89, 0x89FD, recvCheckSum, calcCheckSum);
-        }
-        else {
-            log.info("Node: {}, Check Sum OK: recv: {}, calc: {}", nodeId, recvCheckSum, calcCheckSum);
-        }
-    }
-
 }