采集器重启命令

segment2.0
linrf 2 years ago
parent f25f2644e3
commit c3a6a74d6f

@ -5,59 +5,59 @@ public class MsgConstants {
/** /**
* *
*/ */
public static final String HEARTBEAT_REQUEST = "0".trim(); public static final String HEARTBEAT_REQUEST = "0";
/** /**
* 线 * 线
*/ */
public static final String ONLINE_REGISTER = "1".trim(); public static final String ONLINE_REGISTER = "1";
/** /**
* 线 * 线
*/ */
public static final String OFFLINE_REMOVE = "2".trim(); public static final String OFFLINE_REMOVE = "2";
/** /**
* *
*/ */
public static final String EXCEPTION_REMOVE = "3".trim(); public static final String EXCEPTION_REMOVE = "3";
/** /**
* *
*/ */
public static final String TERMINATOR_RESTART = "4".trim(); public static final String TERMINATOR_RESTART = "4";
/** /**
* *
*/ */
public static final String COLLECTOR_RESTART = "5".trim(); public static final String COLLECTOR_RESTART = "5";
/** /**
* *
*/ */
public static final String VIRTUAL_RESTART = "6".trim(); public static final String VIRTUAL_RESTART = "6";
/** /**
* *
*/ */
public static final String UPDATE_COLLECTOR_FILE = "7".trim(); public static final String UPDATE_COLLECTOR_FILE = "7";
/** /**
* *
*/ */
public static final String UPDATE_COLLECTOR_CONFIG = "8".trim(); public static final String UPDATE_COLLECTOR_CONFIG = "8";
/** /**
* *
*/ */
public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); public static final String SCH_DISTRIBUTE_TASKS = "9";
/** /**
* *
*/ */
public static final String HAS_VALID_COLLECTOR = "11".trim(); public static final String HAS_VALID_COLLECTOR = "11";
/** /**
* *
*/ */
public static final String HAS_COLLECTOR_COUNT = "12".trim(); public static final String HAS_COLLECTOR_COUNT = "12";
} }

@ -3,12 +3,10 @@ package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap; import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO; import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -26,6 +24,8 @@ import java.io.Serializable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import static com.docus.server.common.MsgConstants.*;
/** /**
* *
*/ */
@ -38,44 +38,102 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
@Resource @Resource
private TerminalToChannelCacheMap terminalToChannelCacheMap; private TerminalToChannelCacheMap terminalToChannelCacheMap;
@Resource
private ISchTerminatorService iSchTerminatorService;
@Override @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CommMsg commMsg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, CommMsg commMsg) throws Exception {
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
String messageContent = (String) commMsg.getContent(); String messageContent = (String) commMsg.getContent();
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
log.info("======== 【调度器服务端-收到终端消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); log.info("======== 【调度器服务端-收到终端消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { switch (messageType) {
log.info("接受到终端重启命令,内容{}", messageContent); case ONLINE_REGISTER:
} else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { onlineRegister(ctx, messageContent, clientIp);
log.info("收到采集器重启命令,内容{}", messageContent); break;
} else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { case TERMINATOR_RESTART:
log.info("收到虚拟机重启命令,内容{}", messageContent); terminatorRestart(messageContent);
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { break;
log.info("收到更新采集器文件命令,内容{}", messageContent); case COLLECTOR_RESTART:
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { collectorRestart(messageContent);
log.info("收到更新采集器配置命令,内容{}", messageContent); break;
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { case VIRTUAL_RESTART:
log.info("收到采集调度器下发任务命令,内容{}", messageContent); virtualRestart(messageContent);
} else if (messageType.equals(MsgConstants.HAS_COLLECTOR_COUNT)) { break;
log.info("收到终端客户端返回的【采集器数量】消息,内容{}", messageContent); case UPDATE_COLLECTOR_FILE:
updateCollectorFile(messageContent);
String terminalIp = terminalToChannelCacheMap.getClientKey(channelHandlerContext.channel()); break;
case UPDATE_COLLECTOR_CONFIG:
NettyTerminatorDTO nettyTerminatorDTO = terminalToChannelCacheMap.getTerminatorByIp(terminalIp); updateCollectorConfig(messageContent);
break;
if (Func.isNotBlank(messageContent)) { case SCH_DISTRIBUTE_TASKS:
nettyTerminatorDTO.getLoadSchCollectorVOList().clear(); schDistributeTasks(commMsg, messageContent);
List<LoadSchCollectorVO> loadSchCollectorVOList = JSON.fromJSONWithGeneric(messageContent, new TypeReference<List<LoadSchCollectorVO>>() { break;
}); case HAS_COLLECTOR_COUNT:
nettyTerminatorDTO.getLoadSchCollectorVOList().addAll(loadSchCollectorVOList); hasCollectorCount(ctx, messageContent);
} break;
default:
break;
}
}
private void hasCollectorCount(ChannelHandlerContext ctx, String messageContent) {
log.info("【调度器服务端-收到终端消息】收到终端客户端返回的【采集器数量】消息,内容{}", messageContent);
String terminalIp = terminalToChannelCacheMap.getClientKey(ctx.channel());
NettyTerminatorDTO nettyTerminatorDTO = terminalToChannelCacheMap.getTerminatorByIp(terminalIp);
if (Func.isNotBlank(messageContent)) {
nettyTerminatorDTO.getLoadSchCollectorVOList().clear();
List<LoadSchCollectorVO> loadSchCollectorVOList = JSON.fromJSONWithGeneric(messageContent, new TypeReference<List<LoadSchCollectorVO>>() {
});
nettyTerminatorDTO.getLoadSchCollectorVOList().addAll(loadSchCollectorVOList);
}
}
private void schDistributeTasks(CommMsg commMsg, String messageContent) {
log.info("【调度器服务端-收到终端消息】收到采集调度器下发任务命令,内容{}", messageContent);
}
private void updateCollectorConfig(String messageContent) {
log.info("【调度器服务端-收到终端消息】收到更新采集器配置命令,内容{}", messageContent);
}
private void updateCollectorFile(String messageContent) {
log.info("【调度器服务端-收到终端消息】收到更新采集器文件命令,内容{}", messageContent);
}
private void virtualRestart(String messageContent) {
log.info("【调度器服务端-收到终端消息】收到虚拟机重启命令,内容{}", messageContent);
}
private void collectorRestart(String messageContent) {
log.info("【调度器服务端-收到终端消息】收到采集器重启命令,内容{}", messageContent);
}
private void terminatorRestart(String messageContent) {
log.info("【调度器服务端-收到终端消息】接受到终端重启命令,内容{}", messageContent);
}
private void onlineRegister(ChannelHandlerContext ctx, String messageContent, String clientIp) {
log.info("【调度器服务端-收到终端消息】接受到终端注册命令,内容{}", messageContent);
String clientKey = terminalToChannelCacheMap.getClientKey(ctx.channel());
if (Func.isNotBlank(clientKey)) {
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
//将ip和channel进行映射
terminalToChannelCacheMap.put(nettyTerminatorDTO, ctx.channel());
} }
} }
@ -116,6 +174,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
System.out.println(channel.remoteAddress() + " 下线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(channel.remoteAddress() + " 下线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String clientId = terminalToChannelCacheMap.getClientKey(channel); String clientId = terminalToChannelCacheMap.getClientKey(channel);
log.error("客户端下线,终端连接:{}", clientId); log.error("客户端下线,终端连接:{}", clientId);
//移除终端,终端离线 //移除终端,终端离线
if (clientId != null) { if (clientId != null) {
terminalToChannelCacheMap.remove(clientId); terminalToChannelCacheMap.remove(clientId);

@ -1,12 +1,8 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func;
import com.docus.server.common.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap; import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
@ -25,36 +21,18 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CommMsg commMsg = (CommMsg) msg; CommMsg commMsg = (CommMsg) msg;
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
Serializable content = commMsg.getContent(); Serializable messageContent = commMsg.getContent();
if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) { if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) {
log.info("接收到客户端的心跳"); log.info("======== 【调度器服务端-收到终端消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, content);
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress(); String clientIp = ipSocket.getAddress().getHostAddress();
log.info("接收到客户端的心跳,【调度器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
String clientKey = repository.getClientKey(ctx.channel());
if (Func.isNotBlank(clientKey)) {
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
//将ip和channel进行映射
repository.put(nettyTerminatorDTO, ctx.channel());
}
} else { } else {

@ -1,9 +1,11 @@
package com.docus.server.service.impl; package com.docus.server.service.impl;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.property.Setters;
import com.docus.infrastructure.redis.service.IdService; import com.docus.infrastructure.redis.service.IdService;
import com.docus.infrastructure.web.request.SearchDTO; import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult; import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.convert.SchTerminatorConvert; import com.docus.server.convert.SchTerminatorConvert;
import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO;
@ -15,10 +17,12 @@ import com.docus.server.infrastructure.dao.ISchTerminatorDao;
import com.docus.server.service.ISchTerminatorService; import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* *
@ -32,6 +36,8 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
private ISchTerminatorDao iSchTerminatorDao; private ISchTerminatorDao iSchTerminatorDao;
@Resource @Resource
private IdService idService; private IdService idService;
@Resource
private TerminalToChannelCacheMap terminalToChannelCacheMap;
/** /**
* *
@ -52,7 +58,23 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
*/ */
@Override @Override
public PageResult<SchTerminatorVO> search(SearchDTO searchDTO) { public PageResult<SchTerminatorVO> search(SearchDTO searchDTO) {
return SchTerminatorConvert.INSTANCE.convertVO(iSchTerminatorDao.search(searchDTO)); PageResult<SchTerminatorVO> result = SchTerminatorConvert.INSTANCE.convertVO(iSchTerminatorDao.search(searchDTO));
if (CollectionUtils.isEmpty(result.getList())) {
return new PageResult<>();
}
Map<String, NettyTerminatorDTO> ipToTerminatorCacheMap = terminalToChannelCacheMap.getIpToTerminatorCacheMap();
Setters.<SchTerminatorVO>instance().list(result.getList()).cycleSetProperties(p -> {
String terminatorIp = String.valueOf(p.getTerminatorIp());
if (ipToTerminatorCacheMap.containsKey(terminatorIp)) {
NettyTerminatorDTO nettyTerminatorDTO = ipToTerminatorCacheMap.get(terminatorIp);
p.setLoadSchCollectorVOList(nettyTerminatorDTO.getLoadSchCollectorVOList());
}
});
return result;
} }
@Override @Override

@ -5,64 +5,64 @@ public class MsgConstants {
/** /**
* *
*/ */
public static final String HEARTBEAT_REQUEST = "0".trim(); public static final String HEARTBEAT_REQUEST = "0";
/** /**
* 线 * 线
*/ */
public static final String ONLINE_REGISTER = "1".trim(); public static final String ONLINE_REGISTER = "1";
/** /**
* 线 * 线
*/ */
public static final String OFFLINE_REMOVE = "2".trim(); public static final String OFFLINE_REMOVE = "2";
/** /**
* *
*/ */
public static final String EXCEPTION_REMOVE = "3".trim(); public static final String EXCEPTION_REMOVE = "3";
/** /**
* *
*/ */
public static final String TERMINATOR_RESTART = "4".trim(); public static final String TERMINATOR_RESTART = "4";
/** /**
* *
*/ */
public static final String COLLECTOR_RESTART = "5".trim(); public static final String COLLECTOR_RESTART = "5";
/** /**
* *
*/ */
public static final String VIRTUAL_RESTART = "6".trim(); public static final String VIRTUAL_RESTART = "6";
/** /**
* *
*/ */
public static final String UPDATE_COLLECTOR_FILE = "7".trim(); public static final String UPDATE_COLLECTOR_FILE = "7";
/** /**
* *
*/ */
public static final String UPDATE_COLLECTOR_CONFIG = "8".trim(); public static final String UPDATE_COLLECTOR_CONFIG = "8";
/** /**
* *
*/ */
public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); public static final String SCH_DISTRIBUTE_TASKS = "9";
/** /**
* *
*/ */
public static final String REV_COLLECTOR_TASK = "10".trim(); public static final String REV_COLLECTOR_TASK = "10";
/** /**
* *
*/ */
public static final String HAS_VALID_COLLECTOR = "11".trim(); public static final String HAS_VALID_COLLECTOR = "11";
/** /**
* *
*/ */
public static final String HAS_COLLECTOR_COUNT = "12".trim(); public static final String HAS_COLLECTOR_COUNT = "12";
} }

@ -25,6 +25,9 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static com.docus.server.common.MsgConstants.COLLECTOR_RESTART;
import static com.docus.server.common.MsgConstants.HAS_COLLECTOR_COUNT;
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@ -41,102 +44,161 @@ public class ClientHandler extends SimpleChannelInboundHandler<CommMsg<Serializa
log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { switch (messageType) {
log.info("【终端客户端-收到调度器消息】接受到终端重启命令,内容={}", messageContent); case MsgConstants.TERMINATOR_RESTART:
terminatorRestart(messageContent);
break;
case MsgConstants.COLLECTOR_RESTART:
collectorRestart(messageContent);
break;
case MsgConstants.VIRTUAL_RESTART:
virtualRestart(messageContent);
break;
case MsgConstants.UPDATE_COLLECTOR_FILE:
updateCollectorFile(messageContent);
break;
case MsgConstants.UPDATE_COLLECTOR_CONFIG:
updateCollectorConfig(messageContent);
break;
case MsgConstants.SCH_DISTRIBUTE_TASKS:
schDistributeTasks(commMsg, messageContent);
break;
case MsgConstants.HAS_VALID_COLLECTOR:
hasValidCollector(messageContent);
break;
case MsgConstants.HAS_COLLECTOR_COUNT:
hasCollectorCount(context, messageContent);
break;
default:
break;
}
}
} else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { private void hasCollectorCount(ChannelHandlerContext context, String messageContent) {
log.info("【终端客户端-收到调度器消息】收到采集器重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】收到获取采集器数量命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap();
log.info("【终端客户端-收到调度器消息】收到虚拟机重启命令,内容={}", messageContent); Map<String, Channel> collectorMap = collectorChannelCacheMap.getAll();
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { String packageValue = nettyClient.getRedisMsg();
log.info("【终端客户端-收到调度器消息】收到更新采集器文件命令,内容={}", messageContent); List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() {
});
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { List<LoadSchCollectorVO> objects = Lists.newArrayList();
log.info("【终端客户端-收到调度器消息】收到更新采集器配置命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { for (LoadSchCollectorVO loadSchCollectorVO : redisLoadSchCollectors) {
log.info("【终端客户端-收到调度器消息】收到采集调度器下发任务命令,内容={}", messageContent); if (collectorMap.containsKey(String.valueOf(loadSchCollectorVO.getCollectorId()))) {
objects.add(loadSchCollectorVO);
}
}
//下发任务: 判断这个任务采集器类型在指定目录是否存在、 CommMsg<Serializable> collectorCountMsg = CommMsg.builder()
//如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器 .messageType(HAS_COLLECTOR_COUNT)
//如果存在: .messageTime(DateUtil.formatDateTime(new Date()))
.content(JSON.toJSON(objects))
.build();
SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(messageContent, SchCollectorTaskDTO.class); context.channel().writeAndFlush(collectorCountMsg);
String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId(); }
Long compareCollectorId = Long.valueOf(collectorId); private void hasValidCollector(String messageContent) {
log.info("【终端客户端-收到调度器消息】是否有可用类型的采集器命令,内容={}", messageContent);
}
String packageValue = nettyClient.getRedisMsg(); private void schDistributeTasks(CommMsg commMsg, String messageContent) throws Exception {
CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap(); log.info("【终端客户端-收到调度器消息】收到采集调度器下发任务命令,内容={}", messageContent);
String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath();
List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() { //下发任务: 判断这个任务采集器类型在指定目录是否存在、
}); //如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器
//如果存在:
Map<Long, LoadSchCollectorVO> collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId); SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(messageContent, SchCollectorTaskDTO.class);
String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId();
if (collectorVoMap.containsKey(compareCollectorId)) { Long compareCollectorId = Long.valueOf(collectorId);
LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(compareCollectorId);
String processName = loadSchCollectorVO.getProcessName();
if (Func.isNotBlank(processName)) { String packageValue = nettyClient.getRedisMsg();
boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName); CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap();
//存在指定采集器id进程 String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath();
if (isExistProcess) {
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
} else {
//不存在进程,则启动指定采集器进程,等待启动完成下发任务
StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName);
while (true) { List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() {
Channel channel = collectorChannelCacheMap.get(collectorId); });
log.info("等待采集器={}启动中", collectorId);
if (null != channel && channel.isOpen()) {
log.info("等待采集器={}启动成功并注册到终端", collectorId);
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
break;
}
}
}
}
}
Map<Long, LoadSchCollectorVO> collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId);
} else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) { if (collectorVoMap.containsKey(compareCollectorId)) {
log.info("【终端客户端-收到调度器消息】是否有可用类型的采集器命令,内容={}", messageContent); LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(compareCollectorId);
String processName = loadSchCollectorVO.getProcessName();
if (Func.isNotBlank(processName)) {
boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName);
//存在指定采集器id进程
if (isExistProcess) {
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
} else {
//不存在进程,则启动指定采集器进程,等待启动完成下发任务
StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName);
//下发任务: 判断这个任务采集器类型在指定目录是否存在、 while (true) {
//如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器 Channel channel = collectorChannelCacheMap.get(collectorId);
//如果存在: log.info("等待采集器={}启动中", collectorId);
} else if (messageType.equals(MsgConstants.HAS_COLLECTOR_COUNT)) { if (null != channel && channel.isOpen()) {
log.info("【终端客户端-收到调度器消息】收到获取采集器数量命令,内容={}", messageContent); log.info("等待采集器={}启动成功并注册到终端", collectorId);
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
break;
}
Thread.sleep(5000);
}
}
}
}
}
CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap(); private void updateCollectorConfig(String messageContent) {
Map<String, Channel> collectorMap = collectorChannelCacheMap.getAll(); log.info("【终端客户端-收到调度器消息】收到更新采集器配置命令,内容={}", messageContent);
}
String packageValue = nettyClient.getRedisMsg(); private void updateCollectorFile(String messageContent) {
List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() { log.info("【终端客户端-收到调度器消息】收到更新采集器文件命令,内容={}", messageContent);
}); }
List<LoadSchCollectorVO> objects = Lists.newArrayList(); private void virtualRestart(String messageContent) {
log.info("【终端客户端-收到调度器消息】收到虚拟机重启命令,内容={}", messageContent);
}
for (LoadSchCollectorVO loadSchCollectorVO : redisLoadSchCollectors) { private void collectorRestart(String messageContent) {
if (collectorMap.containsKey(String.valueOf(loadSchCollectorVO.getCollectorId()))) { log.info("【终端客户端-收到调度器消息】收到采集器重启命令,内容={}", messageContent);
objects.add(loadSchCollectorVO); String packageValue = nettyClient.getRedisMsg();
String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath();
List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() {
});
Map<Long, LoadSchCollectorVO> collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId);
Long collectorId = Long.valueOf(messageContent);
if (collectorVoMap.containsKey(collectorId)) {
LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(collectorId);
String processName = loadSchCollectorVO.getProcessName();
if (Func.isNotBlank(processName)) {
boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName);
//存在指定采集器id进程
if (isExistProcess) {
boolean result = StartUpExeUtils.startUpExe(saveCollectorPackagePath + collectorId + "\\collector\\" + processName, processName);
CommMsg<Serializable> collectorRestartMsg = CommMsg.builder()
.content(result)
.messageTime(DateUtil.formatDateTime(new Date()))
.messageType(COLLECTOR_RESTART)
.build();
nettyClient.getCollectorChannelCacheMap().writeAndFlush(messageContent, collectorRestartMsg);
} }
} }
CommMsg<Serializable> collectorCountMsg = CommMsg.builder()
.messageType(MsgConstants.HAS_COLLECTOR_COUNT)
.messageTime(DateUtil.formatDateTime(new Date()))
.content(JSON.toJSON(objects))
.build();
context.channel().writeAndFlush(collectorCountMsg);
} }
}
private void terminatorRestart(String messageContent) {
log.info("【终端客户端-收到调度器消息】接受到终端重启命令,内容={}", messageContent);
} }

@ -5,7 +5,6 @@ import com.docus.core.util.Func;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.api.scheduling.management.SchCollectRecordApi; import com.docus.server.api.scheduling.management.SchCollectRecordApi;
import com.docus.server.common.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.common.netty.state.DeviceStateContext;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
@ -25,6 +24,8 @@ import java.io.Serializable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Date; import java.util.Date;
import static com.docus.server.common.MsgConstants.*;
/** /**
* *
*/ */
@ -50,74 +51,110 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
log.info("======== 【终端服务端-收到采集器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); log.info("======== 【终端服务端-收到采集器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.ONLINE_REGISTER)) { switch (messageType) {
//===========login ok,切换到已登录状态===============messageContent=collectorId========= case ONLINE_REGISTER:
deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过"); onlineRegister(deviceStateContext, ctx, messageContent);
break;
case HEARTBEAT_REQUEST:
heartbeatRequest(deviceStateContext, ctx, messageContent);
break;
case TERMINATOR_RESTART:
terminatorRestart(messageContent);
break;
case COLLECTOR_RESTART:
collectorRestart(messageContent);
break;
case VIRTUAL_RESTART:
virtualRestart(messageContent);
break;
case UPDATE_COLLECTOR_FILE:
updateCollectorFile(messageContent);
break;
case UPDATE_COLLECTOR_CONFIG:
updateCollectorConfig(messageContent);
break;
case SCH_DISTRIBUTE_TASKS:
schDistributeTasks(commMsg, messageContent);
break;
case REV_COLLECTOR_TASK:
revCollectorTask(messageContent);
break;
default:
break;
}
}
CommMsg<Serializable> authMsg = CommMsg.builder() private void revCollectorTask(String messageContent) {
.messageTime(DateUtil.formatDateTime(new Date())) log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent);
.content("采集器认证通过") EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(messageContent, EditSchCollectRecordDTO.class);
.messageType("response") schCollectRecordApi.edit(editSchCollectRecordDTO);
.build(); }
ctx.writeAndFlush(authMsg);
private void schDistributeTasks(CommMsg commMsg, String messageContent) {
log.info("【终端服务端-接收采集器消息】收到采集调度器下发任务命令,内容{}", messageContent);
}
Channel channel = collectorChannelCacheMap.get(messageContent); private void updateCollectorConfig(String messageContent) {
if (channel != null && channel.isOpen()) { log.info("【终端服务端-接收采集器消息】收到更新采集器配置命令,内容{}", messageContent);
channel.close(); }
}
if (Func.isNotBlank(messageContent)) {
collectorChannelCacheMap.put(messageContent, ctx.channel());
}
} else if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) { private void updateCollectorFile(String messageContent) {
log.info("【终端服务端-接收采集器消息】收到更新采集器文件命令,内容{}", messageContent);
}
//============状态为上行数据============= private void virtualRestart(String messageContent) {
deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据"); log.info("【终端服务端-接收采集器消息】收到虚拟机重启命令,内容{}", messageContent);
}
//============返回消息================== private void collectorRestart(String messageContent) {
CommMsg<Serializable> resp = CommMsg.builder() log.info("【终端服务端-接收采集器消息】收到采集器重启命令,内容{}", messageContent);
.messageTime(DateUtil.formatDateTime(new Date())) }
.content("HEARTBEAT_REQUEST ok 【终端服务器】收到【采集器客户端】心跳")
.messageType("response HEARTBEAT_REQUEST")
.build();
ctx.writeAndFlush(resp);
private void terminatorRestart(String messageContent) {
log.info("【终端服务端-接收采集器消息】接受到终端重启命令,内容{}", messageContent);
}
log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); private void heartbeatRequest(DeviceStateContext deviceStateContext, ChannelHandlerContext ctx, String messageContent) {
//============状态为上行数据=============
deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据");
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); //============返回消息==================
String clientIp = ipSocket.getAddress().getHostAddress(); CommMsg<Serializable> resp = CommMsg.builder()
.messageTime(DateUtil.formatDateTime(new Date()))
.content("HEARTBEAT_REQUEST ok 【终端服务器】收到【采集器客户端】心跳")
.messageType("response HEARTBEAT_REQUEST")
.build();
ctx.writeAndFlush(resp);
log.info("【终端服务端-接收采集器消息】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息内容={}", messageContent);
} else if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
log.info("【终端服务端-接收采集器消息】接受到终端重启命令,内容{}", messageContent); String clientIp = ipSocket.getAddress().getHostAddress();
} else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { log.info("【终端服务端-接收采集器消息】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
log.info("【终端服务端-接收采集器消息】收到采集器重启命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { }
log.info("【终端服务端-接收采集器消息】收到虚拟机重启命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { private void onlineRegister(DeviceStateContext deviceStateContext, ChannelHandlerContext ctx, String messageContent) {
log.info("【终端服务端-接收采集器消息】收到更新采集器文件命令,内容{}", messageContent); //===========login ok,切换到已登录状态===============messageContent=collectorId=========
deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过");
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { CommMsg<Serializable> authMsg = CommMsg.builder()
log.info("【终端服务端-接收采集器消息】收到更新采集器配置命令,内容{}", messageContent); .messageTime(DateUtil.formatDateTime(new Date()))
.content("采集器认证通过")
.messageType("response")
.build();
ctx.writeAndFlush(authMsg);
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
log.info("【终端服务端-接收采集器消息】收到采集调度器下发任务命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.REV_COLLECTOR_TASK)) { Channel channel = collectorChannelCacheMap.get(messageContent);
log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent); if (channel != null && channel.isOpen()) {
EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(messageContent, EditSchCollectRecordDTO.class); channel.close();
schCollectRecordApi.edit(editSchCollectRecordDTO); }
if (Func.isNotBlank(messageContent)) {
collectorChannelCacheMap.put(messageContent, ctx.channel());
} }
System.out.println("channelRead0:" + deviceStateContext.toString());
} }
/** /**

@ -24,7 +24,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
* @author AutoGenerator * @author AutoGenerator
* @since 2023-07-15 * @since 2023-07-15
*/ */
@Api(value = "执行管理器管理接口", tags = "执行管理器管理接口") @Api(value = "终端管理器管理接口", tags = "终端管理器管理接口")
@FeignClient(value = "collector-scheduling-management", contextId = "collector-scheduling-management.SchTerminatorApi") @FeignClient(value = "collector-scheduling-management", contextId = "collector-scheduling-management.SchTerminatorApi")
@RequestMapping("/sch/terminator") @RequestMapping("/sch/terminator")
public interface SchTerminatorApi { public interface SchTerminatorApi {

@ -73,10 +73,6 @@ public class SchTerminator implements Serializable {
@TableField("last_task_error_msg") @TableField("last_task_error_msg")
private String lastTaskErrorMsg; private String lastTaskErrorMsg;
@ApiModelProperty(value = "当前终端注册上来的采集器信息")
@TableField("online_collectors_json")
private String onlineCollectorsJson;
@ApiModelProperty(value = "入库时间") @ApiModelProperty(value = "入库时间")
@TableField("create_time") @TableField("create_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")

Loading…
Cancel
Save