|
@@ -0,0 +1,126 @@
|
|
|
|
|
+package com.evps.simulator.kafka.producer.kafka;
|
|
|
|
|
+
|
|
|
|
|
+import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
|
|
+import org.apache.kafka.common.serialization.StringSerializer;
|
|
|
|
|
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
|
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
|
+import org.springframework.kafka.support.serializer.JsonSerializer;
|
|
|
|
|
+
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.Properties;
|
|
|
|
|
+
|
|
|
|
|
+public class KafkaProducerFactory {
|
|
|
|
|
+
|
|
|
|
|
+ private KafkaProducerFactory() {
|
|
|
|
|
+ throw new IllegalStateException("KafkaProducerFactory class");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static <K,V> KafkaTemplate<K, V> createJsonTemplate(String bootstrapServers, List<Map<String, String>> props) {
|
|
|
|
|
+ Map<String, Object> configs = new HashMap<String, Object>();
|
|
|
|
|
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
|
|
+ configs.put("enable.idempotence", false);
|
|
|
|
|
+ configs.put(ProducerConfig.ACKS_CONFIG, "0");
|
|
|
|
|
+ configs.put(ProducerConfig.RETRIES_CONFIG, 0);
|
|
|
|
|
+ configs.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
|
|
|
|
+ configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
|
|
|
|
|
+ configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
|
|
|
|
|
+ //configs.put("queue.buffering.max.messages", 10000000);
|
|
|
|
|
+ //configs.put("queue.buffering.max.kbytes", 2147483647);
|
|
|
|
|
+ //configs.put("queue.buffering.max.ms", 0);
|
|
|
|
|
+ //configs.put("api.version.request", false);
|
|
|
|
|
+ configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5000);
|
|
|
|
|
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
|
|
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
|
|
|
|
+
|
|
|
|
|
+ for (Map<String, String> prop : props) {
|
|
|
|
|
+ for (Map.Entry<String, String> elem : prop.entrySet()) {
|
|
|
|
|
+ String key = elem.getKey();
|
|
|
|
|
+ String val = elem.getValue();
|
|
|
|
|
+ if (val != null) {
|
|
|
|
|
+ if (val.equals("true") || val.equals("false")) {
|
|
|
|
|
+ configs.put(key, val.equals("true"));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ configs.put(key, val);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configs);
|
|
|
|
|
+ return new KafkaTemplate<>(defaultKafkaProducerFactory);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static <K,V> KafkaTemplate<K, V> createByteArrayTemplate(String bootstrapServers, List<Map<String, String>> props) {
|
|
|
|
|
+ Map<String, Object> configs = new HashMap<String, Object>();
|
|
|
|
|
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
|
|
+ configs.put("enable.idempotence", false);
|
|
|
|
|
+ configs.put(ProducerConfig.ACKS_CONFIG, "0");
|
|
|
|
|
+ configs.put(ProducerConfig.RETRIES_CONFIG, 0);
|
|
|
|
|
+ configs.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
|
|
|
|
+ configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
|
|
|
|
|
+ configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
|
|
|
|
|
+ //configs.put("queue.buffering.max.messages", 10000000);
|
|
|
|
|
+ //configs.put("queue.buffering.max.kbytes", 2147483647);
|
|
|
|
|
+ //configs.put("queue.buffering.max.ms", 0);
|
|
|
|
|
+ //configs.put("api.version.request", false);
|
|
|
|
|
+ configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5000);
|
|
|
|
|
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
|
|
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class);
|
|
|
|
|
+
|
|
|
|
|
+ for (Map<String, String> prop : props) {
|
|
|
|
|
+ for (Map.Entry<String, String> elem : prop.entrySet()) {
|
|
|
|
|
+ String key = elem.getKey();
|
|
|
|
|
+ String val = elem.getValue();
|
|
|
|
|
+ if (val != null) {
|
|
|
|
|
+ if (val.equals("true") || val.equals("false")) {
|
|
|
|
|
+ configs.put(key, val.equals("true"));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ configs.put(key, val);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<K, V>(configs);
|
|
|
|
|
+ return new KafkaTemplate<>(defaultKafkaProducerFactory);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static <K,V> KafkaTemplate<K, V> createProducerTemplate(Map<String, Object> props) {
|
|
|
|
|
+ DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
|
|
|
|
|
+ return new KafkaTemplate<>(defaultKafkaProducerFactory);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static Properties getProperties(String bootstrapServers, List<Map<String, String>> props) {
|
|
|
|
|
+ Properties properties = new Properties();
|
|
|
|
|
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
|
|
+ properties.put("enable.idempotence", false);
|
|
|
|
|
+ properties.put(ProducerConfig.ACKS_CONFIG, "0");
|
|
|
|
|
+ properties.put(ProducerConfig.RETRIES_CONFIG, 0);
|
|
|
|
|
+ properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
|
|
|
|
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
|
|
|
|
|
+ properties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 4000);
|
|
|
|
|
+ //properties.put("queue.buffering.max.messages", 10000000);
|
|
|
|
|
+ //properties.put("queue.buffering.max.kbytes", 2147483647);
|
|
|
|
|
+ //properties.put("queue.buffering.max.ms", 0);
|
|
|
|
|
+ //properties.put("api.version.request", false);
|
|
|
|
|
+ properties.put("transaction.timeout.ms", 5000);
|
|
|
|
|
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
|
|
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class);
|
|
|
|
|
+ for (Map<String, String> prop : props) {
|
|
|
|
|
+ for (Map.Entry<String, String> elem : prop.entrySet()) {
|
|
|
|
|
+ String key = elem.getKey();
|
|
|
|
|
+ String val = elem.getValue();
|
|
|
|
|
+ if (val != null) {
|
|
|
|
|
+ if (val.equals("true") || val.equals("false")) {
|
|
|
|
|
+ properties.put(key, val.equals("true"));
|
|
|
|
|
+ } else {
|
|
|
|
|
+ properties.put(key, val);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return properties ;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|