12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- package com.evps.consumer.service;
- import com.evps.common.kafka.dto.*;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.listener.MessageListener;
- @Slf4j
- @AllArgsConstructor
- public class KafkaUticEvpsConsumerWorker implements MessageListener<String, String> {
- private static ObjectMapper mapper = new ObjectMapper();
- @Override
- public void onMessage(ConsumerRecord<String, String> record) {
- try {
- if (EvpsKafkaConst.KAFKA_EVPS_EVENT.equals(record.key())) {
- KafkaEvpsEventDto data = mapper.readValue(record.value(), KafkaEvpsEventDto.class);
- log.info("EvpsEvent: {}", data);
- }
- else if (EvpsKafkaConst.KAFKA_EVPS_SIGNAL.equals(record.key())) {
- KafkaEvpsSignalDto data = mapper.readValue(record.value(), KafkaEvpsSignalDto.class);
- log.info("EvpsSignal: {}", data);
- }
- else if (EvpsKafkaConst.KAFKA_EVPS_NODE.equals(record.key())) {
- KafkaEvpsNodeDto data = mapper.readValue(record.value(), KafkaEvpsNodeDto.class);
- log.info("EvpsNode: {}", data);
- }
- else if (EvpsKafkaConst.KAFKA_EVPS_SERVICE.equals(record.key())) {
- KafkaEvpsServiceDto data = mapper.readValue(record.value(), KafkaEvpsServiceDto.class);
- log.info("EvpsService: {}", data);
- }
- else {
- log.error("Unknown Utic Evps Kafka Key: {}, {}", record.key(), record.value());
- }
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
- }
|