BisAgipWorker.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package com.its.bis.service;
  2. import com.its.app.utils.Elapsed;
  3. import com.its.app.utils.SysUtils;
  4. import com.its.app.utils.TimeUtils;
  5. import com.its.bis.api.dto.AgipObeLoc;
  6. import com.its.bis.config.ServerConfig;
  7. import com.its.bis.dto.*;
  8. import com.its.bis.entity.TbBisLinkTrafClct;
  9. import com.its.bis.entity.TbBisVehLoc;
  10. import com.its.bis.process.DbmsData;
  11. import com.its.bis.process.DbmsDataProcess;
  12. import com.its.bis.process.DbmsDataType;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.slf4j.MDC;
  15. import java.util.Map;
  16. import java.util.concurrent.LinkedBlockingQueue;
  17. @Slf4j
  18. public class BisAgipWorker implements Runnable {
  19. private long avgTime = 0;
  20. private int idx;
  21. private int qSize;
  22. private final ServerConfig config;
  23. private final AppRepositoryService repoService;
  24. private final DbmsDataProcess dbmsDataProcess;
  25. private final LinkedBlockingQueue<AgipObeLoc> DATA_QUEUE;
  26. public BisAgipWorker(int idx, int qSize, ServerConfig config, AppRepositoryService repoService, DbmsDataProcess dbmsDataProcess) {
  27. this.idx = idx;
  28. this.qSize = qSize;
  29. this.config = config;
  30. this.repoService = repoService;
  31. this.dbmsDataProcess = dbmsDataProcess;
  32. this.DATA_QUEUE = new LinkedBlockingQueue<>(qSize);
  33. }
  34. protected long calcProcessTime(long recvTime) {
  35. long jobTime = System.nanoTime() - recvTime;
  36. if (this.avgTime == 0) {
  37. this.avgTime = jobTime;
  38. }
  39. else {
  40. this.avgTime = (this.avgTime + jobTime) / 2L;
  41. }
  42. return this.avgTime;
  43. }
  44. /*
  45. * 작업큐에 데이터 추가
  46. */
  47. public boolean add(AgipObeLoc obeLoc) {
  48. boolean offer = false;
  49. try {
  50. //offer => full -> return
  51. //add => full -> wait
  52. offer = this.DATA_QUEUE.offer(obeLoc);
  53. if (!offer) {
  54. MDC.put("id", obeLoc.getDeviceId());
  55. log.error("Packet Queue.offer: {}/{}, Queue Full: {} EA, {}, {}",
  56. obeLoc.getDeviceId(), this.DATA_QUEUE.size(), this.qSize, TimeUtils.elapsedTime(obeLoc.getRcvNanoSeconds()), Thread.currentThread().getName());
  57. MDC.clear();
  58. }
  59. } catch (Exception e) {
  60. MDC.put("id", obeLoc.getDeviceId());
  61. log.error("Packet Queue.offer: Exception: {}, {}, {}", obeLoc.getDeviceId(), Thread.currentThread().getName(), e.getMessage());
  62. MDC.clear();
  63. }
  64. return offer;
  65. }
  66. public void process(AgipObeLoc obeLoc) {
  67. // long curr = System.nanoTime();
  68. // 샘플데이터: latitude=36.56392018, longitude=128.74352336
  69. Location currLocation = Location.builder()
  70. .mDistance(0)
  71. .mInitialBearing(0)
  72. .mFinalBearing(0)
  73. .mLatitude(obeLoc.getGnssInfo().getLongitude())
  74. .mLongitude(obeLoc.getGnssInfo().getLatitude())
  75. // .mLatitude(obeLoc.getGnssInfo().getLatitude())
  76. // .mLongitude(obeLoc.getGnssInfo().getLongitude())
  77. .build();
  78. BisObe bisObe = this.repoService.getObeMap(obeLoc.getDeviceId());
  79. if (bisObe == null) {
  80. bisObe = BisObe.builder()
  81. .deviceId(obeLoc.getDeviceId())
  82. .carId(obeLoc.getCarId())
  83. .carNumber(obeLoc.getCarNumber())
  84. .location(currLocation)
  85. .moveDist(0)
  86. .stNodeId(0L)
  87. .edNodeId(0L)
  88. .trvlHh(0)
  89. .stNodeTm(0)
  90. .edNodeTm(0)
  91. .build();
  92. this.repoService.putObeMap(bisObe);
  93. }
  94. else {
  95. bisObe.setLocation(currLocation);
  96. }
  97. bisObe.setHeight(obeLoc.getGnssInfo().getHeight());
  98. bisObe.setSpeed(obeLoc.getGnssInfo().getSpeed());
  99. bisObe.setAngle(obeLoc.getGnssInfo().getAngle());
  100. /**
  101. * 최근접 통과노드정보를 구한다.
  102. */
  103. NearNodeDto passNode = getNearNode(currLocation);
  104. Long stNodeId = bisObe.getStNodeId();
  105. Long edNodeId = bisObe.getEdNodeId();
  106. Long passNodeId = passNode.getNodeId();
  107. if (passNodeId != 0L) {
  108. log.info("DEVICE ID: {}, 현재노드 통과: {}", bisObe.getDeviceId(), passNodeId);
  109. if (stNodeId == 0L) {
  110. // 시작노드가 없으면 시작노드 설정
  111. bisObe.setStNodeId(passNodeId);
  112. bisObe.setStNodeTm(System.currentTimeMillis());
  113. log.info("DEVICE ID: {}, 시작노드 통과: {}", bisObe.getDeviceId(), passNodeId);
  114. }
  115. else {
  116. if (!stNodeId.equals(passNodeId) && !edNodeId.equals(passNodeId)) {
  117. // 시작노드 값이 존재하면서 통과노드가 시작노드 및 종료노드와 같지 않으면 종료노드로 설정
  118. log.info("DEVICE ID: {}, 종료노드 통과: {}", bisObe.getDeviceId(), passNodeId);
  119. // 종료노드와 같지 않은 경우 종료노드에 도착한 것이므로 구간 소통정보를 생성하자....
  120. bisObe.setEdNodeId(passNodeId);
  121. bisObe.setEdNodeTm(System.currentTimeMillis());
  122. int travelSec = (int) ((bisObe.getEdNodeTm() - bisObe.getStNodeTm()) / 1000);
  123. if (travelSec > (this.config.getMaxTrvlMin() * 60)) {
  124. // 구간 통과 시간이 설정한 값보다 크기 때문에 통행시간을 계산하지 않는다.
  125. // 현재 통과 노드를 시작노드로 설정한다.
  126. log.warn("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 여행시간 오류: {} sec.", bisObe.getDeviceId(), bisObe.getStNodeId(), bisObe.getEdNodeId(), travelSec);
  127. bisObe.setStNodeId(bisObe.getEdNodeId());
  128. bisObe.setStNodeTm(bisObe.getEdNodeTm());
  129. bisObe.setEdNodeId(0L);
  130. bisObe.setEdNodeTm(0);
  131. }
  132. else {
  133. TbLinkDto link = this.repoService.getSectMap(bisObe.getStNodeId(), bisObe.getEdNodeId());
  134. if (link == null) {
  135. // 구간을 찾지 못하였음... 종료노드를 시작노드로 설정
  136. log.warn("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 구간 맷칭 오류...", bisObe.getDeviceId(), bisObe.getStNodeId(), bisObe.getEdNodeId());
  137. bisObe.setStNodeId(bisObe.getEdNodeId());
  138. bisObe.setStNodeTm(bisObe.getEdNodeTm());
  139. bisObe.setEdNodeId(0L);
  140. bisObe.setEdNodeTm(0);
  141. }
  142. else {
  143. // 구간을 통과하였으므로 구간교통정보를 생성한다.
  144. int speed = calcSpeed(link.getLinkLeng(), travelSec);
  145. log.warn("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 구간통과: {}, {} m, {} km/h, {} seconds.",
  146. bisObe.getDeviceId(), bisObe.getStNodeId(), bisObe.getEdNodeId(), link.getLinkId(), link.getLinkLeng(), speed, travelSec);
  147. TbBisLinkTrafClct bisLinkTrafClct = TbBisLinkTrafClct.builder()
  148. .CLCT_DT(SysUtils.getSysTime())
  149. .LINK_ID(link.getLinkId())
  150. .DEVICE_ID(bisObe.getDeviceId())
  151. .SPED(speed)
  152. .TRVL_HH(travelSec)
  153. .ST_NODE_ARR_DT(TimeUtils.millisToString(bisObe.getStNodeTm()))
  154. .ED_NODE_ARR_DT(TimeUtils.millisToString(bisObe.getEdNodeTm()))
  155. .build();
  156. this.dbmsDataProcess.add(new DbmsData(DbmsDataType.DBMS_DATA_CRT_BIS_LINK_TRAF_CLCT, false, bisLinkTrafClct));
  157. bisObe.setStNodeId(bisObe.getEdNodeId());
  158. bisObe.setStNodeTm(bisObe.getEdNodeTm());
  159. bisObe.setEdNodeId(0L);
  160. bisObe.setEdNodeTm(0);
  161. }
  162. }
  163. }
  164. else {
  165. log.info("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 통과노드: {}", bisObe.getDeviceId(), stNodeId, edNodeId, passNodeId);
  166. }
  167. }
  168. }
  169. else {
  170. if (stNodeId != 0 && edNodeId != 0) {
  171. // 시작노드와 종료노드가 설정된 상태에서 이번에 아무런 노드정보가 없으므로 노드를 빠져 나온것이므로
  172. // 종료노드를 시작노드로 설정하고 종료노드는 0으로 설정한다.
  173. log.info("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 통과노드: {}", bisObe.getDeviceId(), stNodeId, edNodeId, passNodeId);
  174. bisObe.setStNodeId(edNodeId);
  175. bisObe.setStNodeTm(System.currentTimeMillis());
  176. // 종료노드 초기화
  177. bisObe.setEdNodeId(0L);
  178. bisObe.setEdNodeTm(0);
  179. }
  180. else {
  181. log.info("DEVICE ID: {}, 시작노드: {}, 종료노드: {}, 통과노드: {}, 구간 운행 중...", bisObe.getDeviceId(), stNodeId, edNodeId, passNodeId);
  182. }
  183. }
  184. /**
  185. * 버스 현재위치 정보를 업데이트 한다.
  186. */
  187. TbBisVehLoc bisVehLoc = TbBisVehLoc.builder()
  188. .DEVICE_ID(bisObe.getDeviceId())
  189. .CLCT_DT(SysUtils.getSysTime())
  190. .LOC_TYPE(obeLoc.getGnssInfo().getType())
  191. .SPEED(obeLoc.getGnssInfo().getSpeed())
  192. .HEGT(obeLoc.getGnssInfo().getHeight())
  193. .ANGLE(obeLoc.getGnssInfo().getAngle())
  194. .LNG(obeLoc.getGnssInfo().getLongitude())
  195. .LAT(obeLoc.getGnssInfo().getLatitude())
  196. .PASS_NODE_ID(passNodeId)
  197. .build();
  198. this.dbmsDataProcess.add(new DbmsData(DbmsDataType.DBMS_DATA_UPD_BIS_VEH_LOC, false, bisVehLoc));
  199. }
  200. public void report() {
  201. long avgTime = 0;
  202. log.info("Packet: Remain Q: {}, Average: {}, {}", this.DATA_QUEUE.size(), TimeUtils.elapsedTimeStr(avgTime), Thread.currentThread().getName());
  203. }
  204. public int calcSpeed(int distance, int seconds) {
  205. int speed;
  206. if (distance <= 0 || seconds <= 0) {
  207. return 2;
  208. }
  209. speed = (int)(((distance * 3.6) / (float)seconds) + 0.5);
  210. return speed;
  211. }
  212. public NearNodeDto getNearNode(Location loc) {
  213. Elapsed elapsed = new Elapsed();
  214. //Location(mDistance=0.0, mInitialBearing=0.0, mFinalBearing=0.0, mLatitude=128.73881251, mLongitude=36.56128759)
  215. double maxDistance = this.config.getMaxDistance();
  216. NearNodeDto nearNode = NearNodeDto.builder().nodeId(0L).distance(maxDistance).build();
  217. for (Map.Entry<Long, TbNodeDto> e : this.repoService.getEntrySetNode()) {
  218. TbNodeDto node = e.getValue();
  219. double fDistance = (float) getDistance(loc.getMLatitude(), loc.getMLongitude(), node.getXCrdn(), node.getYCrdn());
  220. if (fDistance < maxDistance) {
  221. maxDistance = fDistance;
  222. nearNode.setNodeId(node.getNodeId());
  223. nearNode.setDistance(fDistance);
  224. }
  225. }
  226. log.info("getNearNode: {} ms. {} m, {}", elapsed.milliSeconds(), maxDistance, nearNode);
  227. return nearNode;
  228. }
  229. public double getDistance(double lat1, double lon1, double lat2, double lon2) {
  230. double EARTH_RADIUS = 6371.0;
  231. double dLat = Math.toRadians(lat2 - lat1);
  232. double dLon = Math.toRadians(lon2 - lon1);
  233. double a = Math.sin(dLat/2)* Math.sin(dLat/2)+ Math.cos(Math.toRadians(lat1))* Math.cos(Math.toRadians(lat2))* Math.sin(dLon/2)* Math.sin(dLon/2);
  234. double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
  235. return (EARTH_RADIUS * c * 1000); // Distance in m
  236. }
  237. @Override
  238. public void run() {
  239. log.info("{} Start. Q_SIZE: {}", Thread.currentThread().getName(), this.qSize);
  240. while (true) {
  241. try {
  242. AgipObeLoc obeLoc = this.DATA_QUEUE.take();
  243. if (obeLoc != null) {
  244. process(obeLoc);
  245. }
  246. else {
  247. Thread.yield();
  248. }
  249. }
  250. catch (Exception e) {
  251. log.error("Exception: {}", e.getMessage());
  252. }
  253. }
  254. }
  255. }