|
@@ -0,0 +1,170 @@
|
|
|
+package com.ggits.comm.server.kafka;
|
|
|
+
|
|
|
+import com.ggits.app.common.kafka.KafkaProducerFactory;
|
|
|
+import com.ggits.app.common.utils.TimeUtils;
|
|
|
+import com.ggits.comm.server.config.KafkaConfig;
|
|
|
+import com.ggits.comm.server.process.dbms.DbmsDataProcess;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.kafka.clients.producer.Callback;
|
|
|
+import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
+import org.apache.kafka.clients.producer.RecordMetadata;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.kafka.support.SendResult;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.concurrent.ListenableFuture;
|
|
|
+import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@AllArgsConstructor
|
|
|
+@Service
|
|
|
+public class KafkaProducerService {
|
|
|
+
|
|
|
+ private final KafkaConfig config;
|
|
|
+ private final DbmsDataProcess dbmsDataProcess;
|
|
|
+
|
|
|
+ private KafkaTemplate<String, byte[]> nodeProducer;
|
|
|
+ private KafkaTemplate<String, Long> pingProducer;
|
|
|
+
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ void init() {
|
|
|
+ //this.callback = new ProducerResultCallback();
|
|
|
+
|
|
|
+ if (this.config.isMultiConnect()) {
|
|
|
+ // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용 한다.
|
|
|
+ if (this.config.isEnableNode()) {
|
|
|
+ this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.config.getNodeServers(), this.config.props);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
|
|
|
+ // 동일한 KafkaTemplate 를 사용 한다.
|
|
|
+ KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
|
|
|
+ if (this.config.isEnableNode()) {
|
|
|
+ this.nodeProducer = producer;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ createPingProducer();
|
|
|
+
|
|
|
+ log.info("[{}] ------------------", this.getClass().getSimpleName());
|
|
|
+ log.info("[{}] nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
|
|
|
+ log.info("[{}] pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
|
|
|
+
|
|
|
+ //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void createPingProducer() {
|
|
|
+ Map<String, Object> props = new HashMap<>();
|
|
|
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrapServers());
|
|
|
+ props.put(ProducerConfig.ACKS_CONFIG, this.config.getConsumerAckConfig());
|
|
|
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
|
|
|
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
|
|
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
|
|
|
+ props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
|
|
|
+ props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
|
|
|
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
|
|
|
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
|
|
|
+
|
|
|
+ this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
|
|
|
+ this.pingProducer.setDefaultTopic(this.config.getPingTopic());
|
|
|
+ }
|
|
|
+ public void shutdown() {
|
|
|
+ try {
|
|
|
+ if (this.nodeProducer != null) {
|
|
|
+ this.nodeProducer.destroy();
|
|
|
+ }
|
|
|
+ if (this.pingProducer != null) {
|
|
|
+ this.pingProducer.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch(Exception e) {
|
|
|
+ log.error("Failed to shutdown: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public void sendPing() {
|
|
|
+ if (this.pingProducer == null ) {
|
|
|
+ log.info("sendPing: pingProducer == null");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long sendNanoTime = System.nanoTime();
|
|
|
+// TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds
|
|
|
+// TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0); // micro seconds
|
|
|
+// TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0); // micro seconds
|
|
|
+
|
|
|
+ ListenableFuture<SendResult<String, Long>> future = this.pingProducer.sendDefault("key", sendNanoTime);
|
|
|
+ future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onSuccess(SendResult<String, Long> result) {
|
|
|
+ long recvNanoTime = System.nanoTime();
|
|
|
+ long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
|
|
|
+// TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
|
|
|
+ log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void onFailure(Throwable ex) {
|
|
|
+ long recvNanoTime = System.nanoTime();
|
|
|
+// TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
|
|
|
+// KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
|
|
|
+// stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
|
|
|
+// stat.setStatus(0);
|
|
|
+// stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
|
|
|
+// stat.setRecvTm(0);
|
|
|
+// tsiCvimDbmsService.add(stat, (int)Thread.currentThread().getId());
|
|
|
+// log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
|
|
|
+//
|
|
|
+// // 카프카 전송 오류 알람 저장
|
|
|
+// String value = "Send Failed";
|
|
|
+// if (ex != null) {
|
|
|
+// value = ex.getMessage().substring(0, 99);
|
|
|
+// }
|
|
|
+// AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
|
|
|
+// alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
|
|
|
+// alarm.setAlarmTarget(producerConfig.getBootstrapServers());
|
|
|
+// alarm.setAlarmValue(value);
|
|
|
+// tsiCvimDbmsService.add(alarm, (int)Thread.currentThread().getId());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendNode(String key, byte[] data) {
|
|
|
+ if (this.nodeProducer != null) {
|
|
|
+ try {
|
|
|
+ this.nodeProducer.send(key, key, data);
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ log.error("sendNode: {}, {}: {}", key, key, e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
|
|
|
+ try {
|
|
|
+ kafka.send(topic, key, data);
|
|
|
+ }
|
|
|
+ catch(Exception e) {
|
|
|
+ log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class ProducerResultCallback implements Callback {
|
|
|
+ @Override
|
|
|
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
|
|
+ if (e != null) {
|
|
|
+ log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
|
|
|
+ System.out.println(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|