KafkaProducerService.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package com.tsi.comm.server.kafka;
  2. import com.tsi.comm.server.config.TsiCvimServerConfig;
  3. import com.tsi.comm.server.config.TsiKafkaProducerConfig;
  4. import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
  5. import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
  6. import com.tsi.comm.server.vo.mariadb.KafkaTransVo;
  7. import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
  8. import com.tsi.comm.server.repository.TsiAlarmManager;
  9. import com.tsi.comm.server.repository.TsiTpmsManager;
  10. import com.tsi.comm.server.vo.TsiAlarmConfigVo;
  11. import com.tsi.common.utils.TimeUtils;
  12. import lombok.RequiredArgsConstructor;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.kafka.clients.producer.ProducerConfig;
  15. import org.jetbrains.annotations.NotNull;
  16. import org.springframework.kafka.core.KafkaTemplate;
  17. import org.springframework.kafka.support.SendResult;
  18. import org.springframework.stereotype.Service;
  19. import org.springframework.util.concurrent.ListenableFuture;
  20. import org.springframework.util.concurrent.ListenableFutureCallback;
  21. import java.util.HashMap;
  22. import java.util.Map;
  23. import java.util.concurrent.TimeUnit;
  24. @Slf4j
  25. @RequiredArgsConstructor
  26. @Service
  27. public class KafkaProducerService {
  28. private final TsiCvimServerConfig config;
  29. private final TsiKafkaProducerConfig producerConfig;
  30. private final TsiAlarmManager alarmManager;
  31. private final TsiTpmsManager tpmsManager;
  32. private final TsiCvimDbmsProcess dbmsProcess;
  33. private KafkaTemplate<String, byte[]> cvimProducer;
  34. private KafkaTemplate<String, byte[]> nodeProducer;
  35. private KafkaTemplate<String, byte[]> testProducer;
  36. private KafkaTemplate<String, Long> pingProducer;
  37. public void start() {
  38. //this.callback = new ProducerResultCallback();
  39. if (this.producerConfig.isMultiConnect()) {
  40. // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용한다.
  41. if (this.producerConfig.isEnableCvim()) {
  42. this.cvimProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getCvimServers(), this.producerConfig.getProps());
  43. this.cvimProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
  44. }
  45. if (this.producerConfig.isEnableNode()) {
  46. this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getNodeServers(), this.producerConfig.getProps());
  47. }
  48. if (this.producerConfig.isEnableTest()) {
  49. this.testProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getTestServers(), this.producerConfig.getProps());
  50. this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
  51. }
  52. }
  53. else {
  54. // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
  55. // 동일한 KafkaTemplate 를 사용한다.
  56. KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getBootstrapServers(), this.producerConfig.getProps());
  57. if (this.producerConfig.isEnableCvim()) {
  58. this.cvimProducer = producer;
  59. }
  60. if (this.producerConfig.isEnableNode()) {
  61. this.nodeProducer = producer;
  62. }
  63. if (this.producerConfig.isEnableTest()) {
  64. this.testProducer = producer;
  65. this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
  66. }
  67. }
  68. createPingProducer();
  69. log.info("[{}] ------------------", this.getClass().getSimpleName());
  70. log.info("[{}] cvimProducer: {}", this.getClass().getSimpleName(), this.cvimProducer);
  71. log.info("[{}] nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
  72. log.info("[{}] testProducer: {}", this.getClass().getSimpleName(), this.testProducer);
  73. log.info("[{}] pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
  74. //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
  75. }
  76. public void createPingProducer() {
  77. Map<String, Object> props = new HashMap<>();
  78. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.producerConfig.getBootstrapServers());
  79. props.put(ProducerConfig.ACKS_CONFIG, this.producerConfig.getConsumerAckConfig());
  80. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  81. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  82. props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
  83. props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
  84. props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
  85. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
  86. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
  87. this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
  88. this.pingProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_PING+this.config.getServerId());
  89. }
  90. public void shutdown() {
  91. try {
  92. if (this.cvimProducer != null) {
  93. this.cvimProducer.destroy();
  94. }
  95. if (this.nodeProducer != null) {
  96. this.nodeProducer.destroy();
  97. }
  98. if (this.testProducer != null) {
  99. this.testProducer.destroy();
  100. }
  101. if (this.pingProducer != null) {
  102. this.pingProducer.destroy();
  103. }
  104. }
  105. catch(Exception e) {
  106. // 로그를 남기지 않고, 예외가 발생해도 무시한다.
  107. }
  108. }
  109. public void sendPing() {
  110. if (this.pingProducer == null ) {
  111. log.info("sendPing: pingProducer == null");
  112. return;
  113. }
  114. long sendNanoTime = System.nanoTime();
  115. this.tpmsManager.getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds
  116. this.tpmsManager.getKafkaTransVo().setSendTm(0); // micro seconds
  117. this.tpmsManager.getKafkaTransVo().setRecvTm(0); // micro seconds
  118. ListenableFuture<SendResult<String, Long>> future = this.pingProducer.sendDefault("key", sendNanoTime);
  119. future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
  120. @Override
  121. public void onSuccess(SendResult<String, Long> result) {
  122. long recvNanoTime = System.nanoTime();
  123. long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
  124. tpmsManager.getKafkaTransVo().setSendTm(sendTime);
  125. log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
  126. // 카프카 전송 지연 알람 저장
  127. if (alarmManager.checkAlarm(TsiAlarmConfigVo.KAFKA_02)) {
  128. TsiAlarmConfigVo alarmConfig = alarmManager.get(TsiAlarmConfigVo.KAFKA_02);
  129. if (alarmConfig != null && sendTime > alarmConfig.getValue()) {
  130. AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
  131. alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_02);
  132. alarm.setAlarmTarget(producerConfig.getBootstrapServers());
  133. alarm.setAlarmValue(Long.toString(sendTime));
  134. dbmsProcess.add(alarm, 0);
  135. }
  136. }
  137. }
  138. @Override
  139. public void onFailure(@NotNull Throwable ex) {
  140. long recvNanoTime = System.nanoTime();
  141. tpmsManager.getKafkaTransVo().setSendNanoTime(0);
  142. KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
  143. stat.setHostName(tpmsManager.getKafkaTransVo().getHostName());
  144. stat.setStatus(0);
  145. stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
  146. stat.setRecvTm(0);
  147. dbmsProcess.add(stat, 0);
  148. log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
  149. // 카프카 전송 오류 알람 저장
  150. String value = ex.getMessage().substring(0, 99);
  151. //if (ex != null) {
  152. // value = ex.getMessage().substring(0, 99);
  153. //}
  154. AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
  155. alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
  156. alarm.setAlarmTarget(producerConfig.getBootstrapServers());
  157. alarm.setAlarmValue(value);
  158. dbmsProcess.add(alarm, 0);
  159. }
  160. });
  161. }
  162. public void sendCvim(long key, byte[] data) {
  163. if (this.cvimProducer != null) {
  164. try {
  165. if (this.producerConfig.isMultiConnect()) {
  166. this.cvimProducer.sendDefault(Long.toString(key), data);
  167. }
  168. else {
  169. this.cvimProducer.send(TsiKafkaProducerConfig.CVIM_RAW_TOPIC, Long.toString(key), data);
  170. }
  171. }
  172. catch (Exception e) {
  173. log.error("sendCvim: {}, {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, key, e.toString());
  174. }
  175. }
  176. }
  177. public void sendNode(String key, byte[] data) {
  178. if (this.nodeProducer != null) {
  179. try {
  180. this.nodeProducer.send(key, key, data);
  181. }
  182. catch (Exception e) {
  183. log.error("sendNode: {}, {}: {}", key, key, e.toString());
  184. }
  185. }
  186. }
  187. public void sendTest(long key, byte[] data) {
  188. if (this.testProducer != null) {
  189. try {
  190. //this.producer.send(new ProducerRecord<String, byte[]>(Long.toString(key), data), this.callback);
  191. this.testProducer.sendDefault(Long.toString(key), data);
  192. }
  193. catch (Exception e) {
  194. log.error("sendTest: {}, {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, key, e.toString());
  195. }
  196. }
  197. }
  198. }