|
@@ -25,23 +25,25 @@ public class KafkaConsumerService {
|
|
|
private ConcurrentMessageListenerContainer<String, byte[]> kafkaListenerContainer;
|
|
|
|
|
|
public void start() {
|
|
|
- log.info("Starting Kafka: {}, {}, {}, {}", this.bootstrapServers, this.topic, this.groupId, this.keyValues);
|
|
|
+ log.info("Starting Kafka Consumer: bootstrapServers: {}, topic: {}, group: {}, keys: {}", this.bootstrapServers, this.topic, this.groupId, this.keyValues);
|
|
|
|
|
|
+ HashSet<String> keys = new HashSet<>(this.keyValues);
|
|
|
+
|
|
|
+ HashSet<String> topics = new HashSet<>();
|
|
|
String consumerTopic = "node";
|
|
|
- HashSet<String> keys = new HashSet<String>();
|
|
|
if ("test".equals(this.topic)) {
|
|
|
consumerTopic = "topic-for-ssd-test";
|
|
|
- keys.add(consumerTopic);
|
|
|
+ topics.add(consumerTopic);
|
|
|
}
|
|
|
else if ("cvim".equals(topic)) {
|
|
|
consumerTopic = "cvim-raw";
|
|
|
- keys.add(consumerTopic);
|
|
|
+ topics.add(consumerTopic);
|
|
|
}
|
|
|
else {
|
|
|
- keys.addAll(this.keyValues);
|
|
|
+ topics = keys;
|
|
|
}
|
|
|
|
|
|
- ContainerProperties containerProperties = new ContainerProperties(keys.toArray(new String[0]));
|
|
|
+ ContainerProperties containerProperties = new ContainerProperties(topics.toArray(new String[0]));
|
|
|
containerProperties.setGroupId(this.groupId);
|
|
|
containerProperties.setPollTimeout(5000);
|
|
|
//containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
|