CvimServerByteBufMessageDecoder.java 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package com.tsi.comm.server.tcp.codec;
  2. import com.tsi.app.common.utils.SysUtils;
  3. import com.tsi.app.common.utils.TimeUtils;
  4. import com.tsi.app.common.xnet.NettyUtils;
  5. import com.tsi.comm.server.config.TsiCvimServerConfig;
  6. import com.tsi.comm.server.mybatis.vo.AbstractDbmsVo;
  7. import com.tsi.comm.server.mybatis.vo.AlarmOccrVo;
  8. import com.tsi.comm.server.mybatis.vo.NodeIpAddrVo;
  9. import com.tsi.comm.server.mybatis.vo.NodeStatusVo;
  10. import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
  11. import com.tsi.comm.server.process.packet.TsiChannelSession;
  12. import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
  13. import com.tsi.comm.server.protocol.TsiCpuPacket;
  14. import com.tsi.comm.server.repository.TsiAlarmManager;
  15. import com.tsi.comm.server.repository.TsiNodeManager;
  16. import com.tsi.comm.server.repository.TsiSessionManager;
  17. import com.tsi.comm.server.repository.TsiTpmsManager;
  18. import com.tsi.comm.server.vo.TsiAlarmConfigVo;
  19. import com.tsi.comm.server.vo.TsiNodeVo;
  20. import io.netty.buffer.ByteBuf;
  21. import io.netty.channel.Channel;
  22. import io.netty.channel.ChannelHandler;
  23. import io.netty.channel.ChannelHandlerContext;
  24. import io.netty.handler.codec.MessageToMessageDecoder;
  25. import lombok.RequiredArgsConstructor;
  26. import lombok.extern.slf4j.Slf4j;
  27. import org.springframework.stereotype.Component;
  28. import java.util.List;
  29. @Slf4j
  30. @Component
  31. @RequiredArgsConstructor
  32. @ChannelHandler.Sharable
  33. public class CvimServerByteBufMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  34. private final TsiCvimServerConfig config;
  35. private final TsiNodeManager nodeManager;
  36. private final TsiAlarmManager alarmManager;
  37. private final TsiTpmsManager tpmsManager;
  38. private final TsiSessionManager sessionManager;
  39. private final TsiCvimPacketProcess packetProcess;
  40. private final TsiCvimDbmsProcess dbmsProcess;
  41. @Override
  42. protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
  43. try {
  44. long msec = TimeUtils.currentTimeSeconds();
  45. long nsec = System.nanoTime();
  46. if (byteBuf == null) {
  47. log.error("Receive: Packet frame packet error. length field data error");
  48. ctx.close();
  49. return;
  50. }
  51. final int readableBytes = byteBuf.readableBytes();
  52. if (readableBytes < TsiCpuPacket.SIZE_HEAD) {
  53. log.error("Receive: Packet readableBytes too small: {}, required min size: {}", readableBytes, TsiCpuPacket.SIZE_HEAD);
  54. ctx.close();
  55. return;
  56. }
  57. final String remoteIpAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
  58. long nodeId = byteBuf.getUnsignedInt(6);
  59. TsiNodeVo nodeVo = ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).get();
  60. if (nodeVo == null) {
  61. // not found node information from channel map
  62. nodeVo = attachNodeVo(nodeId, remoteIpAddress, ctx.channel());
  63. if (nodeVo == null) {
  64. // node id 가 0 이하로 설정되어 있거나, 노드 정보가 등록되어 있지 않은 경우
  65. if (nodeId <= 0) {
  66. byte[] buf = new byte[readableBytes];
  67. byteBuf.readBytes(buf);
  68. this.alarmManager.loggingUnknownNodePacket(nodeId, ctx.channel(), buf);
  69. }
  70. ctx.disconnect();
  71. ctx.close();
  72. return;
  73. }
  74. // first connection, save node information to channel attribute map
  75. ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(nodeVo);
  76. NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS);
  77. status.setServerId(this.config.getServerId());
  78. status.setNodeId(nodeId);
  79. status.setStatus(1);
  80. status.setIpAddr(NettyUtils.getRemoteIpAddress(ctx.channel()));
  81. if (this.sessionManager.isServerRun()) {
  82. this.dbmsProcess.add(status, (int)Thread.currentThread().getId());
  83. }
  84. else {
  85. log.error("Node Login but server not running: {}", status);
  86. }
  87. }
  88. nodeVo.setLastCommTm(System.currentTimeMillis()); // 통신 수신시각 저장
  89. if (!remoteIpAddress.equals(nodeVo.getIpAddr())) {
  90. // IP 주소가 변경된 경우
  91. nodeVo.setIpAddr(remoteIpAddress);
  92. NodeIpAddrVo nodeIpAddrVo = new NodeIpAddrVo(AbstractDbmsVo.DBMS_NODE_IP_UPDATE);
  93. nodeIpAddrVo.setNodeId(nodeVo.getNodeId());
  94. nodeIpAddrVo.setIpAddr(remoteIpAddress);
  95. this.dbmsProcess.add(nodeIpAddrVo, (int)Thread.currentThread().getId());
  96. this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
  97. }
  98. TsiCpuPacket packet = new TsiCpuPacket(nodeId, msec, nsec, ctx.channel());
  99. packet.setBuf(new byte[readableBytes]);
  100. byteBuf.readBytes(packet.getBuf());
  101. packet.setObj(nodeVo); // TsiNodeVo 객체를 저장
  102. // 작업큐로 데이터 전송
  103. this.packetProcess.add(packet, nodeVo.getIdx());
  104. // 작업큐 데이터 전송 시각 셋
  105. packet.setAdd(System.nanoTime());
  106. // 패킷 통계정보 생성
  107. this.tpmsManager.readPacket(packet);
  108. if (nodeVo.isDump()) {
  109. log.info("RECV: {}, {}", nodeVo.getNodeId(), SysUtils.byteArrayToHex(packet.getBuf()));
  110. }
  111. }
  112. catch (Exception e) {
  113. log.error("decode Exception: {}", e.getMessage());
  114. }
  115. }
  116. private TsiNodeVo attachNodeVo(long nodeId, String remoteIpAddress, Channel channel) {
  117. long start = System.nanoTime();
  118. // 노드맵에서 노드 정보를 찾는다.
  119. TsiNodeVo nodeVo = this.nodeManager.get(nodeId);
  120. if (nodeVo == null) {
  121. // 메모리맵에 노드정보가 존재하지 않은 경우 로깅
  122. this.alarmManager.loggingUnknownNode(nodeId, channel);
  123. if (nodeId <= 0) {
  124. return null; // 잘못된 노드 ID인 경우 null 반환
  125. }
  126. // 등록되어 있지 않은 노드 ID로 접속한 경우(노드 ID가 0 이하인 경우 제외)
  127. // 신규로 해당 노드정보 메모리를 생성해서 메모리 맵에 추가한다.
  128. nodeVo = new TsiNodeVo(nodeId, remoteIpAddress, false, true, true);
  129. nodeVo.setCheckInstalled(true);
  130. nodeVo.setRegistered(false);
  131. this.nodeManager.put(nodeId, nodeVo);
  132. this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
  133. // oops 알람 설정 되어 있으면 알람테이블에 저장하자.
  134. if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_01)) {
  135. AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
  136. alarm.setAlarmCode(TsiAlarmConfigVo.COMM_01);
  137. alarm.setAlarmTarget(Long.toString(nodeId));
  138. alarm.setAlarmValue(remoteIpAddress);
  139. this.dbmsProcess.add(alarm, (int)Thread.currentThread().getId());
  140. }
  141. }
  142. // 채널 목록에 접속 정보 등록
  143. nodeVo.setConnect(channel);
  144. // 채널 맵에 채널에 대한 노드정보를 저장한다.
  145. this.sessionManager.addChannel(channel, nodeVo);
  146. TsiChannelSession.objectRegistered(nodeVo, remoteIpAddress);
  147. // log.info("Node: {}, Object Register: {}, {}", nodeId, TimeUtils.elapsedTime(start), Thread.currentThread().getId());
  148. return nodeVo;
  149. }
  150. }