下发任务

segment2.0
beeajax 2 years ago
parent 50c8321678
commit 490d8bbbc1

@ -0,0 +1,50 @@
package com.docus.server;
import com.docus.core.util.DateUtil;
import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.convert.CommMsgConvert;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Map;
import java.util.Set;
@Configuration
@EnableScheduling
@EnableAsync
public class JobConfig {
@Resource
private ChannelRepository channelRepository;
@Async
@Scheduled(cron = "0/1 * * * * ?")
public void runTask() throws InterruptedException {
Map<String, Channel> channelMap = channelRepository.getIpToChannelCacheMap();
Set<String> keySet = channelMap.keySet();
for (String clientIp : keySet) {
Channel channel = channelMap.get(clientIp);
CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(new CommMsgDTO());
commMsg.setMessageTime(DateUtil.formatDateTime(new Date()));
if (channel != null) {
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
}
}
}
}

@ -1,5 +1,6 @@
package com.docus.server.common.netty.server; package com.docus.server.common.netty.server;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.service.ISchTerminatorService; import com.docus.server.service.ISchTerminatorService;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -24,22 +25,35 @@ public class ChannelRepository {
/** /**
* <IP-Channel> * <IP-Channel>
*/ */
private final static Map<String, Channel> CHANNEL_CACHE_MAP = new ConcurrentHashMap<>(); private final static Map<String, Channel> IP_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>();
/**
* <IP-Terminator>
*/
private final static Map<String, NettyTerminatorDTO> IP_TERMINATOR_CACHE_MAP = new ConcurrentHashMap<>();
/** /**
* 线 * 线
*/ */
public void put(String key, Channel value, OnlineStateEnum onlineState) { public void put(NettyTerminatorDTO nettyTerminatorDTO, Channel channel) {
//客户端上线 //客户端上线
CHANNEL_CACHE_MAP.put(key, value); String terminatorIp = nettyTerminatorDTO.getTerminatorIp();
AttributeKey<String> attributeKey = AttributeKey.valueOf("user");
value.attr(attributeKey).set(key); IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel);
iSchTerminatorService.saveOrUpdate(key, onlineState); IP_TERMINATOR_CACHE_MAP.put(terminatorIp, nettyTerminatorDTO);
AttributeKey<String> attributeKey = AttributeKey.valueOf("ip");
channel.attr(attributeKey).set(terminatorIp);
//更新数据库终端数据
iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState());
} }
public String getClientKey(Channel channel) { public String getClientKey(Channel channel) {
AttributeKey<String> key = AttributeKey.valueOf("user");
AttributeKey<String> key = AttributeKey.valueOf("ip");
if (channel.hasAttr(key)) { if (channel.hasAttr(key)) {
return channel.attr(key).get(); return channel.attr(key).get();
} }
@ -47,18 +61,18 @@ public class ChannelRepository {
} }
public Channel get(String key) { public Channel get(String key) {
return CHANNEL_CACHE_MAP.get(key); return IP_CHANNEL_CACHE_MAP.get(key);
} }
public Map<String, Channel> getAll() { public Map<String, Channel> getIpToChannelCacheMap() {
return CHANNEL_CACHE_MAP; return IP_CHANNEL_CACHE_MAP;
} }
/** /**
* 线 * 线
*/ */
public void remove(String key) { public void remove(String key) {
CHANNEL_CACHE_MAP.remove(key); IP_CHANNEL_CACHE_MAP.remove(key);
iSchTerminatorService.saveOrUpdate(key, OnlineStateEnum.OFFLINE); iSchTerminatorService.saveOrUpdate(key, OnlineStateEnum.OFFLINE);
} }

@ -5,6 +5,8 @@ import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.OnlineStateEnum;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -81,6 +83,9 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
} }
} }
/**
* netty client 线
*/
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered(); ctx.fireChannelRegistered();
@ -93,19 +98,37 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
channel.close(); channel.close();
} }
repository.put(clientIp, ctx.channel(), OnlineStateEnum.OFFLINE); NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setId(ctx.channel().id().asShortText());
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
repository.put(nettyTerminatorDTO, ctx.channel());
DEFAULT_CHANNEL_GROUP.add(ctx.channel()); DEFAULT_CHANNEL_GROUP.add(ctx.channel());
System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
} }
/**
* netty client 线
*/
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String clientId = repository.getClientKey(channel);
log.error("客户端下线,终端连接:{}", clientId);
//移除终端,终端离线
if (clientId != null) {
repository.remove(clientId);
}
} }
/**
* netty exception
*/
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); cause.printStackTrace();

@ -5,6 +5,8 @@ import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.OnlineStateEnum;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -24,7 +26,7 @@ import java.net.InetSocketAddress;
public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组 //创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()]; byte[] barray = new byte[buf.readableBytes()];
@ -51,7 +53,7 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent);
InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress(); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress(); String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
@ -61,13 +63,19 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
channel.close(); channel.close();
} }
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setId(ctx.channel().id().asShortText());
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
//将ip和channel进行映射 //将ip和channel进行映射
repository.put(clientIp, channelHandlerContext.channel(), OnlineStateEnum.ONLINE); repository.put(nettyTerminatorDTO, ctx.channel());
} else { } else {
if (channelHandlerContext.channel().isOpen()) { if (ctx.channel().isOpen()) {
//触发下一个handler //触发下一个handler
channelHandlerContext.fireChannelRead(msg); ctx.fireChannelRead(msg);
} }
} }
} }

@ -40,7 +40,7 @@ public class CommMsgServiceImpl implements ICommMsgService {
@Override @Override
public void clientsCommand(CommMsgDTO commMsgDTO) { public void clientsCommand(CommMsgDTO commMsgDTO) {
Map<String, Channel> channelMap = channelRepository.getAll(); Map<String, Channel> channelMap = channelRepository.getIpToChannelCacheMap();
Set<String> keySet = channelMap.keySet(); Set<String> keySet = channelMap.keySet();
for (String clientIp : keySet) { for (String clientIp : keySet) {

@ -0,0 +1,66 @@
package com.docus.server.dto.scheduling.management.schterminator;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.enums.RestrictStateEnum;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* NettyTerminatorDTO
*
* @author AutoGenerator
* @since 2023-07-15
*/
@Data
@ApiModel(value = "NettyTerminatorDTO", description = "NettyTerminatorDTO")
public class NettyTerminatorDTO implements Serializable {
@ApiModelProperty(value = "channelId主键")
private String id;
@ApiModelProperty(value = "终端IP")
private String terminatorIp;
@ApiModelProperty(value = "资源管理器名称")
private String terminatorName;
@ApiModelProperty(value = "只干某些任务,任务偏好属性")
private String onlyCollectorIds;
@ApiModelProperty(value = "优先干某些任务,任务偏好属性(优先给哪个采集器,若不空闲再给任意采集器)")
private String priorityCollectorIds;
@ApiModelProperty(value = "是否限制采集器类型01是")
private RestrictStateEnum restrictState;
@ApiModelProperty(value = "忙闲状态0空闲1繁忙")
private BusyStateEnum busyState;
@ApiModelProperty(value = "在线状态0离线1在线")
private OnlineStateEnum onlineState;
@ApiModelProperty(value = "最近任务执行时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date lastTaskExecTime;
@ApiModelProperty(value = "正在执行任务的名称")
private String executingTaskName;
@ApiModelProperty(value = "最近一次任务执行失败原因")
private String lastTaskErrorMsg;
@ApiModelProperty(value = "入库时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
@ApiModelProperty(value = "入库更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
}
Loading…
Cancel
Save