package com.tsi.comm.server.kafka; import com.tsi.comm.server.config.TsiCvimServerConfig; import com.tsi.comm.server.config.TsiKafkaProducerConfig; import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo; import com.tsi.comm.server.vo.mariadb.AlarmOccrVo; import com.tsi.comm.server.vo.mariadb.KafkaTransVo; import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess; import com.tsi.comm.server.repository.TsiAlarmManager; import com.tsi.comm.server.repository.TsiTpmsManager; import com.tsi.comm.server.vo.TsiAlarmConfigVo; import com.tsi.common.utils.TimeUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.jetbrains.annotations.NotNull; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @Slf4j @RequiredArgsConstructor @Service public class KafkaProducerService { private final TsiCvimServerConfig config; private final TsiKafkaProducerConfig producerConfig; private final TsiAlarmManager alarmManager; private final TsiTpmsManager tpmsManager; private final TsiCvimDbmsProcess dbmsProcess; private KafkaTemplate cvimProducer; private KafkaTemplate nodeProducer; private KafkaTemplate testProducer; private KafkaTemplate pingProducer; public void start() { //this.callback = new ProducerResultCallback(); if (this.producerConfig.isMultiConnect()) { // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용한다. if (this.producerConfig.isEnableCvim()) { this.cvimProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getCvimServers(), this.producerConfig.getProps()); this.cvimProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_RAW_TOPIC); } if (this.producerConfig.isEnableNode()) { this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getNodeServers(), this.producerConfig.getProps()); } if (this.producerConfig.isEnableTest()) { this.testProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getTestServers(), this.producerConfig.getProps()); this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC); } } else { // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우 // 동일한 KafkaTemplate 를 사용한다. KafkaTemplate producer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getBootstrapServers(), this.producerConfig.getProps()); if (this.producerConfig.isEnableCvim()) { this.cvimProducer = producer; } if (this.producerConfig.isEnableNode()) { this.nodeProducer = producer; } if (this.producerConfig.isEnableTest()) { this.testProducer = producer; this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC); } } createPingProducer(); log.info("[{}] ------------------", this.getClass().getSimpleName()); log.info("[{}] cvimProducer: {}", this.getClass().getSimpleName(), this.cvimProducer); log.info("[{}] nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer); log.info("[{}] testProducer: {}", this.getClass().getSimpleName(), this.testProducer); log.info("[{}] pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer); //this.producer = new KafkaProducer(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props)); } public void createPingProducer() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.producerConfig.getBootstrapServers()); props.put(ProducerConfig.ACKS_CONFIG, this.producerConfig.getConsumerAckConfig()); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class); this.pingProducer = KafkaProducerFactory.createProducerTemplate(props); this.pingProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_PING+this.config.getServerId()); } public void shutdown() { try { if (this.cvimProducer != null) { this.cvimProducer.destroy(); } if (this.nodeProducer != null) { this.nodeProducer.destroy(); } if (this.testProducer != null) { this.testProducer.destroy(); } if (this.pingProducer != null) { this.pingProducer.destroy(); } } catch(Exception e) { // 로그를 남기지 않고, 예외가 발생해도 무시한다. } } public void sendPing() { if (this.pingProducer == null ) { log.info("sendPing: pingProducer == null"); return; } long sendNanoTime = System.nanoTime(); this.tpmsManager.getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds this.tpmsManager.getKafkaTransVo().setSendTm(0); // micro seconds this.tpmsManager.getKafkaTransVo().setRecvTm(0); // micro seconds ListenableFuture> future = this.pingProducer.sendDefault("key", sendNanoTime); future.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(SendResult result) { long recvNanoTime = System.nanoTime(); long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS); tpmsManager.getKafkaTransVo().setSendTm(sendTime); log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime)); // 카프카 전송 지연 알람 저장 if (alarmManager.checkAlarm(TsiAlarmConfigVo.KAFKA_02)) { TsiAlarmConfigVo alarmConfig = alarmManager.get(TsiAlarmConfigVo.KAFKA_02); if (alarmConfig != null && sendTime > alarmConfig.getValue()) { AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS); alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_02); alarm.setAlarmTarget(producerConfig.getBootstrapServers()); alarm.setAlarmValue(Long.toString(sendTime)); dbmsProcess.add(alarm, 0); } } } @Override public void onFailure(@NotNull Throwable ex) { long recvNanoTime = System.nanoTime(); tpmsManager.getKafkaTransVo().setSendNanoTime(0); KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS); stat.setHostName(tpmsManager.getKafkaTransVo().getHostName()); stat.setStatus(0); stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS)); stat.setRecvTm(0); dbmsProcess.add(stat, 0); log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage()); // 카프카 전송 오류 알람 저장 String value = ex.getMessage().substring(0, 99); //if (ex != null) { // value = ex.getMessage().substring(0, 99); //} AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS); alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01); alarm.setAlarmTarget(producerConfig.getBootstrapServers()); alarm.setAlarmValue(value); dbmsProcess.add(alarm, 0); } }); } public void sendCvim(long key, byte[] data) { if (this.cvimProducer != null) { try { if (this.producerConfig.isMultiConnect()) { this.cvimProducer.sendDefault(Long.toString(key), data); } else { this.cvimProducer.send(TsiKafkaProducerConfig.CVIM_RAW_TOPIC, Long.toString(key), data); } } catch (Exception e) { log.error("sendCvim: {}, {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, key, e.toString()); } } } public void sendNode(String key, byte[] data) { if (this.nodeProducer != null) { try { this.nodeProducer.send(key, key, data); } catch (Exception e) { log.error("sendNode: {}, {}: {}", key, key, e.toString()); } } } public void sendTest(long key, byte[] data) { if (this.testProducer != null) { try { //this.producer.send(new ProducerRecord(Long.toString(key), data), this.callback); this.testProducer.sendDefault(Long.toString(key), data); } catch (Exception e) { log.error("sendTest: {}, {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, key, e.toString()); } } } }