From f25f2644e3869752882bbd20f1d0d6b835badfd0 Mon Sep 17 00:00:00 2001 From: linrf Date: Wed, 9 Aug 2023 17:36:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86netty=20=E4=B8=8B=E5=8F=91?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=B2=98=E5=8C=85=E5=88=86=E5=8C=85=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/docus/server/common/MsgConstants.java | 4 ++ .../docus/server/common/SchCollectorTask.java | 4 +- .../netty/server/NettyServerInitializer.java | 2 +- ...ry.java => TerminalToChannelCacheMap.java} | 4 +- .../server/handler/NettyBusinessHandler.java | 58 +++++++++++-------- .../server/handler/NettyHeartbeatHandler.java | 6 +- .../common/process/ChannelProcessor.java | 4 +- .../controller/SchTerminatorController.java | 4 +- .../server/service/ISchTerminatorService.java | 2 + .../service/impl/CommMsgServiceImpl.java | 4 +- .../impl/SchTerminatorServiceImpl.java | 6 ++ .../com/docus/server/common/MsgConstants.java | 5 ++ .../netty/client/handler/ClientHandler.java | 28 +++++++++ .../server/CollectorChannelCacheMap.java | 4 ++ .../server/handler/NettyBusinessHandler.java | 25 +++++--- .../netty/state/DeviceStateContext.java | 4 +- .../schterminator/NettyTerminatorDTO.java | 4 ++ .../scheduling.management/SchTerminator.java | 4 ++ .../schterminator/SchTerminatorVO.java | 18 ++---- 19 files changed, 130 insertions(+), 60 deletions(-) rename collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/{ChannelRepository.java => TerminalToChannelCacheMap.java} (97%) diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java index 22b78f6..d788ace 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java @@ -56,4 +56,8 @@ public class MsgConstants { * 是否有可用类型的采集器命令 */ public static final String HAS_VALID_COLLECTOR = "11".trim(); + /** + * 获取终端采集器数据量命令 + */ + public static final String HAS_COLLECTOR_COUNT = "12".trim(); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java index f355c53..7e2f9ee 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java @@ -5,7 +5,7 @@ import com.docus.core.util.Func; import com.docus.core.util.json.JSON; import com.docus.infrastructure.redis.service.RedisOps; import com.docus.server.api.taskdistribute.TaskDistributeApi; -import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.common.netty.server.TerminalToChannelCacheMap; import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; @@ -41,7 +41,7 @@ import java.util.concurrent.LinkedBlockingQueue; @Slf4j public class SchCollectorTask { @Resource - private ChannelRepository channelRepository; + private TerminalToChannelCacheMap channelRepository; @Resource private TaskDistributeApi taskDistributeApi; @Resource diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java index 160a4fa..3e0c811 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java @@ -27,7 +27,7 @@ public class NettyServerInitializer extends ChannelInitializer { private NettyBusinessHandler businessHandler; @Resource - private ChannelRepository channelRepository; + private TerminalToChannelCacheMap channelRepository; @Override protected void initChannel(Channel channel) throws Exception { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/TerminalToChannelCacheMap.java similarity index 97% rename from collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java rename to collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/TerminalToChannelCacheMap.java index 9a8a58c..039bd13 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/TerminalToChannelCacheMap.java @@ -23,7 +23,7 @@ import java.util.stream.Collectors; */ @Component @Slf4j -public class ChannelRepository { +public class TerminalToChannelCacheMap { @Resource private ISchTerminatorService iSchTerminatorService; @@ -97,7 +97,7 @@ public class ChannelRepository { */ public void remove(String key) { IP_CHANNEL_CACHE_MAP.remove(key); - + IP_TERMINATOR_CACHE_MAP.remove(key); NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index 6a3c84e..2a23923 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -1,12 +1,16 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.Func; +import com.docus.core.util.json.JSON; import com.docus.server.common.CommMsg; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.common.netty.server.TerminalToChannelCacheMap; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; +import com.docus.server.service.ISchTerminatorService; +import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO; +import com.fasterxml.jackson.core.type.TypeReference; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -20,6 +24,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.Serializable; import java.net.InetSocketAddress; +import java.util.List; /** * 业务消息处理 @@ -32,38 +37,45 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler loadSchCollectorVOList = JSON.fromJSONWithGeneric(messageContent, new TypeReference>() { + }); + nettyTerminatorDTO.getLoadSchCollectorVOList().addAll(loadSchCollectorVOList); + } + } } @@ -78,7 +90,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler findAll(); void batchUpdate(List terminators); + + SchTerminatorVO findByTerminalIP(String terminalIp); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java index ebb7e28..ff1535c 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java @@ -3,7 +3,7 @@ package com.docus.server.service.impl; import com.docus.core.util.DateUtil; import com.docus.core.util.json.JSON; import com.docus.server.common.CommMsg; -import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.common.netty.server.TerminalToChannelCacheMap; import com.docus.server.convert.CommMsgConvert; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; import com.docus.server.service.ICommMsgService; @@ -23,7 +23,7 @@ import java.util.Set; public class CommMsgServiceImpl implements ICommMsgService { @Resource - private ChannelRepository channelRepository; + private TerminalToChannelCacheMap channelRepository; @Override public void clientCommand(CommMsgDTO commMsgDTO) { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java index 4421e83..4df2f9d 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java @@ -96,6 +96,12 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService { iSchTerminatorDao.updateBatchById(terminators, 20); } + @Override + public SchTerminatorVO findByTerminalIP(String terminalIp) { + SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminalIp); + return SchTerminatorConvert.INSTANCE.convertVO(schTerminator); + } + /** * 新增 * diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java index a155362..f966724 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java @@ -60,4 +60,9 @@ public class MsgConstants { * 是否有可用类型的采集器命令 */ public static final String HAS_VALID_COLLECTOR = "11".trim(); + + /** + * 获取终端采集器数据量命令 + */ + public static final String HAS_COLLECTOR_COUNT = "12".trim(); } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java index 9cd858f..e128f11 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java @@ -1,5 +1,6 @@ package com.docus.server.common.netty.client.handler; +import com.docus.core.util.DateUtil; import com.docus.core.util.Func; import com.docus.core.util.ListUtils; import com.docus.core.util.json.JSON; @@ -11,6 +12,7 @@ import com.docus.server.common.utils.StartUpExeUtils; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -19,6 +21,7 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.io.Serializable; +import java.util.Date; import java.util.List; import java.util.Map; @@ -108,6 +111,31 @@ public class ClientHandler extends SimpleChannelInboundHandler collectorMap = collectorChannelCacheMap.getAll(); + + String packageValue = nettyClient.getRedisMsg(); + List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { + }); + + List objects = Lists.newArrayList(); + + for (LoadSchCollectorVO loadSchCollectorVO : redisLoadSchCollectors) { + if (collectorMap.containsKey(String.valueOf(loadSchCollectorVO.getCollectorId()))) { + objects.add(loadSchCollectorVO); + } + } + + CommMsg collectorCountMsg = CommMsg.builder() + .messageType(MsgConstants.HAS_COLLECTOR_COUNT) + .messageTime(DateUtil.formatDateTime(new Date())) + .content(JSON.toJSON(objects)) + .build(); + + context.channel().writeAndFlush(collectorCountMsg); } } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java index 3bbe9c1..0db8b77 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java @@ -64,6 +64,10 @@ public class CollectorChannelCacheMap { return COLLECTOR_ID_CHANNEL_CACHE_MAP.get(key); } + public Map getAll() { + return COLLECTOR_ID_CHANNEL_CACHE_MAP; + } + /** * 终端离线 */ diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index 9395903..f38ed4e 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -48,6 +48,8 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler resp = CommMsg.builder() .messageTime(DateUtil.formatDateTime(new Date())) - .content("HEARTBEAT_REQUEST ok") + .content("HEARTBEAT_REQUEST ok 【终端服务器】收到【采集器客户端】心跳") .messageType("response HEARTBEAT_REQUEST") .build(); ctx.writeAndFlush(resp); @@ -112,7 +112,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler deviceStateContext.getHeartRate()) { @@ -151,15 +155,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler loadSchCollectorVOList = new ArrayList<>(); } diff --git a/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java b/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java index 301dd33..9ba6781 100644 --- a/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java +++ b/docus-client-interface/src/main/java/com/docus/server/entity/scheduling.management/SchTerminator.java @@ -73,6 +73,10 @@ public class SchTerminator implements Serializable { @TableField("last_task_error_msg") private String lastTaskErrorMsg; + @ApiModelProperty(value = "当前终端注册上来的采集器信息") + @TableField("online_collectors_json") + private String onlineCollectorsJson; + @ApiModelProperty(value = "入库时间") @TableField("create_time") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") diff --git a/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schterminator/SchTerminatorVO.java b/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schterminator/SchTerminatorVO.java index 028fe57..44d5d4b 100644 --- a/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schterminator/SchTerminatorVO.java +++ b/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schterminator/SchTerminatorVO.java @@ -1,9 +1,9 @@ package com.docus.server.vo.scheduling.management.schterminator; -import com.baomidou.mybatisplus.annotation.TableField; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.RestrictStateEnum; +import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO; import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -11,6 +11,7 @@ import lombok.Data; import java.io.Serializable; import java.util.Date; +import java.util.List; /** * 执行管理器 VO @@ -26,53 +27,44 @@ public class SchTerminatorVO implements Serializable { private Long id; @ApiModelProperty(value = "终端IP") - @TableField("terminator_ip") private String terminatorIp; @ApiModelProperty(value = "资源管理器名称") - @TableField("terminator_name") private String terminatorName; @ApiModelProperty(value = "只干某些任务,任务偏好属性") - @TableField("only_collector_ids") private String onlyCollectorIds; @ApiModelProperty(value = "优先干某些任务,任务偏好属性(优先给哪个采集器,若不空闲再给任意采集器)") - @TableField("priority_collector_ids") private String priorityCollectorIds; @ApiModelProperty(value = "是否限制采集器类型,0:否,1:是") - @TableField("restrict_state") private RestrictStateEnum restrictState; @ApiModelProperty(value = "忙闲状态,0:空闲,1:繁忙") - @TableField("busy_state") private BusyStateEnum busyState; @ApiModelProperty(value = "在线状态,0:离线,1:在线") - @TableField("online_state") private OnlineStateEnum onlineState; @ApiModelProperty(value = "最近任务执行时间") - @TableField("last_task_exec_time") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date lastTaskExecTime; @ApiModelProperty(value = "正在执行任务的名称") - @TableField("executing_task_name") private String executingTaskName; @ApiModelProperty(value = "最近一次任务执行失败原因") - @TableField("last_task_error_msg") private String lastTaskErrorMsg; + @ApiModelProperty(value = "当前终端注册上来的采集器信息") + private List loadSchCollectorVOList; + @ApiModelProperty(value = "入库时间") - @TableField("create_time") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; @ApiModelProperty(value = "入库更新时间") - @TableField("update_time") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date updateTime;