TsiKafkaConsumerWorker.java 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package com.sig.comm.server.kafka;
  2. import com.sig.app.common.utils.TimeUtils;
  3. import com.sig.comm.server.process.dbms.DbmsDataProcess;
  4. import lombok.AllArgsConstructor;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.springframework.kafka.listener.MessageListener;
  8. import org.springframework.kafka.support.Acknowledgment;
  9. @Slf4j
  10. @AllArgsConstructor
  11. public class TsiKafkaConsumerWorker implements MessageListener<String, Long> {
  12. private DbmsDataProcess dbmsDataProcess;
  13. @Override
  14. public void onMessage(ConsumerRecord<String, Long> record) {
  15. Long sendNanoTime = record.value();
  16. Long recvNanoTime = System.nanoTime();
  17. // KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
  18. // stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
  19. // stat.setStatus(1);
  20. // if (TsiTpmsManager.getInstance().getKafkaTransVo().getSendNanoTime() == sendNanoTime) {
  21. // stat.setSendTm(TsiTpmsManager.getInstance().getKafkaTransVo().getSendTm());
  22. // stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
  23. // }
  24. // else {
  25. // stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
  26. // stat.setSendTm(stat.getRecvTm());
  27. // log.info("recv ping success, sendNanoTime miss match: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
  28. // }
  29. // dbmsDataProcess.add(stat, (int)Thread.currentThread().getId());
  30. // log.info("recv ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
  31. }
  32. @Override
  33. public void onMessage(ConsumerRecord<String, Long> record, Acknowledgment acknowledgment) {
  34. try {
  35. Long sendNanoTime = record.value();
  36. Long recvNanoTime = System.nanoTime();
  37. log.info("recv ping success, ack: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
  38. //acknowledgment.acknowledge();
  39. } catch (Exception e) {
  40. log.error("onMessage:" + e.getMessage());
  41. }
  42. }
  43. }