diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java index 019e9d9..22b78f6 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/MsgConstants.java @@ -52,4 +52,8 @@ public class MsgConstants { */ public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); + /** + * 是否有可用类型的采集器命令 + */ + public static final String HAS_VALID_COLLECTOR = "11".trim(); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java index 287085c..0676124 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java @@ -10,12 +10,14 @@ import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; +import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog; import com.docus.server.entity.scheduling.management.SchCollector; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.RetryTaskEnum; +import com.docus.server.service.ICommMsgService; import com.docus.server.service.ISchCollectRecordRetryLogService; import com.docus.server.service.ISchCollectRecordService; import com.docus.server.service.ISchCollectorService; @@ -54,6 +56,8 @@ public class SchCollectorTask { private ISchCollectorService iSchCollectorService; @Resource private RedisOps redisOps; + @Resource + private ICommMsgService iCommMsgService; private BlockingQueue retryTaskQueue = new LinkedBlockingQueue<>(); @@ -106,7 +110,7 @@ public class SchCollectorTask { continue; } - for (ReportDownTwoDTO report : reportDownTwoDTOList) { + for (ReportDownTwoDTO report : reportDownTwoDTOList) { //先找出有只采集的任务。 ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) @@ -201,6 +205,9 @@ public class SchCollectorTask { } private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) { + + hasValidCollector(terminal, reportDownTwoDTO); + Channel channel = channelRepository.get(terminal.getTerminatorIp()); Map params = reportDownTwoDTO.getParams(); @@ -248,6 +255,27 @@ public class SchCollectorTask { } + private void hasValidCollector(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) { + /** + * 终端虚拟机配置了任务偏好,根据任务偏好计算出下发的任务,下发之前,先发一个命令给终端,让终端去找采集器进程和心跳,全部正常就下发这条任务, + * 反之,没有找到,就回报给调度器,调度器下发下载指令给终端,终端收到命令,去下载部署采集器,并运行打开采集器进程,建立心跳,正常后,就回报调度器,调度器下发这条任务 + */ + + //1、下发任务之前,询问终端是否有采集器进程和心跳 + //2、若没有,下发部署采集器指令给终端 + //3、若有,下发任务给终端 + String collectorId = reportDownTwoDTO.getTasks().get(0).getCollectorId(); + + CommMsgDTO commMsgDTO = CommMsgDTO.builder() + .terminatorIp(terminal.getTerminatorIp()) + .messageType(MsgConstants.HAS_VALID_COLLECTOR) + .content(collectorId) + .build(); + + iCommMsgService.clientCommand(commMsgDTO); + + } + public void addRetryTask(ReportDownTwoDTO reportDownTwoDTO) { this.retryTaskQueue.add(reportDownTwoDTO); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java index 076813b..71c2335 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java @@ -41,14 +41,19 @@ public class ChannelProcessor extends AbstractProcessor { switch (group) { case "SchCollectRecordController-edit": - return doSchCollectRecordControllerEdit(context); + doSchCollectRecordControllerEdit(context); + break; case "SchTerminatorController": - return doSchTerminatorController(context); + doSchTerminatorController(context); + break; case "RedisKeyExpirationService-expired": - return doRedisKeyExpired(context); + doRedisKeyExpired(context); + break; default: - return true; + return null; } + + return null; } private Object doRedisKeyExpired(TrackContext context) { @@ -70,8 +75,8 @@ public class ChannelProcessor extends AbstractProcessor { return null; } - private boolean doSchCollectRecordControllerEdit(TrackContext context) { - return logCollectRecord(context); + private void doSchCollectRecordControllerEdit(TrackContext context) { + logCollectRecord(context); } private boolean doSchTerminatorController(TrackContext context) { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java index f39a5a1..a016ceb 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java @@ -75,8 +75,8 @@ public class SchCollectRecordController implements SchCollectRecordApi { */ @TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class) @Override - public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) { - return iSchCollectRecordService.edit(editSchCollectRecordDTO); + public void edit(EditSchCollectRecordDTO editSchCollectRecordDTO) { + iSchCollectRecordService.edit(editSchCollectRecordDTO); } /** diff --git a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchTerminatorController.java b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchTerminatorController.java index 65700e2..600f3cf 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchTerminatorController.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchTerminatorController.java @@ -65,12 +65,11 @@ public class SchTerminatorController implements SchTerminatorApi { * 编辑 * * @param editSchTerminatorDTO 编辑参数 - * @return 成功或失败 */ @TrackGroup(group = "SchTerminatorController", processor = ChannelProcessor.class) @Override - public boolean edit(EditSchTerminatorDTO editSchTerminatorDTO) { - return iSchTerminatorService.edit(editSchTerminatorDTO); + public void edit(EditSchTerminatorDTO editSchTerminatorDTO) { + iSchTerminatorService.edit(editSchTerminatorDTO); } /** diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java index 019e9d9..a155362 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java @@ -51,5 +51,13 @@ public class MsgConstants { * 采集调度器下发任务命令 */ public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); + /** + * 接收采集器上报的任务 + */ + public static final String REV_COLLECTOR_TASK = "10".trim(); + /** + * 是否有可用类型的采集器命令 + */ + public static final String HAS_VALID_COLLECTOR = "11".trim(); } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java index 179c2ea..7c8dbb0 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java @@ -4,16 +4,20 @@ import com.docus.core.util.StringUtils; import com.docus.core.util.json.JSON; import com.docus.server.common.MsgConstants; import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.netty.server.CollectorChannelCacheMap; +import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; -import java.io.Serializable; +import javax.annotation.Resource; @Slf4j public class ClientHandler extends SimpleChannelInboundHandler { + @Resource + private CollectorChannelCacheMap collectorChannelCacheMap; @Override protected void channelRead0(ChannelHandlerContext context, ByteBuf msg) throws Exception { @@ -34,32 +38,43 @@ public class ClientHandler extends SimpleChannelInboundHandler { String messageType = commMsg.getMessageType(); String messageTime = commMsg.getMessageTime(); - Serializable messageContent = commMsg.getContent(); + String messageContent = (String) commMsg.getContent(); - log.info("======== 收到服务端消息, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); + log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { - log.info("接受到终端重启命令,内容={}", messageContent); - } + log.info("【终端客户端-收到调度器下发任务】接受到终端重启命令,内容={}", messageContent); - if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { - log.info("收到采集器重启命令,内容={}", messageContent); - } + } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { + log.info("【终端客户端-收到调度器下发任务】收到采集器重启命令,内容={}", messageContent); - if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { - log.info("收到虚拟机重启命令,内容={}", messageContent); - } + } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { + log.info("【终端客户端-收到调度器下发任务】收到虚拟机重启命令,内容={}", messageContent); - if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { - log.info("收到更新采集器文件命令,内容={}", messageContent); - } + } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { + log.info("【终端客户端-收到调度器下发任务】收到更新采集器文件命令,内容={}", messageContent); + + } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { + log.info("【终端客户端-收到调度器下发任务】收到更新采集器配置命令,内容={}", messageContent); + + } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { + log.info("【终端客户端-收到调度器下发任务】收到采集调度器下发任务命令,内容={}", messageContent); + + //下发任务: 判断这个任务采集器类型在指定目录是否存在、 + //如果不存在:下拉该采集器类型最新的版本更新包到指定的部署路径,并且启动该exe采集器,将任务tcp给对应采集器 + //如果存在: + + SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(JSON.toJSON(messageContent), SchCollectorTaskDTO.class); + String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId(); + collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); + + } else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) { + log.info("【终端客户端-收到调度器下发任务】是否有可用类型的采集器命令,内容={}", messageContent); - if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { - log.info("收到更新采集器配置命令,内容={}", messageContent); - } - if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { - log.info("收到采集调度器下发任务命令,内容={}", messageContent); + //下发任务: 判断这个任务采集器类型在指定目录是否存在、 + //如果不存在:下拉该采集器类型最新的版本更新包到指定的部署路径,并且启动该exe采集器,将任务tcp给对应采集器 + //如果存在: } } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java index 4d2eb11..3384216 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java @@ -40,11 +40,6 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("通道激活......"); -// Payload.Message.Builder builder = Payload.Message.newBuilder() -// .setClient(appName) -// .setContent("我来了") -// .setCmd(Payload.Message.type.AUTH); - CommMsg onlineRegister = CommMsg.builder() .messageType(MsgConstants.ONLINE_REGISTER) .messageTime(DateUtil.formatDateTime(new Date())) @@ -61,12 +56,7 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { // 一定时间内,通道内未传递消息,发送心跳,保证存活 - log.info("after {} seconds no message wrote", nettyProperties.getWriterIdleTimeSeconds()); - -// Payload.Message heartbeat = Payload.Message -// .newBuilder() -// .setCmd(Payload.Message.type.HEARTBEAT_REQUEST) -// .build(); + log.info("【终端客户端心跳】 after {} seconds no message wrote", nettyProperties.getWriterIdleTimeSeconds()); CommMsg heartbeat = CommMsg.builder() .messageType(MsgConstants.HEARTBEAT_REQUEST) diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java deleted file mode 100644 index e496ca6..0000000 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.docus.server.common.netty.server; - -import io.netty.channel.Channel; -import io.netty.util.AttributeKey; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 客户端IP和通信信道的映射 - */ -@Component -@Slf4j -public class ChannelRepository { - - /** - * - */ - private final static Map COLLECTOR_ID_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>(); - - /** - * 终端上线 - */ - public void put(String collectorId, Channel channel) { - - //缓存 - COLLECTOR_ID_CHANNEL_CACHE_MAP.put(collectorId, channel); - - AttributeKey attributeKey = AttributeKey.valueOf("ip"); - channel.attr(attributeKey).set(collectorId); - - } - - public String getClientKey(Channel channel) { - - AttributeKey key = AttributeKey.valueOf("ip"); - - if (channel.hasAttr(key)) { - return channel.attr(key).get(); - } - return null; - } - - public Channel get(String key) { - return COLLECTOR_ID_CHANNEL_CACHE_MAP.get(key); - } - - /** - * 终端离线 - */ - public void remove(String key) { - COLLECTOR_ID_CHANNEL_CACHE_MAP.remove(key); - } - -} diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java new file mode 100644 index 0000000..4a80006 --- /dev/null +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java @@ -0,0 +1,106 @@ +package com.docus.server.common.netty.server; + +import com.docus.core.util.json.JSON; +import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.netty.state.DeviceStateContext; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 采集器id和终端 + * 通信信道的映射 + */ +@Component +@Slf4j +public class CollectorChannelCacheMap { + + /** + * + */ + private final static Map COLLECTOR_ID_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>(); + + /** + * 采集器上线 + */ + public void put(String collectorId, Channel channel) { + + //缓存 + COLLECTOR_ID_CHANNEL_CACHE_MAP.put(collectorId, channel); + + AttributeKey attributeKey = AttributeKey.valueOf("collectorId"); + channel.attr(attributeKey).set(collectorId); + } + + /** + * 根据channel获取采集器id + */ + public String getClientKey(Channel channel) { + + AttributeKey key = AttributeKey.valueOf("collectorId"); + + if (channel.hasAttr(key)) { + return channel.attr(key).get(); + } + return null; + } + + public void writeAndFlush(String collectorId, CommMsg commMsg) { + Channel channel = this.get(collectorId); + if (null != channel && channel.isOpen()) { + channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); + } + } + + public Channel get(String key) { + return COLLECTOR_ID_CHANNEL_CACHE_MAP.get(key); + } + + /** + * 终端离线 + */ + public void remove(String key) { + COLLECTOR_ID_CHANNEL_CACHE_MAP.remove(key); + } + + /** + * 设置属性 + * + * @param ctx 通道上下文 + * @param value 值 + */ + public void setAttribute(ChannelHandlerContext ctx, DeviceStateContext value) { + Attribute attribute = getAttribute(ctx); + attribute.set(value); + } + + /** + * 获取属性值 + * + * @param ctx 通道上下文 + * @return 属性值 有可能为 null + */ + public DeviceStateContext getAttributeValue(ChannelHandlerContext ctx) { + Attribute attribute = getAttribute(ctx); + DeviceStateContext value = attribute.get(); + return value; + } + + /** + * 永远不会返回 null + * + * @param ctx channel 上下 + * @return ctx 关联的 channle 的 attribute + */ + private Attribute getAttribute(ChannelHandlerContext ctx) { + return ctx.channel().attr(AttributeKey.valueOf("state")); + } +} diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java index 2ea873c..04eb2ca 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java @@ -22,7 +22,7 @@ public class NettyServerInitializer extends ChannelInitializer { private NettyBusinessHandler businessHandler; @Resource - private ChannelRepository channelRepository; + private CollectorChannelCacheMap channelRepository; @Override protected void initChannel(Channel channel) throws Exception { diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index 13e25fb..c148133 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -4,11 +4,12 @@ import com.docus.core.util.DateUtil; import com.docus.core.util.Func; import com.docus.core.util.StringUtils; import com.docus.core.util.json.JSON; +import com.docus.server.api.scheduling.management.SchCollectRecordApi; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.ChannelAttribute; import com.docus.server.common.netty.CommMsg; -import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.state.DeviceStateContext; +import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -38,15 +39,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - public static final ChannelAttribute STATE_MACHINE_SESSION = new ChannelAttribute<>("state"); - @Resource - private ChannelRepository repository; + private CollectorChannelCacheMap collectorChannelCacheMap; + @Resource + private SchCollectRecordApi schCollectRecordApi; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); - + DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); ByteBuf buf = (ByteBuf) msg; //创建目标大小的数组 byte[] barray = new byte[buf.readableBytes()]; @@ -64,11 +64,11 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { String messageType = commMsg.getMessageType(); String messageTime = commMsg.getMessageTime(); - Serializable messageContent = commMsg.getContent(); + String messageContent = (String) commMsg.getContent(); if (messageType.equals(MsgConstants.ONLINE_REGISTER)) { - //===========login ok,切换到已登录状态=============== - deviceStateContext.onLoginSucc("device-123", System.currentTimeMillis(), 10, "设备认证通过"); + //===========login ok,切换到已登录状态===============messageContent=collectorId========= + deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过"); CommMsg login_ok = CommMsg.builder() .messageTime(DateUtil.formatDateTime(new Date())) @@ -77,10 +77,19 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { .build(); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(login_ok), CharsetUtil.UTF_8)); + + Channel channel = collectorChannelCacheMap.get(messageContent); + if (channel != null && channel.isOpen()) { + channel.close(); + } + if (Func.isNotBlank(messageContent)) { + collectorChannelCacheMap.put(messageContent, ctx.channel()); + } + } else if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) { //============状态为上行数据============= - deviceStateContext.onHeartbeat(System.currentTimeMillis(), "设备上行了数据"); + deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据"); //==============处理数据================ System.out.println("收到数据:" + msg); @@ -93,32 +102,37 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(resp), CharsetUtil.UTF_8)); - log.info("接受到【终端-采集器】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); + log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = ipSocket.getAddress().getHostAddress(); - log.info("【终端-采集器IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); + log.info("【终端服务端-接收采集器消息】:{},连接上线,IP地址信息:{}", clientIp, clientIp); } else if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { - log.info("接受到终端重启命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】接受到终端重启命令,内容{}", messageContent); } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { - log.info("收到采集器重启命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】收到采集器重启命令,内容{}", messageContent); } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { - log.info("收到虚拟机重启命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】收到虚拟机重启命令,内容{}", messageContent); } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { - log.info("收到更新采集器文件命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】收到更新采集器文件命令,内容{}", messageContent); } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { - log.info("收到更新采集器配置命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】收到更新采集器配置命令,内容{}", messageContent); } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { - log.info("收到采集调度器下发任务命令,内容{}", messageContent); + log.info("【终端服务端-接收采集器消息】收到采集调度器下发任务命令,内容{}", messageContent); + } else if (messageType.equals(MsgConstants.REV_COLLECTOR_TASK)) { + log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent); + EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(JSON.toJSON(messageContent), EditSchCollectRecordDTO.class); + schCollectRecordApi.edit(editSchCollectRecordDTO); + System.out.println(1); } @@ -132,14 +146,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println(getClass().getSimpleName() + "." + "userEventTriggered" + ctx.channel().remoteAddress()); if (evt instanceof IdleStateEvent) { - DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); + DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); long lastUpdateTime = deviceStateContext.getLastUpdateTime(); long currentTimeMillis = System.currentTimeMillis(); long intervalTime = currentTimeMillis - lastUpdateTime; if (intervalTime > deviceStateContext.getHeartRate()) { //==============发生超时,进入超时状态============== - deviceStateContext.onTimeout("设备发生超时"); + deviceStateContext.onTimeout("采集器发生超时"); System.out.println("userEventTriggered:" + deviceStateContext.toString()); } } else { @@ -160,18 +174,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { //===========设置设备状态为 未登录================= deviceStateContext.onConnect(System.currentTimeMillis(), "设备 active"); //更新添加 state 属性 - STATE_MACHINE_SESSION.setAttribute(ctx, deviceStateContext); +// STATE_MACHINE_SESSION.setAttribute(ctx, deviceStateContext); + collectorChannelCacheMap.setAttribute(ctx, deviceStateContext); System.out.println("channelActive:" + deviceStateContext.toString()); //===========设置设备状态为 未登录================= InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = ipSocket.getAddress().getHostAddress(); - log.info("【终端-采集器IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); - - if (Func.isNotBlank(clientIp)) { - - repository.put(clientIp, ctx.channel()); - } + log.info("【终端服务端-采集器IP连接成功】:{},连接上线,IP地址信息:{}", clientIp, clientIp); DEFAULT_CHANNEL_GROUP.add(ctx.channel()); @@ -185,15 +195,15 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); - String clientId = repository.getClientKey(channel); - log.error("客户端下线,终端连接:{}", clientId); + String collectorId = collectorChannelCacheMap.getClientKey(channel); + log.error("客户端下线,终端连接:{}", collectorId); //移除终端,终端离线 - if (clientId != null) { - repository.remove(clientId); + if (collectorId != null) { + collectorChannelCacheMap.remove(collectorId); } //================设置为断开================ - DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); + DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); deviceStateContext.onDisconnect("设备 inactive"); System.out.println("channelInactive:" + deviceStateContext.toString()); } @@ -205,17 +215,17 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //==============发生异常切换到断开模式=============== System.out.println("exceptionCaught:" + cause.getMessage()); - DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); + DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); deviceStateContext.onDisconnect(cause.getMessage()); System.out.println("exceptionCaught:" + deviceStateContext.toString()); cause.printStackTrace(); - String clientId = repository.getClientKey(ctx.channel()); - log.error("通道发生异常,终端连接:{}", clientId); + String collectorId = collectorChannelCacheMap.getClientKey(ctx.channel()); + log.error("通道发生异常,终端连接:{}", collectorId); //移除终端,终端离线 - if (clientId != null) { - repository.remove(clientId); + if (collectorId != null) { + collectorChannelCacheMap.remove(collectorId); } if (ctx.channel().isActive()) { ctx.close(); diff --git a/collector-terminal-management/src/test/java/com/docus/server/FileController.java b/collector-terminal-management/src/test/java/com/docus/server/FileController.java index 23c085c..1de4556 100644 --- a/collector-terminal-management/src/test/java/com/docus/server/FileController.java +++ b/collector-terminal-management/src/test/java/com/docus/server/FileController.java @@ -2,7 +2,7 @@ package com.docus.server; import cn.hutool.core.util.ZipUtil; import com.docus.server.common.download.downLoader.HttpDownloader; -import org.junit.jupiter.api.Test; +import com.docus.server.common.utils.StartUpExeUtils; import org.springframework.boot.test.context.SpringBootTest; /** @@ -14,17 +14,28 @@ import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class FileController { - @Test - void test() throws Exception { + public static void test() throws Exception { + /** + * 1、接收调度器下载采集器包命令 + * 2、下载保存到指定部署路径 + * 3、启动采集器 + */ + HttpDownloader httpDownloader = new HttpDownloader(null); - String url = "http://127.0.0.1:9113/sch/file/download?filePath=collector_packages/20230727/78c002bc-7674-4dfe-9247-ae594d03ccdf/docus-collector-scheduling.tar.gz"; - String fileName = "collector.tar.gz"; - String savePath = "/Users/linruifeng/Desktop/collector_packages"; + String url = "http://192.168.16.110:9113/sch/file/download?filePath=collector_packages/20230718/91d930e6-0490-44e5-9756-caee3251d645/navicat.zip"; + String fileName = "collector.zip"; + String savePath = "H:\\test"; //部署路径 + String procName = "navicat.exe"; //进程名称 + httpDownloader.downLoadFromUrl(url, fileName, savePath); + + ZipUtil.unzip(savePath + "\\" + fileName, savePath + "\\collector"); + + StartUpExeUtils.startUpExe(savePath + "\\collector\\" + procName, procName); } public static void main(String[] args) throws Exception { - test1(); + test(); } public static void test1() throws Exception { diff --git a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/FileApi.java b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/FileApi.java deleted file mode 100644 index d89c314..0000000 --- a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/FileApi.java +++ /dev/null @@ -1,36 +0,0 @@ -//package com.docus.server.api.scheduling.management; -// -//import io.swagger.annotations.Api; -//import io.swagger.annotations.ApiImplicitParam; -//import io.swagger.annotations.ApiImplicitParams; -//import io.swagger.annotations.ApiOperation; -//import org.springframework.cloud.openfeign.FeignClient; -//import org.springframework.web.bind.annotation.*; -//import org.springframework.web.multipart.MultipartFile; -// -//import javax.servlet.http.HttpServletResponse; -// -// -///** -// * 文件上传下载 API -// * -// * @author AutoGenerator -// * @since 2023-07-15 -// */ -//@Api(value = "通用文件上传下载接口", tags = "通用文件上传下载接口") -//@FeignClient(value = "collector-scheduling-management", contextId = "collector-scheduling-management.FileApi") -//@RequestMapping("/sch/file") -//public interface FileApi { -// -// @ApiOperation("文件下载") -// @GetMapping("/download") -// void downloadFile(@RequestParam(value = "filePath") String filePath, HttpServletResponse response) throws Exception; -// -// @ApiOperation("文件上传") -// @PostMapping("/upload") -// @ApiImplicitParams({ -// @ApiImplicitParam(name = "files", value = "文件", required = true, dataTypeClass = MultipartFile.class) -// }) -// void uploadFile(@RequestPart MultipartFile[] files, String pathKey) throws Exception; -// -//} diff --git a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java index e34fb79..0499af0 100644 --- a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java +++ b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java @@ -69,7 +69,7 @@ public interface SchCollectRecordApi { */ @ApiOperation("编辑") @PutMapping("/edit") - boolean edit(@RequestBody EditSchCollectRecordDTO editSchCollectRecordDTO); + void edit(@RequestBody EditSchCollectRecordDTO editSchCollectRecordDTO); /** * 批量删除 diff --git a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java index 8bd43c3..d4c93e5 100644 --- a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java +++ b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchTerminatorApi.java @@ -63,11 +63,10 @@ public interface SchTerminatorApi { * 编辑 * * @param editSchTerminatorDTO 编辑参数 - * @return 成功或失败 */ @ApiOperation("编辑") @PutMapping("/edit") - boolean edit(@RequestBody EditSchTerminatorDTO editSchTerminatorDTO); + void edit(@RequestBody EditSchTerminatorDTO editSchTerminatorDTO); /** * 批量删除 diff --git a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollectrecord/EditSchCollectRecordDTO.java b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollectrecord/EditSchCollectRecordDTO.java index 4a99201..7cc8164 100644 --- a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollectrecord/EditSchCollectRecordDTO.java +++ b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollectrecord/EditSchCollectRecordDTO.java @@ -6,7 +6,10 @@ import com.docus.server.enums.SubStateEnum; import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; @@ -17,6 +20,9 @@ import java.util.Date; * @author AutoGenerator * @since 2023-07-15 */ +@Builder +@AllArgsConstructor +@NoArgsConstructor @Data @ApiModel(value = "EditSchCollectRecordDTO对象", description = "采集记录表") public class EditSchCollectRecordDTO implements Serializable {