123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package com.tsi.comm.server.tcp.codec;
- import com.tsi.app.common.utils.SysUtils;
- import com.tsi.app.common.utils.TimeUtils;
- import com.tsi.app.common.xnet.NettyUtils;
- import com.tsi.comm.server.config.TsiCvimServerConfig;
- import com.tsi.comm.server.mybatis.vo.AbstractDbmsVo;
- import com.tsi.comm.server.mybatis.vo.AlarmOccrVo;
- import com.tsi.comm.server.mybatis.vo.NodeIpAddrVo;
- import com.tsi.comm.server.mybatis.vo.NodeStatusVo;
- import com.tsi.comm.server.process.dbms.TsiCvimDbmsProcess;
- import com.tsi.comm.server.process.packet.TsiChannelSession;
- import com.tsi.comm.server.process.packet.TsiCvimPacketProcess;
- import com.tsi.comm.server.protocol.TsiCpuPacket;
- import com.tsi.comm.server.repository.TsiAlarmManager;
- import com.tsi.comm.server.repository.TsiNodeManager;
- import com.tsi.comm.server.repository.TsiSessionManager;
- import com.tsi.comm.server.repository.TsiTpmsManager;
- import com.tsi.comm.server.vo.TsiAlarmConfigVo;
- import com.tsi.comm.server.vo.TsiNodeVo;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToMessageDecoder;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import java.util.List;
- @Slf4j
- @Component
- @RequiredArgsConstructor
- @ChannelHandler.Sharable
- public class CvimServerByteBufMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
- private final TsiCvimServerConfig config;
- private final TsiNodeManager nodeManager;
- private final TsiAlarmManager alarmManager;
- private final TsiTpmsManager tpmsManager;
- private final TsiSessionManager sessionManager;
- private final TsiCvimPacketProcess packetProcess;
- private final TsiCvimDbmsProcess dbmsProcess;
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
- try {
- long msec = TimeUtils.currentTimeSeconds();
- long nsec = System.nanoTime();
- if (byteBuf == null) {
- log.error("Receive: Packet frame packet error. length field data error");
- ctx.close();
- return;
- }
- final int readableBytes = byteBuf.readableBytes();
- if (readableBytes < TsiCpuPacket.SIZE_HEAD) {
- log.error("Receive: Packet readableBytes too small: {}, required min size: {}", readableBytes, TsiCpuPacket.SIZE_HEAD);
- ctx.close();
- return;
- }
- final String remoteIpAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
- long nodeId = byteBuf.getUnsignedInt(6);
- TsiNodeVo nodeVo = ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).get();
- if (nodeVo == null) {
- // not found node information from channel map
- nodeVo = attachNodeVo(nodeId, remoteIpAddress, ctx.channel());
- if (nodeVo == null) {
- // node id 가 0 이하로 설정되어 있거나, 노드 정보가 등록되어 있지 않은 경우
- if (nodeId <= 0) {
- byte[] buf = new byte[readableBytes];
- byteBuf.readBytes(buf);
- this.alarmManager.loggingUnknownNodePacket(nodeId, ctx.channel(), buf);
- }
- ctx.disconnect();
- ctx.close();
- return;
- }
- // first connection, save node information to channel attribute map
- ctx.channel().attr(TsiSessionManager.TSI_NODE_ATTRIBUTE_KEY).set(nodeVo);
- NodeStatusVo status = new NodeStatusVo(AbstractDbmsVo.DBMS_NODE_STATUS);
- status.setServerId(this.config.getServerId());
- status.setNodeId(nodeId);
- status.setStatus(1);
- status.setIpAddr(NettyUtils.getRemoteIpAddress(ctx.channel()));
- if (this.sessionManager.isServerRun()) {
- this.dbmsProcess.add(status, (int)Thread.currentThread().getId());
- }
- else {
- log.error("Node Login but server not running: {}", status);
- }
- }
- nodeVo.setLastCommTm(System.currentTimeMillis()); // 통신 수신시각 저장
- if (!remoteIpAddress.equals(nodeVo.getIpAddr())) {
- // IP 주소가 변경된 경우
- nodeVo.setIpAddr(remoteIpAddress);
- NodeIpAddrVo nodeIpAddrVo = new NodeIpAddrVo(AbstractDbmsVo.DBMS_NODE_IP_UPDATE);
- nodeIpAddrVo.setNodeId(nodeVo.getNodeId());
- nodeIpAddrVo.setIpAddr(remoteIpAddress);
- this.dbmsProcess.add(nodeIpAddrVo, (int)Thread.currentThread().getId());
- this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
- }
- TsiCpuPacket packet = new TsiCpuPacket(nodeId, msec, nsec, ctx.channel());
- packet.setBuf(new byte[readableBytes]);
- byteBuf.readBytes(packet.getBuf());
- packet.setObj(nodeVo); // TsiNodeVo 객체를 저장
- // 작업큐로 데이터 전송
- this.packetProcess.add(packet, nodeVo.getIdx());
- // 작업큐 데이터 전송 시각 셋
- packet.setAdd(System.nanoTime());
- // 패킷 통계정보 생성
- this.tpmsManager.readPacket(packet);
- if (nodeVo.isDump()) {
- log.info("RECV: {}, {}", nodeVo.getNodeId(), SysUtils.byteArrayToHex(packet.getBuf()));
- }
- }
- catch (Exception e) {
- log.error("decode Exception: {}", e.getMessage());
- }
- }
- private TsiNodeVo attachNodeVo(long nodeId, String remoteIpAddress, Channel channel) {
- long start = System.nanoTime();
- // 노드맵에서 노드 정보를 찾는다.
- TsiNodeVo nodeVo = this.nodeManager.get(nodeId);
- if (nodeVo == null) {
- // 메모리맵에 노드정보가 존재하지 않은 경우 로깅
- this.alarmManager.loggingUnknownNode(nodeId, channel);
- if (nodeId <= 0) {
- return null; // 잘못된 노드 ID인 경우 null 반환
- }
- // 등록되어 있지 않은 노드 ID로 접속한 경우(노드 ID가 0 이하인 경우 제외)
- // 신규로 해당 노드정보 메모리를 생성해서 메모리 맵에 추가한다.
- nodeVo = new TsiNodeVo(nodeId, remoteIpAddress, false, true, true);
- nodeVo.setCheckInstalled(true);
- nodeVo.setRegistered(false);
- this.nodeManager.put(nodeId, nodeVo);
- this.nodeManager.putIpAddr(remoteIpAddress, nodeVo); // IP 주소 갱신
- // oops 알람 설정 되어 있으면 알람테이블에 저장하자.
- if (this.alarmManager.checkAlarm(TsiAlarmConfigVo.COMM_01)) {
- AlarmOccrVo alarm = new AlarmOccrVo(AbstractDbmsVo.DBMS_ALARM_OCCR_HS);
- alarm.setAlarmCode(TsiAlarmConfigVo.COMM_01);
- alarm.setAlarmTarget(Long.toString(nodeId));
- alarm.setAlarmValue(remoteIpAddress);
- this.dbmsProcess.add(alarm, (int)Thread.currentThread().getId());
- }
- }
- // 채널 목록에 접속 정보 등록
- nodeVo.setConnect(channel);
- // 채널 맵에 채널에 대한 노드정보를 저장한다.
- this.sessionManager.addChannel(channel, nodeVo);
- TsiChannelSession.objectRegistered(nodeVo, remoteIpAddress);
- // log.info("Node: {}, Object Register: {}, {}", nodeId, TimeUtils.elapsedTime(start), Thread.currentThread().getId());
- return nodeVo;
- }
- }
|