|
@@ -1,145 +0,0 @@
|
|
-package com.its.api.websocket;
|
|
|
|
-
|
|
|
|
-import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
-import lombok.Getter;
|
|
|
|
-import lombok.Setter;
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
|
-import org.springframework.web.socket.WebSocketSession;
|
|
|
|
-
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
-
|
|
|
|
-@Slf4j
|
|
|
|
-@Getter
|
|
|
|
-@Setter
|
|
|
|
-public class SessionConsumerThread implements Runnable {
|
|
|
|
-
|
|
|
|
- private AtomicBoolean stopFlag = new AtomicBoolean(false);
|
|
|
|
- private AtomicBoolean updateTopics = new AtomicBoolean(false);
|
|
|
|
-
|
|
|
|
- private final WebsocketHandler websocketHandler;
|
|
|
|
- private final WebSocketSession session;
|
|
|
|
- private String groupId;
|
|
|
|
- private List<String> topics;
|
|
|
|
- //private KafkaConsumer<String, byte[]> consumer;
|
|
|
|
- private ObjectMapper mapper;
|
|
|
|
-
|
|
|
|
- public SessionConsumerThread(WebsocketHandler websocketHandler, WebSocketSession session) {
|
|
|
|
- this.websocketHandler = websocketHandler;
|
|
|
|
- this.session = session;
|
|
|
|
-
|
|
|
|
- this.topics = new CopyOnWriteArrayList();
|
|
|
|
- this.mapper = new ObjectMapper();
|
|
|
|
-
|
|
|
|
- this.groupId = this.session.getId();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /*public List<String> formatPartitions(Collection<TopicPartition> partitions) {
|
|
|
|
- return partitions.stream().map(topicPartition ->
|
|
|
|
- String.format("\ntopic: %s, partition: %s", topicPartition.topic(), topicPartition.partition()))
|
|
|
|
- .collect(Collectors.toList());
|
|
|
|
- }*/
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
-
|
|
|
|
- log.info("SessionConsumerThread start: {}", this.groupId);
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- boolean unsubscribe = false;
|
|
|
|
- while (!this.stopFlag.get() && (!Thread.currentThread().isInterrupted())) {
|
|
|
|
-/*
|
|
|
|
- if (this.topics.size() == 0) {
|
|
|
|
- try {
|
|
|
|
- if (unsubscribe) {
|
|
|
|
- this.consumer.unsubscribe();
|
|
|
|
- unsubscribe = false;
|
|
|
|
- }
|
|
|
|
- Thread.sleep(100);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- }
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (this.consumer == null) {
|
|
|
|
- TsiKafkaConsumerConfig config = (TsiKafkaConsumerConfig) AppUtils.getBean(TsiKafkaConsumerConfig.class);
|
|
|
|
- this.consumer = new KafkaConsumer<>(config.getConsumerProperties(this.groupId));
|
|
|
|
- log.info("Kafka Consumer Create: {}", this.consumer);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (this.updateTopics.get()) {
|
|
|
|
- this.updateTopics.set(false);
|
|
|
|
-
|
|
|
|
- log.info("SessionConsumerThread Subscribe Start: {}, {}", this.groupId, this.topics);
|
|
|
|
- *//*this.consumer.unsubscribe();
|
|
|
|
-
|
|
|
|
- this.consumer.subscribe(this.topics);
|
|
|
|
- Set<TopicPartition> assignment = new HashSet<>();
|
|
|
|
- while (assignment.size() == 0) {
|
|
|
|
- consumer.poll(Duration.ofMillis(100));
|
|
|
|
- assignment = consumer.assignment();
|
|
|
|
- }
|
|
|
|
- log.info("SessionConsumerThread Subscribe Partitions: {} EA", assignment.size());
|
|
|
|
- consumer.seekToEnd(assignment);
|
|
|
|
-*//*
|
|
|
|
- this.consumer.subscribe(this.topics,
|
|
|
|
- new ConsumerRebalanceListener() {
|
|
|
|
- @Override
|
|
|
|
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
|
|
|
- log.info("onPartitionsRevoked - consumerName: {}, partitions: {}", topics.toString(), formatPartitions(partitions));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
|
|
|
- log.info("onPartitionsAssigned - consumerName: {}, partitions: {}", topics.toString(), formatPartitions(partitions));
|
|
|
|
- consumer.seekToEnd(partitions);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- log.info("SessionConsumerThread Subscribe ..End: {}, {}", this.groupId, this.topics);
|
|
|
|
- unsubscribe = true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ConsumerRecords<String, byte[]> records = this.consumer.poll(Duration.ofMillis(100));
|
|
|
|
- for (ConsumerRecord<String, byte[]> record : records) {
|
|
|
|
- try {
|
|
|
|
- CpuNodeStatusDTO status = TsiCpuNodePacket.getNodeStatusDTO(Long.parseLong(record.key()), record.value());
|
|
|
|
- if (status != null) {
|
|
|
|
- try {
|
|
|
|
- String jsonInString = this.mapper.writeValueAsString(status);
|
|
|
|
- try {
|
|
|
|
- this.websocketHandler.sendMessage(this.session, record.key(), new TextMessage(jsonInString));
|
|
|
|
- log.info("Send to: {}, {}, {} bytes.", this.session.getRemoteAddress().getAddress(), record.key(), jsonInString.length());
|
|
|
|
- }
|
|
|
|
- catch(Exception e) {
|
|
|
|
- log.error("Send Failed: {}, {}, {} bytes.", this.session, record.key(), jsonInString.length());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- catch(JsonProcessingException e) {
|
|
|
|
- log.error("ConsumerThread Json parsing Exception: {}, {}", this.topics.toString(), e.getMessage());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("SessionConsumerThread Exception: {}, {}, {}", this.topics.toString(), this.session.toString(), e.getMessage());
|
|
|
|
- }
|
|
|
|
- }*/
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- log.info("SessionConsumerThread: {}, {}, {}, stopped.", this.groupId, this.topics.toString(), this.session.toString());
|
|
|
|
- }
|
|
|
|
- catch(Exception e) {
|
|
|
|
- log.error("SessionConsumerThread Wakeup Exception: {}, {}", this.groupId, e.getMessage());
|
|
|
|
- }
|
|
|
|
- finally {
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void stop() {
|
|
|
|
- this.stopFlag.set(true);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void shutdown() {
|
|
|
|
- //this.consumer.wakeup();
|
|
|
|
- }
|
|
|
|
-}
|
|
|