KafkaProducerService.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package com.tsi.comm.server.kafka;
  2. import com.tsi.app.common.kafka.KafkaProducerFactory;
  3. import com.tsi.app.common.utils.TimeUtils;
  4. import com.tsi.comm.server.config.TsiCvimServerConfig;
  5. import com.tsi.comm.server.config.TsiKafkaProducerConfig;
  6. import com.tsi.comm.server.mybatis.vo.AbstractDbmsVo;
  7. import com.tsi.comm.server.mybatis.vo.AlarmOccrVo;
  8. import com.tsi.comm.server.mybatis.vo.KafkaTransVo;
  9. import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
  10. import com.tsi.comm.server.repository.TsiAlarmManager;
  11. import com.tsi.comm.server.repository.TsiTpmsManager;
  12. import com.tsi.comm.server.vo.TsiAlarmConfigVo;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.kafka.clients.producer.Callback;
  15. import org.apache.kafka.clients.producer.ProducerConfig;
  16. import org.apache.kafka.clients.producer.RecordMetadata;
  17. import org.apache.kafka.common.PartitionInfo;
  18. import org.springframework.kafka.core.KafkaTemplate;
  19. import org.springframework.kafka.support.SendResult;
  20. import org.springframework.stereotype.Service;
  21. import org.springframework.util.concurrent.ListenableFuture;
  22. import org.springframework.util.concurrent.ListenableFutureCallback;
  23. import javax.annotation.PostConstruct;
  24. import java.util.HashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import java.util.concurrent.TimeUnit;
  28. @Slf4j
  29. @Service
  30. public class KafkaProducerService {
  31. private final TsiCvimServerConfig config;
  32. private final TsiKafkaProducerConfig producerConfig;
  33. private final TsiCvimDbmsProcess dbmsProcess;
  34. private KafkaTemplate<String, byte[]> cvimProducer;
  35. private KafkaTemplate<String, byte[]> nodeProducer;
  36. private KafkaTemplate<String, byte[]> testProducer;
  37. private KafkaTemplate<String, Long> pingProducer;
  38. //ProducerResultCallback callback;
  39. public KafkaProducerService(TsiCvimServerConfig config, TsiKafkaProducerConfig producerConfig, TsiCvimDbmsProcess dbmsProcess) {
  40. this.config = config;
  41. this.producerConfig = producerConfig;
  42. this.dbmsProcess = dbmsProcess;
  43. }
  44. @PostConstruct
  45. void init() {
  46. //this.callback = new ProducerResultCallback();
  47. if (this.producerConfig.isMultiConnect()) {
  48. // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용한다.
  49. if (this.producerConfig.isEnableCvim()) {
  50. this.cvimProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getCvimServers(), this.producerConfig.props);
  51. this.cvimProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
  52. }
  53. if (this.producerConfig.isEnableNode()) {
  54. this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getNodeServers(), this.producerConfig.props);
  55. }
  56. if (this.producerConfig.isEnableTest()) {
  57. this.testProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getTestServers(), this.producerConfig.props);
  58. this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
  59. }
  60. }
  61. else {
  62. // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
  63. // 동일한 KafkaTemplate 를 사용한다.
  64. KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getBootstrapServers(), this.producerConfig.props);
  65. if (this.producerConfig.isEnableCvim()) {
  66. this.cvimProducer = producer;
  67. }
  68. if (this.producerConfig.isEnableNode()) {
  69. this.nodeProducer = producer;
  70. }
  71. if (this.producerConfig.isEnableTest()) {
  72. this.testProducer = producer;
  73. this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
  74. }
  75. }
  76. createPingProducer();
  77. log.info("[{}] ------------------", this.getClass().getSimpleName());
  78. log.info("[{}] cvimProducer: {}", this.getClass().getSimpleName(), this.cvimProducer);
  79. log.info("[{}] nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
  80. log.info("[{}] testProducer: {}", this.getClass().getSimpleName(), this.testProducer);
  81. log.info("[{}] pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
  82. //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
  83. }
  84. public void createPingProducer() {
  85. Map<String, Object> props = new HashMap<>();
  86. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.producerConfig.getBootstrapServers());
  87. props.put(ProducerConfig.ACKS_CONFIG, this.producerConfig.getConsumerAckConfig());
  88. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  89. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  90. props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
  91. props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
  92. props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
  93. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
  94. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
  95. this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
  96. this.pingProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_PING+this.config.getServerId());
  97. }
  98. public void shutdown() {
  99. try {
  100. if (this.cvimProducer != null) {
  101. this.cvimProducer.destroy();
  102. }
  103. if (this.nodeProducer != null) {
  104. this.nodeProducer.destroy();
  105. }
  106. if (this.testProducer != null) {
  107. this.testProducer.destroy();
  108. }
  109. if (this.pingProducer != null) {
  110. this.pingProducer.destroy();
  111. }
  112. }
  113. catch(Exception e) {
  114. }
  115. }
  116. public boolean initPartitions() {
  117. try {
  118. if (this.testProducer != null) {
  119. List<PartitionInfo> partitionInfos = this.testProducer.partitionsFor(TsiKafkaProducerConfig.TEST_TOPIC);
  120. log.info("{} partitions: {}", TsiKafkaProducerConfig.TEST_TOPIC, partitionInfos);
  121. }
  122. }
  123. catch(Exception e) {
  124. log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, e.toString());
  125. return false;
  126. }
  127. try {
  128. if (this.cvimProducer != null) {
  129. List<PartitionInfo> partitionInfos = this.cvimProducer.partitionsFor(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
  130. log.info("{} partitions: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, partitionInfos);
  131. }
  132. }
  133. catch (Exception e) {
  134. log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, e.toString());
  135. return false;
  136. }
  137. return true;
  138. }
  139. public void sendPing() {
  140. if (this.pingProducer == null ) {
  141. log.info("sendPing: pingProducer == null");
  142. return;
  143. }
  144. long sendNanoTime = System.nanoTime();
  145. TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds
  146. TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0); // micro seconds
  147. TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0); // micro seconds
  148. ListenableFuture<SendResult<String, Long>> future = this.pingProducer.sendDefault("key", sendNanoTime);
  149. future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
  150. @Override
  151. public void onSuccess(SendResult<String, Long> result) {
  152. long recvNanoTime = System.nanoTime();
  153. long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
  154. TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
  155. log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
  156. // 카프카 전송 지연 알람 저장
  157. if (TsiAlarmManager.getInstance().checkAlarm(TsiAlarmConfigVo.KAFKA_02)) {
  158. TsiAlarmConfigVo alarmConfig = TsiAlarmManager.getInstance().get(TsiAlarmConfigVo.KAFKA_02);
  159. if (alarmConfig != null && sendTime > alarmConfig.getValue()) {
  160. AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
  161. alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_02);
  162. alarm.setAlarmTarget(producerConfig.getBootstrapServers());
  163. alarm.setAlarmValue(Long.toString(sendTime));
  164. dbmsProcess.add(alarm, (int) Thread.currentThread().getId());
  165. }
  166. }
  167. }
  168. @Override
  169. public void onFailure(Throwable ex) {
  170. long recvNanoTime = System.nanoTime();
  171. TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
  172. KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
  173. stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
  174. stat.setStatus(0);
  175. stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
  176. stat.setRecvTm(0);
  177. dbmsProcess.add(stat, (int)Thread.currentThread().getId());
  178. log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
  179. // 카프카 전송 오류 알람 저장
  180. String value = "Send Failed";
  181. if (ex != null) {
  182. value = ex.getMessage().substring(0, 99);
  183. }
  184. AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
  185. alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
  186. alarm.setAlarmTarget(producerConfig.getBootstrapServers());
  187. alarm.setAlarmValue(value);
  188. dbmsProcess.add(alarm, (int)Thread.currentThread().getId());
  189. }
  190. });
  191. }
  192. public void sendCvim(long key, byte[] data) {
  193. if (this.cvimProducer != null) {
  194. try {
  195. if (this.producerConfig.isMultiConnect()) {
  196. this.cvimProducer.sendDefault(Long.toString(key), data);
  197. }
  198. else {
  199. this.cvimProducer.send(TsiKafkaProducerConfig.CVIM_RAW_TOPIC, Long.toString(key), data);
  200. }
  201. }
  202. catch (Exception e) {
  203. log.error("sendCvim: {}, {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, key, e.toString());
  204. }
  205. }
  206. }
  207. public void sendNode(String key, byte[] data) {
  208. if (this.nodeProducer != null) {
  209. try {
  210. this.nodeProducer.send(key, key, data);
  211. }
  212. catch (Exception e) {
  213. log.error("sendNode: {}, {}: {}", key, key, e.toString());
  214. }
  215. }
  216. }
  217. public void sendTest(long key, byte[] data) {
  218. if (this.testProducer != null) {
  219. try {
  220. //this.producer.send(new ProducerRecord<String, byte[]>(Long.toString(key), data), this.callback);
  221. this.testProducer.sendDefault(Long.toString(key), data);
  222. }
  223. catch (Exception e) {
  224. log.error("sendTest: {}, {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, key, e.toString());
  225. }
  226. }
  227. }
  228. protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
  229. try {
  230. kafka.send(topic, key, data);
  231. }
  232. catch(Exception e) {
  233. log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
  234. }
  235. }
  236. private static class ProducerResultCallback implements Callback {
  237. @Override
  238. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  239. if (e != null) {
  240. log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
  241. }
  242. else {
  243. String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
  244. System.out.println(message);
  245. }
  246. }
  247. }
  248. }