| 
					
				 | 
			
			
				@@ -1,24 +1,13 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 package com.ggits.comm.server.kafka; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.ggits.app.common.kafka.KafkaProducerFactory; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.ggits.app.common.utils.TimeUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import com.ggits.comm.server.config.KafkaConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.ggits.comm.server.process.dbms.DbmsDataProcess; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import lombok.AllArgsConstructor; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.apache.kafka.clients.producer.Callback; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.apache.kafka.clients.producer.ProducerConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.apache.kafka.clients.producer.RecordMetadata; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.kafka.core.KafkaTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.kafka.support.SendResult; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import org.springframework.stereotype.Service; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.util.concurrent.ListenableFuture; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.util.concurrent.ListenableFutureCallback; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import javax.annotation.PostConstruct; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.HashMap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 @Slf4j 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 @AllArgsConstructor 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -26,113 +15,53 @@ import java.util.concurrent.TimeUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 public class KafkaProducerService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private final KafkaConfig config; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private final DbmsDataProcess dbmsDataProcess; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private KafkaTemplate<String, byte[]> sigProducer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private KafkaTemplate<String, byte[]> nodeProducer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private KafkaTemplate<String, Long> pingProducer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     @PostConstruct 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     void init() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         //this.callback = new ProducerResultCallback(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (this.config.isMultiConnect()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            // 각각의 Producer 에 대하여 KafkaTemplate 를 생성해서 사용 한다. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (this.config.isEnableNode()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                this.nodeProducer = KafkaProducerFactory.createByteArrayTemplate(this.config.getNodeServers(), this.config.props); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        // 동일한 KafkaTemplate 를 사용 한다. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (this.config.isEnableNode()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            this.nodeProducer = producer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            // 하나의 Producer KafkaTemplate 로 데이터를 전송하는 경우 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            // 동일한 KafkaTemplate 를 사용 한다. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (this.config.isEnableNode()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                this.nodeProducer = producer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (this.config.isEnableSig()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            this.sigProducer = producer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        createPingProducer(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         log.info("[{}] ------------------", this.getClass().getSimpleName()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log.info("[{}]   nodeProducer: {}", this.getClass().getSimpleName(), this.nodeProducer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log.info("[{}]   pingProducer: {}", this.getClass().getSimpleName(), this.pingProducer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log.info("[{}]   nodeProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableNode(), this.nodeProducer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log.info("[{}]    sigProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableSig(), this.sigProducer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public void createPingProducer() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<String, Object> props = new HashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrapServers()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.ACKS_CONFIG, this.config.getConsumerAckConfig()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.RETRIES_CONFIG, 0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 3000); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        this.pingProducer = KafkaProducerFactory.createProducerTemplate(props); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        this.pingProducer.setDefaultTopic(this.config.getPingTopic()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     public void shutdown() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if (this.nodeProducer != null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 this.nodeProducer.destroy(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (this.pingProducer != null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                this.pingProducer.destroy(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (this.sigProducer != null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                this.sigProducer.destroy(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         catch(Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             log.error("Failed to shutdown: {}", e.getMessage()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public void sendPing() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (this.pingProducer == null ) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            log.info("sendPing: pingProducer == null"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        long sendNanoTime = System.nanoTime(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(sendNanoTime);   // nano seconds 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//        TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(0);                    // micro seconds 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//        TsiTpmsManager.getInstance().getKafkaTransVo().setRecvTm(0);                    // micro seconds 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        ListenableFuture<SendResult<String, Long>> future =  this.pingProducer.sendDefault("key", sendNanoTime); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        future.addCallback(new ListenableFutureCallback<SendResult<String, Long>>() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            public void onSuccess(SendResult<String, Long> result) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                long recvNanoTime = System.nanoTime(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                long sendTime = TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendTm(sendTime); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                log.info("send ping success: {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public void sendSig(String key, byte[] data) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (this.sigProducer != null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                this.sigProducer.send(KafkaConfig.SIG_ALL_TOPIC, key, data); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            public void onFailure(Throwable ex) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                long recvNanoTime = System.nanoTime(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                TsiTpmsManager.getInstance().getKafkaTransVo().setSendNanoTime(0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                KafkaTransVo stat = new KafkaTransVo(AbstractDbmsVo.DBMS_KAFKA_TRANS_HS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                stat.setHostName(TsiTpmsManager.getInstance().getKafkaTransVo().getHostName()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                stat.setStatus(0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                stat.setSendTm(TimeUnit.MICROSECONDS.convert(Math.abs(recvNanoTime - sendNanoTime), TimeUnit.NANOSECONDS)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                stat.setRecvTm(0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                tsiCvimDbmsService.add(stat, (int)Thread.currentThread().getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                log.error("send ping failed: {}, {}, {}", sendNanoTime, TimeUtils.elapsedTimeStr(recvNanoTime - sendNanoTime), ex.getMessage()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-// 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                // 카프카 전송 오류 알람 저장 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                String value = "Send Failed"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                if (ex != null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                    value = ex.getMessage().substring(0, 99); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                alarm.setAlarmCode(TsiAlarmConfigVo.KAFKA_01); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                alarm.setAlarmTarget(producerConfig.getBootstrapServers()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                alarm.setAlarmValue(value); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                tsiCvimDbmsService.add(alarm, (int)Thread.currentThread().getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                log.error("sendSig: {}, {}: {}", KafkaConfig.SIG_ALL_TOPIC, key, e.toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     public void sendNode(String key, byte[] data) { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -145,26 +74,4 @@ public class KafkaProducerService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    protected void send(KafkaTemplate<String, byte[]> kafka, String topic, String key, byte[] data) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            kafka.send(topic, key, data); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        catch(Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            log.error("kafka.send: {}, Exception: {}", topic, e.getMessage()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private static class ProducerResultCallback implements Callback { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        @Override 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (e != null) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                log.error("Error while producing message to topic: {}, {}", recordMetadata, e.toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                System.out.println(message); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 |