package com.tsi.app.server.websocket; import com.tsi.app.server.repository.TsiKafkaConsumerManager; import com.tsi.app.server.vo.TsiKafkaVo; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.util.ArrayList; import java.util.List; @Slf4j @Data public class WebSocketSessionClient { private WebsocketHandler websocketHandler; private WebSocketSession session; private String groupId; private List topics = new ArrayList<>(); private List threadTopics = new ArrayList<>(); private SessionConsumerThread consumerThread = null; private Thread thread; public WebSocketSessionClient(WebsocketHandler websocketHandler, WebSocketSession session) { this.websocketHandler = websocketHandler; this.session = session; } public void delSubscription(List topics, boolean isExit) { if (topics == null) { return; } this.threadTopics = new ArrayList<>(); for (String topic: topics) { boolean remain = true; TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic)); if (kafkaVo != null) { if (kafkaVo.getNodeConsumerThread() != null) { kafkaVo.getNodeConsumerThread().removeSession(this.session); remain = false; log.info("delSubscription: {}, consumer: {}, session: {}", topic, kafkaVo.getNodeConsumerThread(), session.toString()); } } if (remain) { this.threadTopics.add(topic); } } if (this.consumerThread != null && this.threadTopics.size() > 0) { if (isExit) { this.threadTopics = new ArrayList<>(); } this.consumerThread.setTopics(this.threadTopics); this.consumerThread.getUpdateTopics().set(true); } } public void addSubscription(List topics) { if (topics == null) { return; } long currMilliseconds = System.currentTimeMillis(); this.threadTopics = new ArrayList<>(); for (String topic: topics) { boolean remain = true; TsiKafkaVo kafkaVo = TsiKafkaConsumerManager.getInstance().get(Long.parseLong(topic)); if (kafkaVo != null) { // 등록된 교차로인 경우 일단 요청해 놓는다. if (kafkaVo.getNodeConsumerThread() != null) { kafkaVo.getNodeConsumerThread().addSession(this.session); log.info("addSubscription: {}, recvTm: {}, consumer: {}, session: {}", topic, currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime(), kafkaVo.getNodeConsumerThread(), session.toString()); // 통신이 OffLine 인 경우 offline 메시지 전송 if (currMilliseconds - kafkaVo.getNodeConsumerThread().getRecvTime() > 5000) { try { this.websocketHandler.sendMessage(this.session, topic, new TextMessage(topic + ":offline")); } catch (Exception e) { // } } } } if (remain) { this.threadTopics.add(topic); } } if (this.threadTopics.size() > 0) { if (this.consumerThread == null) { this.consumerThread = new SessionConsumerThread(this.websocketHandler, this.session); this.thread = new Thread(this.consumerThread); this.thread.setUncaughtExceptionHandler( new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread th, Throwable ex) { log.error("Uncaught exception: {}, {}", groupId, ex.getMessage()); } } ); this.thread.start(); } this.consumerThread.setTopics(this.threadTopics); this.consumerThread.getUpdateTopics().set(true); } } public void subscribe(List topics) { if (topics.size() == 1 && topics.get(0).equals("x")) { // 세션 종료 메시지 안 경우임. 모든 요청 토픽 삭제 delSubscription(this.topics, true); this.topics = new ArrayList<>(); } else { List delTopics = new ArrayList<>(); List newTopics = new ArrayList<>(); // 지금 요청중인 토픽 중에서 이번 요청에 포함되지 않은 토픽은 삭제한다. for (String topic: this.topics) { if (!topics.contains(topic)) { delTopics.add(topic); } } if (delTopics.size() > 0) { delSubscription(delTopics, false); } // 이번 요청 토픽중에 지금 요청중이 아닌 경우에만 새롭게 요청한다. for (String topic: topics) { if (!this.topics.contains(topic)) { newTopics.add(topic); } } if (newTopics.size() > 0) { addSubscription(newTopics); } this.topics = topics; } } public void start(String groupId) { this.groupId = groupId; } public void stop() { delSubscription(this.topics, true); if (this.consumerThread != null) { this.consumerThread.stop(); } } }