增加启动等待采集器连接事件

segment2.0
linrf 2 years ago
parent 4d06e75487
commit 1e8edd88ef

@ -99,6 +99,7 @@ public class TerminalToChannelCacheMap {
IP_CHANNEL_CACHE_MAP.remove(key); IP_CHANNEL_CACHE_MAP.remove(key);
IP_TERMINATOR_CACHE_MAP.remove(key); IP_TERMINATOR_CACHE_MAP.remove(key);
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO); iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO);
} }

@ -136,6 +136,10 @@ public class LoadPackageCommandLineRunner implements CommandLineRunner {
updatePart(loadSchCollectorVOList, value); updatePart(loadSchCollectorVOList, value);
} }
loadSchCollectorVOList.forEach(p -> {
p.setDeployPath(saveCollectorPackagePath + p.getCollectorId() + "\\collector");
});
//根据服务器端口区分,每次更新了哪些采集器版本 //根据服务器端口区分,每次更新了哪些采集器版本
template.opsForValue().set(key, JSON.toJSON(loadSchCollectorVOList)); template.opsForValue().set(key, JSON.toJSON(loadSchCollectorVOList));
} }

@ -4,7 +4,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
public class ChannelAttribute<T> { public class ChannelAttribute<T> {
/** /**
* *

@ -2,6 +2,7 @@ package com.docus.server.common.netty.client;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.netty.event.StartUpEvent;
import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -13,6 +14,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -46,6 +48,8 @@ public class NettyClient {
private StringRedisTemplate template; private StringRedisTemplate template;
@Resource @Resource
private Environment env; private Environment env;
@Resource
private ApplicationContext applicationContext;
/** /**
* *
@ -59,6 +63,15 @@ public class NettyClient {
} }
} }
/**
*
*
* @param message
*/
public void publishEvent(String message, CommMsg commMsg) {
applicationContext.publishEvent(new StartUpEvent(this, message, commMsg));
}
@PostConstruct @PostConstruct
public void start() { public void start() {
final EventLoopGroup group = new NioEventLoopGroup(); final EventLoopGroup group = new NioEventLoopGroup();

@ -138,16 +138,7 @@ public class ClientHandler extends SimpleChannelInboundHandler<CommMsg<Serializa
//不存在进程,则启动指定采集器进程,等待启动完成下发任务 //不存在进程,则启动指定采集器进程,等待启动完成下发任务
StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName); StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName);
while (true) { nettyClient.publishEvent(collectorId, commMsg);
Channel channel = collectorChannelCacheMap.get(collectorId);
log.info("等待采集器={}启动中", collectorId);
if (null != channel && channel.isOpen()) {
log.info("等待采集器={}启动成功并注册到终端", collectorId);
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
break;
}
Thread.sleep(5000);
}
} }
} }
} }

@ -0,0 +1,27 @@
package com.docus.server.common.netty.event;
import com.docus.server.common.CommMsg;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@Getter
public class StartUpEvent extends ApplicationEvent {
/**
* id ,
*/
private final String message;
private final CommMsg commMsg;
/**
* @param source
* @param message
*/
public StartUpEvent(Object source, String message, CommMsg commMsg) {
super(source);
this.message = message;
this.commMsg = commMsg;
}
}

@ -0,0 +1,41 @@
package com.docus.server.common.netty.event;
import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class StartUpEventListener {
@Resource
private CollectorChannelCacheMap collectorChannelCacheMap;
@EventListener
@Async("threadPoolExecutor")
public void startUp(StartUpEvent startUpEvent) {
while (true) {
String collectorId = startUpEvent.getMessage();
Channel channel = collectorChannelCacheMap.get(collectorId);
log.info("等待采集器={}启动中", collectorId);
if (null != channel && channel.isOpen()) {
log.info("等待采集器={}启动成功并注册到终端", collectorId);
collectorChannelCacheMap.writeAndFlush(collectorId, startUpEvent.getCommMsg());
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Loading…
Cancel
Save