diff --git a/collector-scheduling-management/src/main/java/com/docus/server/JobConfig.java b/collector-scheduling-management/src/main/java/com/docus/server/JobConfig.java new file mode 100644 index 0000000..b253cad --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/JobConfig.java @@ -0,0 +1,50 @@ +package com.docus.server; + +import com.docus.core.util.DateUtil; +import com.docus.core.util.json.JSON; +import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.convert.CommMsgConvert; +import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.util.CharsetUtil; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.Map; +import java.util.Set; + +@Configuration +@EnableScheduling +@EnableAsync +public class JobConfig { + @Resource + private ChannelRepository channelRepository; + + @Async + @Scheduled(cron = "0/1 * * * * ?") + public void runTask() throws InterruptedException { + + Map channelMap = channelRepository.getIpToChannelCacheMap(); + Set keySet = channelMap.keySet(); + + for (String clientIp : keySet) { + Channel channel = channelMap.get(clientIp); + + CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(new CommMsgDTO()); + commMsg.setMessageTime(DateUtil.formatDateTime(new Date())); + + if (channel != null) { + channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); + } + + } + + } +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java index 55c4b68..7340159 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java @@ -1,5 +1,6 @@ package com.docus.server.common.netty.server; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.OnlineStateEnum; import com.docus.server.service.ISchTerminatorService; import io.netty.channel.Channel; @@ -24,22 +25,35 @@ public class ChannelRepository { /** * */ - private final static Map CHANNEL_CACHE_MAP = new ConcurrentHashMap<>(); + private final static Map IP_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>(); + + /** + * + */ + private final static Map IP_TERMINATOR_CACHE_MAP = new ConcurrentHashMap<>(); /** * 终端上线 */ - public void put(String key, Channel value, OnlineStateEnum onlineState) { + public void put(NettyTerminatorDTO nettyTerminatorDTO, Channel channel) { //客户端上线 - CHANNEL_CACHE_MAP.put(key, value); - AttributeKey attributeKey = AttributeKey.valueOf("user"); - value.attr(attributeKey).set(key); + String terminatorIp = nettyTerminatorDTO.getTerminatorIp(); + + IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel); - iSchTerminatorService.saveOrUpdate(key, onlineState); + IP_TERMINATOR_CACHE_MAP.put(terminatorIp, nettyTerminatorDTO); + + AttributeKey attributeKey = AttributeKey.valueOf("ip"); + channel.attr(attributeKey).set(terminatorIp); + + //更新数据库终端数据 + iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState()); } public String getClientKey(Channel channel) { - AttributeKey key = AttributeKey.valueOf("user"); + + AttributeKey key = AttributeKey.valueOf("ip"); + if (channel.hasAttr(key)) { return channel.attr(key).get(); } @@ -47,18 +61,18 @@ public class ChannelRepository { } public Channel get(String key) { - return CHANNEL_CACHE_MAP.get(key); + return IP_CHANNEL_CACHE_MAP.get(key); } - public Map getAll() { - return CHANNEL_CACHE_MAP; + public Map getIpToChannelCacheMap() { + return IP_CHANNEL_CACHE_MAP; } /** * 终端离线 */ public void remove(String key) { - CHANNEL_CACHE_MAP.remove(key); + IP_CHANNEL_CACHE_MAP.remove(key); iSchTerminatorService.saveOrUpdate(key, OnlineStateEnum.OFFLINE); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index 24be0d0..edd2099 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -5,6 +5,8 @@ import com.docus.core.util.json.JSON; import com.docus.server.common.MsgConstants; import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; +import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.buffer.ByteBuf; @@ -81,6 +83,9 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { } } + /** + * netty client 上线 + */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); @@ -93,19 +98,37 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { channel.close(); } - repository.put(clientIp, ctx.channel(), OnlineStateEnum.OFFLINE); + NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); + nettyTerminatorDTO.setId(ctx.channel().id().asShortText()); + nettyTerminatorDTO.setTerminatorIp(clientIp); + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); + + repository.put(nettyTerminatorDTO, ctx.channel()); DEFAULT_CHANNEL_GROUP.add(ctx.channel()); System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); } + /** + * netty client 下线 + */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); + String clientId = repository.getClientKey(channel); + log.error("客户端下线,终端连接:{}", clientId); + //移除终端,终端离线 + if (clientId != null) { + repository.remove(clientId); + } } + /** + * netty exception 通道异常 + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java index 68d541a..82728af 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java @@ -5,6 +5,8 @@ import com.docus.core.util.json.JSON; import com.docus.server.common.MsgConstants; import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; +import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.buffer.ByteBuf; @@ -24,7 +26,7 @@ import java.net.InetSocketAddress; public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { @Override - public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; //创建目标大小的数组 byte[] barray = new byte[buf.readableBytes()]; @@ -51,7 +53,7 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); - InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress(); + InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = ipSocket.getAddress().getHostAddress(); log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); @@ -61,13 +63,19 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { channel.close(); } + NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); + nettyTerminatorDTO.setId(ctx.channel().id().asShortText()); + nettyTerminatorDTO.setTerminatorIp(clientIp); + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE); + //将ip和channel进行映射 - repository.put(clientIp, channelHandlerContext.channel(), OnlineStateEnum.ONLINE); + repository.put(nettyTerminatorDTO, ctx.channel()); } else { - if (channelHandlerContext.channel().isOpen()) { + if (ctx.channel().isOpen()) { //触发下一个handler - channelHandlerContext.fireChannelRead(msg); + ctx.fireChannelRead(msg); } } } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java index 88399e3..7e4f4d0 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java @@ -40,7 +40,7 @@ public class CommMsgServiceImpl implements ICommMsgService { @Override public void clientsCommand(CommMsgDTO commMsgDTO) { - Map channelMap = channelRepository.getAll(); + Map channelMap = channelRepository.getIpToChannelCacheMap(); Set keySet = channelMap.keySet(); for (String clientIp : keySet) { diff --git a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java new file mode 100644 index 0000000..0121f33 --- /dev/null +++ b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java @@ -0,0 +1,66 @@ +package com.docus.server.dto.scheduling.management.schterminator; + +import com.docus.server.enums.BusyStateEnum; +import com.docus.server.enums.OnlineStateEnum; +import com.docus.server.enums.RestrictStateEnum; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * 执行管理器 NettyTerminatorDTO + * + * @author AutoGenerator + * @since 2023-07-15 + */ +@Data +@ApiModel(value = "NettyTerminatorDTO", description = "NettyTerminatorDTO") +public class NettyTerminatorDTO implements Serializable { + + @ApiModelProperty(value = "channelId主键") + private String id; + + @ApiModelProperty(value = "终端IP") + private String terminatorIp; + + @ApiModelProperty(value = "资源管理器名称") + private String terminatorName; + + @ApiModelProperty(value = "只干某些任务,任务偏好属性") + private String onlyCollectorIds; + + @ApiModelProperty(value = "优先干某些任务,任务偏好属性(优先给哪个采集器,若不空闲再给任意采集器)") + private String priorityCollectorIds; + + @ApiModelProperty(value = "是否限制采集器类型,0:否,1:是") + private RestrictStateEnum restrictState; + + @ApiModelProperty(value = "忙闲状态,0:空闲,1:繁忙") + private BusyStateEnum busyState; + + @ApiModelProperty(value = "在线状态,0:离线,1:在线") + private OnlineStateEnum onlineState; + + @ApiModelProperty(value = "最近任务执行时间") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date lastTaskExecTime; + + @ApiModelProperty(value = "正在执行任务的名称") + private String executingTaskName; + + @ApiModelProperty(value = "最近一次任务执行失败原因") + private String lastTaskErrorMsg; + + @ApiModelProperty(value = "入库时间") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @ApiModelProperty(value = "入库更新时间") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; + +}