123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- package com.tsi.comm.server.kafka;
- import com.tsi.app.common.kafka.KafkaProducerFactory;
- import com.tsi.app.common.utils.TimeUtils;
- import com.tsi.comm.server.config.TsiCvimServerConfig;
- import com.tsi.comm.server.config.TsiKafkaProducerConfig;
- import com.tsi.comm.server.mybatis.vo.AbstractDbmsVo;
- import com.tsi.comm.server.mybatis.vo.AlarmOccrVo;
- import com.tsi.comm.server.mybatis.vo.KafkaTransVo;
- import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
- import com.tsi.comm.server.repository.TsiAlarmManager;
- import com.tsi.comm.server.repository.TsiTpmsManager;
- import com.tsi.comm.server.vo.TsiAlarmConfigVo;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.apache.kafka.common.PartitionInfo;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.stereotype.Service;
- import org.springframework.util.concurrent.ListenableFuture;
- import org.springframework.util.concurrent.ListenableFutureCallback;
- import javax.annotation.PostConstruct;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @Service
- public class KafkaProducerService {
- private final TsiCvimServerConfig config;
- private final TsiKafkaProducerConfig producerConfig;
- private final TsiCvimDbmsProcess dbmsProcess;
- private KafkaTemplate<String, byte[]> cvimProducer;
- private KafkaTemplate<String, byte[]> nodeProducer;
- private KafkaTemplate<String, byte[]> testProducer;
- private KafkaTemplate<String, Long> pingProducer;
- //ProducerResultCallback callback;
- public KafkaProducerService(TsiCvimServerConfig config, TsiKafkaProducerConfig producerConfig, TsiCvimDbmsProcess dbmsProcess) {
- this.config = config;
- this.producerConfig = producerConfig;
- this.dbmsProcess = dbmsProcess;
- }
- @PostConstruct
- void init() {
- //this.callback = new ProducerResultCallback();
- if (this.producerConfig.isMultiConnect()) {
- // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용한다.
- if (this.producerConfig.isEnableCvim()) {
- this.cvimProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getCvimServers(), this.producerConfig.props);
- this.cvimProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
- }
- if (this.producerConfig.isEnableNode()) {
- this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getNodeServers(), this.producerConfig.props);
- }
- if (this.producerConfig.isEnableTest()) {
- this.testProducer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getTestServers(), this.producerConfig.props);
- this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
- }
- }
- else {
- // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
- // 동일한 KafkaTemplate 를 사용한다.
- KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.producerConfig.getBootstrapServers(), this.producerConfig.props);
- if (this.producerConfig.isEnableCvim()) {
- this.cvimProducer = producer;
- }
- if (this.producerConfig.isEnableNode()) {
- this.nodeProducer = producer;
- }
- if (this.producerConfig.isEnableTest()) {
- this.testProducer = producer;
- this.testProducer.setDefaultTopic(TsiKafkaProducerConfig.TEST_TOPIC);
- }
- }
- createPingProducer();
- log.info("[{}] ------------------", this.getClass().getSimpleName());
- log.info("[{}] cvimProducer: {}", this.getClass().getSimpleName(), this.cvimProducer);
- log.info("[{}] nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
- log.info("[{}] testProducer: {}", this.getClass().getSimpleName(), this.testProducer);
- log.info("[{}] pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
- //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
- }
- public void createPingProducer() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.producerConfig.getBootstrapServers());
- props.put(ProducerConfig.ACKS_CONFIG, this.producerConfig.getConsumerAckConfig());
- props.put(ProducerConfig.RETRIES_CONFIG, 0);
- props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
- props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
- props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
- props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
- this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
- this.pingProducer.setDefaultTopic(TsiKafkaProducerConfig.CVIM_PING+this.config.getServerId());
- }
- public void shutdown() {
- try {
- if (this.cvimProducer != null) {
- this.cvimProducer.destroy();
- }
- if (this.nodeProducer != null) {
- this.nodeProducer.destroy();
- }
- if (this.testProducer != null) {
- this.testProducer.destroy();
- }
- if (this.pingProducer != null) {
- this.pingProducer.destroy();
- }
- }
- catch(Exception e) {
- }
- }
- public boolean initPartitions() {
- try {
- if (this.testProducer != null) {
- List<PartitionInfo> partitionInfos = this.testProducer.partitionsFor(TsiKafkaProducerConfig.TEST_TOPIC);
- log.info("{} partitions: {}", TsiKafkaProducerConfig.TEST_TOPIC, partitionInfos);
- }
- }
- catch(Exception e) {
- log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, e.toString());
- return false;
- }
- try {
- if (this.cvimProducer != null) {
- List<PartitionInfo> partitionInfos = this.cvimProducer.partitionsFor(TsiKafkaProducerConfig.CVIM_RAW_TOPIC);
- log.info("{} partitions: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, partitionInfos);
- }
- }
- catch (Exception e) {
- log.error("Request partitionFor {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, e.toString());
- return false;
- }
- return true;
- }
- public void sendPing() {
- if (this.pingProducer == null ) {
- log.info("sendPing: pingProducer == null");
- return;
- }
- long sendNanoTime = System.nanoTime();
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime); // nano seconds
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0); // micro seconds
- TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0); // micro seconds
- ListenableFuture<SendResult<String, Long>> future = this.pingProducer.sendDefault("key", sendNanoTime);
- future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
- @Override
- public void onSuccess(SendResult<String, Long> result) {
- long recvNanoTime = System.nanoTime();
- long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
- log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
- // 카프카 전송 지연 알람 저장
- if (TsiAlarmManager.getInstance().checkAlarm(TsiAlarmConfigVo.KAFKA_02)) {
- TsiAlarmConfigVo alarmConfig = TsiAlarmManager.getInstance().get(TsiAlarmConfigVo.KAFKA_02);
- if (alarmConfig != null && sendTime > alarmConfig.getValue()) {
- AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
- alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_02);
- alarm.setAlarmTarget(producerConfig.getBootstrapServers());
- alarm.setAlarmValue(Long.toString(sendTime));
- dbmsProcess.add(alarm, (int) Thread.currentThread().getId());
- }
- }
- }
- @Override
- public void onFailure(Throwable ex) {
- long recvNanoTime = System.nanoTime();
- TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
- KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
- stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
- stat.setStatus(0);
- stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
- stat.setRecvTm(0);
- dbmsProcess.add(stat, (int)Thread.currentThread().getId());
- log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
- // 카프카 전송 오류 알람 저장
- String value = "Send Failed";
- if (ex != null) {
- value = ex.getMessage().substring(0, 99);
- }
- AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
- alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
- alarm.setAlarmTarget(producerConfig.getBootstrapServers());
- alarm.setAlarmValue(value);
- dbmsProcess.add(alarm, (int)Thread.currentThread().getId());
- }
- });
- }
- public void sendCvim(long key, byte[] data) {
- if (this.cvimProducer != null) {
- try {
- if (this.producerConfig.isMultiConnect()) {
- this.cvimProducer.sendDefault(Long.toString(key), data);
- }
- else {
- this.cvimProducer.send(TsiKafkaProducerConfig.CVIM_RAW_TOPIC, Long.toString(key), data);
- }
- }
- catch (Exception e) {
- log.error("sendCvim: {}, {}: {}", TsiKafkaProducerConfig.CVIM_RAW_TOPIC, key, e.toString());
- }
- }
- }
- public void sendNode(String key, byte[] data) {
- if (this.nodeProducer != null) {
- try {
- this.nodeProducer.send(key, key, data);
- }
- catch (Exception e) {
- log.error("sendNode: {}, {}: {}", key, key, e.toString());
- }
- }
- }
- public void sendTest(long key, byte[] data) {
- if (this.testProducer != null) {
- try {
- //this.producer.send(new ProducerRecord<String, byte[]>(Long.toString(key), data), this.callback);
- this.testProducer.sendDefault(Long.toString(key), data);
- }
- catch (Exception e) {
- log.error("sendTest: {}, {}: {}", TsiKafkaProducerConfig.TEST_TOPIC, key, e.toString());
- }
- }
- }
- protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
- try {
- kafka.send(topic, key, data);
- }
- catch(Exception e) {
- log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
- }
- }
- private static class ProducerResultCallback implements Callback {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
- }
- else {
- String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
- System.out.println(message);
- }
- }
- }
- }
|