|
@@ -11,29 +11,29 @@ import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
|
|
import com.tsi.comm.server.repository.TsiAlarmManager;
|
|
import com.tsi.comm.server.repository.TsiAlarmManager;
|
|
import com.tsi.comm.server.repository.TsiTpmsManager;
|
|
import com.tsi.comm.server.repository.TsiTpmsManager;
|
|
import com.tsi.comm.server.vo.TsiAlarmConfigVo;
|
|
import com.tsi.comm.server.vo.TsiAlarmConfigVo;
|
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
-import org.apache.kafka.clients.producer.Callback;
|
|
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
-import org.apache.kafka.clients.producer.RecordMetadata;
|
|
|
|
-import org.apache.kafka.common.PartitionInfo;
|
|
|
|
|
|
+import org.jetbrains.annotations.NotNull;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
import org.springframework.kafka.support.SendResult;
|
|
import org.springframework.kafka.support.SendResult;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
-import java.util.List;
|
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
|
+@RequiredArgsConstructor
|
|
@Service
|
|
@Service
|
|
public class KafkaProducerService {
|
|
public class KafkaProducerService {
|
|
|
|
|
|
private final TsiCvimServerConfig config;
|
|
private final TsiCvimServerConfig config;
|
|
private final TsiKafkaProducerConfig producerConfig;
|
|
private final TsiKafkaProducerConfig producerConfig;
|
|
|
|
+ private final TsiAlarmManager alarmManager;
|
|
|
|
+ private final TsiTpmsManager tpmsManager;
|
|
private final TsiCvimDbmsProcess dbmsProcess;
|
|
private final TsiCvimDbmsProcess dbmsProcess;
|
|
|
|
|
|
private KafkaTemplate<String, byte[]> cvimProducer;
|
|
private KafkaTemplate<String, byte[]> cvimProducer;
|
|
@@ -42,36 +42,27 @@ public class KafkaProducerService {
|
|
private KafkaTemplate<String, Long> pingProducer;
|
|
private KafkaTemplate<String, Long> pingProducer;
|
|
|
|
|
|
|
|
|
|
- //ProducerResultCallback callback;
|
|
|
|
-
|
|
|
|
- public KafkaProducerService(TsiCvimServerConfig config, TsiKafkaProducerConfig producerConfig, TsiCvimDbmsProcess dbmsProcess) {
|
|
|
|
- this.config = config;
|
|
|
|
- this.producerConfig = producerConfig;
|
|
|
|
- this.dbmsProcess = dbmsProcess;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @PostConstruct
|
|
|
|
- void init() {
|
|
|
|
|
|
+ public void start() {
|
|
//this.callback = new ProducerResultCallback();
|
|
//this.callback = new ProducerResultCallback();
|
|
|
|
|
|
if (this.producerConfig.isMultiConnect()) {
|
|
if (this.producerConfig.isMultiConnect()) {
|
|
// 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용한다.
|
|
// 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용한다.
|
|
if (this.producerConfig.isEnableCvim()) {
|
|
if (this.producerConfig.isEnableCvim()) {
|
|
- this.cvimProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getCvimServers(), this.producerConfig.props);
|
|
|
|
|
|
+ this.cvimProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getCvimServers(), this.producerConfig.getProps());
|
|
this.cvimProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
|
|
this.cvimProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
|
|
}
|
|
}
|
|
if (this.producerConfig.isEnableNode()) {
|
|
if (this.producerConfig.isEnableNode()) {
|
|
- this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getNodeServers(), this.producerConfig.props);
|
|
|
|
|
|
+ this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getNodeServers(), this.producerConfig.getProps());
|
|
}
|
|
}
|
|
if (this.producerConfig.isEnableTest()) {
|
|
if (this.producerConfig.isEnableTest()) {
|
|
- this.testProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getTestServers(), this.producerConfig.props);
|
|
|
|
|
|
+ this.testProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getTestServers(), this.producerConfig.getProps());
|
|
this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
|
|
this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
// 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
|
|
// 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
|
|
// 동일한 KafkaTemplate 를 사용한다.
|
|
// 동일한 KafkaTemplate 를 사용한다.
|
|
- KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getBootstrapServers(), this.producerConfig.props);
|
|
|
|
|
|
+ KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getBootstrapServers(), this.producerConfig.getProps());
|
|
if (this.producerConfig.isEnableCvim()) {
|
|
if (this.producerConfig.isEnableCvim()) {
|
|
this.cvimProducer = producer;
|
|
this.cvimProducer = producer;
|
|
}
|
|
}
|
|
@@ -126,33 +117,33 @@ public class KafkaProducerService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch(Exception e) {
|
|
catch(Exception e) {
|
|
-
|
|
|
|
|
|
+ // 로그를 남기지 않고, 예외가 발생해도 무시한다.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- public boolean initPartitions() {
|
|
|
|
- try {
|
|
|
|
- if (this.testProducer != null) {
|
|
|
|
- List<PartitionInfo> partitionInfos = this.testProducer.partitionsFor(TsiKafkaProducerConfig.TEST_TOPIC);
|
|
|
|
- log.info("{} partitions: {}", TsiKafkaProducerConfig.TEST_TOPIC, partitionInfos);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- catch(Exception e) {
|
|
|
|
- log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, e.toString());
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- if (this.cvimProducer != null) {
|
|
|
|
- List<PartitionInfo> partitionInfos = this.cvimProducer.partitionsFor(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
|
|
|
|
- log.info("{} partitions: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, partitionInfos);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- catch (Exception e) {
|
|
|
|
- log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, e.toString());
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
|
|
+// public boolean initPartitions() {
|
|
|
|
+// try {
|
|
|
|
+// if (this.testProducer != null) {
|
|
|
|
+// List<PartitionInfo> partitionInfos = this.testProducer.partitionsFor(TsiKafkaProducerConfig.TEST_TOPIC);
|
|
|
|
+// log.info("{} partitions: {}", TsiKafkaProducerConfig.TEST_TOPIC, partitionInfos);
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// catch(Exception e) {
|
|
|
|
+// log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, e.toString());
|
|
|
|
+// return false;
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// try {
|
|
|
|
+// if (this.cvimProducer != null) {
|
|
|
|
+// List<PartitionInfo> partitionInfos = this.cvimProducer.partitionsFor(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
|
|
|
|
+// log.info("{} partitions: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, partitionInfos);
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// catch (Exception e) {
|
|
|
|
+// log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, e.toString());
|
|
|
|
+// return false;
|
|
|
|
+// }
|
|
|
|
+// return true;
|
|
|
|
+// }
|
|
|
|
|
|
public void sendPing() {
|
|
public void sendPing() {
|
|
if (this.pingProducer == null ) {
|
|
if (this.pingProducer == null ) {
|
|
@@ -161,9 +152,9 @@ public class KafkaProducerService {
|
|
}
|
|
}
|
|
|
|
|
|
long sendNanoTime = System.nanoTime();
|
|
long sendNanoTime = System.nanoTime();
|
|
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds
|
|
|
|
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0); // micro seconds
|
|
|
|
- TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0); // micro seconds
|
|
|
|
|
|
+ tpmsManager.getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds
|
|
|
|
+ tpmsManager.getKafkaTransVo().setSendTm(0); // micro seconds
|
|
|
|
+ tpmsManager.getKafkaTransVo().setRecvTm(0); // micro seconds
|
|
|
|
|
|
ListenableFuture<SendResult<String, Long>> future = this.pingProducer.sendDefault("key", sendNanoTime);
|
|
ListenableFuture<SendResult<String, Long>> future = this.pingProducer.sendDefault("key", sendNanoTime);
|
|
future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
|
|
future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
|
|
@@ -172,12 +163,12 @@ public class KafkaProducerService {
|
|
public void onSuccess(SendResult<String, Long> result) {
|
|
public void onSuccess(SendResult<String, Long> result) {
|
|
long recvNanoTime = System.nanoTime();
|
|
long recvNanoTime = System.nanoTime();
|
|
long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
|
|
long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
|
|
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
|
|
|
|
|
|
+ tpmsManager.getKafkaTransVo().setSendTm(sendTime);
|
|
log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
|
|
log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
|
|
|
|
|
|
// 카프카 전송 지연 알람 저장
|
|
// 카프카 전송 지연 알람 저장
|
|
- if (TsiAlarmManager.getInstance().checkAlarm(TsiAlarmConfigVo.KAFKA_02)) {
|
|
|
|
- TsiAlarmConfigVo alarmConfig = TsiAlarmManager.getInstance().get(TsiAlarmConfigVo.KAFKA_02);
|
|
|
|
|
|
+ if (alarmManager.checkAlarm(TsiAlarmConfigVo.KAFKA_02)) {
|
|
|
|
+ TsiAlarmConfigVo alarmConfig = alarmManager.get(TsiAlarmConfigVo.KAFKA_02);
|
|
if (alarmConfig != null && sendTime > alarmConfig.getValue()) {
|
|
if (alarmConfig != null && sendTime > alarmConfig.getValue()) {
|
|
AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
|
|
AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
|
|
alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_02);
|
|
alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_02);
|
|
@@ -188,11 +179,11 @@ public class KafkaProducerService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@Override
|
|
@Override
|
|
- public void onFailure(Throwable ex) {
|
|
|
|
|
|
+ public void onFailure(@NotNull Throwable ex) {
|
|
long recvNanoTime = System.nanoTime();
|
|
long recvNanoTime = System.nanoTime();
|
|
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
|
|
|
|
|
|
+ tpmsManager.getKafkaTransVo().setSendNanoTime(0);
|
|
KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
|
|
KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
|
|
- stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
|
|
|
|
|
|
+ stat.setHostName(tpmsManager.getKafkaTransVo().getHostName());
|
|
stat.setStatus(0);
|
|
stat.setStatus(0);
|
|
stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
|
|
stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
|
|
stat.setRecvTm(0);
|
|
stat.setRecvTm(0);
|
|
@@ -200,10 +191,10 @@ public class KafkaProducerService {
|
|
log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
|
|
log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
|
|
|
|
|
|
// 카프카 전송 오류 알람 저장
|
|
// 카프카 전송 오류 알람 저장
|
|
- String value = "Send Failed";
|
|
|
|
- if (ex != null) {
|
|
|
|
- value = ex.getMessage().substring(0, 99);
|
|
|
|
- }
|
|
|
|
|
|
+ String value = ex.getMessage().substring(0, 99);
|
|
|
|
+ //if (ex != null) {
|
|
|
|
+ // value = ex.getMessage().substring(0, 99);
|
|
|
|
+ //}
|
|
AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
|
|
AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
|
|
alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
|
|
alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
|
|
alarm.setAlarmTarget(producerConfig.getBootstrapServers());
|
|
alarm.setAlarmTarget(producerConfig.getBootstrapServers());
|
|
@@ -253,25 +244,25 @@ public class KafkaProducerService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
|
|
|
|
- try {
|
|
|
|
- kafka.send(topic, key, data);
|
|
|
|
- }
|
|
|
|
- catch(Exception e) {
|
|
|
|
- log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+// protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
|
|
|
|
+// try {
|
|
|
|
+// kafka.send(topic, key, data);
|
|
|
|
+// }
|
|
|
|
+// catch(Exception e) {
|
|
|
|
+// log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
|
|
- private static class ProducerResultCallback implements Callback {
|
|
|
|
- @Override
|
|
|
|
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
|
|
|
- if (e != null) {
|
|
|
|
- log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
|
|
|
|
- }
|
|
|
|
- else {
|
|
|
|
- String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
|
|
|
|
- System.out.println(message);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+// private static class ProducerResultCallback implements Callback {
|
|
|
|
+// @Override
|
|
|
|
+// public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
|
|
|
+// if (e != null) {
|
|
|
|
+// log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
|
|
|
|
+// }
|
|
|
|
+// else {
|
|
|
|
+// String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
|
|
|
|
+// log.error(message);
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// }
|
|
}
|
|
}
|