1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- package com.ggits.comm.server.kafka;
- import com.ggits.app.common.kafka.KafkaProducerFactory;
- import com.ggits.comm.server.config.KafkaConfig;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- import javax.annotation.PostConstruct;
- @Slf4j
- @AllArgsConstructor
- @Service
- public class KafkaProducerService {
- private final KafkaConfig config;
- private KafkaTemplate<String, byte[]> sigProducer;
- private KafkaTemplate<String, byte[]> nodeProducer;
- @PostConstruct
- void init() {
- //this.callback = new ProducerResultCallback();
- // 동일한 KafkaTemplate 를 사용 한다.
- KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
- if (this.config.isEnableNode()) {
- this.nodeProducer = producer;
- }
- if (this.config.isEnableSig()) {
- this.sigProducer = producer;
- }
- log.info("[{}] ------------------", this.getClass().getSimpleName());
- log.info("[{}] nodeProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableNode(), this.nodeProducer);
- log.info("[{}] sigProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableSig(), this.sigProducer);
- //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
- }
- public void shutdown() {
- try {
- if (this.nodeProducer != null) {
- this.nodeProducer.destroy();
- }
- if (this.sigProducer != null) {
- this.sigProducer.destroy();
- }
- }
- catch(Exception e) {
- log.error("Failed to shutdown: {}", e.getMessage());
- }
- }
- public void sendSig(String key, byte[] data) {
- if (this.sigProducer != null) {
- try {
- this.sigProducer.send(KafkaConfig.SIG_ALL_TOPIC, key, data);
- }
- catch (Exception e) {
- log.error("sendSig: {}, {}: {}", KafkaConfig.SIG_ALL_TOPIC, key, e.toString());
- }
- }
- }
- public void sendNode(String key, byte[] data) {
- if (this.nodeProducer != null) {
- try {
- this.nodeProducer.send(key, key, data);
- }
- catch (Exception e) {
- log.error("sendNode: {}, {}: {}", key, key, e.toString());
- }
- }
- }
- }
|