1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- package com.sig.comm.server.kafka;
- import com.sig.app.common.utils.TimeUtils;
- import com.sig.comm.server.process.dbms.DbmsDataProcess;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.listener.MessageListener;
- import org.springframework.kafka.support.Acknowledgment;
- @Slf4j
- @AllArgsConstructor
- public class TsiKafkaConsumerWorker implements MessageListener<String, Long> {
- private DbmsDataProcess dbmsDataProcess;
- @Override
- public void onMessage(ConsumerRecord<String, Long> record) {
- Long sendNanoTime = record.value();
- Long recvNanoTime = System.nanoTime();
- // KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
- // stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
- // stat.setStatus(1);
- // if (TsiTpmsManager.getInstance().getKafkaTransVo().getSendNanoTime() == sendNanoTime) {
- // stat.setSendTm(TsiTpmsManager.getInstance().getKafkaTransVo().getSendTm());
- // stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
- // }
- // else {
- // stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
- // stat.setSendTm(stat.getRecvTm());
- // log.info("recv ping success, sendNanoTime miss match: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
- // }
- // dbmsDataProcess.add(stat, (int)Thread.currentThread().getId());
- // log.info("recv ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
- }
- @Override
- public void onMessage(ConsumerRecord<String, Long> record, Acknowledgment acknowledgment) {
- try {
- Long sendNanoTime = record.value();
- Long recvNanoTime = System.nanoTime();
- log.info("recv ping success, ack: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
- //acknowledgment.acknowledge();
- } catch (Exception e) {
- log.error("onMessage:" + e.getMessage());
- }
- }
- }
|