Bläddra i källkod

center communication add

shjung 3 år sedan
förälder
incheckning
f5d06b9b1e

+ 27 - 7
src/main/java/com/its/app/utils/SysUtils.java

@@ -205,21 +205,41 @@ public final class SysUtils
 		if (byteOrder == ByteOrder.BIG_ENDIAN) {
 			return (
 					((bytes[fromIdx+0] & 0xFF) << 24) |
-							((bytes[fromIdx+1] & 0xFF) << 16) |
-							((bytes[fromIdx+2] & 0xFF) << 8 ) |
-							((bytes[fromIdx+3] & 0xFF) << 0 )
+					((bytes[fromIdx+1] & 0xFF) << 16) |
+					((bytes[fromIdx+2] & 0xFF) << 8 ) |
+					((bytes[fromIdx+3] & 0xFF) << 0 )
 			);
 		}
 		return (
 				((bytes[fromIdx+3] & 0xFF) << 24) |
-						((bytes[fromIdx+2] & 0xFF) << 16) |
-						((bytes[fromIdx+1] & 0xFF) << 8 ) |
-						((bytes[fromIdx+0] & 0xFF) << 0 )
+				((bytes[fromIdx+2] & 0xFF) << 16) |
+				((bytes[fromIdx+1] & 0xFF) << 8 ) |
+				((bytes[fromIdx+0] & 0xFF) << 0 )
 		);
 	}
-
 	public static byte[] intToBytes(int value) {
 		// BIG_ENDIAN
 		return  ByteBuffer.allocate(4).putInt(value).array();
 	}
+
+	public static int bytesToShort(byte[] bytes) {
+		return ByteBuffer.wrap(bytes).getShort();
+	}
+	public static int bytesToShort(byte[] bytes, int fromIdx, ByteOrder byteOrder) {
+
+		if (byteOrder == ByteOrder.BIG_ENDIAN) {
+			return (
+					((bytes[fromIdx+0] & 0xFF) << 8) |
+					((bytes[fromIdx+1] & 0xFF) << 0 )
+			);
+		}
+		return (
+				((bytes[fromIdx+1] & 0xFF) << 8 ) |
+				((bytes[fromIdx+0] & 0xFF) << 0 )
+		);
+	}
+	public static byte[] shortToBytes(short value) {
+		// BIG_ENDIAN
+		return  ByteBuffer.allocate(2).putShort(value).array();
+	}
 }

+ 8 - 1
src/main/java/com/its/vds/VdsCommServerApplication.java

@@ -5,6 +5,7 @@ import com.its.vds.config.ProcessConfig;
 import com.its.vds.process.DbmsJobProcess;
 import com.its.vds.service.UnitSystService;
 import com.its.vds.service.VdsCtlrService;
+import com.its.vds.xnettcp.center.CenterTcpServerService;
 import com.its.vds.xnettcp.vds.VdsTcpClientCommService;
 import com.its.vds.xnettcp.vds.process.VdsDataProcess;
 import lombok.extern.slf4j.Slf4j;
@@ -78,7 +79,10 @@ public class VdsCommServerApplication implements CommandLineRunner, ApplicationL
         ctlrService.updateCtlrStts(true);
 
         VdsTcpClientCommService vdsCommClientService = (VdsTcpClientCommService)AppUtils.getBean(VdsTcpClientCommService.class);
-        vdsCommClientService.run();
+        //vdsCommClientService.run();
+
+        CenterTcpServerService centerService = (CenterTcpServerService)AppUtils.getBean(CenterTcpServerService.class);
+        centerService.run();
 //
 //        UdpServerCenterComm udpServerCenterComm = (UdpServerCenterComm)AppUtils.getBean(UdpServerCenterComm.class);
 //        udpServerCenterComm.run();
@@ -107,6 +111,9 @@ public class VdsCommServerApplication implements CommandLineRunner, ApplicationL
         VdsCtlrService ctlrService = (VdsCtlrService) AppUtils.getBean(VdsCtlrService.class);
         ctlrService.updateCtlrStts(false);
 
+        CenterTcpServerService centerService = (CenterTcpServerService)AppUtils.getBean(CenterTcpServerService.class);
+        centerService.stop();
+
         VdsTcpClientCommService vdsCommClientService = (VdsTcpClientCommService)AppUtils.getBean(VdsTcpClientCommService.class);
         vdsCommClientService.shutdown();
 

+ 43 - 0
src/main/java/com/its/vds/config/CenterCommConfig.java

@@ -19,9 +19,52 @@ public class CenterCommConfig {
 
     private boolean enable = true;
     private int listenPort = 9901;
+    protected String bindingAddr = "0.0.0.0";
+    protected int backlog = 0;
+    protected int acceptThreads = 0;
+    protected int workerThreads = 0;
+    protected int rcvBuf = 0;
+    protected int sndBuf = 0;
+    protected int readerIdleTimeSeconds = 0;
+    protected int writerIdleTimeSeconds = 0;
+    protected int allIdleTimeSeconds = 0;
+    protected int connectTimeoutSeconds = 0;
 
     @PostConstruct
     private void init() {
+        final int DEFAULT_EVENT_THREADS  = Runtime.getRuntime().availableProcessors() * 2;
+        if (this.bindingAddr.equals("")) {
+            this.bindingAddr = "0.0.0.0";
+        }
+        if (this.backlog == 0) {
+            this.backlog = 1024;
+        }
+        /*if (this.acceptThreads == 0) {
+            if (MAX_CORE <= 8) {
+                this.acceptThreads = 4;
+            } else if (MAX_CORE >= 32) {
+                this.acceptThreads = 12;
+            } else {
+                this.acceptThreads = 6;
+            }
+        }
+        if (this.workerThreads == 0) {
+            this.workerThreads = totalThreads - this.acceptThreads;
+        }*/
+        if (this.acceptThreads == 0) {
+            this.acceptThreads = DEFAULT_EVENT_THREADS;
+        }
+        if (this.workerThreads == 0) {
+            this.workerThreads = DEFAULT_EVENT_THREADS;
+        }
+
+        if (this.rcvBuf == 0) {
+            this.rcvBuf = Short.MAX_VALUE / 2;
+        }
+        if (this.sndBuf == 0) {
+            this.sndBuf = Short.MAX_VALUE / 2;
+        }
+
         log.info("{}", this);
     }
 

+ 42 - 4
src/main/java/com/its/vds/entity/TbVdsCtlr.java

@@ -12,9 +12,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 @Slf4j
 @Getter
@@ -62,6 +60,47 @@ public class TbVdsCtlr {
 	private VdsReqTemperature reqTemperature = null;
 	private VdsReqTraffic reqTraffic = null;
 
+	private boolean requestStopImage = false;
+	private List<Channel> requestImageList = new ArrayList<>();
+	private int frameNo;
+	private int cameraNo;
+	private int imageSize;
+	public void addImageSize(int imageSize) {
+		this.imageSize += imageSize;
+	}
+	public void setImageSize(int imageSize) {
+		this.imageSize = imageSize;
+	}
+	public int getImageSize() {
+		return this.imageSize;
+	}
+	public int getFrameNo() {
+		return this.frameNo;
+	}
+	public int getCameraNo() {
+		return this.cameraNo;
+	}
+	public List<Channel> getRequestImageList() {
+		return this.requestImageList;
+	}
+	public void setStopImageRequest(Channel channel, int cameraNo, int frameNo) {
+		if (!this.requestStopImage) {
+			this.requestImageList.clear();
+			this.requestImageList = new ArrayList<>();
+			this.cameraNo = cameraNo;
+			this.frameNo = frameNo;
+			this.imageSize = 0;
+		}
+		this.requestImageList.add(channel);
+		this.requestStopImage = true;
+	}
+	public void setStopImageResponse() {
+		this.requestStopImage = false;
+		this.cameraNo = 0;
+		this.frameNo = 0;
+		this.imageSize = 0;
+	}
+
 	public TbVdsCtlr() {
 		this.vdsDtctMap = Collections.synchronizedMap(new HashMap<String, TbVdsDtct>());
 		this.stts = new TbVdsCtlrStts();
@@ -170,7 +209,6 @@ public class TbVdsCtlr {
 			} else {
 				log.error("SEND_0: [{}], {}: {} sendBytes, Failed. [{}]", this.VDS_CTLR_IP, packetDesc, sendBuff.array().length, this.VDS_CTLR_ID);
 			}
-
 		}
 		return result;
 	}

+ 108 - 0
src/main/java/com/its/vds/xnettcp/center/CenterTcpServerService.java

@@ -0,0 +1,108 @@
+package com.its.vds.xnettcp.center;
+
+import com.its.app.utils.NettyUtils;
+import com.its.app.utils.OS;
+import com.its.vds.config.CenterCommConfig;
+import com.its.vds.xnettcp.center.codec.CenterTcpServerEncoder;
+import com.its.vds.xnettcp.center.handler.CenterTcpServerInboundHandler;
+import com.its.vds.xnettcp.center.initializer.CenterTcpServerInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@RequiredArgsConstructor
+@Service
+public class CenterTcpServerService {
+
+    private final CenterCommConfig config;
+    private final CenterTcpServerInboundHandler centerTcpServerInboundHandler;
+    private final CenterTcpServerEncoder centerTcpServerEncoder;
+
+    private EventLoopGroup acceptGroups = null;
+    private EventLoopGroup workerGroups = null;
+    private ServerBootstrap serverBootstrap = null;
+    private ChannelFuture channelFuture = null;
+
+    public void run() {
+        if (!OS.isWindows()) {
+            if (!Epoll.isAvailable()) {
+                Epoll.unavailabilityCause().printStackTrace();
+            }
+        }
+        if (NettyUtils.isEpollAvailable()) {
+            log.info("서버가 리눅스 EPOLL 모드에서 실행됩니다.");
+        }
+        else {
+            log.info("서버가 윈도우 NIO 모드에서 실행됩니다.");
+        }
+
+        this.serverBootstrap = createBootstrap();
+
+        log.info("************************************************************************************");
+        log.info("**                   Center Communication Server Information                      **");
+        log.info("**     bindAddress: {}", this.config.getBindingAddr());
+        log.info("**      listenPort: {}", this.config.getListenPort());
+        log.info("**         backlog: {}", this.config.getBacklog());
+        log.info("**   acceptThreads: {}", this.config.getAcceptThreads());
+        log.info("**   workerThreads: {}", this.config.getWorkerThreads());
+        log.info("************************************************************************************");
+
+        try {
+            if (this.config.getBindingAddr().equals("0.0.0.0")) {
+                this.channelFuture = this.serverBootstrap.bind(this.config.getListenPort());
+            }
+            else {
+                this.channelFuture = this.serverBootstrap.bind(this.config.getBindingAddr(), config.getListenPort());
+            }
+        }
+        catch (Exception e) {
+            log.error("start, InterruptedException");
+            this.acceptGroups.shutdownGracefully();
+            this.workerGroups.shutdownGracefully();
+        }
+    }
+
+    public void stop() {
+        try {
+            this.acceptGroups.shutdownGracefully().sync();
+            this.workerGroups.shutdownGracefully().sync();
+            this.channelFuture.channel().closeFuture().sync();
+        } catch (InterruptedException e) {
+            log.error("stop, InterruptedException");
+        }
+    }
+
+    public ServerBootstrap createBootstrap() {
+        final int DEFAULT_EVENT_THREADS  = Runtime.getRuntime().availableProcessors() * 2;
+        ServerBootstrap serverBootstrap = new ServerBootstrap();
+        EventLoopGroup acceptGroups;
+        EventLoopGroup workerGroups;
+
+        acceptGroups = NettyUtils.newEventLoopGroup(config.getAcceptThreads(), "Accept");
+        workerGroups = NettyUtils.newEventLoopGroup(config.getWorkerThreads(), "Worker");
+        serverBootstrap.channel(NettyUtils.getServerSocketChannel());
+        serverBootstrap.group(acceptGroups, workerGroups);
+
+        serverBootstrap.option(ChannelOption.AUTO_READ, true);
+        serverBootstrap.option(ChannelOption.SO_BACKLOG, config.getBacklog());
+        serverBootstrap.option(ChannelOption.SO_RCVBUF, config.getRcvBuf());
+        serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
+        serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeoutSeconds()*1000);
+
+        serverBootstrap.childOption(ChannelOption.SO_LINGER, 0);           // 4way-handshake 비활성
+        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, false);    // KEEPALIVE 비활성(활성: true)
+        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);     // 소켓 재사용
+        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);      // Nagle 알고리즘 비활성화
+        serverBootstrap.childHandler((ChannelHandler)new CenterTcpServerInitializer(this.centerTcpServerInboundHandler, this.centerTcpServerEncoder));
+
+        return serverBootstrap;
+    }
+
+}

+ 17 - 0
src/main/java/com/its/vds/xnettcp/center/codec/CenterTcpServerDecoder.java

@@ -0,0 +1,17 @@
+package com.its.vds.xnettcp.center.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class CenterTcpServerDecoder extends ByteToMessageDecoder {
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+
+    }
+}

+ 19 - 0
src/main/java/com/its/vds/xnettcp/center/codec/CenterTcpServerEncoder.java

@@ -0,0 +1,19 @@
+package com.its.vds.xnettcp.center.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class CenterTcpServerEncoder extends MessageToByteEncoder<Object> {
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
+
+    }
+}

+ 57 - 0
src/main/java/com/its/vds/xnettcp/center/handler/CenterTcpServerIdleStateHandler.java

@@ -0,0 +1,57 @@
+package com.its.vds.xnettcp.center.handler;
+
+import com.its.app.utils.NettyUtils;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@ChannelHandler.Sharable
+public class CenterTcpServerIdleStateHandler extends IdleStateHandler {
+
+    public CenterTcpServerIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
+        super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        String ipAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
+        int port = NettyUtils.getRemotePort(ctx.channel());
+        log.info("Connect: {}", NettyUtils.getRemoteAddress(ctx.channel()));
+    }
+
+    /*
+     *  클라이언트와의 연결 종료 처리
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        String ipAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
+        int port = NettyUtils.getRemotePort(ctx.channel());
+        log.info("Disconnect: {}", NettyUtils.getRemoteAddress(ctx.channel()));
+
+        super.channelInactive(ctx);
+    }
+
+    /*
+     *  Remote 에서 연결을 강제로 종료하면 이벤트 발생
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        String ipAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
+        int port = NettyUtils.getRemotePort(ctx.channel());
+        log.info("exceptionCaught: {}", NettyUtils.getRemoteAddress(ctx.channel()));
+        super.exceptionCaught(ctx, cause);
+    }
+
+    /*
+     *  클라이언트와의 연결 종료를 위해 호출하는 함수
+     */
+    public static void disconnectChannel(Channel channel) {
+        String ipAddress = NettyUtils.getRemoteIpAddress(channel);
+        int port = NettyUtils.getRemotePort(channel);
+        log.info("disconnectChannel: {}", NettyUtils.getRemoteAddress(channel));
+    }
+
+}

+ 28 - 0
src/main/java/com/its/vds/xnettcp/center/handler/CenterTcpServerInboundHandler.java

@@ -0,0 +1,28 @@
+package com.its.vds.xnettcp.center.handler;
+
+import com.its.app.utils.NettyUtils;
+import com.its.vds.xnettcp.vds.protocol.VdsResFramePacket;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class CenterTcpServerInboundHandler extends ChannelInboundHandlerAdapter {
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+        String ipAddress = NettyUtils.getRemoteIpAddress(ctx.channel());
+        if (!(msg instanceof VdsResFramePacket)) {
+            log.error("[{}] | Received Data is not VdsResFramePacket Object", NettyUtils.getRemoteIpAddress(ctx.channel()));
+            return;
+        }
+
+//        VdsResFramePacket resFramePacket = (VdsResFramePacket)msg;
+//        this.vdsDataProcess.add(new VdsData(resFramePacket.getVdsObj(), ipAddress, ctx, resFramePacket));
+    }
+
+}

+ 22 - 0
src/main/java/com/its/vds/xnettcp/center/handler/CenterTcpServerOutboundHandler.java

@@ -0,0 +1,22 @@
+package com.its.vds.xnettcp.center.handler;
+
+import io.netty.channel.*;
+import org.springframework.stereotype.Component;
+
+@Component
+@ChannelHandler.Sharable
+public class CenterTcpServerOutboundHandler extends ChannelOutboundHandlerAdapter {
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+        promise.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture f) {
+                if (!f.isSuccess()) {
+                    f.cause().printStackTrace();
+                    f.channel().close();
+                }
+            }
+        });
+    }
+}

+ 27 - 0
src/main/java/com/its/vds/xnettcp/center/initializer/CenterTcpServerInitializer.java

@@ -0,0 +1,27 @@
+package com.its.vds.xnettcp.center.initializer;
+
+import com.its.vds.xnettcp.center.codec.CenterTcpServerDecoder;
+import com.its.vds.xnettcp.center.codec.CenterTcpServerEncoder;
+import com.its.vds.xnettcp.center.handler.CenterTcpServerIdleStateHandler;
+import com.its.vds.xnettcp.center.handler.CenterTcpServerInboundHandler;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class CenterTcpServerInitializer extends ChannelInitializer<Channel> {
+
+    private final CenterTcpServerInboundHandler centerTcpServerInboundHandler;
+    private final CenterTcpServerEncoder centerTcpServerEncoder;
+
+    @Override
+    protected void initChannel(Channel channel) throws Exception {
+        CenterTcpServerIdleStateHandler idleStateHandler = new CenterTcpServerIdleStateHandler(0, 0, 0);
+        ChannelPipeline pipeline = channel.pipeline();
+        pipeline.addLast("centerServerIdleStateHandler", idleStateHandler);
+        pipeline.addLast("centerServerDecoder", new CenterTcpServerDecoder());
+        pipeline.addLast("centerServerInboundHandler", this.centerTcpServerInboundHandler);
+        pipeline.addLast("centerServerEncoder", this.centerTcpServerEncoder);
+    }
+}

+ 50 - 0
src/main/java/com/its/vds/xnettcp/center/protocol/CenterReqFrameHead.java

@@ -0,0 +1,50 @@
+package com.its.vds.xnettcp.center.protocol;
+
+import com.its.vds.xnettcp.vds.protocol.VdsProtocol;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.ByteBuffer;
+
+@Slf4j
+@Getter
+public class CenterReqFrameHead {
+
+    public static int SIZE = 7;
+    public static int OPCODE_POS = 6;
+
+    private final byte dle = VdsProtocol.vds_DLE;
+    private final byte stx = VdsProtocol.vds_STX;
+
+    private short groupNo;
+    private short controllerNo;
+    private byte  opCode;
+
+    public CenterReqFrameHead() {
+        this.groupNo = 0;
+        this.controllerNo = 0;
+        this.opCode = 0;
+    }
+
+    public CenterReqFrameHead(short groupNo, short controllerNo, byte opCode) {
+        this.groupNo = groupNo;
+        this.controllerNo = controllerNo;
+        this.opCode = opCode;
+    }
+
+    public CenterReqFrameHead(byte[] headBytes) {
+        if (headBytes != null && headBytes.length >= SIZE) {
+            ByteBuffer byteBuffer = ByteBuffer.wrap(headBytes);
+            byteBuffer.order(VdsProtocol.byteOrder);
+            byte dle = byteBuffer.get();
+            byte stx = byteBuffer.get();
+            this.groupNo = byteBuffer.getShort();
+            this.controllerNo = byteBuffer.getShort();
+            this.opCode = byteBuffer.get();
+        }
+        else {
+            log.error("VdsReqFrameHead: create failed, headBytes error");
+        }
+    }
+
+}

+ 84 - 0
src/main/java/com/its/vds/xnettcp/vds/process/Job_Image.java

@@ -1,9 +1,18 @@
 package com.its.vds.xnettcp.vds.process;
 
+import com.its.app.utils.NettyUtils;
+import com.its.vds.entity.TbVdsCtlr;
+import com.its.vds.xnettcp.vds.protocol.VdsProtocol;
+import com.its.vds.xnettcp.vds.protocol.VdsReqImage;
 import com.its.vds.xnettcp.vds.protocol.VdsResFramePacket;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 @Slf4j
 @RequiredArgsConstructor
 public class Job_Image implements JobProtocol {
@@ -12,7 +21,82 @@ public class Job_Image implements JobProtocol {
 
 	@Override
 	public int parse() {
+		// 정지영상 데이터 수신
+		if (this.packet == null || this.packet.getVdsObj() == null) {
+			log.error("Job_Image, packet or object data null");
+			return 0;
+		}
+
+		byte[] body = this.packet.getBody();
+		if (body == null || body.length < 6) {
+			log.error("Job_Image, [{}]: Data Length Error, {}", this.packet.getObjectInfo(), (body == null) ? 0 : body.length);
+			return 0;
+		}
+
+		TbVdsCtlr obj = this.packet.getVdsObj();
+		ByteBuffer byteBuffer = ByteBuffer.wrap(body);
+		byteBuffer.order(VdsProtocol.byteOrder);
+
+		int total = byteBuffer.getShort();
+		int current = byteBuffer.getShort();
+		int dataLen = byteBuffer.getShort();
+
+		log.info("Job_Image: image data receive: total {}, current {}, dataLen {}", total, current, dataLen);
+
+		if (dataLen > 6) {
+			int imageSize = dataLen - 6;
+			byte[] imageData = new byte[imageSize-6];
+			byteBuffer.get(imageData);
+			obj.addImageSize(imageSize);
+		}
+
+		if (total == current) {
+			log.info("Job_Image, completed image data receive: {}", obj.getImageSize());
+			List<Channel> channels = obj.getRequestImageList();
+
+			byte[] head1 = new byte[10];
+			byte[] head2 = new byte[172];
+			byte[] data  = new byte[33];
+
+			ByteBuffer buffer1 = ByteBuffer.wrap(head1);
+
+			ByteBuffer buffer2 = ByteBuffer.wrap(head2);
+			ByteBuffer buffer3 = ByteBuffer.wrap(data);
+			channels.forEach(channel -> {
+				sendResponse(channel, buffer1);
+			});
+			channels.forEach(channel -> {
+				sendResponse(channel, buffer2);
+			});
+			channels.forEach(channel -> {
+				sendResponse(channel, buffer3);
+			});
+		}
+		else {
+			VdsReqImage reqImage = new VdsReqImage((short)obj.getGROUP_NO(), (short)obj.getVDS_CTLR_LOCAL_NO());
+			reqImage.setCameraNo(obj.getCameraNo());
+			reqImage.setFrameNo(current+1);
+			reqImage.makeCRC();
+			ByteBuffer sendBuffer = reqImage.getByteBuffer();
+			if (!obj.sendData(sendBuffer, 0, "vds_Image")) {
+				log.error("Job_Image, request data failed: total {}, frameNo {}", total, current+1);
+				obj.setStopImageResponse();
+			}
+		}
 		return 0;
 	}
 
+	public boolean sendResponse(Channel channel, ByteBuffer sendBuff) {
+		boolean result = false;
+		if (channel != null) {
+			ChannelFuture f = channel.writeAndFlush(sendBuff);
+			f.awaitUninterruptibly();
+			if (f.isDone() || f.isSuccess()) {
+				result = true;
+			} else {
+				log.error("Job_Image, sendResponse: {} sendBytes, Failed. {}", sendBuff.array().length, NettyUtils.getRemoteAddress(channel));
+			}
+		}
+		return result;
+	}
 }