|
@@ -5,7 +5,9 @@ import com.tsi.comm.server.kafka.KafkaProducerService;
|
|
|
import com.tsi.comm.server.process.AbstractTsiCvimProcess;
|
|
import com.tsi.comm.server.process.AbstractTsiCvimProcess;
|
|
|
import com.tsi.comm.server.process.AbstractTsiCvimWorker;
|
|
import com.tsi.comm.server.process.AbstractTsiCvimWorker;
|
|
|
import com.tsi.comm.server.protocol.AbstractTsiPacket;
|
|
import com.tsi.comm.server.protocol.AbstractTsiPacket;
|
|
|
|
|
+import com.tsi.comm.server.protocol.TsiCpuPacket;
|
|
|
import com.tsi.comm.server.repository.TsiNodeManager;
|
|
import com.tsi.comm.server.repository.TsiNodeManager;
|
|
|
|
|
+import com.tsi.comm.server.vo.TsiNodeVo;
|
|
|
import com.tsi.common.spring.SpringUtils;
|
|
import com.tsi.common.spring.SpringUtils;
|
|
|
import lombok.Getter;
|
|
import lombok.Getter;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
@@ -35,7 +37,7 @@ public class TsiCvimPacketProcess extends AbstractTsiCvimProcess {
|
|
|
TsiCvimServerConfig tsiCvimServerConfig = SpringUtils.getBean(TsiCvimServerConfig.class);
|
|
TsiCvimServerConfig tsiCvimServerConfig = SpringUtils.getBean(TsiCvimServerConfig.class);
|
|
|
|
|
|
|
|
this.workers = tsiCvimServerConfig.getPacketWorkers();
|
|
this.workers = tsiCvimServerConfig.getPacketWorkers();
|
|
|
- this.qSize = this.nodeManager.size();
|
|
|
|
|
|
|
+ this.qSize = Math.max(10, this.nodeManager.size());
|
|
|
if (this.workers == 1) {
|
|
if (this.workers == 1) {
|
|
|
this.qSize *= 3;
|
|
this.qSize *= 3;
|
|
|
}
|
|
}
|
|
@@ -55,19 +57,84 @@ public class TsiCvimPacketProcess extends AbstractTsiCvimProcess {
|
|
|
this.workerList.add(packetWorker);
|
|
this.workerList.add(packetWorker);
|
|
|
this.executor.submit(packetWorker); // 여기서 실행됨!
|
|
this.executor.submit(packetWorker); // 여기서 실행됨!
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+// warmUpWorkers();
|
|
|
|
|
+ }
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 모든 워커 스레드의 JIT 컴파일을 유도하기 위해 웜업을 수행합니다.
|
|
|
|
|
+ */
|
|
|
|
|
+ private void warmUpWorkers() {
|
|
|
|
|
+ int warmUpRounds = 10000; // JIT 컴파일을 유도하기에 충분한 반복 횟수 (튜닝 필요)
|
|
|
|
|
+ log.info("Starting JIT compiler warm-up for {} packet workers with {} rounds each...", this.workers, warmUpRounds);
|
|
|
|
|
+
|
|
|
|
|
+ // 가짜 TsiNodeVo 객체 생성 (실제 DB나 Manager를 통하지 않음)
|
|
|
|
|
+ TsiNodeVo dummyNodeVo = new TsiNodeVo(1L, "127.0.0.1", false, false, false);
|
|
|
|
|
+
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
|
+
|
|
|
|
|
+ // 모든 워커에게 웜업 작업을 분배
|
|
|
|
|
+ for (int i = 0; i < this.workers; i++) {
|
|
|
|
|
+ final int workerIndex = i;
|
|
|
|
|
+ // 각 워커를 위한 웜업 작업을 별도의 스레드에서 실행하여 병렬로 웜업
|
|
|
|
|
+ new Thread(() -> {
|
|
|
|
|
+ for (int j = 0; j < warmUpRounds; j++) {
|
|
|
|
|
+ // 웜업용 가짜 패킷 생성
|
|
|
|
|
+ TsiCpuPacket warmUpPacket = createWarmUpPacket(dummyNodeVo);
|
|
|
|
|
+ // 해당 워커의 큐에 직접 추가
|
|
|
|
|
+ this.workerList.get(workerIndex).add(warmUpPacket);
|
|
|
|
|
+ }
|
|
|
|
|
+ }).start();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 간단한 웜업에서는 별도의 완료 대기 없이, 백그라운드에서 웜업이 진행되도록 둡니다.
|
|
|
|
|
+ // 실제 트래픽이 들어오기 시작할 때쯤이면 대부분의 웜업이 완료될 것입니다.
|
|
|
|
|
+ log.info("Warm-up tasks submitted. Main process continues...");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 웜업에 사용할 가짜 TsiCpuPacket을 생성.
|
|
|
|
|
+ * @param dummyNodeVo 가짜 TsiNodeVo 객체
|
|
|
|
|
+ * @return 웜업용 패킷
|
|
|
|
|
+ */
|
|
|
|
|
+ private TsiCpuPacket createWarmUpPacket(TsiNodeVo dummyNodeVo) {
|
|
|
|
|
+ // 실제 패킷과 유사한 데이터를 사용하여 생성
|
|
|
|
|
+ long nodeId = dummyNodeVo.getNodeId();
|
|
|
|
|
+ long msec = System.currentTimeMillis();
|
|
|
|
|
+ long nsec = System.nanoTime();
|
|
|
|
|
+
|
|
|
|
|
+ int packetSize = 100;
|
|
|
|
|
+ byte[] dummyBytes = new byte[packetSize];
|
|
|
|
|
+// ByteBuf dummyBuf = ByteBufAllocator.DEFAULT.buffer(packetSize);
|
|
|
|
|
+// try {
|
|
|
|
|
+// dummyBuf.writeShort(0x7E7E);
|
|
|
|
|
+// dummyBuf.writeShort(packetSize);
|
|
|
|
|
+// dummyBuf.writeByte(1);
|
|
|
|
|
+// dummyBuf.writeByte(0);
|
|
|
|
|
+// dummyBuf.writeInt((int)nodeId);
|
|
|
|
|
+// // 나머지 공간을 0x00으로 채움.
|
|
|
|
|
+// dummyBuf.writeBytes(new byte[packetSize - dummyBuf.writerIndex()]);
|
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
|
+// dummyBuf.release();
|
|
|
|
|
+// throw e;
|
|
|
|
|
+// }
|
|
|
|
|
+
|
|
|
|
|
+ TsiCpuPacket packet = new TsiCpuPacket(nodeId, msec, nsec, null); // channel은 null
|
|
|
|
|
+ packet.setBuf(dummyBytes);
|
|
|
|
|
+ packet.setObj(dummyNodeVo);
|
|
|
|
|
+
|
|
|
|
|
+ //dummyBuf.release();
|
|
|
|
|
+ return packet;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public boolean add(Object object, int idx) {
|
|
public boolean add(Object object, int idx) {
|
|
|
boolean offer = false;
|
|
boolean offer = false;
|
|
|
AbstractTsiPacket packet = (AbstractTsiPacket)object;
|
|
AbstractTsiPacket packet = (AbstractTsiPacket)object;
|
|
|
try {
|
|
try {
|
|
|
- // TODO: 20251117: 통신연결시 할당된 패킷큐 인덱스로 바로 전송
|
|
|
|
|
-// idx = Math.abs(idx % this.workers);
|
|
|
|
|
if (idx >= 0 && idx < this.workers) {
|
|
if (idx >= 0 && idx < this.workers) {
|
|
|
offer = this.workerList.get(idx).add(packet);
|
|
offer = this.workerList.get(idx).add(packet);
|
|
|
}
|
|
}
|
|
|
else {
|
|
else {
|
|
|
- log.error("Invalid packet queue index: {}", idx);
|
|
|
|
|
|
|
+ log.error("PacketProcess: Invalid packet queue index: {}", idx);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
catch (Exception e) {
|
|
catch (Exception e) {
|
|
@@ -80,15 +147,41 @@ public class TsiCvimPacketProcess extends AbstractTsiCvimProcess {
|
|
|
|
|
|
|
|
public void stop() {
|
|
public void stop() {
|
|
|
log.info("PacketProcess Stopping...");
|
|
log.info("PacketProcess Stopping...");
|
|
|
|
|
+
|
|
|
|
|
+ for (AbstractTsiCvimWorker worker : this.workerList) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ ((TsiCvimPacketWorker) worker).getDATA_QUEUE().put(TsiCvimPacketWorker.SHUTDOWN_PACKET);
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ log.warn("PacketProcess: Interrupted while sending shutdown signal to worker {}.", worker.getIdx());
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
this.executor.shutdown();
|
|
this.executor.shutdown();
|
|
|
|
|
+
|
|
|
try {
|
|
try {
|
|
|
if (!this.executor.awaitTermination(3, TimeUnit.SECONDS)) {
|
|
if (!this.executor.awaitTermination(3, TimeUnit.SECONDS)) {
|
|
|
this.executor.shutdownNow(); // 실행 중인 작업도 중단 시도
|
|
this.executor.shutdownNow(); // 실행 중인 작업도 중단 시도
|
|
|
|
|
+
|
|
|
|
|
+ // 큐에 남아있는 데이터 로깅
|
|
|
|
|
+ int remainingTasks = 0;
|
|
|
|
|
+ for (AbstractTsiCvimWorker worker : this.workerList) {
|
|
|
|
|
+ remainingTasks += ((TsiCvimPacketWorker) worker).getDATA_QUEUE().size();
|
|
|
|
|
+ }
|
|
|
|
|
+ if (remainingTasks > 0) {
|
|
|
|
|
+ log.warn("PacketProcess: queue remain {}, forced shutdown.", remainingTasks);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 강제 종료 후에도 종료될 때까지 조금 더 기다려줌
|
|
|
|
|
+ if (!this.executor.awaitTermination(1, TimeUnit.SECONDS)) {
|
|
|
|
|
+ log.error("PacketProcess: Executor did not terminate even after forced shutdown.");
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
|
this.executor.shutdownNow();
|
|
this.executor.shutdownNow();
|
|
|
Thread.currentThread().interrupt(); // 현재 스레드도 중단
|
|
Thread.currentThread().interrupt(); // 현재 스레드도 중단
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
log.info("PacketProcess Stopped...");
|
|
log.info("PacketProcess Stopped...");
|
|
|
}
|
|
}
|
|
|
|
|
|