package com.sig.comm.server.kafka; import com.sig.app.common.utils.TimeUtils; import com.sig.comm.server.process.dbms.DbmsDataProcess; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.support.Acknowledgment; @Slf4j @AllArgsConstructor public class TsiKafkaConsumerWorker implements MessageListener { private DbmsDataProcess dbmsDataProcess; @Override public void onMessage(ConsumerRecord record) { Long sendNanoTime = record.value(); Long recvNanoTime = System.nanoTime(); // KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS); // stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName()); // stat.setStatus(1); // if (TsiTpmsManager.getInstance().getKafkaTransVo().getSendNanoTime() == sendNanoTime) { // stat.setSendTm(TsiTpmsManager.getInstance().getKafkaTransVo().getSendTm()); // stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS)); // } // else { // stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS)); // stat.setSendTm(stat.getRecvTm()); // log.info("recv ping success, sendNanoTime miss match: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime)); // } // dbmsDataProcess.add(stat, (int)Thread.currentThread().getId()); // log.info("recv ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime)); } @Override public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) { try { Long sendNanoTime = record.value(); Long recvNanoTime = System.nanoTime(); log.info("recv ping success, ack: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime)); //acknowledgment.acknowledge(); } catch (Exception e) { log.error("onMessage:" + e.getMessage()); } } }