package com.tsi.comm.consumer.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import java.util.Collection; import java.util.HashMap; import java.util.Map; @Slf4j @RequiredArgsConstructor public class KafkaConsumerService { private final String bootstrapServers; private final String topic; private final String groupId; private final String nodeId; private ConcurrentMessageListenerContainer kafkaListenerContainer; public void start() { log.info("Starting Kafka: {}, {}, {}, {}", this.bootstrapServers, this.topic, this.groupId, this.nodeId); String consumerTopic = this.nodeId; if ("test".equals(this.topic)) { consumerTopic = "topic-for-ssd-test"; } if ("cvim".equals(topic)) { consumerTopic = "cvim-raw"; } ContainerProperties containerProperties = new ContainerProperties(consumerTopic); containerProperties.setGroupId(this.groupId); containerProperties.setPollTimeout(5000); //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL); containerProperties.setMessageListener(new TsiKafkaConsumerWorker(consumerTopic, this.nodeId)); containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { @Override public void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) { } @Override public void onPartitionsRevokedAfterCommit(Consumer consumer, Collection partitions) { } @Override public void onPartitionsAssigned(Consumer consumer, Collection partitions) { consumer.seekToEnd(partitions); } }); ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap()); this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties); this.kafkaListenerContainer.setBeanName("consumer"); this.kafkaListenerContainer.setConcurrency(1); this.kafkaListenerContainer.setErrorHandler((thrownException, data) -> { log.error("kafkaListenerContainer error: {}", thrownException.getMessage()); this.kafkaListenerContainer.stop(); }); this.kafkaListenerContainer.start(); } public Map getConsumerPropertiesMap() { Map properties = new HashMap<>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArrayDeserializer.class); return properties; } public void shutdown() { try { if (this.kafkaListenerContainer != null) { this.kafkaListenerContainer.stop(); } } catch(Exception ignored) { } } }