新增下载平台的test方法,调用下载调度器的文件

segment2.0
linrf 2 years ago
parent fd55f7cfae
commit 6e34870174

@ -52,4 +52,8 @@ public class MsgConstants {
*/ */
public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); public static final String SCH_DISTRIBUTE_TASKS = "9".trim();
/**
*
*/
public static final String HAS_VALID_COLLECTOR = "11".trim();
} }

@ -10,12 +10,14 @@ import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog; import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
import com.docus.server.entity.scheduling.management.SchCollector; import com.docus.server.entity.scheduling.management.SchCollector;
import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.RetryTaskEnum; import com.docus.server.enums.RetryTaskEnum;
import com.docus.server.service.ICommMsgService;
import com.docus.server.service.ISchCollectRecordRetryLogService; import com.docus.server.service.ISchCollectRecordRetryLogService;
import com.docus.server.service.ISchCollectRecordService; import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.service.ISchCollectorService; import com.docus.server.service.ISchCollectorService;
@ -54,6 +56,8 @@ public class SchCollectorTask {
private ISchCollectorService iSchCollectorService; private ISchCollectorService iSchCollectorService;
@Resource @Resource
private RedisOps redisOps; private RedisOps redisOps;
@Resource
private ICommMsgService iCommMsgService;
private BlockingQueue<ReportDownTwoDTO> retryTaskQueue = new LinkedBlockingQueue<>(); private BlockingQueue<ReportDownTwoDTO> retryTaskQueue = new LinkedBlockingQueue<>();
@ -106,7 +110,7 @@ public class SchCollectorTask {
continue; continue;
} }
for (ReportDownTwoDTO report : reportDownTwoDTOList) { for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。 //先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
@ -201,6 +205,9 @@ public class SchCollectorTask {
} }
private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) { private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) {
hasValidCollector(terminal, reportDownTwoDTO);
Channel channel = channelRepository.get(terminal.getTerminatorIp()); Channel channel = channelRepository.get(terminal.getTerminatorIp());
Map<String, Object> params = reportDownTwoDTO.getParams(); Map<String, Object> params = reportDownTwoDTO.getParams();
@ -248,6 +255,27 @@ public class SchCollectorTask {
} }
private void hasValidCollector(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) {
/**
*
*
*/
//1、下发任务之前询问终端是否有采集器进程和心跳
//2、若没有下发部署采集器指令给终端
//3、若有下发任务给终端
String collectorId = reportDownTwoDTO.getTasks().get(0).getCollectorId();
CommMsgDTO commMsgDTO = CommMsgDTO.builder()
.terminatorIp(terminal.getTerminatorIp())
.messageType(MsgConstants.HAS_VALID_COLLECTOR)
.content(collectorId)
.build();
iCommMsgService.clientCommand(commMsgDTO);
}
public void addRetryTask(ReportDownTwoDTO reportDownTwoDTO) { public void addRetryTask(ReportDownTwoDTO reportDownTwoDTO) {
this.retryTaskQueue.add(reportDownTwoDTO); this.retryTaskQueue.add(reportDownTwoDTO);

@ -41,14 +41,19 @@ public class ChannelProcessor extends AbstractProcessor {
switch (group) { switch (group) {
case "SchCollectRecordController-edit": case "SchCollectRecordController-edit":
return doSchCollectRecordControllerEdit(context); doSchCollectRecordControllerEdit(context);
break;
case "SchTerminatorController": case "SchTerminatorController":
return doSchTerminatorController(context); doSchTerminatorController(context);
break;
case "RedisKeyExpirationService-expired": case "RedisKeyExpirationService-expired":
return doRedisKeyExpired(context); doRedisKeyExpired(context);
break;
default: default:
return true; return null;
} }
return null;
} }
private Object doRedisKeyExpired(TrackContext context) { private Object doRedisKeyExpired(TrackContext context) {
@ -70,8 +75,8 @@ public class ChannelProcessor extends AbstractProcessor {
return null; return null;
} }
private boolean doSchCollectRecordControllerEdit(TrackContext context) { private void doSchCollectRecordControllerEdit(TrackContext context) {
return logCollectRecord(context); logCollectRecord(context);
} }
private boolean doSchTerminatorController(TrackContext context) { private boolean doSchTerminatorController(TrackContext context) {

@ -75,8 +75,8 @@ public class SchCollectRecordController implements SchCollectRecordApi {
*/ */
@TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class) @TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class)
@Override @Override
public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) { public void edit(EditSchCollectRecordDTO editSchCollectRecordDTO) {
return iSchCollectRecordService.edit(editSchCollectRecordDTO); iSchCollectRecordService.edit(editSchCollectRecordDTO);
} }
/** /**

@ -65,12 +65,11 @@ public class SchTerminatorController implements SchTerminatorApi {
* *
* *
* @param editSchTerminatorDTO * @param editSchTerminatorDTO
* @return
*/ */
@TrackGroup(group = "SchTerminatorController", processor = ChannelProcessor.class) @TrackGroup(group = "SchTerminatorController", processor = ChannelProcessor.class)
@Override @Override
public boolean edit(EditSchTerminatorDTO editSchTerminatorDTO) { public void edit(EditSchTerminatorDTO editSchTerminatorDTO) {
return iSchTerminatorService.edit(editSchTerminatorDTO); iSchTerminatorService.edit(editSchTerminatorDTO);
} }
/** /**

@ -51,5 +51,13 @@ public class MsgConstants {
* *
*/ */
public static final String SCH_DISTRIBUTE_TASKS = "9".trim(); public static final String SCH_DISTRIBUTE_TASKS = "9".trim();
/**
*
*/
public static final String REV_COLLECTOR_TASK = "10".trim();
/**
*
*/
public static final String HAS_VALID_COLLECTOR = "11".trim();
} }

@ -4,16 +4,20 @@ import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.io.Serializable; import javax.annotation.Resource;
@Slf4j @Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Resource
private CollectorChannelCacheMap collectorChannelCacheMap;
@Override @Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf msg) throws Exception { protected void channelRead0(ChannelHandlerContext context, ByteBuf msg) throws Exception {
@ -34,32 +38,43 @@ public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
Serializable messageContent = commMsg.getContent(); String messageContent = (String) commMsg.getContent();
log.info("======== 收到服务端消息, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
log.info("接受到终端重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器下发任务】接受到终端重启命令,内容={}", messageContent);
}
if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
log.info("收到采集器重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器下发任务】收到采集器重启命令,内容={}", messageContent);
}
if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
log.info("收到虚拟机重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器下发任务】收到虚拟机重启命令,内容={}", messageContent);
}
if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
log.info("收到更新采集器文件命令,内容={}", messageContent); log.info("【终端客户端-收到调度器下发任务】收到更新采集器文件命令,内容={}", messageContent);
}
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
log.info("【终端客户端-收到调度器下发任务】收到更新采集器配置命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
log.info("【终端客户端-收到调度器下发任务】收到采集调度器下发任务命令,内容={}", messageContent);
//下发任务: 判断这个任务采集器类型在指定目录是否存在、
//如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器
//如果存在:
SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(JSON.toJSON(messageContent), SchCollectorTaskDTO.class);
String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId();
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
} else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) {
log.info("【终端客户端-收到调度器下发任务】是否有可用类型的采集器命令,内容={}", messageContent);
if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
log.info("收到更新采集器配置命令,内容={}", messageContent);
}
if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { //下发任务: 判断这个任务采集器类型在指定目录是否存在、
log.info("收到采集调度器下发任务命令,内容={}", messageContent); //如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器
//如果存在:
} }
} }

@ -40,11 +40,6 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("通道激活......"); log.info("通道激活......");
// Payload.Message.Builder builder = Payload.Message.newBuilder()
// .setClient(appName)
// .setContent("我来了")
// .setCmd(Payload.Message.type.AUTH);
CommMsg onlineRegister = CommMsg.builder() CommMsg onlineRegister = CommMsg.builder()
.messageType(MsgConstants.ONLINE_REGISTER) .messageType(MsgConstants.ONLINE_REGISTER)
.messageTime(DateUtil.formatDateTime(new Date())) .messageTime(DateUtil.formatDateTime(new Date()))
@ -61,12 +56,7 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt; IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) { if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
// 一定时间内,通道内未传递消息,发送心跳,保证存活 // 一定时间内,通道内未传递消息,发送心跳,保证存活
log.info("after {} seconds no message wrote", nettyProperties.getWriterIdleTimeSeconds()); log.info("【终端客户端心跳】 after {} seconds no message wrote", nettyProperties.getWriterIdleTimeSeconds());
// Payload.Message heartbeat = Payload.Message
// .newBuilder()
// .setCmd(Payload.Message.type.HEARTBEAT_REQUEST)
// .build();
CommMsg heartbeat = CommMsg.builder() CommMsg heartbeat = CommMsg.builder()
.messageType(MsgConstants.HEARTBEAT_REQUEST) .messageType(MsgConstants.HEARTBEAT_REQUEST)

@ -1,57 +0,0 @@
package com.docus.server.common.netty.server;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IP
*/
@Component
@Slf4j
public class ChannelRepository {
/**
* <collectorId-Channel>
*/
private final static Map<String, Channel> COLLECTOR_ID_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>();
/**
* 线
*/
public void put(String collectorId, Channel channel) {
//缓存
COLLECTOR_ID_CHANNEL_CACHE_MAP.put(collectorId, channel);
AttributeKey<String> attributeKey = AttributeKey.valueOf("ip");
channel.attr(attributeKey).set(collectorId);
}
public String getClientKey(Channel channel) {
AttributeKey<String> key = AttributeKey.valueOf("ip");
if (channel.hasAttr(key)) {
return channel.attr(key).get();
}
return null;
}
public Channel get(String key) {
return COLLECTOR_ID_CHANNEL_CACHE_MAP.get(key);
}
/**
* 线
*/
public void remove(String key) {
COLLECTOR_ID_CHANNEL_CACHE_MAP.remove(key);
}
}

@ -0,0 +1,106 @@
package com.docus.server.common.netty.server;
import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.state.DeviceStateContext;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* id
*
*/
@Component
@Slf4j
public class CollectorChannelCacheMap {
/**
* <collectorId-Channel>
*/
private final static Map<String, Channel> COLLECTOR_ID_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>();
/**
* 线
*/
public void put(String collectorId, Channel channel) {
//缓存
COLLECTOR_ID_CHANNEL_CACHE_MAP.put(collectorId, channel);
AttributeKey<String> attributeKey = AttributeKey.valueOf("collectorId");
channel.attr(attributeKey).set(collectorId);
}
/**
* channelid
*/
public String getClientKey(Channel channel) {
AttributeKey<String> key = AttributeKey.valueOf("collectorId");
if (channel.hasAttr(key)) {
return channel.attr(key).get();
}
return null;
}
public void writeAndFlush(String collectorId, CommMsg commMsg) {
Channel channel = this.get(collectorId);
if (null != channel && channel.isOpen()) {
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
}
}
public Channel get(String key) {
return COLLECTOR_ID_CHANNEL_CACHE_MAP.get(key);
}
/**
* 线
*/
public void remove(String key) {
COLLECTOR_ID_CHANNEL_CACHE_MAP.remove(key);
}
/**
*
*
* @param ctx
* @param value
*/
public void setAttribute(ChannelHandlerContext ctx, DeviceStateContext value) {
Attribute<DeviceStateContext> attribute = getAttribute(ctx);
attribute.set(value);
}
/**
*
*
* @param ctx
* @return null
*/
public DeviceStateContext getAttributeValue(ChannelHandlerContext ctx) {
Attribute<DeviceStateContext> attribute = getAttribute(ctx);
DeviceStateContext value = attribute.get();
return value;
}
/**
* null
*
* @param ctx channel
* @return ctx channle attribute
*/
private Attribute<DeviceStateContext> getAttribute(ChannelHandlerContext ctx) {
return ctx.channel().attr(AttributeKey.valueOf("state"));
}
}

@ -22,7 +22,7 @@ public class NettyServerInitializer extends ChannelInitializer<Channel> {
private NettyBusinessHandler businessHandler; private NettyBusinessHandler businessHandler;
@Resource @Resource
private ChannelRepository channelRepository; private CollectorChannelCacheMap channelRepository;
@Override @Override
protected void initChannel(Channel channel) throws Exception { protected void initChannel(Channel channel) throws Exception {

@ -4,11 +4,12 @@ import com.docus.core.util.DateUtil;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.StringUtils; import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.api.scheduling.management.SchCollectRecordApi;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.ChannelAttribute;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.common.netty.state.DeviceStateContext;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -38,15 +39,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static final ChannelAttribute<DeviceStateContext> STATE_MACHINE_SESSION = new ChannelAttribute<>("state");
@Resource @Resource
private ChannelRepository repository; private CollectorChannelCacheMap collectorChannelCacheMap;
@Resource
private SchCollectRecordApi schCollectRecordApi;
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组 //创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()]; byte[] barray = new byte[buf.readableBytes()];
@ -64,11 +64,11 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
Serializable messageContent = commMsg.getContent(); String messageContent = (String) commMsg.getContent();
if (messageType.equals(MsgConstants.ONLINE_REGISTER)) { if (messageType.equals(MsgConstants.ONLINE_REGISTER)) {
//===========login ok,切换到已登录状态=============== //===========login ok,切换到已登录状态===============messageContent=collectorId=========
deviceStateContext.onLoginSucc("device-123", System.currentTimeMillis(), 10, "设备认证通过"); deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过");
CommMsg<Serializable> login_ok = CommMsg.builder() CommMsg<Serializable> login_ok = CommMsg.builder()
.messageTime(DateUtil.formatDateTime(new Date())) .messageTime(DateUtil.formatDateTime(new Date()))
@ -77,10 +77,19 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
.build(); .build();
ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(login_ok), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(login_ok), CharsetUtil.UTF_8));
Channel channel = collectorChannelCacheMap.get(messageContent);
if (channel != null && channel.isOpen()) {
channel.close();
}
if (Func.isNotBlank(messageContent)) {
collectorChannelCacheMap.put(messageContent, ctx.channel());
}
} else if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) { } else if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) {
//============状态为上行数据============= //============状态为上行数据=============
deviceStateContext.onHeartbeat(System.currentTimeMillis(), "设备上行了数据"); deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据");
//==============处理数据================ //==============处理数据================
System.out.println("收到数据:" + msg); System.out.println("收到数据:" + msg);
@ -93,32 +102,37 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(resp), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(resp), CharsetUtil.UTF_8));
log.info("接受到【终端-采集器】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent);
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress(); String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【终端-采集器IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); log.info("【终端服务端-接收采集器消息】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
} else if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { } else if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
log.info("接受到终端重启命令,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】接受到终端重启命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
log.info("收到采集器重启命令,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】收到采集器重启命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
log.info("收到虚拟机重启命令,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】收到虚拟机重启命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
log.info("收到更新采集器文件命令,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】收到更新采集器文件命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
log.info("收到更新采集器配置命令,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】收到更新采集器配置命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
log.info("收到采集调度器下发任务命令,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】收到采集调度器下发任务命令,内容{}", messageContent);
} else if (messageType.equals(MsgConstants.REV_COLLECTOR_TASK)) {
log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent);
EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(JSON.toJSON(messageContent), EditSchCollectRecordDTO.class);
schCollectRecordApi.edit(editSchCollectRecordDTO);
System.out.println(1);
} }
@ -132,14 +146,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println(getClass().getSimpleName() + "." + "userEventTriggered" + ctx.channel().remoteAddress()); System.out.println(getClass().getSimpleName() + "." + "userEventTriggered" + ctx.channel().remoteAddress());
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
long lastUpdateTime = deviceStateContext.getLastUpdateTime(); long lastUpdateTime = deviceStateContext.getLastUpdateTime();
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
long intervalTime = currentTimeMillis - lastUpdateTime; long intervalTime = currentTimeMillis - lastUpdateTime;
if (intervalTime > deviceStateContext.getHeartRate()) { if (intervalTime > deviceStateContext.getHeartRate()) {
//==============发生超时,进入超时状态============== //==============发生超时,进入超时状态==============
deviceStateContext.onTimeout("设备发生超时"); deviceStateContext.onTimeout("采集器发生超时");
System.out.println("userEventTriggered:" + deviceStateContext.toString()); System.out.println("userEventTriggered:" + deviceStateContext.toString());
} }
} else { } else {
@ -160,18 +174,14 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
//===========设置设备状态为 未登录================= //===========设置设备状态为 未登录=================
deviceStateContext.onConnect(System.currentTimeMillis(), "设备 active"); deviceStateContext.onConnect(System.currentTimeMillis(), "设备 active");
//更新添加 state 属性 //更新添加 state 属性
STATE_MACHINE_SESSION.setAttribute(ctx, deviceStateContext); // STATE_MACHINE_SESSION.setAttribute(ctx, deviceStateContext);
collectorChannelCacheMap.setAttribute(ctx, deviceStateContext);
System.out.println("channelActive:" + deviceStateContext.toString()); System.out.println("channelActive:" + deviceStateContext.toString());
//===========设置设备状态为 未登录================= //===========设置设备状态为 未登录=================
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress(); String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【终端-采集器IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); log.info("【终端服务端-采集器IP连接成功】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
if (Func.isNotBlank(clientIp)) {
repository.put(clientIp, ctx.channel());
}
DEFAULT_CHANNEL_GROUP.add(ctx.channel()); DEFAULT_CHANNEL_GROUP.add(ctx.channel());
@ -185,15 +195,15 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String clientId = repository.getClientKey(channel); String collectorId = collectorChannelCacheMap.getClientKey(channel);
log.error("客户端下线,终端连接:{}", clientId); log.error("客户端下线,终端连接:{}", collectorId);
//移除终端,终端离线 //移除终端,终端离线
if (clientId != null) { if (collectorId != null) {
repository.remove(clientId); collectorChannelCacheMap.remove(collectorId);
} }
//================设置为断开================ //================设置为断开================
DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
deviceStateContext.onDisconnect("设备 inactive"); deviceStateContext.onDisconnect("设备 inactive");
System.out.println("channelInactive:" + deviceStateContext.toString()); System.out.println("channelInactive:" + deviceStateContext.toString());
} }
@ -205,17 +215,17 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//==============发生异常切换到断开模式=============== //==============发生异常切换到断开模式===============
System.out.println("exceptionCaught:" + cause.getMessage()); System.out.println("exceptionCaught:" + cause.getMessage());
DeviceStateContext deviceStateContext = STATE_MACHINE_SESSION.getAttributeValue(ctx); DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
deviceStateContext.onDisconnect(cause.getMessage()); deviceStateContext.onDisconnect(cause.getMessage());
System.out.println("exceptionCaught:" + deviceStateContext.toString()); System.out.println("exceptionCaught:" + deviceStateContext.toString());
cause.printStackTrace(); cause.printStackTrace();
String clientId = repository.getClientKey(ctx.channel()); String collectorId = collectorChannelCacheMap.getClientKey(ctx.channel());
log.error("通道发生异常,终端连接:{}", clientId); log.error("通道发生异常,终端连接:{}", collectorId);
//移除终端,终端离线 //移除终端,终端离线
if (clientId != null) { if (collectorId != null) {
repository.remove(clientId); collectorChannelCacheMap.remove(collectorId);
} }
if (ctx.channel().isActive()) { if (ctx.channel().isActive()) {
ctx.close(); ctx.close();

@ -2,7 +2,7 @@ package com.docus.server;
import cn.hutool.core.util.ZipUtil; import cn.hutool.core.util.ZipUtil;
import com.docus.server.common.download.downLoader.HttpDownloader; import com.docus.server.common.download.downLoader.HttpDownloader;
import org.junit.jupiter.api.Test; import com.docus.server.common.utils.StartUpExeUtils;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
/** /**
@ -14,17 +14,28 @@ import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest @SpringBootTest
public class FileController { public class FileController {
@Test public static void test() throws Exception {
void test() throws Exception { /**
* 1
* 2
* 3
*/
HttpDownloader httpDownloader = new HttpDownloader(null); HttpDownloader httpDownloader = new HttpDownloader(null);
String url = "http://127.0.0.1:9113/sch/file/download?filePath=collector_packages/20230727/78c002bc-7674-4dfe-9247-ae594d03ccdf/docus-collector-scheduling.tar.gz"; String url = "http://192.168.16.110:9113/sch/file/download?filePath=collector_packages/20230718/91d930e6-0490-44e5-9756-caee3251d645/navicat.zip";
String fileName = "collector.tar.gz"; String fileName = "collector.zip";
String savePath = "/Users/linruifeng/Desktop/collector_packages"; String savePath = "H:\\test"; //部署路径
String procName = "navicat.exe"; //进程名称
httpDownloader.downLoadFromUrl(url, fileName, savePath); httpDownloader.downLoadFromUrl(url, fileName, savePath);
ZipUtil.unzip(savePath + "\\" + fileName, savePath + "\\collector");
StartUpExeUtils.startUpExe(savePath + "\\collector\\" + procName, procName);
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
test1(); test();
} }
public static void test1() throws Exception { public static void test1() throws Exception {

@ -1,36 +0,0 @@
//package com.docus.server.api.scheduling.management;
//
//import io.swagger.annotations.Api;
//import io.swagger.annotations.ApiImplicitParam;
//import io.swagger.annotations.ApiImplicitParams;
//import io.swagger.annotations.ApiOperation;
//import org.springframework.cloud.openfeign.FeignClient;
//import org.springframework.web.bind.annotation.*;
//import org.springframework.web.multipart.MultipartFile;
//
//import javax.servlet.http.HttpServletResponse;
//
//
///**
// * 文件上传下载 API
// *
// * @author AutoGenerator
// * @since 2023-07-15
// */
//@Api(value = "通用文件上传下载接口", tags = "通用文件上传下载接口")
//@FeignClient(value = "collector-scheduling-management", contextId = "collector-scheduling-management.FileApi")
//@RequestMapping("/sch/file")
//public interface FileApi {
//
// @ApiOperation("文件下载")
// @GetMapping("/download")
// void downloadFile(@RequestParam(value = "filePath") String filePath, HttpServletResponse response) throws Exception;
//
// @ApiOperation("文件上传")
// @PostMapping("/upload")
// @ApiImplicitParams({
// @ApiImplicitParam(name = "files", value = "文件", required = true, dataTypeClass = MultipartFile.class)
// })
// void uploadFile(@RequestPart MultipartFile[] files, String pathKey) throws Exception;
//
//}

@ -69,7 +69,7 @@ public interface SchCollectRecordApi {
*/ */
@ApiOperation("编辑") @ApiOperation("编辑")
@PutMapping("/edit") @PutMapping("/edit")
boolean edit(@RequestBody EditSchCollectRecordDTO editSchCollectRecordDTO); void edit(@RequestBody EditSchCollectRecordDTO editSchCollectRecordDTO);
/** /**
* *

@ -63,11 +63,10 @@ public interface SchTerminatorApi {
* *
* *
* @param editSchTerminatorDTO * @param editSchTerminatorDTO
* @return
*/ */
@ApiOperation("编辑") @ApiOperation("编辑")
@PutMapping("/edit") @PutMapping("/edit")
boolean edit(@RequestBody EditSchTerminatorDTO editSchTerminatorDTO); void edit(@RequestBody EditSchTerminatorDTO editSchTerminatorDTO);
/** /**
* *

@ -6,7 +6,10 @@ import com.docus.server.enums.SubStateEnum;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
@ -17,6 +20,9 @@ import java.util.Date;
* @author AutoGenerator * @author AutoGenerator
* @since 2023-07-15 * @since 2023-07-15
*/ */
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data @Data
@ApiModel(value = "EditSchCollectRecordDTO对象", description = "采集记录表") @ApiModel(value = "EditSchCollectRecordDTO对象", description = "采集记录表")
public class EditSchCollectRecordDTO implements Serializable { public class EditSchCollectRecordDTO implements Serializable {

Loading…
Cancel
Save