| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package com.its.common.cluster.service;
- import com.its.common.cluster.codec.ClusterMessageDecoder;
- import com.its.common.cluster.codec.ClusterMessageEncoder;
- import com.its.common.cluster.config.AbstractClusterConfig;
- import com.its.common.cluster.handler.ClusterSlaveHandler;
- import com.its.common.cluster.vo.ClusterNode;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import io.netty.util.concurrent.GenericFutureListener;
- import lombok.Getter;
- import lombok.RequiredArgsConstructor;
- import lombok.Setter;
- import lombok.extern.slf4j.Slf4j;
- import java.net.InetSocketAddress;
- import java.util.concurrent.Callable;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @Getter
- @Setter
- @RequiredArgsConstructor
- public class ClusterSlave implements Callable<Object> {
- private final AbstractClusterSlaveService slaveService;
- private final AbstractClusterConfig clusterConfig;
- private final ClusterNode clusterNode;
- private final ClusterSlaveBootstrapFactory bootstrapFactory;
- private Bootstrap bootstrap = null;
- private ChannelFuture channelFuture = null;
- private String ipAddress;
- private int port;
- @Override
- public Object call() {
- this.ipAddress = this.clusterNode.getIp();
- this.port = this.clusterNode.getPort();
- try {
- initializeBootstrapIfNeeded();
- if (this.clusterConfig.isLogging()) {
- log.info("ClusterNodeId: {}, ClusterSlave.call: Try Connect: toClusterId: {}, [{}.{}]", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port);
- }
- closePreviousChannel();
- this.channelFuture = this.bootstrap.connect(new InetSocketAddress(this.ipAddress, this.port));
- // 채널 연결 리스너 등록
- this.channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- channelOpen(future);
- }
- });
- // 채널 연결 종료 리스너 등록
- this.channelFuture.channel().closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- channelClosed(future);
- }
- });
- return this.channelFuture != null && this.channelFuture.isSuccess();
- } catch (Exception e) {
- log.error("ClusterNodeId: {}, ClusterSlave.call: Connection error toClusterId: {}, [{}.{}], Exception: {}",
- this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, e.getMessage(), e);
- return false;
- }
- }
- private void initializeBootstrapIfNeeded() {
- if (this.bootstrap != null) {
- return;
- }
- if (this.clusterConfig.isLogging()) {
- log.info("ClusterNodeId: {}, ClusterSlave.call: Startup: toClusterId: {}, [{}.{}]", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port);
- }
- this.bootstrap = bootstrapFactory.createBootstrap();
- this.bootstrap.option(ChannelOption.SO_REUSEADDR, true);
- this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
- // 바인딩 로컬 포트 설정(설정파일에 등록된 바인딩 포트번호 + 서버 ID), 따라서 하나의 서버에 여러 개의 클러스터 서버가 존재할 수 있다.
- // 주의) 하나의 서버에 여러 개의 클러스터 서버가 존재할 경우, 바인딩 포트는 위의 규칙에 속하지 않는 포트번호를 할당해야 한다.
- this.bootstrap.localAddress(new InetSocketAddress(port + clusterConfig.getId()));
- this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- if (clusterConfig.isPacketLogging()) {
- ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
- }
- ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
- ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
- ch.pipeline().addLast(new ClusterMessageDecoder(clusterNode, clusterConfig));
- ch.pipeline().addLast(new ClusterMessageEncoder(clusterNode, clusterConfig));
- ch.pipeline().addLast(new ClusterSlaveHandler(clusterNode, slaveService));
- }
- });
- }
- private void closePreviousChannel() {
- if (this.channelFuture != null && this.channelFuture.channel().isOpen()) {
- this.channelFuture.channel().close();
- this.channelFuture = null;
- }
- }
- /**
- * 연결 성공시 처리 이벤트
- */
- protected void channelOpen(ChannelFuture future) {
- if (future.isSuccess()) {
- Channel channel = future.channel();
- channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(this.clusterNode);
- this.clusterNode.getSyncState().connect(channel);
- if (this.clusterConfig.isLogging()) {
- log.info("ClusterNodeId: {}, ClusterSlave.channelOpen: toClusterId: {}, [{}.{}], Channel: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, channel);
- }
- }
- else {
- if (this.clusterConfig.isLogging()) {
- log.warn("ClusterNodeId: {}, ClusterSlave.channelOpen: toClusterId: {}, [{}.{}], Failed: {}", this.clusterConfig.getId(), clusterNode.getId(), clusterNode.getIp(), clusterNode.getPort(), future.cause().getMessage());
- }
- }
- }
- /**
- * 연결 종료시 처리 이벤트
- */
- protected synchronized void channelClosed(ChannelFuture future) {
- Channel channel = future.channel();
- if (this.clusterConfig.isLogging()) {
- log.warn("ClusterNodeId: {}, ClusterSlave.channelClosed: toClusterId: {}, [{}.{}], Channel: {}", this.clusterConfig.getId(), this.clusterNode.getId(), this.ipAddress, this.port, channel);
- }
- channel.attr(AbstractClusterConfig.CLUSTER_ATTRIBUTE_KEY).set(null);
- this.clusterNode.getSyncState().disConnect();
- if (channel.isOpen()) {
- channel.close();
- }
- channel.eventLoop().schedule(this, 5, TimeUnit.SECONDS);
- }
- }
|