package com.tsi.comm.server.tcp.codec; import com.tsi.app.common.utils.SysUtils; import com.tsi.app.common.utils.TimeUtils; import com.tsi.app.common.xnet.NettyUtils; import com.tsi.comm.server.config.TsiCvimServerConfig; import com.tsi.comm.server.mybatis.vo.AbstractDbmsVo; import com.tsi.comm.server.mybatis.vo.AlarmOccrVo; import com.tsi.comm.server.mybatis.vo.NodeIpAddrVo; import com.tsi.comm.server.mybatis.vo.NodeStatusVo; import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess; import com.tsi.comm.server.process.packet.TsiChannelSession; import com.tsi.comm.server.process.packet.TsiCvimPacketProcess; import com.tsi.comm.server.protocol.TsiCpuPacket; import com.tsi.comm.server.repository.TsiAlarmManager; import com.tsi.comm.server.repository.TsiNodeManager; import com.tsi.comm.server.repository.TsiSessionManager; import com.tsi.comm.server.repository.TsiTpmsManager; import com.tsi.comm.server.vo.TsiAlarmConfigVo; import com.tsi.comm.server.vo.TsiNodeVo; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component @RequiredArgsConstructor @ChannelHandler.Sharable public class CvimServerByteBufMessageDecoder extends MessageToMessageDecoder { private final TsiCvimServerConfig config; private final TsiNodeManager nodeManager; private final TsiAlarmManager alarmManager; private final TsiTpmsManager tpmsManager; private final TsiSessionManager sessionManager; private final TsiCvimPacketProcess packetProcess; private final TsiCvimDbmsProcess dbmsProcess; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List list) { try { long msec = TimeUtils.currentTimeSeconds(); long nsec = System.nanoTime(); if (byteBuf == null) { log.error("Receive: Packet frame packet error. length field data error"); ctx.close(); return; } final int readableBytes = byteBuf.readableBytes(); if (readableBytes < TsiCpuPacket.SIZE_HEAD) { log.error("Receive: Packet readableBytes too small: {}, required min size: {}", readableBytes, TsiCpuPacket.SIZE_HEAD); ctx.close(); return; } final String remoteIpAddress = NettyUtils.getRemoteIpAddress(ctx.channel()); long nodeId = byteBuf.getUnsignedInt(6); TsiNodeVo nodeVo = ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).get(); if (nodeVo == null) { // not found node information from channel map nodeVo = attachNodeVo(nodeId, remoteIpAddress, ctx.channel()); if (nodeVo == null) { // node id 가 0 이하로 설정되어 있거나, 노드 정보가 등록되어 있지 않은 경우 if (nodeId <= 0) { byte[] buf = new byte[readableBytes]; byteBuf.readBytes(buf); this.alarmManager.loggingUnknownNodePacket(nodeId, ctx.channel(), buf); } ctx.disconnect(); ctx.close(); return; } // first connection, save node information to channel attribute map ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(nodeVo); NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS); status.setServerId(this.config.getServerId()); status.setNodeId(nodeId); status.setStatus(1); status.setIpAddr(NettyUtils.getRemoteIpAddress(ctx.channel())); if (this.sessionManager.isServerRun()) { this.dbmsProcess.add(status, (int)Thread.currentThread().getId()); } else { log.error("Node Login but server not running: {}", status); } } nodeVo.setLastCommTm(System.currentTimeMillis()); // 통신 수신시각 저장 if (!remoteIpAddress.equals(nodeVo.getIpAddr())) { // IP 주소가 변경된 경우 nodeVo.setIpAddr(remoteIpAddress); NodeIpAddrVo nodeIpAddrVo = new NodeIpAddrVo(AbstractDbmsVo.DBMS_NODE_IP_UPDATE); nodeIpAddrVo.setNodeId(nodeVo.getNodeId()); nodeIpAddrVo.setIpAddr(remoteIpAddress); this.dbmsProcess.add(nodeIpAddrVo, (int)Thread.currentThread().getId()); this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신 } TsiCpuPacket packet = new TsiCpuPacket(nodeId, msec, nsec, ctx.channel()); packet.setBuf(new byte[readableBytes]); byteBuf.readBytes(packet.getBuf()); packet.setObj(nodeVo); // TsiNodeVo 객체를 저장 // 작업큐로 데이터 전송 this.packetProcess.add(packet, nodeVo.getIdx()); // 작업큐 데이터 전송 시각 셋 packet.setAdd(System.nanoTime()); // 패킷 통계정보 생성 this.tpmsManager.readPacket(packet); if (nodeVo.isDump()) { log.info("RECV: {}, {}", nodeVo.getNodeId(), SysUtils.byteArrayToHex(packet.getBuf())); } } catch (Exception e) { log.error("decode Exception: {}", e.getMessage()); } } private TsiNodeVo attachNodeVo(long nodeId, String remoteIpAddress, Channel channel) { long start = System.nanoTime(); // 노드맵에서 노드 정보를 찾는다. TsiNodeVo nodeVo = this.nodeManager.get(nodeId); if (nodeVo == null) { // 메모리맵에 노드정보가 존재하지 않은 경우 로깅 this.alarmManager.loggingUnknownNode(nodeId, channel); if (nodeId <= 0) { return null; // 잘못된 노드 ID인 경우 null 반환 } // 등록되어 있지 않은 노드 ID로 접속한 경우(노드 ID가 0 이하인 경우 제외) // 신규로 해당 노드정보 메모리를 생성해서 메모리 맵에 추가한다. nodeVo = new TsiNodeVo(nodeId, remoteIpAddress, false, true, true); nodeVo.setCheckInstalled(true); nodeVo.setRegistered(false); this.nodeManager.put(nodeId, nodeVo); this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신 // oops 알람 설정 되어 있으면 알람테이블에 저장하자. if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_01)) { AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS); alarm.setAlarmCode(TsiAlarmConfigVo.COMM_01); alarm.setAlarmTarget(Long.toString(nodeId)); alarm.setAlarmValue(remoteIpAddress); this.dbmsProcess.add(alarm, (int)Thread.currentThread().getId()); } } // 채널 목록에 접속 정보 등록 nodeVo.setConnect(channel); // 채널 맵에 채널에 대한 노드정보를 저장한다. this.sessionManager.addChannel(channel, nodeVo); TsiChannelSession.objectRegistered(nodeVo, remoteIpAddress); // log.info("Node: {}, Object Register: {}, {}", nodeId, TimeUtils.elapsedTime(start), Thread.currentThread().getId()); return nodeVo; } }