123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package com.tsi.comm.consumer.kafka;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.TopicPartition;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
- import org.springframework.kafka.listener.ContainerProperties;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.Map;
- @Slf4j
- @RequiredArgsConstructor
- public class KafkaConsumerService {
- private final String bootstrapServers;
- private final String topic;
- private final String groupId;
- private final String nodeId;
- private ConcurrentMessageListenerContainer<String, byte[]> kafkaListenerContainer;
- public void start() {
- log.info("Starting Kafka: {}, {}, {}, {}", this.bootstrapServers, this.topic, this.groupId, this.nodeId);
- String consumerTopic = this.nodeId;
- if ("test".equals(this.topic)) {
- consumerTopic = "topic-for-ssd-test";
- }
- if ("cvim".equals(topic)) {
- consumerTopic = "cvim-raw";
- }
- ContainerProperties containerProperties = new ContainerProperties(consumerTopic);
- containerProperties.setGroupId(this.groupId);
- containerProperties.setPollTimeout(5000);
- //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
- containerProperties.setMessageListener(new TsiKafkaConsumerWorker(consumerTopic, this.nodeId));
- containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
- @Override
- public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- }
- @Override
- public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- }
- @Override
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- consumer.seekToEnd(partitions);
- }
- });
- ConsumerFactory<String, byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
- this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
- this.kafkaListenerContainer.setBeanName("consumer");
- this.kafkaListenerContainer.setConcurrency(1);
- this.kafkaListenerContainer.setErrorHandler((thrownException, data) -> {
- log.error("kafkaListenerContainer error: {}", thrownException.getMessage());
- this.kafkaListenerContainer.stop();
- });
- this.kafkaListenerContainer.start();
- }
- public Map<String, Object> getConsumerPropertiesMap() {
- Map<String, Object> properties = new HashMap<>();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
- properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
- properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
- return properties;
- }
- public void shutdown() {
- try {
- if (this.kafkaListenerContainer != null) {
- this.kafkaListenerContainer.stop();
- }
- }
- catch(Exception ignored) {
- }
- }
- }
|