123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- package com.tsi.app.server.websocket;
- import com.tsi.app.server.repository.TsiKafkaConsumerManager;
- import com.tsi.app.server.vo.TsiKafkaVo;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.web.socket.TextMessage;
- import org.springframework.web.socket.WebSocketSession;
- import java.util.ArrayList;
- import java.util.List;
- @Slf4j
- @Data
- public class WebSocketSessionClient {
- private WebsocketHandler websocketHandler;
- private WebSocketSession session;
- private String groupId;
- private List<String> topics = new ArrayList<>();
- private List<String> threadTopics = new ArrayList<>();
- private SessionConsumerThread consumerThread = null;
- private Thread thread;
- public WebSocketSessionClient(WebsocketHandler websocketHandler, WebSocketSession session) {
- this.websocketHandler = websocketHandler;
- this.session = session;
- }
- public void delSubscription(List<String> topics, boolean isExit) {
- if (topics == null) {
- return;
- }
- this.threadTopics = new ArrayList<>();
- for (String topic: topics) {
- boolean remain = true;
- TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic));
- if (kafkaVo != null) {
- if (kafkaVo.getNodeConsumerThread() != null) {
- kafkaVo.getNodeConsumerThread().removeSession(this.session);
- remain = false;
- log.info("delSubscription: {}, consumer: {}, session: {}", topic, kafkaVo.getNodeConsumerThread(), session.toString());
- }
- }
- if (remain) {
- this.threadTopics.add(topic);
- }
- }
- if (this.consumerThread != null && this.threadTopics.size() > 0) {
- if (isExit) {
- this.threadTopics = new ArrayList<>();
- }
- this.consumerThread.setTopics(this.threadTopics);
- this.consumerThread.getUpdateTopics().set(true);
- }
- }
- public void addSubscription(List<String> topics) {
- if (topics == null) {
- return;
- }
- long currMilliseconds = System.currentTimeMillis();
- this.threadTopics = new ArrayList<>();
- for (String topic: topics) {
- boolean remain = true;
- TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic));
- if (kafkaVo != null) {
- // 등록된 교차로인 경우 일단 요청해 놓는다.
- if (kafkaVo.getNodeConsumerThread() != null) {
- kafkaVo.getNodeConsumerThread().addSession(this.session);
- log.info("addSubscription: {}, recvTm: {}, consumer: {}, session: {}", topic, currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime(), kafkaVo.getNodeConsumerThread(), session.toString());
- // 통신이 OffLine 인 경우 offline 메시지 전송
- if (currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime() > 5000) {
- try {
- this.websocketHandler.sendMessage(this.session, topic, new TextMessage(topic + ":offline"));
- } catch (Exception e) {
- //
- }
- }
- }
- }
- if (remain) {
- this.threadTopics.add(topic);
- }
- }
- if (this.threadTopics.size() > 0) {
- if (this.consumerThread == null) {
- this.consumerThread = new SessionConsumerThread(this.websocketHandler, this.session);
- this.thread = new Thread(this.consumerThread);
- this.thread.setUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread th, Throwable ex) {
- log.error("Uncaught exception: {}, {}", groupId, ex.getMessage());
- }
- }
- );
- this.thread.start();
- }
- this.consumerThread.setTopics(this.threadTopics);
- this.consumerThread.getUpdateTopics().set(true);
- }
- }
- public void subscribe(List<String> topics) {
- if (topics.size() == 1 && topics.get(0).equals("x")) {
- // 세션 종료 메시지 안 경우임. 모든 요청 토픽 삭제
- delSubscription(this.topics, true);
- this.topics = new ArrayList<>();
- }
- else {
- List<String> delTopics = new ArrayList<>();
- List<String> newTopics = new ArrayList<>();
- // 지금 요청중인 토픽 중에서 이번 요청에 포함되지 않은 토픽은 삭제한다.
- for (String topic: this.topics) {
- if (!topics.contains(topic)) {
- delTopics.add(topic);
- }
- }
- if (delTopics.size() > 0) {
- delSubscription(delTopics, false);
- }
- // 이번 요청 토픽중에 지금 요청중이 아닌 경우에만 새롭게 요청한다.
- for (String topic: topics) {
- if (!this.topics.contains(topic)) {
- newTopics.add(topic);
- }
- }
- if (newTopics.size() > 0) {
- addSubscription(newTopics);
- }
- this.topics = topics;
- }
- }
- public void start(String groupId) {
- this.groupId = groupId;
- }
- public void stop() {
- delSubscription(this.topics, true);
- if (this.consumerThread != null) {
- this.consumerThread.stop();
- }
- }
- }
|