WebSocketSessionClient.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package com.tsi.app.server.websocket;
  2. import com.tsi.app.server.repository.TsiKafkaConsumerManager;
  3. import com.tsi.app.server.vo.TsiKafkaVo;
  4. import lombok.Data;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.web.socket.TextMessage;
  7. import org.springframework.web.socket.WebSocketSession;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. @Slf4j
  11. @Data
  12. public class WebSocketSessionClient {
  13. private WebsocketHandler websocketHandler;
  14. private WebSocketSession session;
  15. private String groupId;
  16. private List<String> topics = new ArrayList<>();
  17. private List<String> threadTopics = new ArrayList<>();
  18. private SessionConsumerThread consumerThread = null;
  19. private Thread thread;
  20. public WebSocketSessionClient(WebsocketHandler websocketHandler, WebSocketSession session) {
  21. this.websocketHandler = websocketHandler;
  22. this.session = session;
  23. }
  24. public void delSubscription(List<String> topics, boolean isExit) {
  25. if (topics == null) {
  26. return;
  27. }
  28. this.threadTopics = new ArrayList<>();
  29. for (String topic: topics) {
  30. boolean remain = true;
  31. TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic));
  32. if (kafkaVo != null) {
  33. if (kafkaVo.getNodeConsumerThread() != null) {
  34. kafkaVo.getNodeConsumerThread().removeSession(this.session);
  35. remain = false;
  36. log.info("delSubscription: {}, consumer: {}, session: {}", topic, kafkaVo.getNodeConsumerThread(), session.toString());
  37. }
  38. }
  39. if (remain) {
  40. this.threadTopics.add(topic);
  41. }
  42. }
  43. if (this.consumerThread != null && this.threadTopics.size() > 0) {
  44. if (isExit) {
  45. this.threadTopics = new ArrayList<>();
  46. }
  47. this.consumerThread.setTopics(this.threadTopics);
  48. this.consumerThread.getUpdateTopics().set(true);
  49. }
  50. }
  51. public void addSubscription(List<String> topics) {
  52. if (topics == null) {
  53. return;
  54. }
  55. long currMilliseconds = System.currentTimeMillis();
  56. this.threadTopics = new ArrayList<>();
  57. for (String topic: topics) {
  58. boolean remain = true;
  59. TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic));
  60. if (kafkaVo != null) {
  61. // 등록된 교차로인 경우 일단 요청해 놓는다.
  62. if (kafkaVo.getNodeConsumerThread() != null) {
  63. kafkaVo.getNodeConsumerThread().addSession(this.session);
  64. log.info("addSubscription: {}, recvTm: {}, consumer: {}, session: {}", topic, currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime(), kafkaVo.getNodeConsumerThread(), session.toString());
  65. // 통신이 OffLine 인 경우 offline 메시지 전송
  66. if (currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime() > 5000) {
  67. try {
  68. this.websocketHandler.sendMessage(this.session, topic, new TextMessage(topic + ":offline"));
  69. } catch (Exception e) {
  70. //
  71. }
  72. }
  73. }
  74. }
  75. if (remain) {
  76. this.threadTopics.add(topic);
  77. }
  78. }
  79. if (this.threadTopics.size() > 0) {
  80. if (this.consumerThread == null) {
  81. this.consumerThread = new SessionConsumerThread(this.websocketHandler, this.session);
  82. this.thread = new Thread(this.consumerThread);
  83. this.thread.setUncaughtExceptionHandler(
  84. new Thread.UncaughtExceptionHandler() {
  85. @Override
  86. public void uncaughtException(Thread th, Throwable ex) {
  87. log.error("Uncaught exception: {}, {}", groupId, ex.getMessage());
  88. }
  89. }
  90. );
  91. this.thread.start();
  92. }
  93. this.consumerThread.setTopics(this.threadTopics);
  94. this.consumerThread.getUpdateTopics().set(true);
  95. }
  96. }
  97. public void subscribe(List<String> topics) {
  98. if (topics.size() == 1 && topics.get(0).equals("x")) {
  99. // 세션 종료 메시지 안 경우임. 모든 요청 토픽 삭제
  100. delSubscription(this.topics, true);
  101. this.topics = new ArrayList<>();
  102. }
  103. else {
  104. List<String> delTopics = new ArrayList<>();
  105. List<String> newTopics = new ArrayList<>();
  106. // 지금 요청중인 토픽 중에서 이번 요청에 포함되지 않은 토픽은 삭제한다.
  107. for (String topic: this.topics) {
  108. if (!topics.contains(topic)) {
  109. delTopics.add(topic);
  110. }
  111. }
  112. if (delTopics.size() > 0) {
  113. delSubscription(delTopics, false);
  114. }
  115. // 이번 요청 토픽중에 지금 요청중이 아닌 경우에만 새롭게 요청한다.
  116. for (String topic: topics) {
  117. if (!this.topics.contains(topic)) {
  118. newTopics.add(topic);
  119. }
  120. }
  121. if (newTopics.size() > 0) {
  122. addSubscription(newTopics);
  123. }
  124. this.topics = topics;
  125. }
  126. }
  127. public void start(String groupId) {
  128. this.groupId = groupId;
  129. }
  130. public void stop() {
  131. delSubscription(this.topics, true);
  132. if (this.consumerThread != null) {
  133. this.consumerThread.stop();
  134. }
  135. }
  136. }