|
|
@@ -0,0 +1,161 @@
|
|
|
+package com.its.cctv.xnettcp.cctvserver.process;
|
|
|
+
|
|
|
+import com.its.app.AppUtils;
|
|
|
+import com.its.app.utils.NettyUtils;
|
|
|
+import com.its.cctv.config.RunningConfig;
|
|
|
+import com.its.cctv.entity.TbCctvCtlr;
|
|
|
+import com.its.cctv.global.AppRepository;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.slf4j.MDC;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@RequiredArgsConstructor
|
|
|
+@Service
|
|
|
+public class CctvServerDataProcess {
|
|
|
+
|
|
|
+ private final RunningConfig runningConfig;
|
|
|
+
|
|
|
+ public static LinkedBlockingQueue<CctvServerData> SERVER_DATA_QUEUE = new LinkedBlockingQueue<>(1000);
|
|
|
+ private ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);
|
|
|
+ int MAX_CORE = Runtime.getRuntime().availableProcessors();
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ private void init() {
|
|
|
+ }
|
|
|
+ public void run() {
|
|
|
+ log.info("CctvServerDataProcess.run: Start.");
|
|
|
+ if (this.MAX_CORE < 8) {
|
|
|
+ this.MAX_CORE = 8;
|
|
|
+ }
|
|
|
+// ItsThreadPoolInitializer poolInitializer = (ItsThreadPoolInitializer) AppUtils.getBean(ItsThreadPoolInitializer.class);
|
|
|
+ int executePool = this.MAX_CORE;//Math.max(this.MAX_CORE, poolInitializer.getThreadPoolWork());
|
|
|
+ for (int ii = 0; ii < executePool; ii++) {
|
|
|
+ log.info("CctvServerDataProcess.Task: {}", ii);
|
|
|
+ this.taskExecutor.execute(() -> {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ CctvServerData serverData = CctvServerDataProcess.SERVER_DATA_QUEUE.take();
|
|
|
+ if (serverData != null) {
|
|
|
+ CctvServerDataTask handler = (CctvServerDataTask) AppUtils.getBean(CctvServerDataTask.class);
|
|
|
+ handler.run(this, serverData);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ log.error("CctvServerDataProcess.Task: Received data null");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ log.error("CctvServerDataProcess.Task: Exception: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("CctvServerDataProcess.run: ..End.");
|
|
|
+ }
|
|
|
+
|
|
|
+ public void process(CctvServerData data) {
|
|
|
+ try {
|
|
|
+ ChannelHandlerContext ctx = data.getCtx();
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ //String tcpAddress = NettyUtils.getTcpAddress(channel); // local/remote address 를 폼함한 문자열
|
|
|
+ String ipAddress = NettyUtils.getRemoteIpAddress(channel);
|
|
|
+ TbCctvCtlr obj = AppRepository.getInstance().getCtlrIpMap().get(ipAddress);;
|
|
|
+// C2CAuthenticatedMessage c2c = (C2CAuthenticatedMessage)data.getData();
|
|
|
+
|
|
|
+// if (obj == null || channel == null || c2c == null) {
|
|
|
+// log.error("CctvServerDataProcess.process: {}, data null. controller: {}, channel: {}, c2c: {{}", ipAddress, obj, channel, c2c);
|
|
|
+// return;
|
|
|
+// }
|
|
|
+
|
|
|
+ MDC.put("id", obj.getLogKey());
|
|
|
+// eAuthInfo cmd = eAuthInfo.getByValue(c2c.getDatexAuthenticationInfoText().value[0]);
|
|
|
+
|
|
|
+// log.info("CctvServerDataProcess.process: {}, {}, ThreadId: {}", ipAddress, cmd.toString(), Thread.currentThread().getId());
|
|
|
+
|
|
|
+// DsrcAsn1Response response = null;
|
|
|
+// switch (cmd) {
|
|
|
+// case AI_Initiate : //(0x01, "AI_Initiate"), /* 초기 통신연결을 위한 개시 요청 데이터 패킷 */
|
|
|
+// // 서버모드 처리내용 없음(평택 서버인데 클라이언트가 서버로 동작 => 로그인 요청해야함)
|
|
|
+// response = new InitiateResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Login : //(0x02, "AI_Login"), /* 서버에 접속하기 위한 클라이언트의 로그인 데이터 패킷 */
|
|
|
+// response = new LoginResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_FrED : //(0x03, "AI_FrED"), /* 서버와 클라이언트의 연결을 유지하기 위한 확인 데이터 패킷 */
|
|
|
+// response = new FredResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Terminate : //(0x04, "AI_Terminate"), /* 연결을 종료하고자 할 때, 서버에서 클라이언트에 요청하는 데이터 패킷 */
|
|
|
+// response = new TerminateResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Logout : //(0x05, "AI_Logout"), /* 접속을 종료하기 위한 클라이언트의 로그아웃 데이터 패킷 */
|
|
|
+// response = new LogoutResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Subscription: //(0x06, "AI_Subscription"), /* 클라이언트가 서버에 정보를 요청할 경우 송신하는 데이터 패킷 */
|
|
|
+// response = new SubscriptionResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Publication : //(0x07, "AI_Publication"), /* 클라이언트가 요청한 정보를 제공하기 위한 데이터 패킷 - 요청에 대한 정보공개*/
|
|
|
+// response = new PublicationResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Accept : //(0x09, "AI_Accept"), /* 클라이언트의 요청에 대한 수용 */
|
|
|
+// response = new AcceptResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Reject : //(0x0A, "AI_Reject"); /* 클라이언트의 요청에 대한 거부 */
|
|
|
+// // 운영단말 명령에 대한 거부도 발생할 수 있으므로 운영단말로 결과를 전송한다.
|
|
|
+// response = new RejectResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_TransferDone: //(0x08, "AI_TransferDone"), /* 클라이언트가 요청한 정보를 파일형태로 제공하기 위한 데이터 패킷 */
|
|
|
+// // 처리내용 없음
|
|
|
+// response = new TransferDoneResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// case AI_Null : //(0x00, "AI_Null"), /* NULL */
|
|
|
+// // 처리내용 없음
|
|
|
+// response = new NullResponse(obj, ctx, c2c);
|
|
|
+// break;
|
|
|
+// default:
|
|
|
+// log.warn("{}. process: Unknown packet: {}, {}", ipAddress, cmd.toString(), obj.toString());
|
|
|
+// break;
|
|
|
+// }
|
|
|
+
|
|
|
+// if (response != null) {
|
|
|
+// if (!response.response(this.runningConfig)) {
|
|
|
+// log.error("CctvServerDataProcess.process: {}, response error. will be closed.", ipAddress);
|
|
|
+//// DsrcAsn1ServerIdleStatePacketHandler.disconnectChannel(obj, ctx.channel());
|
|
|
+// }
|
|
|
+// }
|
|
|
+// else {
|
|
|
+// log.error("CctvServerDataProcess.process: {}, unknown packet. will be closed.", ipAddress);
|
|
|
+//// DsrcAsn1ServerIdleStatePacketHandler.disconnectChannel(obj, ctx.channel());
|
|
|
+// }
|
|
|
+ MDC.remove(obj.getLogKey());
|
|
|
+ MDC.clear();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("CctvServerDataProcess.process: Exception: {}", e.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * 작업큐에 데이터 추가
|
|
|
+ */
|
|
|
+ public void add(CctvServerData data) {
|
|
|
+ try {
|
|
|
+ //offer => full -> return
|
|
|
+ //add => full -> wait
|
|
|
+ //큐가 차더라도 바로 리턴함.
|
|
|
+ if (!SERVER_DATA_QUEUE.offer(data)) {
|
|
|
+ log.error("CctvServerDataProcess-QueueFull: {}", SERVER_DATA_QUEUE.size());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("CctvServerDataProcess-QueueAddError: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|