处理netty 下发任务粘包分包问题

segment2.0
linrf 2 years ago
parent 01055eeda0
commit 12bb788009

@ -5,7 +5,6 @@ import com.docus.core.util.Func;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.infrastructure.redis.service.RedisOps; import com.docus.infrastructure.redis.service.RedisOps;
import com.docus.server.api.taskdistribute.TaskDistributeApi; import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
@ -32,7 +31,6 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -110,7 +108,7 @@ public class SchCollectorTask {
continue; continue;
} }
for (ReportDownTwoDTO report : reportDownTwoDTOList) { for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。 //先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
@ -206,7 +204,7 @@ public class SchCollectorTask {
private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) { private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) {
hasValidCollector(terminal, reportDownTwoDTO); // hasValidCollector(terminal, reportDownTwoDTO);
Channel channel = channelRepository.get(terminal.getTerminatorIp()); Channel channel = channelRepository.get(terminal.getTerminatorIp());
@ -238,6 +236,7 @@ public class SchCollectorTask {
//tcp 下发任务到终端 //tcp 下发任务到终端
if (channel != null) { if (channel != null) {
System.out.println(JSON.toJSON(commMsg));
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
} }
@ -380,8 +379,8 @@ public class SchCollectorTask {
"}"; "}";
ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class); ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
// List<ReportDownTwoDTO> allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); List<ReportDownTwoDTO> allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3);
List<ReportDownTwoDTO> allTaskList = new ArrayList<>(); // List<ReportDownTwoDTO> allTaskList = new ArrayList<>();
if (!CollectionUtils.isEmpty(this.retryTaskQueue)) { if (!CollectionUtils.isEmpty(this.retryTaskQueue)) {
ReportDownTwoDTO retryTask = (ReportDownTwoDTO) this.retryTaskQueue.take(); ReportDownTwoDTO retryTask = (ReportDownTwoDTO) this.retryTaskQueue.take();

@ -1,9 +1,13 @@
package com.docus.server.common.netty.server; package com.docus.server.common.netty.server;
import com.docus.server.common.JsonDecoder;
import com.docus.server.common.JsonEncoder;
import com.docus.server.common.netty.server.handler.NettyBusinessHandler; import com.docus.server.common.netty.server.handler.NettyBusinessHandler;
import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler; import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -40,13 +44,21 @@ public class NettyServerInitializer extends ChannelInitializer<Channel> {
// .addLast(new ProtobufVarint32LengthFieldPrepender()) // .addLast(new ProtobufVarint32LengthFieldPrepender())
// .addLast(new ProtobufEncoder()) // .addLast(new ProtobufEncoder())
// .addLast(new LineBasedFrameDecoder(2048)) // .addLast(new LineBasedFrameDecoder(1024))
// .addLast(new StringDecoder()) // .addLast(new StringDecoder(CharsetUtil.UTF_8))
// .addLast(new StringEncoder()) // .addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2))
// LengthFieldPrepender是一个编码器主要是在响应字节数据前面添加字节长度字段
.addLast(new LengthFieldPrepender(2))
// 对经过粘包和拆包处理之后的数据进行json反序列化从而得到UserInfo对象
.addLast(new JsonDecoder())
// 对响应数据进行编码主要是将UserInfo对象序列化为json
.addLast(new JsonEncoder())
// 加载业务处理器 // 加载业务处理器
.addLast(new NettyHeartbeatHandler(channelRepository)) .addLast(new NettyHeartbeatHandler(channelRepository))
.addLast(businessHandler); .addLast(businessHandler);
// .addLast(new EchoServerHandler()); // .addLast(new EchoServerHandler());
} }
} }

@ -1,7 +1,7 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.CommMsg;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;

@ -1,15 +1,12 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.StringUtils; import com.docus.server.common.CommMsg;
import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.OnlineStateEnum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -30,7 +27,7 @@ import java.net.InetSocketAddress;
@Slf4j @Slf4j
@ChannelHandler.Sharable @ChannelHandler.Sharable
@Component @Component
public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> { public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Serializable>> {
private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@ -38,21 +35,21 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
private ChannelRepository repository; private ChannelRepository repository;
@Override @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception { protected void channelRead0(ChannelHandlerContext channelHandlerContext, CommMsg commMsg) throws Exception {
ByteBuf buf = (ByteBuf) msg; // ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组 // //创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()]; // byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[] // //把数据从bytebuf转移到byte[]
buf.getBytes(0, barray); // buf.getBytes(0, barray);
//将byte[]转成字符串用于打印 // //将byte[]转成字符串用于打印
String message = new String(barray); // String message = new String(barray);
//
//空消息不处理 // //空消息不处理
if (!StringUtils.hasText(message)) { // if (!StringUtils.hasText(message)) {
return; // return;
} // }
//
CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); // CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
@ -108,7 +105,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
DEFAULT_CHANNEL_GROUP.add(ctx.channel()); DEFAULT_CHANNEL_GROUP.add(ctx.channel());
System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(ctx.channel().remoteAddress() + " 上线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
} }
/** /**
@ -117,7 +114,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(channel.remoteAddress() + " 下线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String clientId = repository.getClientKey(channel); String clientId = repository.getClientKey(channel);
log.error("客户端下线,终端连接:{}", clientId); log.error("客户端下线,终端连接:{}", clientId);
//移除终端,终端离线 //移除终端,终端离线

@ -1,15 +1,12 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.StringUtils; import com.docus.server.common.CommMsg;
import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum; import com.docus.server.enums.OnlineStateEnum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
@ -27,20 +24,21 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg; // ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组 // //创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()]; // byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[] // //把数据从bytebuf转移到byte[]
buf.getBytes(0, barray); // buf.getBytes(0, barray);
//将byte[]转成字符串用于打印 // //将byte[]转成字符串用于打印
String message = new String(barray); // String message = new String(barray);
//空消息不处理 // //空消息不处理
if (!StringUtils.hasText(message)) { // if (!StringUtils.hasText(message)) {
return; // return;
} // }
CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); // CommMsg commMsg = JSON.fromJSON((String) message, CommMsg.class);
CommMsg commMsg = (CommMsg) msg;
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
Serializable content = commMsg.getContent(); Serializable content = commMsg.getContent();
@ -77,7 +75,6 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
ctx.fireChannelRead(msg); ctx.fireChannelRead(msg);
} }
} }
} }
private ChannelRepository repository; private ChannelRepository repository;

@ -1,6 +1,6 @@
package com.docus.server.convert; package com.docus.server.convert;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mappings; import org.mapstruct.Mappings;

@ -2,7 +2,7 @@ package com.docus.server.service.impl;
import com.docus.core.util.DateUtil; import com.docus.core.util.DateUtil;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.convert.CommMsgConvert; import com.docus.server.convert.CommMsgConvert;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;

@ -13,8 +13,8 @@ spring:
#公司病案的文件服务数据库 #公司病案的文件服务数据库
master: master:
url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: root username: docus
password: root password: docus702
driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
servlet: servlet:
@ -23,7 +23,7 @@ spring:
max-request-size: 200MB max-request-size: 200MB
redis: redis:
host: redis.docus.cn host: redis.docus.cn
# password: JSdocus@702 password: JSdocus@702
cloud: cloud:
nacos: nacos:
discovery: discovery:
@ -60,7 +60,7 @@ netty:
all-idle-time-seconds: 0 all-idle-time-seconds: 0
file: file:
uploadFolder: /Users/linruifeng/workspace/ uploadFolder: D://docus/
docus: docus:
redisKeyExpiration: true redisKeyExpiration: true

@ -152,7 +152,7 @@ public class LoadPackageCommandLineRunner implements CommandLineRunner {
if (collectorVoMap.containsKey(loadSchCollectorVO.getCollectorId())) { if (collectorVoMap.containsKey(loadSchCollectorVO.getCollectorId())) {
LoadSchCollectorVO redisLoadSchCollectorVO = collectorVoMap.get(loadSchCollectorVO.getCollectorId()); LoadSchCollectorVO redisLoadSchCollectorVO = collectorVoMap.get(loadSchCollectorVO.getCollectorId());
if (!redisLoadSchCollectorVO.getCollectorVersionId().equals(loadSchCollectorVO.getCollectorVersionId())) { if (!redisLoadSchCollectorVO.getCollectorVersionId().equals(loadSchCollectorVO.getCollectorVersionId())) {
actionDownLoad(loadSchCollectorVO); downloadCollectorPackages(loadSchCollectorVO);
} }
} }
} }
@ -163,12 +163,12 @@ public class LoadPackageCommandLineRunner implements CommandLineRunner {
private void updateAll(List<LoadSchCollectorVO> loadSchCollectorVOList) throws Exception { private void updateAll(List<LoadSchCollectorVO> loadSchCollectorVOList) throws Exception {
StopWatch watch = new StopWatch(); StopWatch watch = new StopWatch();
for (LoadSchCollectorVO loadSchCollectorVO : loadSchCollectorVOList) { for (LoadSchCollectorVO loadSchCollectorVO : loadSchCollectorVOList) {
actionDownLoad(loadSchCollectorVO); downloadCollectorPackages(loadSchCollectorVO);
} }
log.info("终端全量下载采集器更新包耗时:{}ms", watch.elapsedTime()); log.info("终端全量下载采集器更新包耗时:{}ms", watch.elapsedTime());
} }
private void actionDownLoad(LoadSchCollectorVO loadSchCollectorVO) throws Exception { private void downloadCollectorPackages(LoadSchCollectorVO loadSchCollectorVO) throws Exception {
DOWNLOAD_LOCK.lock(); DOWNLOAD_LOCK.lock();
try { try {
//下载更新包 //下载更新包

@ -1,22 +0,0 @@
package com.docus.server.common.netty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class CommMsg<MSG_CONTENT extends Serializable> implements Serializable {
public String messageType;
public String messageTime;
public MSG_CONTENT content;
}

@ -1,7 +1,8 @@
package com.docus.server.common.netty.client; package com.docus.server.common.netty.client;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -12,6 +13,8 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -34,6 +37,16 @@ public class NettyClient {
private SocketChannel socketChannel; private SocketChannel socketChannel;
@Resource
private CollectorChannelCacheMap collectorChannelCacheMap;
@Value("${docus.collector-package-download-savePath:H://packages/}")
private String saveCollectorPackagePath;
@Resource
private StringRedisTemplate template;
@Resource
private Environment env;
/** /**
* *
* *
@ -68,4 +81,16 @@ public class NettyClient {
}); });
socketChannel = (SocketChannel) future.channel(); socketChannel = (SocketChannel) future.channel();
} }
public String getRedisMsg() {
return template.opsForValue().get("docus:collectorsPackagesDownload:" + env.getProperty("server.port"));
}
public CollectorChannelCacheMap getCollectorChannelCacheMap() {
return collectorChannelCacheMap;
}
public String getSaveCollectorPackagePath() {
return saveCollectorPackagePath;
}
} }

@ -1,9 +1,13 @@
package com.docus.server.common.netty.client; package com.docus.server.common.netty.client;
import com.docus.server.common.JsonDecoder;
import com.docus.server.common.JsonEncoder;
import com.docus.server.common.netty.client.handler.ClientHandler; import com.docus.server.common.netty.client.handler.ClientHandler;
import com.docus.server.common.netty.client.handler.HeartbeatHandler; import com.docus.server.common.netty.client.handler.HeartbeatHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -30,10 +34,20 @@ public class NettyClientInitializer extends ChannelInitializer<Channel> {
// .addLast(new ProtobufDecoder(Payload.Message.getDefaultInstance())) // .addLast(new ProtobufDecoder(Payload.Message.getDefaultInstance()))
// .addLast(new ProtobufVarint32LengthFieldPrepender()) // .addLast(new ProtobufVarint32LengthFieldPrepender())
// .addLast(new ProtobufEncoder()) // .addLast(new ProtobufEncoder())
// .addLast(new LineBasedFrameDecoder(2048))
// .addLast(new LineBasedFrameDecoder(1024))
// .addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2))
// LengthFieldPrepender是一个编码器主要是在响应字节数据前面添加字节长度字段
.addLast(new LengthFieldPrepender(2))
// 对经过粘包和拆包处理之后的数据进行json反序列化从而得到UserInfo对象
.addLast(new JsonDecoder())
// 对响应数据进行编码主要是将UserInfo对象序列化为json
.addLast(new JsonEncoder())
// 加载心跳处理器 // 加载心跳处理器
.addLast(new HeartbeatHandler(nettyClient, nettyProperties, appName)) .addLast(new HeartbeatHandler(nettyClient, nettyProperties, appName))
// 加载业务处理器 // 加载业务处理器
.addLast(new ClientHandler()) .addLast(new ClientHandler(nettyClient))
.addLast(); .addLast();
} }

@ -1,40 +1,50 @@
package com.docus.server.common.netty.client.handler; package com.docus.server.common.netty.client.handler;
import com.docus.core.util.StringUtils; import com.docus.core.util.Func;
import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.client.NettyClient;
import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import com.docus.server.common.utils.StartUpExeUtils;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import io.netty.buffer.ByteBuf; import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource; import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @AllArgsConstructor
@Resource @NoArgsConstructor
private CollectorChannelCacheMap collectorChannelCacheMap; public class ClientHandler extends SimpleChannelInboundHandler<CommMsg<Serializable>> {
@Override private NettyClient nettyClient;
protected void channelRead0(ChannelHandlerContext context, ByteBuf msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[]
buf.getBytes(0, barray);
//将byte[]转成字符串用于打印
String message = new String(barray);
//空消息不处理
if (!StringUtils.hasText(message)) {
return;
}
CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); @Override
protected void channelRead0(ChannelHandlerContext context, CommMsg commMsg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// //创建目标大小的数组
// byte[] barray = new byte[buf.readableBytes()];
// //把数据从bytebuf转移到byte[]
// buf.getBytes(0, barray);
// //将byte[]转成字符串用于打印
// String message = new String(barray);
//
// //空消息不处理
// if (!StringUtils.hasText(message)) {
// return;
// }
// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
@ -43,33 +53,61 @@ public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
log.info("【终端客户端-收到调度器下发任务】接受到终端重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】接受到终端重启命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
log.info("【终端客户端-收到调度器下发任务】收到采集器重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】收到采集器重启命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
log.info("【终端客户端-收到调度器下发任务】收到虚拟机重启命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】收到虚拟机重启命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
log.info("【终端客户端-收到调度器下发任务】收到更新采集器文件命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】收到更新采集器文件命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
log.info("【终端客户端-收到调度器下发任务】收到更新采集器配置命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】收到更新采集器配置命令,内容={}", messageContent);
} else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
log.info("【终端客户端-收到调度器下发任务】收到采集调度器下发任务命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】收到采集调度器下发任务命令,内容={}", messageContent);
//下发任务: 判断这个任务采集器类型在指定目录是否存在、 //下发任务: 判断这个任务采集器类型在指定目录是否存在、
//如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器 //如果不存在下拉该采集器类型最新的版本更新包到指定的部署路径并且启动该exe采集器将任务tcp给对应采集器
//如果存在: //如果存在:
SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(JSON.toJSON(messageContent), SchCollectorTaskDTO.class); SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(messageContent, SchCollectorTaskDTO.class);
String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId(); String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId();
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
Long compareCollectorId = Long.valueOf(collectorId);
String packageValue = nettyClient.getRedisMsg();
CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap();
String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath();
List<LoadSchCollectorVO> redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference<List<LoadSchCollectorVO>>() {
});
Map<Long, LoadSchCollectorVO> collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId);
if (collectorVoMap.containsKey(compareCollectorId)) {
LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(compareCollectorId);
String processName = loadSchCollectorVO.getProcessName();
if (Func.isNotBlank(processName)) {
boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName);
//存在指定采集器id进程
if (isExistProcess) {
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
} else {
//不存在进程,则启动指定采集器进程
StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName);
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
}
}
}
} else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) { } else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) {
log.info("【终端客户端-收到调度器下发任务】是否有可用类型的采集器命令,内容={}", messageContent); log.info("【终端客户端-收到调度器消息】是否有可用类型的采集器命令,内容={}", messageContent);
//下发任务: 判断这个任务采集器类型在指定目录是否存在、 //下发任务: 判断这个任务采集器类型在指定目录是否存在、

@ -1,19 +1,16 @@
package com.docus.server.common.netty.client.handler; package com.docus.server.common.netty.client.handler;
import com.docus.core.util.DateUtil; import com.docus.core.util.DateUtil;
import com.docus.core.util.json.JSON; import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.client.NettyClient; import com.docus.server.common.netty.client.NettyClient;
import com.docus.server.common.netty.client.NettyClientProperties; import com.docus.server.common.netty.client.NettyClientProperties;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -46,7 +43,8 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
.content(appName + " 我来了") .content(appName + " 我来了")
.build(); .build();
ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(onlineRegister), CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(onlineRegister), CharsetUtil.UTF_8));
ctx.writeAndFlush(onlineRegister);
super.channelActive(ctx); super.channelActive(ctx);
} }
@ -64,7 +62,8 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
.build(); .build();
//发送心跳消息,并在发送失败时关闭该接连 //发送心跳消息,并在发送失败时关闭该接连
ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(heartbeat), CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(heartbeat), CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
} else { } else {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);

@ -1,7 +1,7 @@
package com.docus.server.common.netty.server; package com.docus.server.common.netty.server;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.CommMsg;
import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.common.netty.state.DeviceStateContext;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;

@ -1,8 +1,12 @@
package com.docus.server.common.netty.server; package com.docus.server.common.netty.server;
import com.docus.server.common.JsonDecoder;
import com.docus.server.common.JsonEncoder;
import com.docus.server.common.netty.server.handler.NettyBusinessHandler; import com.docus.server.common.netty.server.handler.NettyBusinessHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -17,10 +21,8 @@ public class NettyServerInitializer extends ChannelInitializer<Channel> {
@Resource @Resource
private NettyServerProperties serverProperties; private NettyServerProperties serverProperties;
@Resource @Resource
private NettyBusinessHandler businessHandler; private NettyBusinessHandler businessHandler;
@Resource @Resource
private CollectorChannelCacheMap channelRepository; private CollectorChannelCacheMap channelRepository;
@ -40,9 +42,15 @@ public class NettyServerInitializer extends ChannelInitializer<Channel> {
// .addLast(new ProtobufEncoder()) // .addLast(new ProtobufEncoder())
// .addLast(new LineBasedFrameDecoder(2048)) // .addLast(new LineBasedFrameDecoder(2048))
// .addLast(new StringDecoder()) // .addLast(new LineBasedFrameDecoder(1024))
// .addLast(new StringEncoder()) // .addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2))
// LengthFieldPrepender是一个编码器主要是在响应字节数据前面添加字节长度字段
.addLast(new LengthFieldPrepender(2))
// 对经过粘包和拆包处理之后的数据进行json反序列化从而得到UserInfo对象
.addLast(new JsonDecoder())
// 对响应数据进行编码主要是将UserInfo对象序列化为json
.addLast(new JsonEncoder())
// 加载业务处理器 // 加载业务处理器
// .addLast(new NettyHeartbeatHandler(channelRepository)) // .addLast(new NettyHeartbeatHandler(channelRepository))
.addLast(businessHandler); .addLast(businessHandler);

@ -1,7 +1,7 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.CommMsg;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;

@ -2,15 +2,13 @@ package com.docus.server.common.netty.server.handler;
import com.docus.core.util.DateUtil; import com.docus.core.util.DateUtil;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.server.api.scheduling.management.SchCollectRecordApi; import com.docus.server.api.scheduling.management.SchCollectRecordApi;
import com.docus.server.common.CommMsg;
import com.docus.server.common.MsgConstants; import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.common.netty.state.DeviceStateContext;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -35,7 +33,7 @@ import java.util.Date;
@Slf4j @Slf4j
@ChannelHandler.Sharable @ChannelHandler.Sharable
@Component @Component
public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> { public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Serializable>> {
private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@ -45,22 +43,22 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
private SchCollectRecordApi schCollectRecordApi; private SchCollectRecordApi schCollectRecordApi;
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, CommMsg commMsg) throws Exception {
DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
ByteBuf buf = (ByteBuf) msg; // ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组 // //创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()]; // byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[] // //把数据从bytebuf转移到byte[]
buf.getBytes(0, barray); // buf.getBytes(0, barray);
//将byte[]转成字符串用于打印 // //将byte[]转成字符串用于打印
String message = new String(barray); // String message = new String(barray);
//
//空消息不处理 // //空消息不处理
if (!StringUtils.hasText(message)) { // if (!StringUtils.hasText(message)) {
return; // return;
} // }
CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); // CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
@ -91,7 +89,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
//============状态为上行数据============= //============状态为上行数据=============
deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据"); deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据");
//==============处理数据================ //==============处理数据================
System.out.println("收到数据:" + msg); System.out.println("收到数据:" + JSON.toJSON(commMsg));
//============返回消息================== //============返回消息==================
CommMsg<Serializable> resp = CommMsg.builder() CommMsg<Serializable> resp = CommMsg.builder()
@ -185,7 +183,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
DEFAULT_CHANNEL_GROUP.add(ctx.channel()); DEFAULT_CHANNEL_GROUP.add(ctx.channel());
System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(ctx.channel().remoteAddress() + " 上线," + "【终端-采集器】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
} }
/** /**
@ -194,7 +192,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); System.out.println(channel.remoteAddress() + " 下线," + "【终端-采集器】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
String collectorId = collectorChannelCacheMap.getClientKey(channel); String collectorId = collectorChannelCacheMap.getClientKey(channel);
log.error("客户端下线,终端连接:{}", collectorId); log.error("客户端下线,终端连接:{}", collectorId);
//移除终端,终端离线 //移除终端,终端离线

@ -13,8 +13,8 @@ spring:
#公司病案的文件服务数据库 #公司病案的文件服务数据库
master: master:
url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: root username: docus
password: root password: docus702
driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
servlet: servlet:
@ -23,7 +23,7 @@ spring:
max-request-size: 200MB max-request-size: 200MB
redis: redis:
host: redis.docus.cn host: redis.docus.cn
# password: JSdocus@702 password: JSdocus@702
cloud: cloud:
nacos: nacos:
discovery: discovery:
@ -67,7 +67,9 @@ netty:
all-idle-time-seconds: 0 all-idle-time-seconds: 0
file: file:
uploadFolder: /Users/linruifeng/workspace/ uploadFolder: D://docus/
docus: docus:
vm-task-cron: 0/15 * * * * ? vm-task-cron: 0/30 * * * * ?
collector-package-download-url: http://192.168.16.110:9113
collector-package-download-savePath: H:\\packages\\

@ -1,4 +1,4 @@
package com.docus.server.common.netty; package com.docus.server.common;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;

@ -0,0 +1,22 @@
package com.docus.server.common;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
CommMsg user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), CommMsg.class);
if (null != user) {
out.add(user);
}
}
}

@ -0,0 +1,20 @@
package com.docus.server.common;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.Serializable;
public class JsonEncoder extends MessageToByteEncoder<CommMsg<Serializable>> {
@Override
protected void encode(ChannelHandlerContext ctx, CommMsg<Serializable> user, ByteBuf buf) throws Exception {
if (null != user) {
String json = JSON.toJSONString(user);
ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
}
}
}

@ -0,0 +1,185 @@
package com.docus.server.common.util;
import com.baomidou.mybatisplus.extension.api.Assert;
import java.io.PrintWriter;
import java.io.StringWriter;
public abstract class LogUtil {
private static String getStackTrace(Throwable t) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
try {
t.printStackTrace(pw);
return sw.toString();
} finally {
pw.close();
}
}
public static String exceptionMsg(String msg, Throwable e) {
return msg + ",异常消息[" + getStackTrace(e) + "]";
}
public static String exceptionSimpleMsg(String msg, Throwable e) {
return msg + ",异常消息[" + e.getLocalizedMessage() + "]";
}
/**
*
*
* @return clock
*/
public static Clock getClock() {
return new Clock() {
/**开始时间*/
private long startTime = System.nanoTime();
/**
* []
* @return
*/
public long consume() {
return LogUtil.consume(startTime, System.nanoTime());
}
/**
*
*/
public long getEnd() {
return System.nanoTime();
}
/**
*
*/
public long getStart() {
return startTime;
}
public long consumeSecond() {
return LogUtil.consumeSecond(startTime, System.nanoTime());
}
};
}
/**
* []
*
* @return
*/
public static long consumeNano(long startTime, long endTime) {
return (endTime - startTime) / 1000000000 + 1;
}
/**
* []
*
* @return
*/
public static long consume(long startTime, long endTime) {
return (endTime - startTime) / 1000000 + 1;
}
/**
* []
*
* @return
*/
public static long consumeSecond(long startTime, long endTime) {
return (endTime - startTime) / 1000 + 1;
}
/**
* getRootCause
*
*
* @param ex
* @return
* @author
*/
public static Throwable getRootCause(Throwable ex) {
Throwable cause = ex;
while (cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}
/**
*
*
* @param ex
* @return
*/
public static String traceRootExceptionInfo(Throwable ex) {
Throwable cause = ex;
while (cause.getCause() != null) {
cause = cause.getCause();
}
return cause.toString();
}
/**
*
*
* @param ex
* @return
*/
public static String tarceExceptionInfo(Throwable ex) {
StringBuffer sb = new StringBuffer();
Throwable cause = ex;
sb.insert(0, cause);
while (cause.getCause() != null) {
cause = cause.getCause();
sb.insert(0, cause.toString());
}
return sb.toString();
}
public static interface Clock {
/**
*
*
* @return
*/
long consume();
/**
*
*
* @return
*/
long consumeSecond();
/**
*
*
* @return
* @author:lwei
*/
long getStart();
/**
*
*
* @return
* @author:lwei
*/
long getEnd();
}
public static void main(String[] args) {
try {
Assert.fail("dd");
} catch (Throwable e) {
System.out.println(exceptionMsg("aa", e));
;
}
}
}

@ -0,0 +1,47 @@
package com.docus.server.common.util.io;
import com.baomidou.mybatisplus.extension.api.Assert;
import com.docus.core.util.pdf.util.IOUtils;
import com.docus.server.common.util.LogUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SerializableMsgCodec {
public Serializable decode(byte[] bytes) {
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (Serializable) ois.readObject();
} catch (Exception e) {
Assert.fail(LogUtil.exceptionMsg("对象序列化异常,异常消息", e));
return null;
} finally {
IOUtils.close(ois);
}
}
public byte[] encode(Serializable obj) {
ObjectOutputStream oos = null;
ByteArrayOutputStream baos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(obj);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (Exception e) {
Assert.fail(LogUtil.exceptionMsg("对象序列化异常,异常消息", e));
return null;
} finally {
IOUtils.close(baos);
IOUtils.close(oos);
}
}
}

@ -1,5 +1,6 @@
package com.docus.server.dto.scheduling.management.schcollector.task; package com.docus.server.dto.scheduling.management.schcollector.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
@ -20,7 +21,7 @@ import java.util.Map;
@Data @Data
@ApiModel(value = "ReportDownTwoDTO", description = "下发任务") @ApiModel(value = "ReportDownTwoDTO", description = "下发任务")
public class ReportDownTwoDTO { public class ReportDownTwoDTO {
@JsonIgnore
@ApiModelProperty(value = "扩展参数") @ApiModelProperty(value = "扩展参数")
private Map<String, Object> params; private Map<String, Object> params;

Loading…
Cancel
Save