|
@@ -1,6 +1,7 @@
|
|
|
package com.evps.consumer.service;
|
|
|
|
|
|
-import com.evps.common.kafka.dto.*;
|
|
|
+import com.evps.common.kafka.dto.EvpsKafkaConst;
|
|
|
+import com.evps.common.kafka.dto.KafkaEvpsData;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.Consumer;
|
|
@@ -16,44 +17,23 @@ import org.springframework.kafka.support.serializer.JsonDeserializer;
|
|
|
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
|
|
|
@Slf4j
|
|
|
@RequiredArgsConstructor
|
|
|
public class KafkaConsumerService {
|
|
|
|
|
|
- private final String EVPS_SERVICE_TOPIC = "evps-service";
|
|
|
- private final String EVPS_ROUTE_TOPIC = "evps-route";
|
|
|
- private final String EVPS_NODE_TOPIC = "evps-node";
|
|
|
- private final String EVPS_PHASE_TOPIC = "evps-phase";
|
|
|
- private final String EVPS_SIGNAL_TOPIC = "evps-signal";
|
|
|
- private final String EVPS_EVENT_TOPIC = "evps-event";
|
|
|
-
|
|
|
private final String bootstrapServers;
|
|
|
private final String groupId;
|
|
|
|
|
|
private ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer;
|
|
|
|
|
|
- private ConcurrentMessageListenerContainer<String, KafkaEvpsServiceDto> serviceListenerContainer;
|
|
|
- private ConcurrentMessageListenerContainer<String, KafkaEvpsRouteDto> routeListenerContainer;
|
|
|
- private ConcurrentMessageListenerContainer<String, KafkaEvpsNodeDto> nodeListenerContainer;
|
|
|
- private ConcurrentMessageListenerContainer<String, KafkaEvpsPhaseDto> phaseListenerContainer;
|
|
|
- private ConcurrentMessageListenerContainer<String, KafkaEvpsSignalDto> signalListenerContainer;
|
|
|
- private ConcurrentMessageListenerContainer<String, KafkaEvpsEventDto> eventListenerContainer;
|
|
|
+ private ConcurrentMessageListenerContainer<String, KafkaEvpsData> uticEvpsListenerContainer;
|
|
|
|
|
|
public void start() {
|
|
|
log.info("Starting Kafka Consumer: bootstrapServers: {}, group: {}", this.bootstrapServers, this.groupId);
|
|
|
|
|
|
- HashSet<String> topics = new HashSet<>();
|
|
|
- topics.add(EVPS_SERVICE_TOPIC);
|
|
|
- topics.add(EVPS_ROUTE_TOPIC);
|
|
|
- topics.add(EVPS_NODE_TOPIC);
|
|
|
- topics.add(EVPS_PHASE_TOPIC);
|
|
|
- topics.add(EVPS_SIGNAL_TOPIC);
|
|
|
- topics.add(EVPS_EVENT_TOPIC);
|
|
|
-
|
|
|
- ContainerProperties containerProperties = new ContainerProperties(topics.toArray(new String[0]));
|
|
|
+ ContainerProperties containerProperties = new ContainerProperties(EvpsKafkaConst.KAFKA_EVPS_TOPIC_NAME);
|
|
|
containerProperties.setGroupId(this.groupId+"Z");
|
|
|
containerProperties.setPollTimeout(5000);
|
|
|
//containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
|
|
@@ -72,115 +52,40 @@ public class KafkaConsumerService {
|
|
|
});
|
|
|
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerStringPropertiesMap());
|
|
|
this.kafkaListenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
|
|
|
- this.kafkaListenerContainer.setBeanName("consumer");
|
|
|
+ this.kafkaListenerContainer.setBeanName("uticEvpsListenerContainer");
|
|
|
this.kafkaListenerContainer.setConcurrency(1);
|
|
|
this.kafkaListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
- log.error("kafkaListenerContainer error: {}", thrownException.getMessage());
|
|
|
+ log.error("uticEvpsListenerContainer error: {}", thrownException.getMessage());
|
|
|
this.kafkaListenerContainer.stop();
|
|
|
+ this.kafkaListenerContainer.start();
|
|
|
});
|
|
|
this.kafkaListenerContainer.start();
|
|
|
|
|
|
-
|
|
|
- ContainerProperties serviceContainerProperties = new ContainerProperties(EVPS_SERVICE_TOPIC);
|
|
|
- serviceContainerProperties.setGroupId(this.groupId);
|
|
|
- serviceContainerProperties.setPollTimeout(5000);
|
|
|
- serviceContainerProperties.setMessageListener(new KafkaEvpsServiceConsumerWorker());
|
|
|
- serviceContainerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
|
|
- @Override
|
|
|
- public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
- consumer.seekToEnd(partitions);
|
|
|
- }
|
|
|
- });
|
|
|
- ConsumerFactory<String, KafkaEvpsServiceDto> serviceFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap(), new StringDeserializer(), new JsonDeserializer<>(KafkaEvpsServiceDto.class, false));
|
|
|
- this.serviceListenerContainer = new ConcurrentMessageListenerContainer<>(serviceFactory, serviceContainerProperties);
|
|
|
- this.serviceListenerContainer.setBeanName("serviceListenerContainer");
|
|
|
- this.serviceListenerContainer.setConcurrency(1);
|
|
|
- this.serviceListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
- log.error("serviceListenerContainer error: {}", thrownException.getMessage());
|
|
|
- this.serviceListenerContainer.stop();
|
|
|
- });
|
|
|
- this.serviceListenerContainer.start();
|
|
|
-
|
|
|
-
|
|
|
-// ContainerProperties routeContainerProperties = new ContainerProperties(EVPS_ROUTE_TOPIC);
|
|
|
-// routeContainerProperties.setGroupId(this.groupId);
|
|
|
-// routeContainerProperties.setPollTimeout(5000);
|
|
|
-// routeContainerProperties.setMessageListener(new KafkaEvpsRouteConsumerWorker());
|
|
|
-// ConsumerFactory<String, KafkaEvpsRouteDto> routeFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
|
|
|
-// this.routeListenerContainer = new ConcurrentMessageListenerContainer<>(routeFactory, routeContainerProperties);
|
|
|
-// this.routeListenerContainer.setBeanName("routeListenerContainer");
|
|
|
-// this.routeListenerContainer.setConcurrency(1);
|
|
|
-// this.routeListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
-// log.error("routeListenerContainer error: {}", thrownException.getMessage());
|
|
|
-// this.routeListenerContainer.stop();
|
|
|
+// ContainerProperties containerProperties = new ContainerProperties(EvpsKafkaConst.KAFKA_EVPS_TOPIC_NAME);
|
|
|
+// containerProperties.setGroupId(this.groupId);
|
|
|
+// containerProperties.setPollTimeout(5000);
|
|
|
+// containerProperties.setMessageListener(new KafkaUticEvpsConsumerWorker());
|
|
|
+// containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
|
|
+// @Override
|
|
|
+// public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
+// }
|
|
|
+// @Override
|
|
|
+// public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
+// }
|
|
|
+// @Override
|
|
|
+// public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
+// consumer.seekToEnd(partitions);
|
|
|
+// }
|
|
|
// });
|
|
|
-// this.routeListenerContainer.start();
|
|
|
-//
|
|
|
-//
|
|
|
- ContainerProperties nodeContainerProperties = new ContainerProperties(EVPS_NODE_TOPIC);
|
|
|
- nodeContainerProperties.setGroupId(this.groupId);
|
|
|
- nodeContainerProperties.setPollTimeout(5000);
|
|
|
- nodeContainerProperties.setMessageListener(new KafkaEvpsNodeConsumerWorker());
|
|
|
- ConsumerFactory<String, KafkaEvpsNodeDto> nodeFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
|
|
|
- this.nodeListenerContainer = new ConcurrentMessageListenerContainer<>(nodeFactory, nodeContainerProperties);
|
|
|
- this.nodeListenerContainer.setBeanName("nodeListenerContainer");
|
|
|
- this.nodeListenerContainer.setConcurrency(1);
|
|
|
- this.nodeListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
- log.error("nodeListenerContainer error: {}", thrownException.getMessage());
|
|
|
- this.nodeListenerContainer.stop();
|
|
|
- });
|
|
|
- this.nodeListenerContainer.start();
|
|
|
-//
|
|
|
-//
|
|
|
-// ContainerProperties phaseContainerProperties = new ContainerProperties(EVPS_PHASE_TOPIC);
|
|
|
-// phaseContainerProperties.setGroupId(this.groupId);
|
|
|
-// phaseContainerProperties.setPollTimeout(5000);
|
|
|
-// phaseContainerProperties.setMessageListener(new KafkaEvpsPhaseConsumerWorker());
|
|
|
-// ConsumerFactory<String, KafkaEvpsPhaseDto> phaseFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
|
|
|
-// this.phaseListenerContainer = new ConcurrentMessageListenerContainer<>(phaseFactory, phaseContainerProperties);
|
|
|
-// this.phaseListenerContainer.setBeanName("phaseListenerContainer");
|
|
|
-// this.phaseListenerContainer.setConcurrency(1);
|
|
|
-// this.phaseListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
-// log.error("phaseListenerContainer error: {}", thrownException.getMessage());
|
|
|
-// this.phaseListenerContainer.stop();
|
|
|
+// ConsumerFactory<String, KafkaEvpsData> serviceFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap(), new StringDeserializer(), new JsonDeserializer<>(KafkaEvpsData.class, false));
|
|
|
+// this.uticEvpsListenerContainer = new ConcurrentMessageListenerContainer<>(serviceFactory, containerProperties);
|
|
|
+// this.uticEvpsListenerContainer.setBeanName("uticEvpsListenerContainer");
|
|
|
+// this.uticEvpsListenerContainer.setConcurrency(1);
|
|
|
+// this.uticEvpsListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
+// log.error("uticEvpsListenerContainer error: {}", thrownException.getMessage());
|
|
|
+// this.uticEvpsListenerContainer.stop();
|
|
|
// });
|
|
|
-// this.phaseListenerContainer.start();
|
|
|
-//
|
|
|
-//
|
|
|
- ContainerProperties signalContainerProperties = new ContainerProperties(EVPS_SIGNAL_TOPIC);
|
|
|
- signalContainerProperties.setGroupId(this.groupId);
|
|
|
- signalContainerProperties.setPollTimeout(5000);
|
|
|
- signalContainerProperties.setMessageListener(new KafkaEvpsSignalConsumerWorker());
|
|
|
- ConsumerFactory<String, KafkaEvpsSignalDto> signalFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
|
|
|
- this.signalListenerContainer = new ConcurrentMessageListenerContainer<>(signalFactory, signalContainerProperties);
|
|
|
- this.signalListenerContainer.setBeanName("signalListenerContainer");
|
|
|
- this.signalListenerContainer.setConcurrency(1);
|
|
|
- this.signalListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
- log.error("signalListenerContainer error: {}", thrownException.getMessage());
|
|
|
- this.signalListenerContainer.stop();
|
|
|
- });
|
|
|
- this.signalListenerContainer.start();
|
|
|
-
|
|
|
-
|
|
|
- ContainerProperties eventContainerProperties = new ContainerProperties(EVPS_EVENT_TOPIC);
|
|
|
- eventContainerProperties.setGroupId(this.groupId);
|
|
|
- eventContainerProperties.setPollTimeout(5000);
|
|
|
- eventContainerProperties.setMessageListener(new KafkaEvpsEventConsumerWorker());
|
|
|
- ConsumerFactory<String, KafkaEvpsEventDto> eventFactory = new DefaultKafkaConsumerFactory<>(getConsumerPropertiesMap());
|
|
|
- this.eventListenerContainer = new ConcurrentMessageListenerContainer<>(eventFactory, eventContainerProperties);
|
|
|
- this.eventListenerContainer.setBeanName("eventListenerContainer");
|
|
|
- this.eventListenerContainer.setConcurrency(1);
|
|
|
- this.eventListenerContainer.setErrorHandler((thrownException, data) -> {
|
|
|
- log.error("eventListenerContainer error: {}", thrownException.getMessage());
|
|
|
- this.eventListenerContainer.stop();
|
|
|
- });
|
|
|
- this.eventListenerContainer.start();
|
|
|
+// this.uticEvpsListenerContainer.start();
|
|
|
}
|
|
|
|
|
|
public Map<String, Object> getConsumerPropertiesMap() {
|
|
@@ -220,23 +125,8 @@ public class KafkaConsumerService {
|
|
|
}
|
|
|
public void shutdown() {
|
|
|
try {
|
|
|
- if (this.serviceListenerContainer != null) {
|
|
|
- this.serviceListenerContainer.stop();
|
|
|
- }
|
|
|
- if (this.routeListenerContainer != null) {
|
|
|
- this.routeListenerContainer.stop();
|
|
|
- }
|
|
|
- if (this.phaseListenerContainer != null) {
|
|
|
- this.phaseListenerContainer.stop();
|
|
|
- }
|
|
|
- if (this.nodeListenerContainer != null) {
|
|
|
- this.nodeListenerContainer.stop();
|
|
|
- }
|
|
|
- if (this.signalListenerContainer != null) {
|
|
|
- this.signalListenerContainer.stop();
|
|
|
- }
|
|
|
- if (this.eventListenerContainer != null) {
|
|
|
- this.eventListenerContainer.stop();
|
|
|
+ if (this.uticEvpsListenerContainer != null) {
|
|
|
+ this.uticEvpsListenerContainer.stop();
|
|
|
}
|
|
|
}
|
|
|
catch(Exception ignored) {
|