KafkaConsumerService.java 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package com.tsi.comm.consumer.kafka;
  2. import lombok.RequiredArgsConstructor;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.Consumer;
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.springframework.kafka.core.ConsumerFactory;
  8. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  9. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  10. import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
  11. import org.springframework.kafka.listener.ContainerProperties;
  12. import java.util.Collection;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. @Slf4j
  16. @RequiredArgsConstructor
  17. public class KafkaConsumerService {
  18. private final String bootstrapServers;
  19. private final String topic;
  20. private final String groupId;
  21. private final String nodeId;
  22. private ConcurrentMessageListenerContainer<String, byte[]> kafkaListenerContainer;
  23. public void start() {
  24. log.info("Starting Kafka: {}, {}, {}, {}", this.bootstrapServers, this.topic, this.groupId, this.nodeId);
  25. String consumerTopic = this.nodeId;
  26. if ("test".equals(this.topic)) {
  27. consumerTopic = "topic-for-ssd-test";
  28. }
  29. if ("cvim".equals(topic)) {
  30. consumerTopic = "cvim-raw";
  31. }
  32. ContainerProperties containerProperties = new ContainerProperties(consumerTopic);
  33. containerProperties.setGroupId(this.groupId);
  34. containerProperties.setPollTimeout(5000);
  35. //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
  36. containerProperties.setMessageListener(new TsiKafkaConsumerWorker(consumerTopic, this.nodeId));
  37. containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
  38. @Override
  39. public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  40. }
  41. @Override
  42. public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  43. }
  44. @Override
  45. public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  46. consumer.seekToEnd(partitions);
  47. }
  48. });
  49. ConsumerFactory<String, byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
  50. this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
  51. this.kafkaListenerContainer.setBeanName("consumer");
  52. this.kafkaListenerContainer.setConcurrency(1);
  53. this.kafkaListenerContainer.setErrorHandler((thrownException, data) -> {
  54. log.error("kafkaListenerContainer error: {}", thrownException.getMessage());
  55. this.kafkaListenerContainer.stop();
  56. });
  57. this.kafkaListenerContainer.start();
  58. }
  59. public Map<String, Object> getConsumerPropertiesMap() {
  60. Map<String, Object> properties = new HashMap<>();
  61. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
  62. properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
  63. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  64. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
  65. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
  66. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  67. properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
  68. properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
  69. properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
  70. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
  71. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
  72. return properties;
  73. }
  74. public void shutdown() {
  75. try {
  76. if (this.kafkaListenerContainer != null) {
  77. this.kafkaListenerContainer.stop();
  78. }
  79. }
  80. catch(Exception ignored) {
  81. }
  82. }
  83. }