KafkaProducerService.java 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package com.ggits.comm.server.kafka;
  2. import com.ggits.app.common.kafka.KafkaProducerFactory;
  3. import com.ggits.comm.server.config.KafkaConfig;
  4. import lombok.AllArgsConstructor;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Service;
  8. import javax.annotation.PostConstruct;
  9. @Slf4j
  10. @AllArgsConstructor
  11. @Service
  12. public class KafkaProducerService {
  13. private final KafkaConfig config;
  14. private KafkaTemplate<String, byte[]> sigProducer;
  15. private KafkaTemplate<String, byte[]> nodeProducer;
  16. @PostConstruct
  17. void init() {
  18. //this.callback = new ProducerResultCallback();
  19. // 동일한 KafkaTemplate 를 사용 한다.
  20. KafkaTemplate<String, byte[]> producer = KafkaProducerFactory.createByteArrayTemplate(this.config.getBootstrapServers(), this.config.props);
  21. if (this.config.isEnableNode()) {
  22. this.nodeProducer = producer;
  23. }
  24. if (this.config.isEnableSig()) {
  25. this.sigProducer = producer;
  26. }
  27. log.info("[{}] ------------------", this.getClass().getSimpleName());
  28. log.info("[{}] nodeProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableNode(), this.nodeProducer);
  29. log.info("[{}] sigProducer: {}, {}", this.getClass().getSimpleName(), this.config.isEnableSig(), this.sigProducer);
  30. //this.producer = new KafkaProducer<String, byte[]>(KafkaProducerFactory.getProperties(this.config.getBootstrapServers(), this.config.props));
  31. }
  32. public void shutdown() {
  33. try {
  34. if (this.nodeProducer != null) {
  35. this.nodeProducer.destroy();
  36. }
  37. if (this.sigProducer != null) {
  38. this.sigProducer.destroy();
  39. }
  40. }
  41. catch(Exception e) {
  42. log.error("Failed to shutdown: {}", e.getMessage());
  43. }
  44. }
  45. public void sendSig(String key, byte[] data) {
  46. if (this.sigProducer != null) {
  47. try {
  48. this.sigProducer.send(KafkaConfig.SIG_ALL_TOPIC, key, data);
  49. }
  50. catch (Exception e) {
  51. log.error("sendSig: {}, {}: {}", KafkaConfig.SIG_ALL_TOPIC, key, e.toString());
  52. }
  53. }
  54. }
  55. public void sendNode(String key, byte[] data) {
  56. if (this.nodeProducer != null) {
  57. try {
  58. this.nodeProducer.send(key, key, data);
  59. }
  60. catch (Exception e) {
  61. log.error("sendNode: {}, {}: {}", key, key, e.toString());
  62. }
  63. }
  64. }
  65. }