diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java index d788ace..a9ba686 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java @@ -5,59 +5,59 @@ public class MsgConstants { /** * 客户端与采集调度器心跳 */ - public static final String HEARTBEAT_REQUEST = "0".trim(); + public static final String HEARTBEAT_REQUEST = "0"; /** * 客户端上线注册 */ - public static final String ONLINE_REGISTER = "1".trim(); + public static final String ONLINE_REGISTER = "1"; /** * 客户端下线移除 */ - public static final String OFFLINE_REMOVE = "2".trim(); + public static final String OFFLINE_REMOVE = "2"; /** * 客户端异常注册 */ - public static final String EXCEPTION_REMOVE = "3".trim(); + public static final String EXCEPTION_REMOVE = "3"; /** * 终端重启命令 */ - public static final String TERMINATOR_RESTART = "4".trim(); + public static final String TERMINATOR_RESTART = "4"; /** * 采集器重启命令 */ - public static final String COLLECTOR_RESTART = "5".trim(); + public static final String COLLECTOR_RESTART = "5"; /** * 虚拟机重启命令 */ - public static final String VIRTUAL_RESTART = "6".trim(); + public static final String VIRTUAL_RESTART = "6"; /** * 更新采集器文件命令 */ - public static final String UPDATE_COLLECTOR_FILE = "7".trim(); + public static final String UPDATE_COLLECTOR_FILE = "7"; /** * 更新采集器配置命令 */ - public static final String UPDATE_COLLECTOR_CONFIG = "8".trim(); + public static final String UPDATE_COLLECTOR_CONFIG = "8"; /** * 采集调度器下发任务命令 */ - public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); + public static final String SCH_DISTRIBUTE_TASKS = "9"; /** * 是否有可用类型的采集器命令 */ - public static final String HAS_VALID_COLLECTOR = "11".trim(); + public static final String HAS_VALID_COLLECTOR = "11"; /** * 获取终端采集器数据量命令 */ - public static final String HAS_COLLECTOR_COUNT = "12".trim(); + public static final String HAS_COLLECTOR_COUNT = "12"; } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index 2a23923..2b8ac94 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -3,12 +3,10 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.Func; import com.docus.core.util.json.JSON; import com.docus.server.common.CommMsg; -import com.docus.server.common.MsgConstants; import com.docus.server.common.netty.server.TerminalToChannelCacheMap; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; -import com.docus.server.service.ISchTerminatorService; import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.channel.Channel; @@ -26,6 +24,8 @@ import java.io.Serializable; import java.net.InetSocketAddress; import java.util.List; +import static com.docus.server.common.MsgConstants.*; + /** * 业务消息处理 */ @@ -38,44 +38,102 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler loadSchCollectorVOList = JSON.fromJSONWithGeneric(messageContent, new TypeReference>() { - }); - nettyTerminatorDTO.getLoadSchCollectorVOList().addAll(loadSchCollectorVOList); - } + switch (messageType) { + case ONLINE_REGISTER: + onlineRegister(ctx, messageContent, clientIp); + break; + case TERMINATOR_RESTART: + terminatorRestart(messageContent); + break; + case COLLECTOR_RESTART: + collectorRestart(messageContent); + break; + case VIRTUAL_RESTART: + virtualRestart(messageContent); + break; + case UPDATE_COLLECTOR_FILE: + updateCollectorFile(messageContent); + break; + case UPDATE_COLLECTOR_CONFIG: + updateCollectorConfig(messageContent); + break; + case SCH_DISTRIBUTE_TASKS: + schDistributeTasks(commMsg, messageContent); + break; + case HAS_COLLECTOR_COUNT: + hasCollectorCount(ctx, messageContent); + break; + default: + break; + } + } + + private void hasCollectorCount(ChannelHandlerContext ctx, String messageContent) { + log.info("【调度器服务端-收到终端消息】收到终端客户端返回的【采集器数量】消息,内容{}", messageContent); + + String terminalIp = terminalToChannelCacheMap.getClientKey(ctx.channel()); + + NettyTerminatorDTO nettyTerminatorDTO = terminalToChannelCacheMap.getTerminatorByIp(terminalIp); + + if (Func.isNotBlank(messageContent)) { + nettyTerminatorDTO.getLoadSchCollectorVOList().clear(); + List loadSchCollectorVOList = JSON.fromJSONWithGeneric(messageContent, new TypeReference>() { + }); + nettyTerminatorDTO.getLoadSchCollectorVOList().addAll(loadSchCollectorVOList); + } + } + + private void schDistributeTasks(CommMsg commMsg, String messageContent) { + log.info("【调度器服务端-收到终端消息】收到采集调度器下发任务命令,内容{}", messageContent); + } + + private void updateCollectorConfig(String messageContent) { + log.info("【调度器服务端-收到终端消息】收到更新采集器配置命令,内容{}", messageContent); + } + + private void updateCollectorFile(String messageContent) { + log.info("【调度器服务端-收到终端消息】收到更新采集器文件命令,内容{}", messageContent); + } + + private void virtualRestart(String messageContent) { + log.info("【调度器服务端-收到终端消息】收到虚拟机重启命令,内容{}", messageContent); + } + + private void collectorRestart(String messageContent) { + log.info("【调度器服务端-收到终端消息】收到采集器重启命令,内容{}", messageContent); + } + + private void terminatorRestart(String messageContent) { + + log.info("【调度器服务端-收到终端消息】接受到终端重启命令,内容{}", messageContent); + } + private void onlineRegister(ChannelHandlerContext ctx, String messageContent, String clientIp) { + log.info("【调度器服务端-收到终端消息】接受到终端注册命令,内容{}", messageContent); + String clientKey = terminalToChannelCacheMap.getClientKey(ctx.channel()); + + if (Func.isNotBlank(clientKey)) { + + NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); + nettyTerminatorDTO.setTerminatorIp(clientIp); + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE); + + //将ip和channel进行映射 + terminalToChannelCacheMap.put(nettyTerminatorDTO, ctx.channel()); } } @@ -116,6 +174,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler search(SearchDTO searchDTO) { - return SchTerminatorConvert.INSTANCE.convertVO(iSchTerminatorDao.search(searchDTO)); + PageResult result = SchTerminatorConvert.INSTANCE.convertVO(iSchTerminatorDao.search(searchDTO)); + + if (CollectionUtils.isEmpty(result.getList())) { + return new PageResult<>(); + } + + Map ipToTerminatorCacheMap = terminalToChannelCacheMap.getIpToTerminatorCacheMap(); + + Setters.instance().list(result.getList()).cycleSetProperties(p -> { + String terminatorIp = String.valueOf(p.getTerminatorIp()); + if (ipToTerminatorCacheMap.containsKey(terminatorIp)) { + NettyTerminatorDTO nettyTerminatorDTO = ipToTerminatorCacheMap.get(terminatorIp); + p.setLoadSchCollectorVOList(nettyTerminatorDTO.getLoadSchCollectorVOList()); + } + }); + + return result; } @Override diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java index f966724..93cfeed 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java @@ -5,64 +5,64 @@ public class MsgConstants { /** * 客户端与采集调度器心跳 */ - public static final String HEARTBEAT_REQUEST = "0".trim(); + public static final String HEARTBEAT_REQUEST = "0"; /** * 客户端上线注册 */ - public static final String ONLINE_REGISTER = "1".trim(); + public static final String ONLINE_REGISTER = "1"; /** * 客户端下线移除 */ - public static final String OFFLINE_REMOVE = "2".trim(); + public static final String OFFLINE_REMOVE = "2"; /** * 客户端异常注册 */ - public static final String EXCEPTION_REMOVE = "3".trim(); + public static final String EXCEPTION_REMOVE = "3"; /** * 终端重启命令 */ - public static final String TERMINATOR_RESTART = "4".trim(); + public static final String TERMINATOR_RESTART = "4"; /** * 采集器重启命令 */ - public static final String COLLECTOR_RESTART = "5".trim(); + public static final String COLLECTOR_RESTART = "5"; /** * 虚拟机重启命令 */ - public static final String VIRTUAL_RESTART = "6".trim(); + public static final String VIRTUAL_RESTART = "6"; /** * 更新采集器文件命令 */ - public static final String UPDATE_COLLECTOR_FILE = "7".trim(); + public static final String UPDATE_COLLECTOR_FILE = "7"; /** * 更新采集器配置命令 */ - public static final String UPDATE_COLLECTOR_CONFIG = "8".trim(); + public static final String UPDATE_COLLECTOR_CONFIG = "8"; /** * 采集调度器下发任务命令 */ - public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); + public static final String SCH_DISTRIBUTE_TASKS = "9"; /** * 接收采集器上报的任务 */ - public static final String REV_COLLECTOR_TASK = "10".trim(); + public static final String REV_COLLECTOR_TASK = "10"; /** * 是否有可用类型的采集器命令 */ - public static final String HAS_VALID_COLLECTOR = "11".trim(); + public static final String HAS_VALID_COLLECTOR = "11"; /** * 获取终端采集器数据量命令 */ - public static final String HAS_COLLECTOR_COUNT = "12".trim(); + public static final String HAS_COLLECTOR_COUNT = "12"; } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java index e128f11..96b84e1 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java @@ -25,6 +25,9 @@ import java.util.Date; import java.util.List; import java.util.Map; +import static com.docus.server.common.MsgConstants.COLLECTOR_RESTART; +import static com.docus.server.common.MsgConstants.HAS_COLLECTOR_COUNT; + @Slf4j @AllArgsConstructor @NoArgsConstructor @@ -41,102 +44,161 @@ public class ClientHandler extends SimpleChannelInboundHandler collectorMap = collectorChannelCacheMap.getAll(); - } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { - log.info("【终端客户端-收到调度器消息】收到更新采集器文件命令,内容={}", messageContent); + String packageValue = nettyClient.getRedisMsg(); + List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { + }); - } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { - log.info("【终端客户端-收到调度器消息】收到更新采集器配置命令,内容={}", messageContent); + List objects = Lists.newArrayList(); - } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { - log.info("【终端客户端-收到调度器消息】收到采集调度器下发任务命令,内容={}", messageContent); + for (LoadSchCollectorVO loadSchCollectorVO : redisLoadSchCollectors) { + if (collectorMap.containsKey(String.valueOf(loadSchCollectorVO.getCollectorId()))) { + objects.add(loadSchCollectorVO); + } + } - //下发任务: 判断这个任务采集器类型在指定目录是否存在、 - //如果不存在:下拉该采集器类型最新的版本更新包到指定的部署路径,并且启动该exe采集器,将任务tcp给对应采集器 - //如果存在: + CommMsg collectorCountMsg = CommMsg.builder() + .messageType(HAS_COLLECTOR_COUNT) + .messageTime(DateUtil.formatDateTime(new Date())) + .content(JSON.toJSON(objects)) + .build(); - SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(messageContent, SchCollectorTaskDTO.class); - String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId(); + context.channel().writeAndFlush(collectorCountMsg); + } - Long compareCollectorId = Long.valueOf(collectorId); + private void hasValidCollector(String messageContent) { + log.info("【终端客户端-收到调度器消息】是否有可用类型的采集器命令,内容={}", messageContent); + } - String packageValue = nettyClient.getRedisMsg(); - CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap(); - String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath(); + private void schDistributeTasks(CommMsg commMsg, String messageContent) throws Exception { + log.info("【终端客户端-收到调度器消息】收到采集调度器下发任务命令,内容={}", messageContent); - List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { - }); + //下发任务: 判断这个任务采集器类型在指定目录是否存在、 + //如果不存在:下拉该采集器类型最新的版本更新包到指定的部署路径,并且启动该exe采集器,将任务tcp给对应采集器 + //如果存在: - Map collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId); + SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(messageContent, SchCollectorTaskDTO.class); + String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId(); - if (collectorVoMap.containsKey(compareCollectorId)) { - LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(compareCollectorId); - String processName = loadSchCollectorVO.getProcessName(); + Long compareCollectorId = Long.valueOf(collectorId); - if (Func.isNotBlank(processName)) { - boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName); - //存在指定采集器id进程 - if (isExistProcess) { - collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); - } else { - //不存在进程,则启动指定采集器进程,等待启动完成下发任务 - StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName); + String packageValue = nettyClient.getRedisMsg(); + CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap(); + String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath(); - while (true) { - Channel channel = collectorChannelCacheMap.get(collectorId); - log.info("等待采集器={}启动中", collectorId); - if (null != channel && channel.isOpen()) { - log.info("等待采集器={}启动成功并注册到终端", collectorId); - collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); - break; - } - } - } - } - } + List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { + }); + Map collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId); - } else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) { - log.info("【终端客户端-收到调度器消息】是否有可用类型的采集器命令,内容={}", messageContent); + if (collectorVoMap.containsKey(compareCollectorId)) { + LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(compareCollectorId); + String processName = loadSchCollectorVO.getProcessName(); + if (Func.isNotBlank(processName)) { + boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName); + //存在指定采集器id进程 + if (isExistProcess) { + collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); + } else { + //不存在进程,则启动指定采集器进程,等待启动完成下发任务 + StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName); - //下发任务: 判断这个任务采集器类型在指定目录是否存在、 - //如果不存在:下拉该采集器类型最新的版本更新包到指定的部署路径,并且启动该exe采集器,将任务tcp给对应采集器 - //如果存在: - } else if (messageType.equals(MsgConstants.HAS_COLLECTOR_COUNT)) { - log.info("【终端客户端-收到调度器消息】收到获取采集器数量命令,内容={}", messageContent); + while (true) { + Channel channel = collectorChannelCacheMap.get(collectorId); + log.info("等待采集器={}启动中", collectorId); + if (null != channel && channel.isOpen()) { + log.info("等待采集器={}启动成功并注册到终端", collectorId); + collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); + break; + } + Thread.sleep(5000); + } + } + } + } + } - CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap(); - Map collectorMap = collectorChannelCacheMap.getAll(); + private void updateCollectorConfig(String messageContent) { + log.info("【终端客户端-收到调度器消息】收到更新采集器配置命令,内容={}", messageContent); + } - String packageValue = nettyClient.getRedisMsg(); - List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { - }); + private void updateCollectorFile(String messageContent) { + log.info("【终端客户端-收到调度器消息】收到更新采集器文件命令,内容={}", messageContent); + } - List objects = Lists.newArrayList(); + private void virtualRestart(String messageContent) { + log.info("【终端客户端-收到调度器消息】收到虚拟机重启命令,内容={}", messageContent); + } - for (LoadSchCollectorVO loadSchCollectorVO : redisLoadSchCollectors) { - if (collectorMap.containsKey(String.valueOf(loadSchCollectorVO.getCollectorId()))) { - objects.add(loadSchCollectorVO); + private void collectorRestart(String messageContent) { + log.info("【终端客户端-收到调度器消息】收到采集器重启命令,内容={}", messageContent); + String packageValue = nettyClient.getRedisMsg(); + String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath(); + List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { + }); + + Map collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId); + Long collectorId = Long.valueOf(messageContent); + + if (collectorVoMap.containsKey(collectorId)) { + LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(collectorId); + String processName = loadSchCollectorVO.getProcessName(); + + if (Func.isNotBlank(processName)) { + boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName); + //存在指定采集器id进程 + if (isExistProcess) { + boolean result = StartUpExeUtils.startUpExe(saveCollectorPackagePath + collectorId + "\\collector\\" + processName, processName); + + CommMsg collectorRestartMsg = CommMsg.builder() + .content(result) + .messageTime(DateUtil.formatDateTime(new Date())) + .messageType(COLLECTOR_RESTART) + .build(); + nettyClient.getCollectorChannelCacheMap().writeAndFlush(messageContent, collectorRestartMsg); } } - - CommMsg collectorCountMsg = CommMsg.builder() - .messageType(MsgConstants.HAS_COLLECTOR_COUNT) - .messageTime(DateUtil.formatDateTime(new Date())) - .content(JSON.toJSON(objects)) - .build(); - - context.channel().writeAndFlush(collectorCountMsg); } + } + + private void terminatorRestart(String messageContent) { + log.info("【终端客户端-收到调度器消息】接受到终端重启命令,内容={}", messageContent); } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index f38ed4e..aa69b40 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -5,7 +5,6 @@ import com.docus.core.util.Func; import com.docus.core.util.json.JSON; import com.docus.server.api.scheduling.management.SchCollectRecordApi; import com.docus.server.common.CommMsg; -import com.docus.server.common.MsgConstants; import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; @@ -25,6 +24,8 @@ import java.io.Serializable; import java.net.InetSocketAddress; import java.util.Date; +import static com.docus.server.common.MsgConstants.*; + /** * 业务消息处理 */ @@ -50,74 +51,110 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler authMsg = CommMsg.builder() - .messageTime(DateUtil.formatDateTime(new Date())) - .content("采集器认证通过") - .messageType("response") - .build(); - ctx.writeAndFlush(authMsg); + private void revCollectorTask(String messageContent) { + log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent); + EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(messageContent, EditSchCollectRecordDTO.class); + schCollectRecordApi.edit(editSchCollectRecordDTO); + } + private void schDistributeTasks(CommMsg commMsg, String messageContent) { + log.info("【终端服务端-接收采集器消息】收到采集调度器下发任务命令,内容{}", messageContent); + } - Channel channel = collectorChannelCacheMap.get(messageContent); - if (channel != null && channel.isOpen()) { - channel.close(); - } - if (Func.isNotBlank(messageContent)) { - collectorChannelCacheMap.put(messageContent, ctx.channel()); - } + private void updateCollectorConfig(String messageContent) { + log.info("【终端服务端-接收采集器消息】收到更新采集器配置命令,内容{}", messageContent); + } - } else if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) { + private void updateCollectorFile(String messageContent) { + log.info("【终端服务端-接收采集器消息】收到更新采集器文件命令,内容{}", messageContent); + } - //============状态为上行数据============= - deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据"); + private void virtualRestart(String messageContent) { + log.info("【终端服务端-接收采集器消息】收到虚拟机重启命令,内容{}", messageContent); + } - //============返回消息================== - CommMsg resp = CommMsg.builder() - .messageTime(DateUtil.formatDateTime(new Date())) - .content("HEARTBEAT_REQUEST ok 【终端服务器】收到【采集器客户端】心跳") - .messageType("response HEARTBEAT_REQUEST") - .build(); - ctx.writeAndFlush(resp); + private void collectorRestart(String messageContent) { + log.info("【终端服务端-接收采集器消息】收到采集器重启命令,内容{}", messageContent); + } + private void terminatorRestart(String messageContent) { + log.info("【终端服务端-接收采集器消息】接受到终端重启命令,内容{}", messageContent); + } - log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); + private void heartbeatRequest(DeviceStateContext deviceStateContext, ChannelHandlerContext ctx, String messageContent) { + //============状态为上行数据============= + deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据"); - InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String clientIp = ipSocket.getAddress().getHostAddress(); + //============返回消息================== + CommMsg resp = CommMsg.builder() + .messageTime(DateUtil.formatDateTime(new Date())) + .content("HEARTBEAT_REQUEST ok 【终端服务器】收到【采集器客户端】心跳") + .messageType("response HEARTBEAT_REQUEST") + .build(); + ctx.writeAndFlush(resp); - log.info("【终端服务端-接收采集器消息】:{},连接上线,IP地址信息:{}", clientIp, clientIp); + log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息内容={}", messageContent); - } else if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { - log.info("【终端服务端-接收采集器消息】接受到终端重启命令,内容{}", messageContent); + InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = ipSocket.getAddress().getHostAddress(); - } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { - log.info("【终端服务端-接收采集器消息】收到采集器重启命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】:{},连接上线,IP地址信息:{}", clientIp, clientIp); - } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { - log.info("【终端服务端-接收采集器消息】收到虚拟机重启命令,内容{}", messageContent); + } - } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { - log.info("【终端服务端-接收采集器消息】收到更新采集器文件命令,内容{}", messageContent); + private void onlineRegister(DeviceStateContext deviceStateContext, ChannelHandlerContext ctx, String messageContent) { + //===========login ok,切换到已登录状态===============messageContent=collectorId========= + deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过"); - } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { - log.info("【终端服务端-接收采集器消息】收到更新采集器配置命令,内容{}", messageContent); + CommMsg authMsg = CommMsg.builder() + .messageTime(DateUtil.formatDateTime(new Date())) + .content("采集器认证通过") + .messageType("response") + .build(); + ctx.writeAndFlush(authMsg); - } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { - log.info("【终端服务端-接收采集器消息】收到采集调度器下发任务命令,内容{}", messageContent); - } else if (messageType.equals(MsgConstants.REV_COLLECTOR_TASK)) { - log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent); - EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(messageContent, EditSchCollectRecordDTO.class); - schCollectRecordApi.edit(editSchCollectRecordDTO); + Channel channel = collectorChannelCacheMap.get(messageContent); + if (channel != null && channel.isOpen()) { + channel.close(); + } + if (Func.isNotBlank(messageContent)) { + collectorChannelCacheMap.put(messageContent, ctx.channel()); } - - - System.out.println("channelRead0:" + deviceStateContext.toString()); } /** diff --git a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java index d4c93e5..2de1882 100644 --- a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java +++ b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java @@ -24,7 +24,7 @@ import org.springframework.web.bind.annotation.RequestMapping; * @author AutoGenerator * @since 2023-07-15 */ -@Api(value = "执行管理器管理接口", tags = "执行管理器管理接口") +@Api(value = "终端管理器管理接口", tags = "终端管理器管理接口") @FeignClient(value = "collector-scheduling-management", contextId = "collector-scheduling-management.SchTerminatorApi") @RequestMapping("/sch/terminator") public interface SchTerminatorApi { diff --git a/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java b/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java index 9ba6781..301dd33 100644 --- a/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java +++ b/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java @@ -73,10 +73,6 @@ public class SchTerminator implements Serializable { @TableField("last_task_error_msg") private String lastTaskErrorMsg; - @ApiModelProperty(value = "当前终端注册上来的采集器信息") - @TableField("online_collectors_json") - private String onlineCollectorsJson; - @ApiModelProperty(value = "入库时间") @TableField("create_time") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")