shjung 11 månader sedan
förälder
incheckning
f5501ff4d0

+ 60 - 0
src/main/java/com/sig/comm/server/config/KafkaConfig.java

@@ -0,0 +1,60 @@
+package com.sig.comm.server.config;
+
+import com.sig.app.common.xnet.NetUtils;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Data
+@Component
+@ConfigurationProperties(prefix = "application.kafka")
+public class KafkaConfig {
+
+    private String bootstrapServers;
+    private String groupId = "ggits-comm-server";
+    private String pingTopic = "ping-topic";
+
+    private String consumerGroupId = "tsi-comm-server";
+    private String consumerAckConfig = "1";
+
+    private boolean multiConnect = false;
+    private boolean enableNode = false;
+    private String nodeServers = "";
+    public List<Map<String, String>> props = new ArrayList<Map<String, String>>();
+
+    @PostConstruct
+    private void init() {
+
+        log.info("{}", this);
+    }
+
+    public String getGroupId() {
+        return this.consumerGroupId + "-" + NetUtils.getHostName();
+    }
+
+    public Map<String, Object> getConsumerPropertiesMap() {
+        Map<String, Object> properties = new HashMap();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, getGroupId());
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
+        properties.put(ConsumerConfig.CHECK_CRCS_CONFIG, false);
+        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongDeserializer.class);
+
+        return properties;
+    }
+}

+ 86 - 0
src/main/java/com/sig/comm/server/kafka/KafkaConsumerService.java

@@ -0,0 +1,86 @@
+package com.sig.comm.server.kafka;
+
+import com.sig.comm.server.config.KafkaConfig;
+import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
+import org.springframework.kafka.listener.ContainerProperties;
+
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+
+@Slf4j
+@RequiredArgsConstructor
+//@Service
+public class KafkaConsumerService {
+
+    private final KafkaConfig config;
+    private final DbmsDataProcess dbmsDataProcess;
+
+    private ConcurrentMessageListenerContainer<String, Long> kafkaListenerContainer;
+
+    @PostConstruct
+    void init() {
+        log.info("[{}] ------------------", this.getClass().getSimpleName());
+        start();
+    }
+
+    public void start() {
+
+        if (this.kafkaListenerContainer != null) {
+            if (!this.kafkaListenerContainer.isRunning()) {
+                log.warn("kafkaListenerContainer restart");
+                this.kafkaListenerContainer.start();
+            }
+            return;
+        }
+
+        ContainerProperties containerProperties = new ContainerProperties(this.config.getPingTopic());
+        containerProperties.setGroupId(this.config.getGroupId());
+        containerProperties.setPollTimeout(5000);
+        //containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
+        containerProperties.setMessageListener(new TsiKafkaConsumerWorker(this.dbmsDataProcess));
+        containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
+            @Override
+            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+            }
+            @Override
+            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+            }
+            @Override
+            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+                consumer.seekToEnd(partitions);
+            }
+        });
+
+        ConsumerFactory<String, Long> consumerFactory = new DefaultKafkaConsumerFactory<>(this.config.getConsumerPropertiesMap());
+        this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
+        this.kafkaListenerContainer.setBeanName("consumer");
+        this.kafkaListenerContainer.setConcurrency(1);
+        this.kafkaListenerContainer.setErrorHandler((thrownException, data) -> {
+            log.error("kafkaListenerContainer error: {}", thrownException.getMessage());
+            this.kafkaListenerContainer.stop();
+        });
+
+        this.kafkaListenerContainer.start();
+    }
+
+    public void shutdown() {
+        try {
+            //if (this.consumer != null) {
+            //    this.consumer.close();
+            //}
+            if (this.kafkaListenerContainer != null) {
+                this.kafkaListenerContainer.stop();
+            }
+        }
+        catch(Exception ignored) {
+        }
+    }
+}

+ 170 - 0
src/main/java/com/sig/comm/server/kafka/KafkaProducerService.java

@@ -0,0 +1,170 @@
+package com.sig.comm.server.kafka;
+
+import com.sig.app.common.kafka.KafkaProducerFactory;
+import com.sig.app.common.utils.TimeUtils;
+import com.sig.comm.server.config.KafkaConfig;
+import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@AllArgsConstructor
+@Service
+public class KafkaProducerService {
+
+    private final KafkaConfig config;
+    private final DbmsDataProcess dbmsDataProcess;
+
+    private KafkaTemplate<String, byte[]> nodeProducer;
+    private KafkaTemplate<String, Long> pingProducer;
+
+
+    @PostConstruct
+    void init() {
+        //this.callback = new ProducerResultCallback();
+
+        if (this.config.isMultiConnect()) {
+            // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용 한다.
+            if (this.config.isEnableNode()) {
+                this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.config.getNodeServers(), this.config.props);
+            }
+        }
+        else {
+            // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우
+            // 동일한 KafkaTemplate 를 사용 한다.
+            KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
+            if (this.config.isEnableNode()) {
+                this.nodeProducer = producer;
+            }
+        }
+
+        createPingProducer();
+
+        log.info("[{}] ------------------", this.getClass().getSimpleName());
+        log.info("[{}]   nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer);
+        log.info("[{}]   pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer);
+
+        //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
+    }
+
+    public void createPingProducer() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrapServers());
+        props.put(ProducerConfig.ACKS_CONFIG, this.config.getConsumerAckConfig());
+        props.put(ProducerConfig.RETRIES_CONFIG, 0);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
+        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
+        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
+
+        this.pingProducer = KafkaProducerFactory.createProducerTemplate(props);
+        this.pingProducer.setDefaultTopic(this.config.getPingTopic());
+    }
+    public void shutdown() {
+        try {
+            if (this.nodeProducer != null) {
+                this.nodeProducer.destroy();
+            }
+            if (this.pingProducer != null) {
+                this.pingProducer.destroy();
+            }
+        }
+        catch(Exception e) {
+            log.error("Failed to shutdown: {}", e.getMessage());
+        }
+    }
+    public void sendPing() {
+        if (this.pingProducer == null ) {
+            log.info("sendPing: pingProducer == null");
+            return;
+        }
+
+        long sendNanoTime = System.nanoTime();
+//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime);   // nano seconds
+//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0);                    // micro seconds
+//        TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0);                    // micro seconds
+
+        ListenableFuture<SendResult<String, Long>> future =  this.pingProducer.sendDefault("key", sendNanoTime);
+        future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() {
+
+            @Override
+            public void onSuccess(SendResult<String, Long> result) {
+                long recvNanoTime = System.nanoTime();
+                long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS);
+//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime);
+                log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
+            }
+            @Override
+            public void onFailure(Throwable ex) {
+                long recvNanoTime = System.nanoTime();
+//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0);
+//                KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
+//                stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
+//                stat.setStatus(0);
+//                stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
+//                stat.setRecvTm(0);
+//                tsiCvimDbmsService.add(stat, (int)Thread.currentThread().getId());
+//                log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage());
+//
+//                // 카프카 전송 오류 알람 저장
+//                String value = "Send Failed";
+//                if (ex != null) {
+//                    value = ex.getMessage().substring(0, 99);
+//                }
+//                AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
+//                alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01);
+//                alarm.setAlarmTarget(producerConfig.getBootstrapServers());
+//                alarm.setAlarmValue(value);
+//                tsiCvimDbmsService.add(alarm, (int)Thread.currentThread().getId());
+            }
+        });
+    }
+
+    public void sendNode(String key, byte[] data) {
+        if (this.nodeProducer != null) {
+            try {
+                this.nodeProducer.send(key, key, data);
+            }
+            catch (Exception e) {
+                log.error("sendNode: {}, {}: {}", key, key, e.toString());
+            }
+        }
+    }
+
+    protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) {
+        try {
+            kafka.send(topic, key, data);
+        }
+        catch(Exception e) {
+            log.error("kafka.send: {}, Exception: {}", topic, e.getMessage());
+        }
+    }
+
+    private static class ProducerResultCallback implements Callback {
+        @Override
+        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+            if (e != null) {
+                log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString());
+            }
+            else {
+                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
+                System.out.println(message);
+            }
+        }
+    }
+}

+ 50 - 0
src/main/java/com/sig/comm/server/kafka/TsiKafkaConsumerWorker.java

@@ -0,0 +1,50 @@
+package com.sig.comm.server.kafka;
+
+import com.sig.app.common.utils.TimeUtils;
+import com.sig.comm.server.process.dbms.DbmsDataProcess;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.listener.MessageListener;
+import org.springframework.kafka.support.Acknowledgment;
+
+@Slf4j
+@AllArgsConstructor
+public class TsiKafkaConsumerWorker implements MessageListener<String, Long> {
+
+    private DbmsDataProcess dbmsDataProcess;
+
+    @Override
+    public void onMessage(ConsumerRecord<String, Long> record) {
+        Long sendNanoTime = record.value();
+        Long recvNanoTime = System.nanoTime();
+
+//        KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS);
+//        stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName());
+//        stat.setStatus(1);
+//        if (TsiTpmsManager.getInstance().getKafkaTransVo().getSendNanoTime() == sendNanoTime) {
+//            stat.setSendTm(TsiTpmsManager.getInstance().getKafkaTransVo().getSendTm());
+//            stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
+//        }
+//        else {
+//            stat.setRecvTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS));
+//            stat.setSendTm(stat.getRecvTm());
+//            log.info("recv ping success, sendNanoTime miss match: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
+//        }
+//        dbmsDataProcess.add(stat, (int)Thread.currentThread().getId());
+//        log.info("recv ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
+    }
+
+    @Override
+    public void onMessage(ConsumerRecord<String, Long> record, Acknowledgment acknowledgment) {
+        try {
+            Long sendNanoTime = record.value();
+            Long recvNanoTime = System.nanoTime();
+            log.info("recv ping success, ack: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime));
+            //acknowledgment.acknowledge();
+        } catch (Exception e) {
+            log.error("onMessage:" + e.getMessage());
+        }
+    }
+
+}

+ 16 - 0
src/main/resources/application.yml

@@ -52,6 +52,22 @@ application:
     dbms: 10
     work: 5
 
+  kafka:
+    bootstrap-servers: 192.168.11.23:9092
+    group-id: ggits-comm-server
+    consumer-ack-config: 1
+    ping-topic: ping-topic
+    multi-connect: false
+    node-servers:
+    enable-node: false
+    props:
+    #  - request.timeout.ms: 100
+    #  - max.block.ms: 100
+    #  - transactional.id: tsi-comm-server-01
+    #  - acks: 0
+    #  - retries: 0
+    #  - linger.ms: 1
+
 ---
 spring:
   config: