package com.ggits.comm.server.kafka; import com.ggits.app.common.kafka.KafkaProducerFactory; import com.ggits.comm.server.config.KafkaConfig; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @Slf4j @AllArgsConstructor @Service public class KafkaProducerService { private final KafkaConfig config; private KafkaTemplate sigProducer; private KafkaTemplate nodeProducer; @PostConstruct void init() { //this.callback = new ProducerResultCallback(); // 동일한 KafkaTemplate 를 사용 한다. KafkaTemplate producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props); if (this.config.isEnableNode()) { this.nodeProducer = producer; } if (this.config.isEnableSig()) { this.sigProducer = producer; } log.info("[{}] ------------------", this.getClass().getSimpleName()); log.info("[{}] nodeProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableNode(), this.nodeProducer); log.info("[{}] sigProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableSig(), this.sigProducer); //this.producer = new KafkaProducer(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props)); } public void shutdown() { try { if (this.nodeProducer != null) { this.nodeProducer.destroy(); } if (this.sigProducer != null) { this.sigProducer.destroy(); } } catch(Exception e) { log.error("Failed to shutdown: {}", e.getMessage()); } } public void sendSig(String key, byte[] data) { if (this.sigProducer != null) { try { this.sigProducer.send(KafkaConfig.SIG_ALL_TOPIC, key, data); } catch (Exception e) { log.error("sendSig: {}, {}: {}", KafkaConfig.SIG_ALL_TOPIC, key, e.toString()); } } } public void sendNode(String key, byte[] data) { if (this.nodeProducer != null) { try { this.nodeProducer.send(key, key, data); } catch (Exception e) { log.error("sendNode: {}, {}: {}", key, key, e.toString()); } } } }