KafkaUticEvpsConsumerWorker.java 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. package com.evps.consumer.service;
  2. import com.evps.common.kafka.dto.*;
  3. import com.fasterxml.jackson.core.JsonProcessingException;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import lombok.AllArgsConstructor;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.kafka.clients.consumer.ConsumerRecord;
  8. import org.springframework.kafka.listener.MessageListener;
  9. @Slf4j
  10. @AllArgsConstructor
  11. public class KafkaUticEvpsConsumerWorker implements MessageListener<String, String> {
  12. private static ObjectMapper mapper = new ObjectMapper();
  13. @Override
  14. public void onMessage(ConsumerRecord<String, String> record) {
  15. try {
  16. if (EvpsKafkaConst.KAFKA_EVPS_EVENT.equals(record.key())) {
  17. KafkaEvpsEventDto data = mapper.readValue(record.value(), KafkaEvpsEventDto.class);
  18. log.info("EvpsEvent: {}", data);
  19. }
  20. else if (EvpsKafkaConst.KAFKA_EVPS_SIGNAL.equals(record.key())) {
  21. KafkaEvpsSignalDto data = mapper.readValue(record.value(), KafkaEvpsSignalDto.class);
  22. log.info("EvpsSignal: {}", data);
  23. }
  24. else if (EvpsKafkaConst.KAFKA_EVPS_NODE.equals(record.key())) {
  25. KafkaEvpsNodeDto data = mapper.readValue(record.value(), KafkaEvpsNodeDto.class);
  26. log.info("EvpsNode: {}", data);
  27. }
  28. else if (EvpsKafkaConst.KAFKA_EVPS_SERVICE.equals(record.key())) {
  29. KafkaEvpsServiceDto data = mapper.readValue(record.value(), KafkaEvpsServiceDto.class);
  30. log.info("EvpsService: {}", data);
  31. }
  32. else {
  33. log.error("Unknown Utic Evps Kafka Key: {}, {}", record.key(), record.value());
  34. }
  35. }
  36. catch (JsonProcessingException e) {
  37. throw new RuntimeException(e);
  38. }
  39. }
  40. }