package com.evps.consumer.service; import com.evps.common.kafka.dto.*; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener; @Slf4j @AllArgsConstructor public class KafkaUticEvpsConsumerWorker implements MessageListener { private static ObjectMapper mapper = new ObjectMapper(); @Override public void onMessage(ConsumerRecord record) { try { if (EvpsKafkaConst.KAFKA_EVPS_EVENT.equals(record.key())) { KafkaEvpsEventDto data = mapper.readValue(record.value(), KafkaEvpsEventDto.class); log.info("EvpsEvent: {}", data); } else if (EvpsKafkaConst.KAFKA_EVPS_SIGNAL.equals(record.key())) { KafkaEvpsSignalDto data = mapper.readValue(record.value(), KafkaEvpsSignalDto.class); log.info("EvpsSignal: {}", data); } else if (EvpsKafkaConst.KAFKA_EVPS_NODE.equals(record.key())) { KafkaEvpsNodeDto data = mapper.readValue(record.value(), KafkaEvpsNodeDto.class); log.info("EvpsNode: {}", data); } else if (EvpsKafkaConst.KAFKA_EVPS_SERVICE.equals(record.key())) { KafkaEvpsServiceDto data = mapper.readValue(record.value(), KafkaEvpsServiceDto.class); log.info("EvpsService: {}", data); } else { log.error("Unknown Utic Evps Kafka Key: {}, {}", record.key(), record.value()); } } catch (JsonProcessingException e) { throw new RuntimeException(e); } } }