package com.its.bis.service; import com.its.app.utils.Elapsed; import com.its.app.utils.SysUtils; import com.its.app.utils.TimeUtils; import com.its.bis.api.dto.AgipObeLoc; import com.its.bis.config.ServerConfig; import com.its.bis.dto.*; import com.its.bis.entity.TbBisLinkTrafClct; import com.its.bis.entity.TbBisVehLoc; import com.its.bis.process.DbmsData; import com.its.bis.process.DbmsDataProcess; import com.its.bis.process.DbmsDataType; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @Slf4j public class BisAgipWorker implements Runnable { private long avgTime = 0; private int idx; private int qSize; private final ServerConfig config; private final AppRepositoryService repoService; private final DbmsDataProcess dbmsDataProcess; private final LinkedBlockingQueue DATA_QUEUE; public BisAgipWorker(int idx, int qSize, ServerConfig config, AppRepositoryService repoService, DbmsDataProcess dbmsDataProcess) { this.idx = idx; this.qSize = qSize; this.config = config; this.repoService = repoService; this.dbmsDataProcess = dbmsDataProcess; this.DATA_QUEUE = new LinkedBlockingQueue<>(qSize); } protected long calcProcessTime(long recvTime) { long jobTime = System.nanoTime() - recvTime; if (this.avgTime == 0) { this.avgTime = jobTime; } else { this.avgTime = (this.avgTime + jobTime) / 2L; } return this.avgTime; } /* * 작업큐에 데이터 추가 */ public boolean add(AgipObeLoc obeLoc) { boolean offer = false; try { //offer => full -> return //add => full -> wait offer = this.DATA_QUEUE.offer(obeLoc); if (!offer) { MDC.put("id", obeLoc.getDeviceId()); log.error("Packet Queue.offer: {}/{}, Queue Full: {} EA, {}, {}", obeLoc.getDeviceId(), this.DATA_QUEUE.size(), this.qSize, TimeUtils.elapsedTime(obeLoc.getRcvNanoSeconds()), Thread.currentThread().getName()); MDC.clear(); } } catch (Exception e) { MDC.put("id", obeLoc.getDeviceId()); log.error("Packet Queue.offer: Exception: {}, {}, {}", obeLoc.getDeviceId(), Thread.currentThread().getName(), e.getMessage()); MDC.clear(); } return offer; } public void process(AgipObeLoc obeLoc) { // long curr = System.nanoTime(); // 샘플데이터: latitude=36.56392018, longitude=128.74352336 Location currLocation = Location.builder() .mDistance(0) .mInitialBearing(0) .mFinalBearing(0) .mLatitude(obeLoc.getGnssInfo().getLongitude()) .mLongitude(obeLoc.getGnssInfo().getLatitude()) // .mLatitude(obeLoc.getGnssInfo().getLatitude()) // .mLongitude(obeLoc.getGnssInfo().getLongitude()) .build(); BisObe bisObe = this.repoService.getObeMap(obeLoc.getDeviceId()); if (bisObe == null) { bisObe = BisObe.builder() .deviceId(obeLoc.getDeviceId()) .carId(obeLoc.getCarId()) .carNumber(obeLoc.getCarNumber()) .location(currLocation) .moveDist(0) .stNodeId(0L) .edNodeId(0L) .trvlHh(0) .stNodeTm(0) .edNodeTm(0) .build(); this.repoService.putObeMap(bisObe); } else { bisObe.setLocation(currLocation); } bisObe.setHeight(obeLoc.getGnssInfo().getHeight()); bisObe.setSpeed(obeLoc.getGnssInfo().getSpeed()); bisObe.setAngle(obeLoc.getGnssInfo().getAngle()); /** * 최근접 통과노드정보를 구한다. */ NearNodeDto passNode = getNearNode(currLocation); Long stNodeId = bisObe.getStNodeId(); Long edNodeId = bisObe.getEdNodeId(); Long passNodeId = passNode.getNodeId(); if (passNodeId != 0L) { log.info("DEVICE ID: {}, 현재노드 통과: {}", bisObe.getDeviceId(), passNodeId); if (stNodeId == 0L) { // 시작노드가 없으면 시작노드 설정 bisObe.setStNodeId(passNodeId); bisObe.setStNodeTm(System.currentTimeMillis()); log.info("DEVICE ID: {}, 시작노드 통과: {}", bisObe.getDeviceId(), passNodeId); } else { if (!stNodeId.equals(passNodeId) && !edNodeId.equals(passNodeId)) { // 시작노드 값이 존재하면서 통과노드가 시작노드 및 종료노드와 같지 않으면 종료노드로 설정 log.info("DEVICE ID: {}, 종료노드 통과: {}", bisObe.getDeviceId(), passNodeId); // 종료노드와 같지 않은 경우 종료노드에 도착한 것이므로 구간 소통정보를 생성하자.... bisObe.setEdNodeId(passNodeId); bisObe.setEdNodeTm(System.currentTimeMillis()); int travelSec = (int) ((bisObe.getEdNodeTm() - bisObe.getStNodeTm()) / 1000); if (travelSec > (this.config.getMaxTrvlMin() * 60)) { // 구간 통과 시간이 설정한 값보다 크기 때문에 통행시간을 계산하지 않는다. // 현재 통과 노드를 시작노드로 설정한다. log.warn("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 여행시간 오류: {} sec.", bisObe.getDeviceId(), bisObe.getStNodeId(), bisObe.getEdNodeId(), travelSec); bisObe.setStNodeId(bisObe.getEdNodeId()); bisObe.setStNodeTm(bisObe.getEdNodeTm()); bisObe.setEdNodeId(0L); bisObe.setEdNodeTm(0); } else { TbLinkDto link = this.repoService.getSectMap(bisObe.getStNodeId(), bisObe.getEdNodeId()); if (link == null) { // 구간을 찾지 못하였음... 종료노드를 시작노드로 설정 log.warn("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 구간 맷칭 오류...", bisObe.getDeviceId(), bisObe.getStNodeId(), bisObe.getEdNodeId()); bisObe.setStNodeId(bisObe.getEdNodeId()); bisObe.setStNodeTm(bisObe.getEdNodeTm()); bisObe.setEdNodeId(0L); bisObe.setEdNodeTm(0); } else { // 구간을 통과하였으므로 구간교통정보를 생성한다. int speed = calcSpeed(link.getLinkLeng(), travelSec); log.warn("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 구간통과: {}, {} m, {} km/h, {} seconds.", bisObe.getDeviceId(), bisObe.getStNodeId(), bisObe.getEdNodeId(), link.getLinkId(), link.getLinkLeng(), speed, travelSec); TbBisLinkTrafClct bisLinkTrafClct = TbBisLinkTrafClct.builder() .CLCT_DT(SysUtils.getSysTime()) .LINK_ID(link.getLinkId()) .DEVICE_ID(bisObe.getDeviceId()) .SPED(speed) .TRVL_HH(travelSec) .ST_NODE_ARR_DT(TimeUtils.millisToString(bisObe.getStNodeTm())) .ED_NODE_ARR_DT(TimeUtils.millisToString(bisObe.getEdNodeTm())) .build(); this.dbmsDataProcess.add(new DbmsData(DbmsDataType.DBMS_DATA_CRT_BIS_LINK_TRAF_CLCT, false, bisLinkTrafClct)); bisObe.setStNodeId(bisObe.getEdNodeId()); bisObe.setStNodeTm(bisObe.getEdNodeTm()); bisObe.setEdNodeId(0L); bisObe.setEdNodeTm(0); } } } else { log.info("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 통과노드: {}", bisObe.getDeviceId(), stNodeId, edNodeId, passNodeId); } } } else { if (stNodeId != 0 && edNodeId != 0) { // 시작노드와 종료노드가 설정된 상태에서 이번에 아무런 노드정보가 없으므로 노드를 빠져 나온것이므로 // 종료노드를 시작노드로 설정하고 종료노드는 0으로 설정한다. log.info("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 통과노드: {}", bisObe.getDeviceId(), stNodeId, edNodeId, passNodeId); bisObe.setStNodeId(edNodeId); bisObe.setStNodeTm(System.currentTimeMillis()); // 종료노드 초기화 bisObe.setEdNodeId(0L); bisObe.setEdNodeTm(0); } else { log.info("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 통과노드: {}, 구간 운행 중...", bisObe.getDeviceId(), stNodeId, edNodeId, passNodeId); } } /** * 버스 현재위치 정보를 업데이트 한다. */ TbBisVehLoc bisVehLoc = TbBisVehLoc.builder() .DEVICE_ID(bisObe.getDeviceId()) .CLCT_DT(SysUtils.getSysTime()) .LOC_TYPE(obeLoc.getGnssInfo().getType()) .SPEED(obeLoc.getGnssInfo().getSpeed()) .HEGT(obeLoc.getGnssInfo().getHeight()) .ANGLE(obeLoc.getGnssInfo().getAngle()) .LNG(obeLoc.getGnssInfo().getLongitude()) .LAT(obeLoc.getGnssInfo().getLatitude()) .PASS_NODE_ID(passNodeId) .build(); this.dbmsDataProcess.add(new DbmsData(DbmsDataType.DBMS_DATA_UPD_BIS_VEH_LOC, false, bisVehLoc)); } public void report() { long avgTime = 0; log.info("Packet: Remain Q: {}, Average: {}, {}", this.DATA_QUEUE.size(), TimeUtils.elapsedTimeStr(avgTime), Thread.currentThread().getName()); } public int calcSpeed(int distance, int seconds) { int speed; if (distance <= 0 || seconds <= 0) { return 2; } speed = (int)(((distance * 3.6) / (float)seconds) + 0.5); return speed; } public NearNodeDto getNearNode(Location loc) { Elapsed elapsed = new Elapsed(); //Location(mDistance=0.0, mInitialBearing=0.0, mFinalBearing=0.0, mLatitude=128.73881251, mLongitude=36.56128759) double maxDistance = this.config.getMaxDistance(); NearNodeDto nearNode = NearNodeDto.builder().nodeId(0L).distance(maxDistance).build(); for (Map.Entry e : this.repoService.getEntrySetNode()) { TbNodeDto node = e.getValue(); double fDistance = (float) getDistance(loc.getMLatitude(), loc.getMLongitude(), node.getXCrdn(), node.getYCrdn()); if (fDistance < maxDistance) { maxDistance = fDistance; nearNode.setNodeId(node.getNodeId()); nearNode.setDistance(fDistance); } } log.info("getNearNode: {} ms. {} m, {}", elapsed.milliSeconds(), maxDistance, nearNode); return nearNode; } public double getDistance(double lat1, double lon1, double lat2, double lon2) { double EARTH_RADIUS = 6371.0; double dLat = Math.toRadians(lat2 - lat1); double dLon = Math.toRadians(lon2 - lon1); double a = Math.sin(dLat/2)* Math.sin(dLat/2)+ Math.cos(Math.toRadians(lat1))* Math.cos(Math.toRadians(lat2))* Math.sin(dLon/2)* Math.sin(dLon/2); double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a)); return (EARTH_RADIUS * c * 1000); // Distance in m } @Override public void run() { log.info("{} Start. Q_SIZE: {}", Thread.currentThread().getName(), this.qSize); while (true) { try { AgipObeLoc obeLoc = this.DATA_QUEUE.take(); if (obeLoc != null) { process(obeLoc); } else { Thread.yield(); } } catch (Exception e) { log.error("Exception: {}", e.getMessage()); } } } }