package com.its.common.cluster.service; import com.its.common.cluster.codec.ClusterMessageDecoder; import com.its.common.cluster.codec.ClusterMessageEncoder; import com.its.common.cluster.config.AbstractClusterConfig; import com.its.common.cluster.handler.ClusterSlaveHandler; import com.its.common.cluster.vo.ClusterNode; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.GenericFutureListener; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @Slf4j @Getter @Setter @RequiredArgsConstructor public class ClusterSlave implements Callable { private final AbstractClusterSlaveService slaveService; private final AbstractClusterConfig clusterConfig; private final ClusterNode clusterNode; private final ClusterSlaveBootstrapFactory bootstrapFactory; private Bootstrap bootstrap = null; private ChannelFuture channelFuture = null; private String ipAddress; private int port; @Override public Object call() { this.ipAddress = this.clusterNode.getIp(); this.port = this.clusterNode.getPort(); try { initializeBootstrapIfNeeded(); if (this.clusterConfig.isLogging()) { log.info("ClusterNodeId: {}, ClusterSlave.call: Try Connect: toClusterId: {}, [{}.{}]", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port); } closePreviousChannel(); this.channelFuture = this.bootstrap.connect(new InetSocketAddress(this.ipAddress, this.port)); // 채널 연결 리스너 등록 this.channelFuture.addListener(new GenericFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { channelOpen(future); } }); // 채널 연결 종료 리스너 등록 this.channelFuture.channel().closeFuture().addListener(new GenericFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { channelClosed(future); } }); return this.channelFuture != null && this.channelFuture.isSuccess(); } catch (Exception e) { log.error("ClusterNodeId: {}, ClusterSlave.call: Connection error toClusterId: {}, [{}.{}], Exception: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, e.getMessage(), e); return false; } } private void initializeBootstrapIfNeeded() { if (this.bootstrap != null) { return; } if (this.clusterConfig.isLogging()) { log.info("ClusterNodeId: {}, ClusterSlave.call: Startup: toClusterId: {}, [{}.{}]", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port); } this.bootstrap = bootstrapFactory.createBootstrap(); this.bootstrap.option(ChannelOption.SO_REUSEADDR, true); this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); // 바인딩 로컬 포트 설정(설정파일에 등록된 바인딩 포트번호 + 서버 ID), 따라서 하나의 서버에 여러 개의 클러스터 서버가 존재할 수 있다. // 주의) 하나의 서버에 여러 개의 클러스터 서버가 존재할 경우, 바인딩 포트는 위의 규칙에 속하지 않는 포트번호를 할당해야 한다. this.bootstrap.localAddress(new InetSocketAddress(port + clusterConfig.getId())); this.bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { if (clusterConfig.isPacketLogging()) { ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); } ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)); ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4)); ch.pipeline().addLast(new ClusterMessageDecoder(clusterNode, clusterConfig)); ch.pipeline().addLast(new ClusterMessageEncoder(clusterNode, clusterConfig)); ch.pipeline().addLast(new ClusterSlaveHandler(clusterNode, slaveService)); } }); } private void closePreviousChannel() { if (this.channelFuture != null && this.channelFuture.channel().isOpen()) { this.channelFuture.channel().close(); this.channelFuture = null; } } /** * 연결 성공시 처리 이벤트 */ protected void channelOpen(ChannelFuture future) { if (future.isSuccess()) { Channel channel = future.channel(); channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.clusterNode); this.clusterNode.getSyncState().connect(channel); if (this.clusterConfig.isLogging()) { log.info("ClusterNodeId: {}, ClusterSlave.channelOpen: toClusterId: {}, [{}.{}], Channel: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, channel); } } else { if (this.clusterConfig.isLogging()) { log.warn("ClusterNodeId: {}, ClusterSlave.channelOpen: toClusterId: {}, [{}.{}], Failed: {}", this.clusterConfig.getId(), clusterNode.getId(), clusterNode.getIp(), clusterNode.getPort(), future.cause().getMessage()); } } } /** * 연결 종료시 처리 이벤트 */ protected synchronized void channelClosed(ChannelFuture future) { Channel channel = future.channel(); if (this.clusterConfig.isLogging()) { log.warn("ClusterNodeId: {}, ClusterSlave.channelClosed: toClusterId: {}, [{}.{}], Channel: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, channel); } channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null); this.clusterNode.getSyncState().disConnect(); if (channel.isOpen()) { channel.close(); } channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS); } }