Browse Source

last before bytebuf use

shjung 1 week ago
parent
commit
9aff65daf0

+ 15 - 2
tsi-comm-server/src/main/java/com/tsi/comm/server/config/TsiCvimServerConfig.java

@@ -49,7 +49,7 @@ public class TsiCvimServerConfig extends NettyServerConfig {
 
         this.whiteListIpSet = getWhitelistIpSet();
 
-        log.info("[TsiCvimServerConfig] -------------------------");
+        log.info("[TsiCvimServerConfig] =======================================================");
         log.info("[TsiCvimServerConfig]              serverId: {}", this.serverId);
         log.info("[TsiCvimServerConfig]              MAX_CORE: {}", MAX_CORE);
         log.info("[TsiCvimServerConfig]         packetWorkers: {}", this.packetWorkers);
@@ -57,7 +57,20 @@ public class TsiCvimServerConfig extends NettyServerConfig {
         log.info("[TsiCvimServerConfig]           dbmsWorkers: {}", this.dbmsWorkers);
         log.info("[TsiCvimServerConfig]           checkPacket: {}", this.checkPacket);
         log.info("[TsiCvimServerConfig]        whiteListIpSet: {}", this.whiteListIpSet);
-        log.info("{}", super.toString());
+        log.info("[TsiCvimServerConfig] -------------------------");
+        log.info("[TsiCvimServerConfig]           bindingAddr: {}", this.bindingAddr);
+        log.info("[TsiCvimServerConfig]           bindingPort: {}", this.bindingPort);
+        log.info("[TsiCvimServerConfig]               backlog: {}", this.backlog);
+        log.info("[TsiCvimServerConfig]         acceptThreads: {}", this.acceptThreads);
+        log.info("[TsiCvimServerConfig]         workerThreads: {}", this.workerThreads);
+        log.info("[TsiCvimServerConfig]                rcvBuf: {}", this.rcvBuf);
+        log.info("[TsiCvimServerConfig]                sndBuf: {}", this.sndBuf);
+        log.info("[TsiCvimServerConfig] readerIdleTimeSeconds: {}", this.readerIdleTimeSeconds);
+        log.info("[TsiCvimServerConfig] writerIdleTimeSeconds: {}", this.writerIdleTimeSeconds);
+        log.info("[TsiCvimServerConfig]    allIdleTimeSeconds: {}", this.allIdleTimeSeconds);
+        log.info("[TsiCvimServerConfig] connectTimeoutSeconds: {}", this.connectTimeoutSeconds);
+        log.info("[TsiCvimServerConfig] =======================================================");
+
     }
 
     public Set<String> getWhitelistIpSet() {

+ 8 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/protocol/TsiCpuPacket.java

@@ -149,6 +149,14 @@ public class TsiCpuPacket extends AbstractTsiPacket {
     public static final byte CONNECT = 0x01;
     public static final byte DISCONNECT = 0x00;
 
+    // FOR Netty LengthFieldBasedFrameDecoder
+    public static final int MAX_FRAME_LENGTH = 2048;
+    public static final int LENGTH_FIELD_OFFSET = 2; // 길이 필드가 2번 인덱스에서 시작
+    public static final int LENGTH_FIELD_LENGTH = 2; // 길이 필드의 크기는 2바이트
+    public static final int LENGTH_ADJUSTMENT = -2; // 길이 필드 값에 더해야 할 조정값 (길이 필드 자체의 크기를 빼줌)
+    public static final int INITIAL_BYTES_TO_STRIP = 0; // 프레임에서 제거할 초기 바이트 수
+
+
     private Object obj;
     private int  length;
     private byte dataVer;

+ 12 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/repository/TsiNodeManager.java

@@ -8,6 +8,7 @@ import org.springframework.stereotype.Component;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 @Slf4j
 @Getter
@@ -18,6 +19,17 @@ public class TsiNodeManager {
     private final ConcurrentHashMap<Long, TsiNodeVo> tsiNodeVoMap = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, TsiNodeVo> ipAddrMap = new ConcurrentHashMap<>();
 
+    /**
+     * nodeId에 해당하는 TsiNodeVo가 없으면, 주어진 함수를 통해 생성하고 맵에 추가한 후 반환합니다.
+     * 이 모든 과정은 ConcurrentHashMap의 computeIfAbsent를 통해 원자적으로 처리됩니다.
+     * @param nodeId 노드 ID
+     * @param mappingFunction TsiNodeVo가 없을 때 새로 생성하기 위한 람다 함수 (Java 8의 Function 인터페이스)
+     * @return 맵에 존재하거나 새로 생성된 TsiNodeVo 객체
+     */
+    public TsiNodeVo getOrCompute(long nodeId, Function<Long, TsiNodeVo> mappingFunction) {
+        return this.tsiNodeVoMap.computeIfAbsent(nodeId, mappingFunction);
+    }
+
     public TsiNodeVo get(long nodeId) {
         return this.tsiNodeVoMap.get(nodeId);
     }

+ 25 - 156
tsi-comm-server/src/main/java/com/tsi/comm/server/tcp/codec/CvimServerByteBufMessageDecoder.java

@@ -1,25 +1,8 @@
 package com.tsi.comm.server.tcp.codec;
 
-import com.tsi.comm.server.service.TsiQueueDistributorService;
-import com.tsi.comm.server.xnet.NettyUtils;
-import com.tsi.comm.server.config.TsiCvimServerConfig;
-import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
-import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
-import com.tsi.comm.server.vo.mariadb.NodeIpAddrVo;
-import com.tsi.comm.server.vo.mariadb.NodeStatusVo;
-import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
-import com.tsi.comm.server.process.packet.TsiChannelSession;
-import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
 import com.tsi.comm.server.protocol.TsiCpuPacket;
-import com.tsi.comm.server.repository.TsiAlarmManager;
-import com.tsi.comm.server.repository.TsiNodeManager;
-import com.tsi.comm.server.repository.TsiSessionManager;
-import com.tsi.comm.server.repository.TsiTpmsManager;
-import com.tsi.comm.server.vo.TsiAlarmConfigVo;
-import com.tsi.comm.server.vo.TsiNodeVo;
 import com.tsi.common.utils.TimeUtils;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
@@ -35,154 +18,40 @@ import java.util.List;
 @ChannelHandler.Sharable
 public class CvimServerByteBufMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
 
-    private final TsiCvimServerConfig config;
-    private final TsiNodeManager nodeManager;
-    private final TsiAlarmManager alarmManager;
-    private final TsiTpmsManager tpmsManager;
-    private final TsiSessionManager sessionManager;
-    private final TsiCvimPacketProcess packetProcess;
-    private final TsiCvimDbmsProcess dbmsProcess;
-    private final TsiQueueDistributorService queueDistributorService;
-
     @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
-        try {
-            final long msec = TimeUtils.currentTimeSeconds();
-            final long nsec = System.nanoTime();
-
-            if (byteBuf == null) {
-                log.error("Receive: Packet frame packet error. length field data error");
-                ctx.close();
-                return;
-            }
-
-            final int readableBytes = byteBuf.readableBytes();
-            if (readableBytes < TsiCpuPacket.SIZE_HEAD) {
-                log.error("Receive: Packet readableBytes too small: {}, required min size: {}", readableBytes, TsiCpuPacket.SIZE_HEAD);
-                ctx.close();
-                return;
-            }
-
-            final String remoteIpAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
-
-            long nodeId = byteBuf.getUnsignedInt(6);
-            TsiNodeVo nodeVo = ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).get();
-            if (nodeVo == null) {
-                // not found node information from channel map
-                nodeVo = attachNodeVo(nodeId, remoteIpAddress, ctx.channel());
-                if (nodeVo == null) {
-                    // node id 가 0 이하로 설정되어 있거나, 노드 정보가 등록되어 있지 않은 경우
-                    if (nodeId <= 0) {
-                        byte[] buf = new byte[readableBytes];
-                        byteBuf.readBytes(buf);
-                        this.alarmManager.loggingUnknownNodePacket(nodeId, ctx.channel(), buf);
-                    }
-//                    ctx.disconnect();
-                    ctx.close();
-                    return;
-                }
-
-                // 중복 로그인 체크
-                Channel oldChannel = nodeVo.getChannel();
-                if (oldChannel != null && oldChannel.isActive() && oldChannel != ctx.channel()) {
-                    // 중복 연결 로깅
-                    this.alarmManager.alarmDupConnect(nodeVo.getNodeId(), nodeVo.getIpAddr(), remoteIpAddress);
-                    oldChannel.close();
-                }
-
-                // first connection, save node information to channel attribute map
-                // 채널 맵에 채널에 대한 노드정보를 저장한다.
-                // 채널 목록에 접속 정보 등록
-                synchronized (nodeVo) {
-                    nodeVo.setConnect(ctx.channel());
-                }
-                this.sessionManager.addChannel(ctx.channel(), nodeVo);
-                TsiChannelSession.objectRegistered(nodeVo, remoteIpAddress);
-
-                this.queueDistributorService.setLoadedQueue(nodeVo);    // 작업큐를 할당한다.
-                ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(nodeVo);
-
-                if (this.sessionManager.isServerRun()) {
-                    NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS);
-                    status.setServerId(this.config.getServerId());
-                    status.setNodeId(nodeId);
-                    status.setStatus(1);
-                    status.setIpAddr(NettyUtils.getRemoteIpAddress(ctx.channel()));
-
-                    this.dbmsProcess.add(status, (int)Thread.currentThread().getId());
-                }
-            }
-
-            nodeVo.setLastCommTm(System.currentTimeMillis());   // 통신 수신시각 저장
+    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) {
 
-            if (!remoteIpAddress.equals(nodeVo.getIpAddr())) {
-                // IP 주소가 변경된 경우
-                final String oldIpAddr = nodeVo.getIpAddr();
-                log.warn("Node IpAddr Changed: {} -> {}", oldIpAddr, remoteIpAddress);
-                nodeVo.setIpAddr(remoteIpAddress);
+        // 패킷을 cpu packet으로 변환하여 다음 핸들러로 전달
+        final long msec = TimeUtils.currentTimeSeconds();
+        final long nsec = System.nanoTime();
 
-                NodeIpAddrVo nodeIpAddrVo = new NodeIpAddrVo(AbstractDbmsVo.DBMS_NODE_IP_UPDATE);
-                nodeIpAddrVo.setNodeId(nodeVo.getNodeId());
-                nodeIpAddrVo.setIpAddr(remoteIpAddress);
-
-                this.dbmsProcess.add(nodeIpAddrVo, (int)Thread.currentThread().getId());
-                this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
-            }
-
-            TsiCpuPacket packet = new TsiCpuPacket(nodeId, msec, nsec, ctx.channel());
-            packet.setBuf(new byte[readableBytes]);
-            byteBuf.readBytes(packet.getBuf());
-            packet.setObj(nodeVo);  // TsiNodeVo 객체를 저장
-
-            // 작업큐로 데이터 전송
-            this.packetProcess.add(packet, nodeVo.getPktQIdx());    // 패킷인덱스큐로 데이터 전송
-
-            // 작업큐 데이터 전송 시각 셋
-            packet.setAdd(System.nanoTime());
-
-            // 패킷 통계정보 생성
-            this.tpmsManager.readPacket(packet);
-            if (nodeVo.isDump()) {
-                TsiChannelSession.recvPacketDump(nodeVo, packet);
-            }
+        if (byteBuf == null) {
+            log.error("Receive: Packet frame packet error. length field data error");
+            ctx.close();
+            return;
         }
-        catch (Exception e) {
-            log.error("decode Exception: {}", e.getMessage());
-        }
-    }
-
-    private TsiNodeVo attachNodeVo(long nodeId, String remoteIpAddress, Channel channel) {
-//        long start = System.nanoTime();
 
-        // 노드맵에서 노드 정보를 찾는다.
-        TsiNodeVo nodeVo = this.nodeManager.get(nodeId);
-        if (nodeVo == null) {
-            // 메모리맵에 노드정보가 존재하지 않은 경우 로깅
-            this.alarmManager.loggingUnknownNode(nodeId, channel);
-            if (nodeId <= 0) {
-                return null; // 잘못된 노드 ID인 경우 null 반환
-            }
+        // 패킷의 최소 유효성만 검사. (예: 최소 길이)
+        final int readableBytes = byteBuf.readableBytes();
+        if (readableBytes < TsiCpuPacket.SIZE_HEAD) {
+            log.error("Receive: Packet readableBytes too small: {}, required min size: {}", readableBytes, TsiCpuPacket.SIZE_HEAD);
+            ctx.close();
+            return;
+        }
 
-            // 등록되어 있지 않은 노드 ID로 접속한 경우(노드 ID가 0 이하인 경우 제외)
-            // 신규로 해당 노드정보 메모리를 생성해서 메모리 맵에 추가한다.
-            nodeVo = new TsiNodeVo(nodeId, remoteIpAddress, false, true, true);
-            nodeVo.setCheckInstalled(true);
-            nodeVo.setRegistered(false);
-            this.nodeManager.put(nodeId, nodeVo);
-            this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
+        // 노드 ID 추출
+        final long nodeId = byteBuf.getUnsignedInt(TsiCpuPacket.INDEX_NODE_ID);
 
-            // oops 알람 설정 되어 있으면 알람테이블에 저장하자.
-            if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_01)) {
-                AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
-                alarm.setAlarmCode(TsiAlarmConfigVo.COMM_01);
-                alarm.setAlarmTarget(Long.toString(nodeId));
-                alarm.setAlarmValue(remoteIpAddress);
+        // 프로세스 패킷 생성
+        TsiCpuPacket packet = new TsiCpuPacket(nodeId, msec, nsec, ctx.channel());
+        packet.setBuf(new byte[readableBytes]);
+        byteBuf.readBytes(packet.getBuf());
+        packet.setObj(null);  // TsiNodeVo 객체를 저장
 
-                this.dbmsProcess.add(alarm, (int)Thread.currentThread().getId());
-            }
-        }
+        // CvimServerPacketProcessHandler 로 TsiCpuPacket 전달
+        out.add(packet);
 
-        return nodeVo;
+        // Exception 이 발생하면 CvimServerPacketProcessHandler 의 exceptionCaught() 메소드가 호출됨
     }
 
 }

+ 6 - 138
tsi-comm-server/src/main/java/com/tsi/comm/server/tcp/handler/CvimServerInboundMessageHandler.java

@@ -1,25 +1,9 @@
 package com.tsi.comm.server.tcp.handler;
 
-import com.tsi.comm.server.config.TsiCvimServerConfig;
-import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
-import com.tsi.comm.server.process.packet.TsiChannelSession;
-import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
-import com.tsi.comm.server.protocol.AbstractTsiPacket;
-import com.tsi.comm.server.protocol.TsiCpuDisconnected;
-import com.tsi.comm.server.repository.TsiAlarmManager;
-import com.tsi.comm.server.repository.TsiNodeManager;
-import com.tsi.comm.server.repository.TsiSessionManager;
-import com.tsi.comm.server.service.TsiQueueDistributorService;
-import com.tsi.comm.server.vo.TsiAlarmConfigVo;
-import com.tsi.comm.server.vo.TsiNodeVo;
-import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
-import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
-import com.tsi.comm.server.vo.mariadb.NodeStatusVo;
-import com.tsi.comm.server.xnet.NettyUtils;
+import com.tsi.comm.server.tcp.service.ConnectionLifecycleService;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -31,147 +15,31 @@ import org.springframework.stereotype.Component;
 @ChannelHandler.Sharable
 public class CvimServerInboundMessageHandler extends ChannelInboundHandlerAdapter {
 
-    private final TsiCvimServerConfig config;
-    private final TsiNodeManager nodeManager;
-    private final TsiSessionManager sessionManager;
-    private final TsiAlarmManager alarmManager;
-    private final TsiCvimPacketProcess packetProcess;
-    private final TsiCvimDbmsProcess dbmsProcess;
-    private final TsiQueueDistributorService queueDistributorService;
-
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        //log.error("Handler: channelRead: {}", NettyUtils.getRemoteIpAddress(ctx.channel()));
-        if (msg instanceof AbstractTsiPacket) {
-            //TsiCpuPacket packet = (TsiCpuPacket)msg;
-            //packet.setPop(System.nanoTime());
-        }
-        else {
-            log.error("Handler: channelRead: Unknown Instance Type: {}", msg.getClass().getSimpleName());
-        }
-    }
+    private final ConnectionLifecycleService connectionService;
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        // session count increment
-        int sessions = this.sessionManager.add();
-        // IP Address 로 제어기를 판달할 수 있으면 여기서 제어기 등록 여부를 판단해서
-        // 접속 여부를 처리할 수 있다.
-        final String remoteIpAddr = NettyUtils.getRemoteIpAddress(ctx.channel());
-        log.info("--channelActive: {}, {} Sessions.", remoteIpAddr, sessions);
-        TsiNodeVo ipNodeVo = this.nodeManager.getIpAddr(remoteIpAddr);
-        if (ipNodeVo == null) {
-            this.alarmManager.reportUnknownIp(remoteIpAddr);
-        }
-        else {
-            TsiChannelSession.sessionActive(ipNodeVo, remoteIpAddr);
-//            if (ipNodeVo.isConnect()) {
-//                // 이미 연결된 상태라면 새 채널을 종료, 이전 채널이 통신을 수행중일수 있으므로 현재 채널을 종료시킴
-//                log.warn("--channelActive: {}, Duplicate connection detected, closing new channel: {}",
-//                        ipNodeVo.getKey(), remoteIpAddr);
-//                ctx.channel().close();
-//                return;
-//            }
-        }
+        this.connectionService.handleChannelActive(ctx);
         ctx.fireChannelActive();
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        final String remoteIpAddr = NettyUtils.getRemoteIpAddress(ctx.channel());
-        int sessions = this.sessionManager.remove();
-        TsiNodeVo nodeVo = this.sessionManager.getNodeVo(ctx.channel());
-        if (nodeVo != null) {
-            NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS);
-            status.setServerId(this.config.getServerId());
-            status.setNodeId(nodeVo.getNodeId());
-            status.setStatus(0);
-            status.setIpAddr(remoteIpAddr);
-            this.dbmsProcess.add(status, (int)Thread.currentThread().getId());
-
-            if (this.sessionManager.isServerRun()) {
-                TsiCpuDisconnected packet = new TsiCpuDisconnected(nodeVo.getNodeId(), ctx.channel());
-                packet.setBuf(null);
-                packet.setObj(nodeVo);
-                this.packetProcess.add(packet, nodeVo.getPktQIdx());    // 패킷인덱스큐로 데이터 전송
-            }
-//            log.info("channelInactive: Node {}, {}, sessions: {}", nodeVo.getNodeId(), remoteIpAddr, sessions);
-
-            TsiChannelSession.sessionInactive(nodeVo, remoteIpAddr);
-
-            this.sessionManager.removeChannel(ctx.channel());
-            ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(null);
-            this.queueDistributorService.releaseQueue(nodeVo);    // 작업큐를 할당을 해제한다.
-
-            synchronized (nodeVo) {
-                if (nodeVo.getChannel() == ctx.channel()) {
-                    // 현재 비활성화되는 채널이 nodeVo에 등록된 활성 채널과 동일한 경우에만
-                    nodeVo.setConnect(null);
-                    log.info("channelInactive: {}, {}, sessions: {}", nodeVo.getNodeId(), remoteIpAddr, sessions);
-                } else {
-                    // 새로운 연결에 의해 강제로 종료되었음
-                    log.warn("channelInactive: {}, {}, sessions: {}, Duplicate Connected.", nodeVo.getNodeId(), remoteIpAddr, sessions);
-                }
-            }
-        }
-        else {
-            log.info("channelInactive: {}, sessions: {}, Unknown node.", remoteIpAddr, sessions);
-        }
-
+        this.connectionService.handleChannelInactive(ctx);
         ctx.fireChannelInactive();
     }
 
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object e) throws Exception {
         if (e instanceof IdleStateEvent) {
-            IdleStateEvent evt = (IdleStateEvent) e;
-            TsiNodeVo nodeVo = this.sessionManager.getNodeVo(ctx.channel());
-            // 연결이 완료된 후 송수신 데이터가 일정시간 동안 없을 경우 이곳에서 처리
-            if (evt.state() == IdleState.READER_IDLE) {
-                String remoteIpAddr = NettyUtils.getRemoteIpAddress(ctx.channel());
-                if (nodeVo == null) {
-                    log.warn("userEventTriggered: Recv Timeout: {}", remoteIpAddr);
-                    // 통신 접속 후 수신 데이터가 없이 READ 타임아웃이 발생한 경우임
-                    if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_02)) {
-                        AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
-                        alarm.setAlarmCode(TsiAlarmConfigVo.COMM_02);
-                        alarm.setAlarmTarget(remoteIpAddr);
-                        alarm.setAlarmValue(remoteIpAddr);
-                        this.dbmsProcess.add(alarm, (int) Thread.currentThread().getId());
-                    }
-//                    ctx.channel().disconnect();
-                    ctx.channel().close();
-                }
-                else {
-                    log.warn("userEventTriggered: Recv Timeout: {}, {}", remoteIpAddr, nodeVo.getNodeId());
-                    // 통신 접속 후 데이터를 한번이라도 수신한 경우에는 스케쥴러에서 처리한다.
-                    if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_02)) {
-                        AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
-                        alarm.setAlarmCode(TsiAlarmConfigVo.COMM_02);
-                        alarm.setAlarmTarget(nodeVo.getKey());
-                        alarm.setAlarmValue(remoteIpAddr);
-                        this.dbmsProcess.add(alarm, (int) Thread.currentThread().getId());
-                    }
-
-                    TsiChannelSession.sessionTimeout(nodeVo, NettyUtils.getRemoteIpAddress(ctx.channel()));
-
-                    ctx.channel().close();
-                }
-            } else if (evt.state() == IdleState.WRITER_IDLE) {
-                log.error("{}.userEventTriggered: WRITER_IDLE: {}", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
-            } else if (evt.state() == IdleState.ALL_IDLE) {
-                log.error("{}.userEventTriggered: ALL_IDLE: {}", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
-            }
+            this.connectionService.handleIdleStateEvent(ctx, (IdleStateEvent) e);
         }
         ctx.fireUserEventTriggered(e);
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        TsiNodeVo nodeVo = this.sessionManager.getNodeVo(ctx.channel());
-        if (nodeVo != null) {
-            log.error("{}.exceptionCaught: {}, {}", this.getClass().getSimpleName(), nodeVo.getNodeId(), ctx.channel());
-        }
-        log.error("{}.exceptionCaught: {}", this.getClass().getSimpleName(), NettyUtils.getAddress(ctx.channel()));
+        this.connectionService.handleExceptionCaught(ctx, cause);
         ctx.fireExceptionCaught(cause);
         ctx.channel().close();
     }

+ 170 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/tcp/handler/CvimServerPacketProcessHandler.java

@@ -0,0 +1,170 @@
+package com.tsi.comm.server.tcp.handler;
+
+import com.tsi.comm.server.config.TsiCvimServerConfig;
+import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
+import com.tsi.comm.server.process.packet.TsiChannelSession;
+import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
+import com.tsi.comm.server.protocol.TsiCpuPacket;
+import com.tsi.comm.server.repository.TsiAlarmManager;
+import com.tsi.comm.server.repository.TsiNodeManager;
+import com.tsi.comm.server.repository.TsiSessionManager;
+import com.tsi.comm.server.repository.TsiTpmsManager;
+import com.tsi.comm.server.service.TsiQueueDistributorService;
+import com.tsi.comm.server.vo.TsiAlarmConfigVo;
+import com.tsi.comm.server.vo.TsiNodeVo;
+import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
+import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
+import com.tsi.comm.server.vo.mariadb.NodeIpAddrVo;
+import com.tsi.comm.server.vo.mariadb.NodeStatusVo;
+import com.tsi.comm.server.xnet.NettyUtils;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.CorruptedFrameException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+@ChannelHandler.Sharable
+public class CvimServerPacketProcessHandler extends SimpleChannelInboundHandler<TsiCpuPacket> {
+
+    private final TsiCvimServerConfig config;
+    private final TsiNodeManager nodeManager;
+    private final TsiAlarmManager alarmManager;
+    private final TsiTpmsManager tpmsManager;
+    private final TsiSessionManager sessionManager;
+    private final TsiCvimPacketProcess packetProcess;
+    private final TsiCvimDbmsProcess dbmsProcess;
+    private final TsiQueueDistributorService queueDistributorService;
+
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TsiCpuPacket packet) throws Exception {
+
+        final String remoteIpAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
+        final long nodeId = packet.getNodeId();
+        TsiNodeVo nodeVo = ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).get();
+        if (nodeVo == null) {
+            // not found node information from channel map
+            nodeVo = attachNodeVo(nodeId, remoteIpAddress, ctx.channel());
+            if (nodeVo == null) {
+                // node id 가 0 이하로 설정되어 있거나, 노드 정보가 등록되어 있지 않은 경우
+                if (nodeId <= 0) {
+                    byte[] buf = packet.getBuf();
+                    this.alarmManager.loggingUnknownNodePacket(nodeId, ctx.channel(), buf);
+                }
+                ctx.close();
+                return;
+            }
+
+            // first connection, save node information to channel attribute map
+            // 채널 맵에 채널에 대한 노드정보를 저장한다.
+            // 채널 목록에 접속 정보 등록(중복 로그인 체크)
+            nodeVo.connectChannel(ctx.channel(), this.alarmManager, remoteIpAddress);
+
+            this.sessionManager.addChannel(ctx.channel(), nodeVo);
+            TsiChannelSession.objectRegistered(nodeVo, remoteIpAddress);
+
+            this.queueDistributorService.setLoadedQueue(nodeVo);    // 작업큐를 할당한다.
+            ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(nodeVo);
+
+            if (this.sessionManager.isServerRun()) {
+                NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS);
+                status.setServerId(this.config.getServerId());
+                status.setNodeId(nodeId);
+                status.setStatus(1);
+                status.setIpAddr(NettyUtils.getRemoteIpAddress(ctx.channel()));
+
+                this.dbmsProcess.add(status, (int)Thread.currentThread().getId());
+            }
+        }
+
+        packet.setObj(nodeVo); // 핸들러에서 찾아낸 최종 nodeVo를 패킷에 설정
+
+        nodeVo.setLastCommTm(System.currentTimeMillis());   // 통신 수신시각 저장
+
+        if (!remoteIpAddress.equals(nodeVo.getIpAddr())) {
+            // IP 주소가 변경된 경우
+            final String oldIpAddr = nodeVo.getIpAddr();
+            log.warn("Node IpAddr Changed: {} -> {}", oldIpAddr, remoteIpAddress);
+            nodeVo.setIpAddr(remoteIpAddress);
+
+            NodeIpAddrVo nodeIpAddrVo = new NodeIpAddrVo(AbstractDbmsVo.DBMS_NODE_IP_UPDATE);
+            nodeIpAddrVo.setNodeId(nodeVo.getNodeId());
+            nodeIpAddrVo.setIpAddr(remoteIpAddress);
+
+            this.dbmsProcess.add(nodeIpAddrVo, (int)Thread.currentThread().getId());
+            this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
+        }
+
+        // 작업큐로 데이터 전송
+        this.packetProcess.add(packet, nodeVo.getPktQIdx());    // 패킷인덱스큐로 데이터 전송
+
+        // 작업큐 데이터 전송 시각 셋
+        packet.setAdd(System.nanoTime());
+
+        // 패킷 통계정보 생성
+        this.tpmsManager.readPacket(packet);
+        if (nodeVo.isDump()) {
+            TsiChannelSession.recvPacketDump(nodeVo, packet);
+        }
+    }
+
+    private TsiNodeVo attachNodeVo(long nodeId, String remoteIpAddress, Channel channel) {
+
+        // 노드맵에서 노드 정보를 찾는다.
+        return this.nodeManager.getOrCompute(nodeId, id -> {
+            // 맵에 nodeId가 존재하지 않을 때만 실행됨, 존재하는 경우는 해당 객체가 리턴됨
+            // 메모리맵에 노드정보가 존재하지 않은 경우 로깅
+            this.alarmManager.loggingUnknownNode(id, channel);
+            if (id <= 0) {
+                return null; // 잘못된 노드 ID인 경우 null 반환(맵에 저장되지 않음)
+            }
+
+            // 등록되어 있지 않은 노드 ID로 접속한 경우(노드 ID가 0 이하인 경우 제외)
+            // 신규로 해당 노드정보 메모리를 생성해서 메모리 맵에 추가한다.
+            // 신규 TsiNodeVo 객체 생성
+            log.info("Creating new TsiNodeVo for unregistered node ID: {}", id);
+            TsiNodeVo newNodeVo = new TsiNodeVo(id, remoteIpAddress, false, true, true);
+            newNodeVo.setCheckInstalled(true);
+            newNodeVo.setRegistered(false);
+
+            // this.nodeManager.put(id, newNodeVo); // computeIfAbsent 에서 알아서 put 해준다.
+            this.nodeManager.putIpAddr(remoteIpAddress, newNodeVo); // IP 주소 갱신
+
+            // oops 알람 설정 되어 있으면 알람테이블에 저장하자.
+            if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_01)) {
+                AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
+                alarm.setAlarmCode(TsiAlarmConfigVo.COMM_01);
+                alarm.setAlarmTarget(Long.toString(id));
+                alarm.setAlarmValue(remoteIpAddress);
+
+                this.dbmsProcess.add(alarm, (int)Thread.currentThread().getId());
+            }
+            return newNodeVo;
+        });
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        TsiNodeVo nodeVo = ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).get();
+
+        if (cause instanceof CorruptedFrameException) {
+            // 디코딩 단계에서 발생한, 예측된 프레임 오류
+            log.warn("Exception CorruptedFrameException {}: {}", NettyUtils.getAddress(ctx.channel()), cause.getMessage());
+        } else if (nodeVo != null) {
+            // 노드가 식별된 이후에 발생한 예외
+            log.error("Exception processing packet, Node ID {}:", nodeVo.getNodeId(), cause);
+        } else {
+            // 노드가 식별되기 전에 발생한 예외 (Decoder 포함)
+            log.error("Exception in pipeline for channel {}:", NettyUtils.getAddress(ctx.channel()), cause);
+        }
+
+        // 문제가 발생한 채널은 닫아서 추가적인 문제를 방지합니다.
+        ctx.close();
+    }
+}

+ 27 - 6
tsi-comm-server/src/main/java/com/tsi/comm/server/tcp/initializer/CvimServerInitializer.java

@@ -1,25 +1,26 @@
 package com.tsi.comm.server.tcp.initializer;
 
-import com.tsi.comm.server.repository.TsiAlarmManager;
-import com.tsi.comm.server.vo.TsiAlarmConfigVo;
-import com.tsi.comm.server.xnet.NettyUtils;
 import com.tsi.comm.server.config.TsiCvimServerConfig;
+import com.tsi.comm.server.protocol.TsiCpuPacket;
+import com.tsi.comm.server.repository.TsiAlarmManager;
 import com.tsi.comm.server.tcp.codec.CvimServerByteBufMessageDecoder;
 import com.tsi.comm.server.tcp.codec.CvimServerEncoder;
 import com.tsi.comm.server.tcp.handler.CvimServerInboundMessageHandler;
+import com.tsi.comm.server.tcp.handler.CvimServerPacketProcessHandler;
+import com.tsi.comm.server.vo.TsiAlarmConfigVo;
+import com.tsi.comm.server.xnet.NettyUtils;
 import com.tsi.common.spring.SpringUtils;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.timeout.IdleStateHandler;
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
 
 public class CvimServerInitializer extends ChannelInitializer<Channel> {
 
     private final TsiCvimServerConfig config;
     private final CvimServerByteBufMessageDecoder cvimServerByteBufMessageDecoder;
+    private final CvimServerPacketProcessHandler cvimServerPacketProcessHandler;
     private final CvimServerInboundMessageHandler cvimServerInboundMessageHandler;
     private final CvimServerEncoder cvimServerEncoder;
     private int readerIdleTimeSeconds;
@@ -27,6 +28,7 @@ public class CvimServerInitializer extends ChannelInitializer<Channel> {
     public CvimServerInitializer(TsiCvimServerConfig config) {
         this.config = config;
         this.cvimServerByteBufMessageDecoder = SpringUtils.getBean(CvimServerByteBufMessageDecoder.class);
+        this.cvimServerPacketProcessHandler = SpringUtils.getBean(CvimServerPacketProcessHandler.class);
         this.cvimServerInboundMessageHandler = SpringUtils.getBean(CvimServerInboundMessageHandler.class);
         this.cvimServerEncoder = SpringUtils.getBean(CvimServerEncoder.class);
         TsiAlarmManager alarmManager = SpringUtils.getBean(TsiAlarmManager.class);
@@ -54,10 +56,29 @@ public class CvimServerInitializer extends ChannelInitializer<Channel> {
         // 공단에서 교차로 제어기의 IP 주소를 관리하지 않는다.
         IdleStateHandler idleStateHandler = new IdleStateHandler(this.readerIdleTimeSeconds, 0, 0);
         ChannelPipeline pipeline = channel.pipeline();
+
+        // 유휴 상태 감지(read timeout 처리)
         pipeline.addLast("idleStateHandler", idleStateHandler);
-        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(2048, 2, 2, -2, 0));
+
+        // 패킷 프레이밍: ByteBuf -> ByteBuf (하나의 완전한 패킷, 패킷길이 체크)
+        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+                TsiCpuPacket.MAX_FRAME_LENGTH,
+                TsiCpuPacket.LENGTH_FIELD_OFFSET,
+                TsiCpuPacket.LENGTH_FIELD_LENGTH,
+                TsiCpuPacket.LENGTH_ADJUSTMENT,
+                TsiCpuPacket.INITIAL_BYTES_TO_STRIP
+        ));
+
+        // 패킷 디코딩: ByteBuf -> TsiCpuPacket
         pipeline.addLast("packetDecoder", this.cvimServerByteBufMessageDecoder);
+
+        // TsiCpuPacket -> 각종 큐잉 및 상태 변경
+        pipeline.addLast("packetProcessor", this.cvimServerPacketProcessHandler);
+
+        // 채널 상태 이벤트 처리 (Active, Inactive 등)
         pipeline.addLast("packetInboundHandler", this.cvimServerInboundMessageHandler);
+
+        // Outbound 인코더(무선신호시스템에서는 전송하는 프로토콜 없음, 사용하지는 않음)
         pipeline.addLast("frameEncoder", this.cvimServerEncoder);
     }
 }

+ 144 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/tcp/service/ConnectionLifecycleService.java

@@ -0,0 +1,144 @@
+package com.tsi.comm.server.tcp.service;
+
+import com.tsi.comm.server.config.TsiCvimServerConfig;
+import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
+import com.tsi.comm.server.process.packet.TsiChannelSession;
+import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
+import com.tsi.comm.server.protocol.TsiCpuDisconnected;
+import com.tsi.comm.server.repository.TsiAlarmManager;
+import com.tsi.comm.server.repository.TsiNodeManager;
+import com.tsi.comm.server.repository.TsiSessionManager;
+import com.tsi.comm.server.service.TsiQueueDistributorService;
+import com.tsi.comm.server.vo.TsiAlarmConfigVo;
+import com.tsi.comm.server.vo.TsiNodeVo;
+import com.tsi.comm.server.vo.mariadb.AbstractDbmsVo;
+import com.tsi.comm.server.vo.mariadb.AlarmOccrVo;
+import com.tsi.comm.server.vo.mariadb.NodeStatusVo;
+import com.tsi.comm.server.xnet.NettyUtils;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class ConnectionLifecycleService {
+
+    private final TsiCvimServerConfig config;
+    private final TsiNodeManager nodeManager;
+    private final TsiSessionManager sessionManager;
+    private final TsiAlarmManager alarmManager;
+    private final TsiCvimPacketProcess packetProcess;
+    private final TsiCvimDbmsProcess dbmsProcess;
+    private final TsiQueueDistributorService queueDistributorService;
+
+    /**
+     * 채널 활성화 이벤트를 처리.
+     */
+    public void handleChannelActive(ChannelHandlerContext ctx) {
+        int sessions = this.sessionManager.add();
+        final String remoteIpAddr = NettyUtils.getRemoteIpAddress(ctx.channel());
+        log.info("--channelActive: {}, {} Sessions.", remoteIpAddr, sessions);
+
+        TsiNodeVo ipNodeVo = this.nodeManager.getIpAddr(remoteIpAddr);
+        if (ipNodeVo == null) {
+            this.alarmManager.reportUnknownIp(remoteIpAddr);
+        } else {
+            TsiChannelSession.sessionActive(ipNodeVo, remoteIpAddr);
+        }
+    }
+
+    /**
+     * 채널 비활성화 이벤트를 처리.
+     */
+    public void handleChannelInactive(ChannelHandlerContext ctx) {
+        final String remoteIpAddr = NettyUtils.getRemoteIpAddress(ctx.channel());
+        int sessions = this.sessionManager.remove();
+        TsiNodeVo nodeVo = this.sessionManager.getNodeVo(ctx.channel());
+
+        if (nodeVo != null) {
+            // DB 상태 업데이트 큐잉
+            NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS);
+            status.setServerId(this.config.getServerId());
+            status.setNodeId(nodeVo.getNodeId());
+            status.setStatus(0);
+            status.setIpAddr(remoteIpAddr);
+            this.dbmsProcess.add(status, (int)Thread.currentThread().getId());
+
+            // Disconnected 패킷 큐잉
+            if (this.sessionManager.isServerRun()) {
+                TsiCpuDisconnected packet = new TsiCpuDisconnected(nodeVo.getNodeId(), ctx.channel());
+                packet.setBuf(null);
+                packet.setObj(nodeVo);
+                this.packetProcess.add(packet, nodeVo.getPktQIdx());    // 패킷인덱스큐로 데이터 전송
+            }
+
+            TsiChannelSession.sessionInactive(nodeVo, remoteIpAddr);
+
+            // 리소스 정리
+            this.sessionManager.removeChannel(ctx.channel());
+            ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(null);
+            this.queueDistributorService.releaseQueue(nodeVo);  // 작업큐를 할당을 해제
+
+            // --- 기존의 synchronized 블록 전체를 아래 코드로 대체 ---
+            // TsiNodeVo 객체가 스스로 상태를 확인하고 안전하게 변경하도록 책임을 위임합니다.
+            if (nodeVo.disconnectChannel(ctx.channel())) {
+                // 현재 비활성화되는 채널이 nodeVo에 등록된 활성 채널과 동일한 경우 (정상적인 연결 종료)
+                log.info("channelInactive: {}, {}, sessions: {}", nodeVo.getNodeId(), remoteIpAddr, sessions);
+            } else {
+                // 새로운 연결에 의해 강제로 종료됨(즉, 이전 연결된 채널이 종료되고 신규채널로 대체되는 상태)
+                log.warn("channelInactive: {}, {}, sessions: {}, Duplicate Connected.", nodeVo.getNodeId(), remoteIpAddr, sessions);
+            }
+        } else {
+            log.info("channelInactive: {}, sessions: {}, Unknown node.", remoteIpAddr, sessions);
+        }
+    }
+
+    /**
+     * 유휴 상태 이벤트를 처리.
+     */
+    public void handleIdleStateEvent(ChannelHandlerContext ctx, IdleStateEvent evt) {
+        if (evt.state() == IdleState.READER_IDLE) {
+            TsiNodeVo nodeVo = this.sessionManager.getNodeVo(ctx.channel());
+            String remoteIpAddr = NettyUtils.getRemoteIpAddress(ctx.channel());
+            if (nodeVo == null) {
+                log.warn("userEventTriggered: READER_IDLE: {}", remoteIpAddr);
+                // 통신 접속 후 수신 데이터가 없이 READ 타임아웃이 발생한 경우임
+                if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_02)) {
+                    AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
+                    alarm.setAlarmCode(TsiAlarmConfigVo.COMM_02);
+                    alarm.setAlarmTarget(remoteIpAddr);
+                    alarm.setAlarmValue(remoteIpAddr);
+                    this.dbmsProcess.add(alarm, (int) Thread.currentThread().getId());
+                }
+            } else {
+                log.warn("userEventTriggered: READER_IDLE: {}, {}", remoteIpAddr, nodeVo.getNodeId());
+                // 통신 접속 후 데이터를 한번이라도 수신한 경우에는 스케쥴러에서 처리한다.
+                if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_02)) {
+                    AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
+                    alarm.setAlarmCode(TsiAlarmConfigVo.COMM_02);
+                    alarm.setAlarmTarget(nodeVo.getKey());
+                    alarm.setAlarmValue(remoteIpAddr);
+                    this.dbmsProcess.add(alarm, (int) Thread.currentThread().getId());
+                }
+                TsiChannelSession.sessionTimeout(nodeVo, remoteIpAddr);
+            }
+            ctx.channel().close();
+        } else if (evt.state() == IdleState.WRITER_IDLE) {
+            log.error("userEventTriggered: WRITER_IDLE: {}", NettyUtils.getAddress(ctx.channel()));
+        } else if (evt.state() == IdleState.ALL_IDLE) {
+            log.error("userEventTriggered: ALL_IDLE: {}", NettyUtils.getAddress(ctx.channel()));
+        }
+    }
+
+    public void handleExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        TsiNodeVo nodeVo = this.sessionManager.getNodeVo(ctx.channel());
+        if (nodeVo != null) {
+            log.error("exceptionCaught: {}, {}", nodeVo.getNodeId(), ctx.channel());
+        }
+        log.error("exceptionCaught: {}", NettyUtils.getAddress(ctx.channel()));
+    }
+}

+ 33 - 3
tsi-comm-server/src/main/java/com/tsi/comm/server/vo/TsiNodeVo.java

@@ -2,6 +2,7 @@ package com.tsi.comm.server.vo;
 
 import com.tsi.comm.server.dto.TsiCvimDto;
 import com.tsi.comm.server.protocol.TsiCpuPacket;
+import com.tsi.comm.server.repository.TsiAlarmManager;
 import com.tsi.common.utils.Counter;
 import io.netty.channel.Channel;
 import lombok.Getter;
@@ -84,18 +85,47 @@ public class TsiNodeVo {
         return (this.channel != null);
     }
 
-    public void setConnect(Channel channel) {
+
+    /**
+     * 채널을 설정을 동기화를 처리하여 스레드 안전성을 보장.
+     */
+    public synchronized void setConnect(Channel channel) {
         this.channel = channel;
         if (channel != null) {
             this.connectCount.increment();
             this.connectTm = System.currentTimeMillis();
-        }
-        else {
+        } else {
             this.disconnectCount.increment();
             this.disconnectTm = System.currentTimeMillis();
         }
     }
 
+    /**
+     * 중복 연결을 확인하고, 필요한 경우 이전 채널을 닫은 후, 새로운 채널을 설정하
+     */
+    public synchronized void connectChannel(Channel newChannel, TsiAlarmManager alarmManager, String remoteIpAddress) {
+        Channel oldChannel = this.channel;
+
+        if (oldChannel != null && oldChannel.isActive() && oldChannel != newChannel) {
+            // 중복 연결 로깅 및 이전 채널 닫기
+            alarmManager.alarmDupConnect(this.getNodeId(), this.getIpAddr(), remoteIpAddress);
+            oldChannel.close();
+        }
+
+        setConnect(newChannel);
+    }
+
+    /**
+     * 현재 비활성화되는 채널이 자신에게 등록된 활성 채널과 동일한 경우에만 연결을 끊도록 함.
+     */
+    public synchronized boolean disconnectChannel(Channel channelToDisconnect) {
+        if (this.channel == channelToDisconnect) {
+            setConnect(null);
+            return true;
+        }
+        return false;
+    }
+
     public String getKey() {
         return String.valueOf(this.nodeId);
     }

+ 5 - 23
tsi-comm-server/src/main/java/com/tsi/comm/server/xnet/NettyServerConfig.java

@@ -18,15 +18,7 @@ public abstract class NettyServerConfig {
     protected int connectTimeoutSeconds = 0;
 
     protected void configure() {
-/*
-        int MAX_CORE = Runtime.getRuntime().availableProcessors();
-        if (MAX_CORE < 8) {
-            MAX_CORE = 8;
-        }
-        int totalThreads = MAX_CORE * 2;
-*/
-        final int DEFAULT_EVENT_THREADS  = Runtime.getRuntime().availableProcessors() * 2;
-
+        final int NETTY_DEFAULT_EVENT_THREADS  = Runtime.getRuntime().availableProcessors() * 2;  // Netty Default == 0
 
         if (this.bindingAddr.isEmpty()) {
             this.bindingAddr = "0.0.0.0";
@@ -34,23 +26,13 @@ public abstract class NettyServerConfig {
         if (this.backlog == 0) {
             this.backlog = 1024;
         }
-        /*if (this.acceptThreads == 0) {
-            if (MAX_CORE <= 8) {
-                this.acceptThreads = 4;
-            } else if (MAX_CORE >= 32) {
-                this.acceptThreads = 12;
-            } else {
-                this.acceptThreads = 6;
-            }
-        }
-        if (this.workerThreads == 0) {
-            this.workerThreads = totalThreads - this.acceptThreads;
-        }*/
+
         if (this.acceptThreads == 0) {
-            this.acceptThreads = DEFAULT_EVENT_THREADS;
+//            this.acceptThreads = NETTY_DEFAULT_EVENT_THREADS;
+            this.acceptThreads = 2;
         }
         if (this.workerThreads == 0) {
-            this.workerThreads = DEFAULT_EVENT_THREADS;
+            this.workerThreads = NETTY_DEFAULT_EVENT_THREADS;
         }
 
         if (this.rcvBuf == 0) {

+ 10 - 0
tsi-comm-server/src/main/java/com/tsi/comm/server/xnet/ServerBootstrapFactory.java

@@ -2,7 +2,9 @@ package com.tsi.comm.server.xnet;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.*;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class ServerBootstrapFactory {
 
     public static ServerBootstrap createBootstrap(NettyServerConfig config, ChannelInitializer<Channel> channelInitializer) throws Exception {
@@ -10,6 +12,14 @@ public class ServerBootstrapFactory {
         EventLoopGroup acceptGroups;
         EventLoopGroup workerGroups;
 
+        log.info("[createBootstrap] ==================================");
+        log.info("[createBootstrap] -         acceptThreads: {}", config.getAcceptThreads());
+        log.info("[createBootstrap] -         workerThreads: {}", config.getWorkerThreads());
+        log.info("[createBootstrap] -               backlog: {}", config.getBacklog());
+        log.info("[createBootstrap] -                rcvBuf: {}", config.getRcvBuf());
+        log.info("[createBootstrap] - connectTimeoutSeconds: {}", config.getConnectTimeoutSeconds());
+        log.info("[createBootstrap] ==================================");
+
         acceptGroups = NettyUtils.newEventLoopGroup(config.getAcceptThreads(), "Accept");
         workerGroups = NettyUtils.newEventLoopGroup(config.getWorkerThreads(), "Worker");
         serverBootstrap.channel(NettyUtils.getServerSocketChannel());

+ 3 - 2
tsi-comm-server/src/main/resources/application.yml

@@ -27,14 +27,15 @@ spring:
       database: ssip
 
 management:
+  endpoint:
+    health:
+      show-details: always
   endpoints:
     prometheus:
       enabled: true
     web:
       exposure:
         include: "*"
-    health:
-      show-details: "always"
   security:
     enabled: false