处理netty 下发任务粘包分包问题

segment2.0
linrf 2 years ago
parent fe21f972ae
commit f25f2644e3

@ -56,4 +56,8 @@ public class MsgConstants {
*
*/
public static final String HAS_VALID_COLLECTOR = "11".trim();
/**
*
*/
public static final String HAS_COLLECTOR_COUNT = "12".trim();
}

@ -5,7 +5,7 @@ import com.docus.core.util.Func;
import com.docus.core.util.json.JSON;
import com.docus.infrastructure.redis.service.RedisOps;
import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
@ -41,7 +41,7 @@ import java.util.concurrent.LinkedBlockingQueue;
@Slf4j
public class SchCollectorTask {
@Resource
private ChannelRepository channelRepository;
private TerminalToChannelCacheMap channelRepository;
@Resource
private TaskDistributeApi taskDistributeApi;
@Resource

@ -27,7 +27,7 @@ public class NettyServerInitializer extends ChannelInitializer<Channel> {
private NettyBusinessHandler businessHandler;
@Resource
private ChannelRepository channelRepository;
private TerminalToChannelCacheMap channelRepository;
@Override
protected void initChannel(Channel channel) throws Exception {

@ -23,7 +23,7 @@ import java.util.stream.Collectors;
*/
@Component
@Slf4j
public class ChannelRepository {
public class TerminalToChannelCacheMap {
@Resource
private ISchTerminatorService iSchTerminatorService;
@ -97,7 +97,7 @@ public class ChannelRepository {
*/
public void remove(String key) {
IP_CHANNEL_CACHE_MAP.remove(key);
IP_TERMINATOR_CACHE_MAP.remove(key);
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO);

@ -1,12 +1,16 @@
package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func;
import com.docus.core.util.json.JSON;
import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -20,6 +24,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.List;
/**
*
@ -32,38 +37,45 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Resource
private ChannelRepository repository;
private TerminalToChannelCacheMap terminalToChannelCacheMap;
@Resource
private ISchTerminatorService iSchTerminatorService;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CommMsg commMsg) throws Exception {
String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime();
Serializable messageContent = commMsg.getContent();
String messageContent = (String) commMsg.getContent();
log.info("======== 【调度器服务端-收到终端消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
log.info("接受到终端重启命令,内容{}", messageContent);
}
if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
} else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
log.info("收到采集器重启命令,内容{}", messageContent);
}
if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
} else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
log.info("收到虚拟机重启命令,内容{}", messageContent);
}
if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
log.info("收到更新采集器文件命令,内容{}", messageContent);
}
if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
log.info("收到更新采集器配置命令,内容{}", messageContent);
}
if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
log.info("收到采集调度器下发任务命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.HAS_COLLECTOR_COUNT)) {
log.info("收到终端客户端返回的【采集器数量】消息,内容{}", messageContent);
String terminalIp = terminalToChannelCacheMap.getClientKey(channelHandlerContext.channel());
NettyTerminatorDTO nettyTerminatorDTO = terminalToChannelCacheMap.getTerminatorByIp(terminalIp);
if (Func.isNotBlank(messageContent)) {
nettyTerminatorDTO.getLoadSchCollectorVOList().clear();
List<LoadSchCollectorVO> loadSchCollectorVOList = JSON.fromJSONWithGeneric(messageContent, new TypeReference<List<LoadSchCollectorVO>>() {
});
nettyTerminatorDTO.getLoadSchCollectorVOList().addAll(loadSchCollectorVOList);
}
}
}
@ -78,7 +90,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
String clientId = repository.getClientKey(ctx.channel());
String clientId = terminalToChannelCacheMap.getClientKey(ctx.channel());
if (Func.isBlank(clientId)) {
@ -87,7 +99,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
repository.put(nettyTerminatorDTO, ctx.channel());
terminalToChannelCacheMap.put(nettyTerminatorDTO, ctx.channel());
}
DEFAULT_CHANNEL_GROUP.add(ctx.channel());
@ -102,11 +114,11 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String clientId = repository.getClientKey(channel);
String clientId = terminalToChannelCacheMap.getClientKey(channel);
log.error("客户端下线,终端连接:{}", clientId);
//移除终端,终端离线
if (clientId != null) {
repository.remove(clientId);
terminalToChannelCacheMap.remove(clientId);
}
}

@ -3,7 +3,7 @@ package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func;
import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
@ -65,9 +65,9 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
}
}
private ChannelRepository repository;
private TerminalToChannelCacheMap repository;
public NettyHeartbeatHandler(ChannelRepository repository) {
public NettyHeartbeatHandler(TerminalToChannelCacheMap repository) {
this.repository = repository;
}

@ -3,7 +3,7 @@ package com.docus.server.common.process;
import com.docus.core.util.Func;
import com.docus.log.context.TrackContext;
import com.docus.log.processor.AbstractProcessor;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
@ -27,7 +27,7 @@ import java.util.stream.Collectors;
*/
public class ChannelProcessor extends AbstractProcessor {
@Resource
private ChannelRepository channelRepository;
private TerminalToChannelCacheMap channelRepository;
@Resource
private ISchTerminatorService iSchTerminatorService;
@Resource

@ -4,7 +4,7 @@ import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.log.annotation.TrackGroup;
import com.docus.server.api.scheduling.management.SchTerminatorApi;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.common.process.ChannelProcessor;
import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO;
@ -26,7 +26,7 @@ public class SchTerminatorController implements SchTerminatorApi {
@Resource
private ISchTerminatorService iSchTerminatorService;
@Resource
private ChannelRepository channelRepository;
private TerminalToChannelCacheMap channelRepository;
/**
*

@ -69,4 +69,6 @@ public interface ISchTerminatorService {
List<SchTerminator> findAll();
void batchUpdate(List<SchTerminator> terminators);
SchTerminatorVO findByTerminalIP(String terminalIp);
}

@ -3,7 +3,7 @@ package com.docus.server.service.impl;
import com.docus.core.util.DateUtil;
import com.docus.core.util.json.JSON;
import com.docus.server.common.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.common.netty.server.TerminalToChannelCacheMap;
import com.docus.server.convert.CommMsgConvert;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import com.docus.server.service.ICommMsgService;
@ -23,7 +23,7 @@ import java.util.Set;
public class CommMsgServiceImpl implements ICommMsgService {
@Resource
private ChannelRepository channelRepository;
private TerminalToChannelCacheMap channelRepository;
@Override
public void clientCommand(CommMsgDTO commMsgDTO) {

@ -96,6 +96,12 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
iSchTerminatorDao.updateBatchById(terminators, 20);
}
@Override
public SchTerminatorVO findByTerminalIP(String terminalIp) {
SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminalIp);
return SchTerminatorConvert.INSTANCE.convertVO(schTerminator);
}
/**
*
*

@ -60,4 +60,9 @@ public class MsgConstants {
*
*/
public static final String HAS_VALID_COLLECTOR = "11".trim();
/**
*
*/
public static final String HAS_COLLECTOR_COUNT = "12".trim();
}

@ -1,5 +1,6 @@
package com.docus.server.common.netty.client.handler;
import com.docus.core.util.DateUtil;
import com.docus.core.util.Func;
import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON;
@ -11,6 +12,7 @@ import com.docus.server.common.utils.StartUpExeUtils;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ -19,6 +21,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -108,6 +111,31 @@ public class ClientHandler extends SimpleChannelInboundHandler<CommMsg<Serializa
//下发任务: 判断这个任务采集器类型在指定目录是否存在、
//如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器
//如果存在:
} else if (messageType.equals(MsgConstants.HAS_COLLECTOR_COUNT)) {
log.info("【终端客户端-收到调度器消息】收到获取采集器数量命令,内容={}", messageContent);
CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap();
Map<String, Channel> collectorMap = collectorChannelCacheMap.getAll();
String packageValue = nettyClient.getRedisMsg();
List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() {
});
List<LoadSchCollectorVO> objects = Lists.newArrayList();
for (LoadSchCollectorVO loadSchCollectorVO : redisLoadSchCollectors) {
if (collectorMap.containsKey(String.valueOf(loadSchCollectorVO.getCollectorId()))) {
objects.add(loadSchCollectorVO);
}
}
CommMsg<Serializable> collectorCountMsg = CommMsg.builder()
.messageType(MsgConstants.HAS_COLLECTOR_COUNT)
.messageTime(DateUtil.formatDateTime(new Date()))
.content(JSON.toJSON(objects))
.build();
context.channel().writeAndFlush(collectorCountMsg);
}
}

@ -64,6 +64,10 @@ public class CollectorChannelCacheMap {
return COLLECTOR_ID_CHANNEL_CACHE_MAP.get(key);
}
public Map<String, Channel> getAll() {
return COLLECTOR_ID_CHANNEL_CACHE_MAP;
}
/**
* 线
*/

@ -48,6 +48,8 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
String messageTime = commMsg.getMessageTime();
String messageContent = (String) commMsg.getContent();
log.info("======== 【终端服务端-收到采集器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.ONLINE_REGISTER)) {
//===========login ok,切换到已登录状态===============messageContent=collectorId=========
deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过");
@ -72,13 +74,11 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
//============状态为上行数据=============
deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据");
//==============处理数据================
System.out.println("收到数据:" + JSON.toJSON(commMsg));
//============返回消息==================
CommMsg<Serializable> resp = CommMsg.builder()
.messageTime(DateUtil.formatDateTime(new Date()))
.content("HEARTBEAT_REQUEST ok")
.content("HEARTBEAT_REQUEST ok 【终端服务器】收到【采集器客户端】心跳")
.messageType("response HEARTBEAT_REQUEST")
.build();
ctx.writeAndFlush(resp);
@ -112,7 +112,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
} else if (messageType.equals(MsgConstants.REV_COLLECTOR_TASK)) {
log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent);
EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(JSON.toJSON(messageContent), EditSchCollectRecordDTO.class);
EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(messageContent, EditSchCollectRecordDTO.class);
schCollectRecordApi.edit(editSchCollectRecordDTO);
}
@ -127,9 +127,13 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println(getClass().getSimpleName() + "." + "userEventTriggered" + ctx.channel().remoteAddress());
if (evt instanceof IdleStateEvent) {
DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
long lastUpdateTime = deviceStateContext.getLastUpdateTime();
long currentTimeMillis = System.currentTimeMillis();
long intervalTime = currentTimeMillis - lastUpdateTime;
if (intervalTime > deviceStateContext.getHeartRate()) {
@ -151,15 +155,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
ctx.fireChannelRegistered();
//链接成功
DeviceStateContext deviceStateContext = new DeviceStateContext(ctx.channel(), true);
DeviceStateContext deviceStateContext = new DeviceStateContext(ctx.channel(), false);
//===========设置设备状态为 未登录=================
deviceStateContext.onConnect(System.currentTimeMillis(), "设备 active");
//更新添加 state 属性
// STATE_MACHINE_SESSION.setAttribute(ctx, deviceStateContext);
collectorChannelCacheMap.setAttribute(ctx, deviceStateContext);
System.out.println("channelActive:" + deviceStateContext.toString());
//===========设置设备状态为 未登录=================
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【终端服务端-采集器IP连接成功】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
@ -175,14 +178,18 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线," + "【终端-采集器】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String collectorId = collectorChannelCacheMap.getClientKey(channel);
log.error("客户端下线,终端连接:{}", collectorId);
log.error("客户端下线采集器id{}", collectorId);
//移除终端,终端离线
if (collectorId != null) {
collectorChannelCacheMap.remove(collectorId);
}
System.out.println(channel.remoteAddress() + " 下线," + "【终端-采集器】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
//================设置为断开================
DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
deviceStateContext.onDisconnect("设备 inactive");

@ -141,7 +141,9 @@ public class DeviceStateContext implements IDeviceState {
public void setState(IDeviceState state, String describe) {
this.state = state;
//把每次切换的状态加入到历史状态中
historyState.add(new HistoryInfoDTO(describe, state.getStateName()));
if (history) {
historyState.add(new HistoryInfoDTO(describe, state.getStateName()));
}
}

@ -3,6 +3,7 @@ package com.docus.server.dto.scheduling.management.schterminator;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.enums.RestrictStateEnum;
import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ -12,6 +13,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -70,4 +72,6 @@ public class NettyTerminatorDTO implements Serializable {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
@ApiModelProperty(value = "终端注册的采集器信息")
private List<LoadSchCollectorVO> loadSchCollectorVOList = new ArrayList<>();
}

@ -73,6 +73,10 @@ public class SchTerminator implements Serializable {
@TableField("last_task_error_msg")
private String lastTaskErrorMsg;
@ApiModelProperty(value = "当前终端注册上来的采集器信息")
@TableField("online_collectors_json")
private String onlineCollectorsJson;
@ApiModelProperty(value = "入库时间")
@TableField("create_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")

@ -1,9 +1,9 @@
package com.docus.server.vo.scheduling.management.schterminator;
import com.baomidou.mybatisplus.annotation.TableField;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.enums.RestrictStateEnum;
import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ -11,6 +11,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* VO
@ -26,53 +27,44 @@ public class SchTerminatorVO implements Serializable {
private Long id;
@ApiModelProperty(value = "终端IP")
@TableField("terminator_ip")
private String terminatorIp;
@ApiModelProperty(value = "资源管理器名称")
@TableField("terminator_name")
private String terminatorName;
@ApiModelProperty(value = "只干某些任务,任务偏好属性")
@TableField("only_collector_ids")
private String onlyCollectorIds;
@ApiModelProperty(value = "优先干某些任务,任务偏好属性(优先给哪个采集器,若不空闲再给任意采集器)")
@TableField("priority_collector_ids")
private String priorityCollectorIds;
@ApiModelProperty(value = "是否限制采集器类型01是")
@TableField("restrict_state")
private RestrictStateEnum restrictState;
@ApiModelProperty(value = "忙闲状态0空闲1繁忙")
@TableField("busy_state")
private BusyStateEnum busyState;
@ApiModelProperty(value = "在线状态0离线1在线")
@TableField("online_state")
private OnlineStateEnum onlineState;
@ApiModelProperty(value = "最近任务执行时间")
@TableField("last_task_exec_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date lastTaskExecTime;
@ApiModelProperty(value = "正在执行任务的名称")
@TableField("executing_task_name")
private String executingTaskName;
@ApiModelProperty(value = "最近一次任务执行失败原因")
@TableField("last_task_error_msg")
private String lastTaskErrorMsg;
@ApiModelProperty(value = "当前终端注册上来的采集器信息")
private List<LoadSchCollectorVO> loadSchCollectorVOList;
@ApiModelProperty(value = "入库时间")
@TableField("create_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
@ApiModelProperty(value = "入库更新时间")
@TableField("update_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;

Loading…
Cancel
Save