From 12bb7880097baa2e55b2a236355759f7156abc1f Mon Sep 17 00:00:00 2001 From: linrf Date: Tue, 8 Aug 2023 16:17:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86netty=20=E4=B8=8B=E5=8F=91?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E7=B2=98=E5=8C=85=E5=88=86=E5=8C=85=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../docus/server/common/SchCollectorTask.java | 11 +- .../netty/server/NettyServerInitializer.java | 20 +- .../server/handler/EchoServerHandler.java | 2 +- .../server/handler/NettyBusinessHandler.java | 41 ++-- .../server/handler/NettyHeartbeatHandler.java | 35 ++-- .../docus/server/convert/CommMsgConvert.java | 2 +- .../service/impl/CommMsgServiceImpl.java | 2 +- .../src/main/resources/bootstrap.yml | 8 +- .../common/LoadPackageCommandLineRunner.java | 6 +- .../docus/server/common/netty/CommMsg.java | 22 --- .../common/netty/client/NettyClient.java | 27 ++- .../netty/client/NettyClientInitializer.java | 16 +- .../netty/client/handler/ClientHandler.java | 100 +++++++--- .../client/handler/HeartbeatHandler.java | 11 +- .../server/CollectorChannelCacheMap.java | 2 +- .../netty/server/NettyServerInitializer.java | 18 +- .../server/handler/EchoServerHandler.java | 2 +- .../server/handler/NettyBusinessHandler.java | 42 ++-- .../src/main/resources/bootstrap.yml | 12 +- .../com/docus/server/common}/CommMsg.java | 2 +- .../com/docus/server/common/JsonDecoder.java | 22 +++ .../com/docus/server/common/JsonEncoder.java | 20 ++ .../com/docus/server/common/util/LogUtil.java | 185 ++++++++++++++++++ .../common/util/io/SerializableMsgCodec.java | 47 +++++ .../schcollector/task/ReportDownTwoDTO.java | 3 +- 25 files changed, 500 insertions(+), 158 deletions(-) delete mode 100644 collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java rename {collector-scheduling-management/src/main/java/com/docus/server/common/netty => docus-api-common/src/main/java/com/docus/server/common}/CommMsg.java (90%) create mode 100644 docus-api-common/src/main/java/com/docus/server/common/JsonDecoder.java create mode 100644 docus-api-common/src/main/java/com/docus/server/common/JsonEncoder.java create mode 100644 docus-api-common/src/main/java/com/docus/server/common/util/LogUtil.java create mode 100644 docus-api-common/src/main/java/com/docus/server/common/util/io/SerializableMsgCodec.java diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java index 0676124..f355c53 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchCollectorTask.java @@ -5,7 +5,6 @@ import com.docus.core.util.Func; import com.docus.core.util.json.JSON; import com.docus.infrastructure.redis.service.RedisOps; import com.docus.server.api.taskdistribute.TaskDistributeApi; -import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO; @@ -32,7 +31,6 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -110,7 +108,7 @@ public class SchCollectorTask { continue; } - for (ReportDownTwoDTO report : reportDownTwoDTOList) { + for (ReportDownTwoDTO report : reportDownTwoDTOList) { //先找出有只采集的任务。 ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) @@ -206,7 +204,7 @@ public class SchCollectorTask { private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) { - hasValidCollector(terminal, reportDownTwoDTO); +// hasValidCollector(terminal, reportDownTwoDTO); Channel channel = channelRepository.get(terminal.getTerminatorIp()); @@ -238,6 +236,7 @@ public class SchCollectorTask { //tcp 下发任务到终端 if (channel != null) { + System.out.println(JSON.toJSON(commMsg)); channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); } @@ -380,8 +379,8 @@ public class SchCollectorTask { "}"; ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class); -// List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); - List allTaskList = new ArrayList<>(); + List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); +// List allTaskList = new ArrayList<>(); if (!CollectionUtils.isEmpty(this.retryTaskQueue)) { ReportDownTwoDTO retryTask = (ReportDownTwoDTO) this.retryTaskQueue.take(); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java index 04f4600..160a4fa 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java @@ -1,9 +1,13 @@ package com.docus.server.common.netty.server; +import com.docus.server.common.JsonDecoder; +import com.docus.server.common.JsonEncoder; import com.docus.server.common.netty.server.handler.NettyBusinessHandler; import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.stereotype.Component; @@ -40,13 +44,21 @@ public class NettyServerInitializer extends ChannelInitializer { // .addLast(new ProtobufVarint32LengthFieldPrepender()) // .addLast(new ProtobufEncoder()) -// .addLast(new LineBasedFrameDecoder(2048)) -// .addLast(new StringDecoder()) -// .addLast(new StringEncoder()) - +// .addLast(new LineBasedFrameDecoder(1024)) +// .addLast(new StringDecoder(CharsetUtil.UTF_8)) +// .addLast(new StringEncoder(CharsetUtil.UTF_8)) + .addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2)) + // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段 + .addLast(new LengthFieldPrepender(2)) + // 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到UserInfo对象 + .addLast(new JsonDecoder()) + // 对响应数据进行编码,主要是将UserInfo对象序列化为json + .addLast(new JsonEncoder()) // 加载业务处理器 .addLast(new NettyHeartbeatHandler(channelRepository)) .addLast(businessHandler); // .addLast(new EchoServerHandler()); } + + } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java index 600aa47..db6578f 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java @@ -1,7 +1,7 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.json.JSON; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.CommMsg; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index 3d253b0..5da0ee8 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -1,15 +1,12 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.Func; -import com.docus.core.util.StringUtils; -import com.docus.core.util.json.JSON; +import com.docus.server.common.CommMsg; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -30,7 +27,7 @@ import java.net.InetSocketAddress; @Slf4j @ChannelHandler.Sharable @Component -public class NettyBusinessHandler extends SimpleChannelInboundHandler { +public class NettyBusinessHandler extends SimpleChannelInboundHandler> { private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @@ -38,21 +35,21 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { private ChannelRepository repository; @Override - protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception { - ByteBuf buf = (ByteBuf) msg; - //创建目标大小的数组 - byte[] barray = new byte[buf.readableBytes()]; - //把数据从bytebuf转移到byte[] - buf.getBytes(0, barray); - //将byte[]转成字符串用于打印 - String message = new String(barray); - - //空消息不处理 - if (!StringUtils.hasText(message)) { - return; - } - - CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); + protected void channelRead0(ChannelHandlerContext channelHandlerContext, CommMsg commMsg) throws Exception { +// ByteBuf buf = (ByteBuf) msg; +// //创建目标大小的数组 +// byte[] barray = new byte[buf.readableBytes()]; +// //把数据从bytebuf转移到byte[] +// buf.getBytes(0, barray); +// //将byte[]转成字符串用于打印 +// String message = new String(barray); +// +// //空消息不处理 +// if (!StringUtils.hasText(message)) { +// return; +// } +// +// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); String messageType = commMsg.getMessageType(); String messageTime = commMsg.getMessageTime(); @@ -108,7 +105,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { DEFAULT_CHANNEL_GROUP.add(ctx.channel()); - System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); + System.out.println(ctx.channel().remoteAddress() + " 上线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); } /** @@ -117,7 +114,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); - System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); + System.out.println(channel.remoteAddress() + " 下线," + "【调度器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); String clientId = repository.getClientKey(channel); log.error("客户端下线,终端连接:{}", clientId); //移除终端,终端离线 diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java index 9d308c8..a70302b 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java @@ -1,15 +1,12 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.Func; -import com.docus.core.util.StringUtils; -import com.docus.core.util.json.JSON; +import com.docus.server.common.CommMsg; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -27,20 +24,21 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf buf = (ByteBuf) msg; - //创建目标大小的数组 - byte[] barray = new byte[buf.readableBytes()]; - //把数据从bytebuf转移到byte[] - buf.getBytes(0, barray); - //将byte[]转成字符串用于打印 - String message = new String(barray); - //空消息不处理 - if (!StringUtils.hasText(message)) { - return; - } - - CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); - +// ByteBuf buf = (ByteBuf) msg; +// //创建目标大小的数组 +// byte[] barray = new byte[buf.readableBytes()]; +// //把数据从bytebuf转移到byte[] +// buf.getBytes(0, barray); +// //将byte[]转成字符串用于打印 +// String message = new String(barray); +// //空消息不处理 +// if (!StringUtils.hasText(message)) { +// return; +// } + +// CommMsg commMsg = JSON.fromJSON((String) message, CommMsg.class); + + CommMsg commMsg = (CommMsg) msg; String messageType = commMsg.getMessageType(); String messageTime = commMsg.getMessageTime(); Serializable content = commMsg.getContent(); @@ -77,7 +75,6 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { ctx.fireChannelRead(msg); } } - } private ChannelRepository repository; diff --git a/collector-scheduling-management/src/main/java/com/docus/server/convert/CommMsgConvert.java b/collector-scheduling-management/src/main/java/com/docus/server/convert/CommMsgConvert.java index fae23c5..f9beeb9 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/convert/CommMsgConvert.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/convert/CommMsgConvert.java @@ -1,6 +1,6 @@ package com.docus.server.convert; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.CommMsg; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; import org.mapstruct.Mapper; import org.mapstruct.Mappings; diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java index 7e4f4d0..ebb7e28 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java @@ -2,7 +2,7 @@ package com.docus.server.service.impl; import com.docus.core.util.DateUtil; import com.docus.core.util.json.JSON; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.convert.CommMsgConvert; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; diff --git a/collector-scheduling-management/src/main/resources/bootstrap.yml b/collector-scheduling-management/src/main/resources/bootstrap.yml index b617803..5de0b5b 100644 --- a/collector-scheduling-management/src/main/resources/bootstrap.yml +++ b/collector-scheduling-management/src/main/resources/bootstrap.yml @@ -13,8 +13,8 @@ spring: #公司病案的文件服务数据库 master: url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai - username: root - password: root + username: docus + password: docus702 driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy type: com.alibaba.druid.pool.DruidDataSource servlet: @@ -23,7 +23,7 @@ spring: max-request-size: 200MB redis: host: redis.docus.cn -# password: JSdocus@702 + password: JSdocus@702 cloud: nacos: discovery: @@ -60,7 +60,7 @@ netty: all-idle-time-seconds: 0 file: - uploadFolder: /Users/linruifeng/workspace/ + uploadFolder: D://docus/ docus: redisKeyExpiration: true diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/LoadPackageCommandLineRunner.java b/collector-terminal-management/src/main/java/com/docus/server/common/LoadPackageCommandLineRunner.java index 9297718..63be06e 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/LoadPackageCommandLineRunner.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/LoadPackageCommandLineRunner.java @@ -152,7 +152,7 @@ public class LoadPackageCommandLineRunner implements CommandLineRunner { if (collectorVoMap.containsKey(loadSchCollectorVO.getCollectorId())) { LoadSchCollectorVO redisLoadSchCollectorVO = collectorVoMap.get(loadSchCollectorVO.getCollectorId()); if (!redisLoadSchCollectorVO.getCollectorVersionId().equals(loadSchCollectorVO.getCollectorVersionId())) { - actionDownLoad(loadSchCollectorVO); + downloadCollectorPackages(loadSchCollectorVO); } } } @@ -163,12 +163,12 @@ public class LoadPackageCommandLineRunner implements CommandLineRunner { private void updateAll(List loadSchCollectorVOList) throws Exception { StopWatch watch = new StopWatch(); for (LoadSchCollectorVO loadSchCollectorVO : loadSchCollectorVOList) { - actionDownLoad(loadSchCollectorVO); + downloadCollectorPackages(loadSchCollectorVO); } log.info("终端全量下载采集器更新包耗时:{}ms", watch.elapsedTime()); } - private void actionDownLoad(LoadSchCollectorVO loadSchCollectorVO) throws Exception { + private void downloadCollectorPackages(LoadSchCollectorVO loadSchCollectorVO) throws Exception { DOWNLOAD_LOCK.lock(); try { //下载更新包 diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java deleted file mode 100644 index ef7c42a..0000000 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.docus.server.common.netty; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.io.Serializable; - -@Builder -@NoArgsConstructor -@AllArgsConstructor -@Data -public class CommMsg implements Serializable { - - public String messageType; - - public String messageTime; - - public MSG_CONTENT content; - -} diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java index 93e1b7e..43c9ab7 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java @@ -1,7 +1,8 @@ package com.docus.server.common.netty.client; import com.docus.core.util.json.JSON; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.CommMsg; +import com.docus.server.common.netty.server.CollectorChannelCacheMap; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -12,6 +13,8 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.env.Environment; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -34,6 +37,16 @@ public class NettyClient { private SocketChannel socketChannel; + @Resource + private CollectorChannelCacheMap collectorChannelCacheMap; + + @Value("${docus.collector-package-download-savePath:H://packages/}") + private String saveCollectorPackagePath; + @Resource + private StringRedisTemplate template; + @Resource + private Environment env; + /** * 发送消息给服务端 * @@ -68,4 +81,16 @@ public class NettyClient { }); socketChannel = (SocketChannel) future.channel(); } + + public String getRedisMsg() { + return template.opsForValue().get("docus:collectorsPackagesDownload:" + env.getProperty("server.port")); + } + + public CollectorChannelCacheMap getCollectorChannelCacheMap() { + return collectorChannelCacheMap; + } + + public String getSaveCollectorPackagePath() { + return saveCollectorPackagePath; + } } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java index 75d34ca..c9bedac 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java @@ -1,9 +1,13 @@ package com.docus.server.common.netty.client; +import com.docus.server.common.JsonDecoder; +import com.docus.server.common.JsonEncoder; import com.docus.server.common.netty.client.handler.ClientHandler; import com.docus.server.common.netty.client.handler.HeartbeatHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.timeout.IdleStateHandler; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; @@ -30,10 +34,20 @@ public class NettyClientInitializer extends ChannelInitializer { // .addLast(new ProtobufDecoder(Payload.Message.getDefaultInstance())) // .addLast(new ProtobufVarint32LengthFieldPrepender()) // .addLast(new ProtobufEncoder()) +// .addLast(new LineBasedFrameDecoder(2048)) +// .addLast(new LineBasedFrameDecoder(1024)) +// .addLast(new StringDecoder(CharsetUtil.UTF_8)) + .addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2)) + // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段 + .addLast(new LengthFieldPrepender(2)) + // 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到UserInfo对象 + .addLast(new JsonDecoder()) + // 对响应数据进行编码,主要是将UserInfo对象序列化为json + .addLast(new JsonEncoder()) // 加载心跳处理器 .addLast(new HeartbeatHandler(nettyClient, nettyProperties, appName)) // 加载业务处理器 - .addLast(new ClientHandler()) + .addLast(new ClientHandler(nettyClient)) .addLast(); } diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java index 7c8dbb0..e6ebb0f 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java @@ -1,40 +1,50 @@ package com.docus.server.common.netty.client.handler; -import com.docus.core.util.StringUtils; +import com.docus.core.util.Func; +import com.docus.core.util.ListUtils; import com.docus.core.util.json.JSON; +import com.docus.server.common.CommMsg; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.netty.client.NettyClient; import com.docus.server.common.netty.server.CollectorChannelCacheMap; +import com.docus.server.common.utils.StartUpExeUtils; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; -import io.netty.buffer.ByteBuf; +import com.docus.server.vo.scheduling.management.schcollector.LoadSchCollectorVO; +import com.fasterxml.jackson.core.type.TypeReference; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import javax.annotation.Resource; +import java.io.Serializable; +import java.util.List; +import java.util.Map; @Slf4j -public class ClientHandler extends SimpleChannelInboundHandler { - @Resource - private CollectorChannelCacheMap collectorChannelCacheMap; +@AllArgsConstructor +@NoArgsConstructor +public class ClientHandler extends SimpleChannelInboundHandler> { - @Override - protected void channelRead0(ChannelHandlerContext context, ByteBuf msg) throws Exception { - ByteBuf buf = (ByteBuf) msg; - //创建目标大小的数组 - byte[] barray = new byte[buf.readableBytes()]; - //把数据从bytebuf转移到byte[] - buf.getBytes(0, barray); - //将byte[]转成字符串用于打印 - String message = new String(barray); - - //空消息不处理 - if (!StringUtils.hasText(message)) { - return; - } + private NettyClient nettyClient; - CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); + @Override + protected void channelRead0(ChannelHandlerContext context, CommMsg commMsg) throws Exception { +// ByteBuf buf = (ByteBuf) msg; +// //创建目标大小的数组 +// byte[] barray = new byte[buf.readableBytes()]; +// //把数据从bytebuf转移到byte[] +// buf.getBytes(0, barray); +// //将byte[]转成字符串用于打印 +// String message = new String(barray); +// +// //空消息不处理 +// if (!StringUtils.hasText(message)) { +// return; +// } + +// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); String messageType = commMsg.getMessageType(); String messageTime = commMsg.getMessageTime(); @@ -43,33 +53,61 @@ public class ClientHandler extends SimpleChannelInboundHandler { log.info("======== 【终端客户端-收到调度器消息】, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== "); if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) { - log.info("【终端客户端-收到调度器下发任务】接受到终端重启命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】接受到终端重启命令,内容={}", messageContent); } else if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) { - log.info("【终端客户端-收到调度器下发任务】收到采集器重启命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】收到采集器重启命令,内容={}", messageContent); } else if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) { - log.info("【终端客户端-收到调度器下发任务】收到虚拟机重启命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】收到虚拟机重启命令,内容={}", messageContent); } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) { - log.info("【终端客户端-收到调度器下发任务】收到更新采集器文件命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】收到更新采集器文件命令,内容={}", messageContent); } else if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) { - log.info("【终端客户端-收到调度器下发任务】收到更新采集器配置命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】收到更新采集器配置命令,内容={}", messageContent); } else if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) { - log.info("【终端客户端-收到调度器下发任务】收到采集调度器下发任务命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】收到采集调度器下发任务命令,内容={}", messageContent); //下发任务: 判断这个任务采集器类型在指定目录是否存在、 //如果不存在:下拉该采集器类型最新的版本更新包到指定的部署路径,并且启动该exe采集器,将任务tcp给对应采集器 //如果存在: - SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(JSON.toJSON(messageContent), SchCollectorTaskDTO.class); + SchCollectorTaskDTO schCollectorTaskDTO = JSON.fromJSON(messageContent, SchCollectorTaskDTO.class); String collectorId = schCollectorTaskDTO.getTaskInfo().getTasks().get(0).getCollectorId(); - collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); + + Long compareCollectorId = Long.valueOf(collectorId); + + String packageValue = nettyClient.getRedisMsg(); + CollectorChannelCacheMap collectorChannelCacheMap = nettyClient.getCollectorChannelCacheMap(); + String saveCollectorPackagePath = nettyClient.getSaveCollectorPackagePath(); + + List redisLoadSchCollectors = JSON.fromJSONWithGeneric(packageValue, new TypeReference>() { + }); + + Map collectorVoMap = ListUtils.toMap(redisLoadSchCollectors, LoadSchCollectorVO::getCollectorId); + + if (collectorVoMap.containsKey(compareCollectorId)) { + LoadSchCollectorVO loadSchCollectorVO = collectorVoMap.get(compareCollectorId); + String processName = loadSchCollectorVO.getProcessName(); + + if (Func.isNotBlank(processName)) { + boolean isExistProcess = StartUpExeUtils.checkProcessOnly(processName); + //存在指定采集器id进程 + if (isExistProcess) { + collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); + } else { + //不存在进程,则启动指定采集器进程 + StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName); + collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); + } + } + } + } else if (messageType.equals(MsgConstants.HAS_VALID_COLLECTOR)) { - log.info("【终端客户端-收到调度器下发任务】是否有可用类型的采集器命令,内容={}", messageContent); + log.info("【终端客户端-收到调度器消息】是否有可用类型的采集器命令,内容={}", messageContent); //下发任务: 判断这个任务采集器类型在指定目录是否存在、 diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java index 3384216..9805364 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java @@ -1,19 +1,16 @@ package com.docus.server.common.netty.client.handler; import com.docus.core.util.DateUtil; -import com.docus.core.util.json.JSON; +import com.docus.server.common.CommMsg; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.client.NettyClient; import com.docus.server.common.netty.client.NettyClientProperties; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.CharsetUtil; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -46,7 +43,8 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { .content(appName + " 我来了") .build(); - ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(onlineRegister), CharsetUtil.UTF_8)); +// ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(onlineRegister), CharsetUtil.UTF_8)); + ctx.writeAndFlush(onlineRegister); super.channelActive(ctx); } @@ -64,7 +62,8 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { .build(); //发送心跳消息,并在发送失败时关闭该接连 - ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(heartbeat), CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); +// ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(heartbeat), CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super.userEventTriggered(ctx, evt); diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java index 4a80006..3bbe9c1 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/CollectorChannelCacheMap.java @@ -1,7 +1,7 @@ package com.docus.server.common.netty.server; import com.docus.core.util.json.JSON; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.CommMsg; import com.docus.server.common.netty.state.DeviceStateContext; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java index 04eb2ca..3355d63 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java @@ -1,8 +1,12 @@ package com.docus.server.common.netty.server; +import com.docus.server.common.JsonDecoder; +import com.docus.server.common.JsonEncoder; import com.docus.server.common.netty.server.handler.NettyBusinessHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.stereotype.Component; @@ -17,10 +21,8 @@ public class NettyServerInitializer extends ChannelInitializer { @Resource private NettyServerProperties serverProperties; - @Resource private NettyBusinessHandler businessHandler; - @Resource private CollectorChannelCacheMap channelRepository; @@ -40,9 +42,15 @@ public class NettyServerInitializer extends ChannelInitializer { // .addLast(new ProtobufEncoder()) // .addLast(new LineBasedFrameDecoder(2048)) -// .addLast(new StringDecoder()) -// .addLast(new StringEncoder()) - +// .addLast(new LineBasedFrameDecoder(1024)) +// .addLast(new StringDecoder(CharsetUtil.UTF_8)) + .addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2)) + // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段 + .addLast(new LengthFieldPrepender(2)) + // 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到UserInfo对象 + .addLast(new JsonDecoder()) + // 对响应数据进行编码,主要是将UserInfo对象序列化为json + .addLast(new JsonEncoder()) // 加载业务处理器 // .addLast(new NettyHeartbeatHandler(channelRepository)) .addLast(businessHandler); diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java index 600aa47..db6578f 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java @@ -1,7 +1,7 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.json.JSON; -import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.CommMsg; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index c148133..4a959af 100644 --- a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -2,15 +2,13 @@ package com.docus.server.common.netty.server.handler; import com.docus.core.util.DateUtil; import com.docus.core.util.Func; -import com.docus.core.util.StringUtils; import com.docus.core.util.json.JSON; import com.docus.server.api.scheduling.management.SchCollectRecordApi; +import com.docus.server.common.CommMsg; import com.docus.server.common.MsgConstants; -import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; @@ -35,7 +33,7 @@ import java.util.Date; @Slf4j @ChannelHandler.Sharable @Component -public class NettyBusinessHandler extends SimpleChannelInboundHandler { +public class NettyBusinessHandler extends SimpleChannelInboundHandler> { private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @@ -45,22 +43,22 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { private SchCollectRecordApi schCollectRecordApi; @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, CommMsg commMsg) throws Exception { DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); - ByteBuf buf = (ByteBuf) msg; - //创建目标大小的数组 - byte[] barray = new byte[buf.readableBytes()]; - //把数据从bytebuf转移到byte[] - buf.getBytes(0, barray); - //将byte[]转成字符串用于打印 - String message = new String(barray); - - //空消息不处理 - if (!StringUtils.hasText(message)) { - return; - } - - CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); +// ByteBuf buf = (ByteBuf) msg; +// //创建目标大小的数组 +// byte[] barray = new byte[buf.readableBytes()]; +// //把数据从bytebuf转移到byte[] +// buf.getBytes(0, barray); +// //将byte[]转成字符串用于打印 +// String message = new String(barray); +// +// //空消息不处理 +// if (!StringUtils.hasText(message)) { +// return; +// } + +// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class); String messageType = commMsg.getMessageType(); String messageTime = commMsg.getMessageTime(); @@ -91,7 +89,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { //============状态为上行数据============= deviceStateContext.onHeartbeat(System.currentTimeMillis(), "采集器上行了数据"); //==============处理数据================ - System.out.println("收到数据:" + msg); + System.out.println("收到数据:" + JSON.toJSON(commMsg)); //============返回消息================== CommMsg resp = CommMsg.builder() @@ -185,7 +183,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { DEFAULT_CHANNEL_GROUP.add(ctx.channel()); - System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); + System.out.println(ctx.channel().remoteAddress() + " 上线," + "【终端-采集器】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); } /** @@ -194,7 +192,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); - System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); + System.out.println(channel.remoteAddress() + " 下线," + "【终端-采集器】在线数量:" + DEFAULT_CHANNEL_GROUP.size()); String collectorId = collectorChannelCacheMap.getClientKey(channel); log.error("客户端下线,终端连接:{}", collectorId); //移除终端,终端离线 diff --git a/collector-terminal-management/src/main/resources/bootstrap.yml b/collector-terminal-management/src/main/resources/bootstrap.yml index a9036de..7769e6b 100644 --- a/collector-terminal-management/src/main/resources/bootstrap.yml +++ b/collector-terminal-management/src/main/resources/bootstrap.yml @@ -13,8 +13,8 @@ spring: #公司病案的文件服务数据库 master: url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai - username: root - password: root + username: docus + password: docus702 driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy type: com.alibaba.druid.pool.DruidDataSource servlet: @@ -23,7 +23,7 @@ spring: max-request-size: 200MB redis: host: redis.docus.cn - # password: JSdocus@702 + password: JSdocus@702 cloud: nacos: discovery: @@ -67,7 +67,9 @@ netty: all-idle-time-seconds: 0 file: - uploadFolder: /Users/linruifeng/workspace/ + uploadFolder: D://docus/ docus: - vm-task-cron: 0/15 * * * * ? + vm-task-cron: 0/30 * * * * ? + collector-package-download-url: http://192.168.16.110:9113 + collector-package-download-savePath: H:\\packages\\ \ No newline at end of file diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java b/docus-api-common/src/main/java/com/docus/server/common/CommMsg.java similarity index 90% rename from collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java rename to docus-api-common/src/main/java/com/docus/server/common/CommMsg.java index ef7c42a..862de67 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java +++ b/docus-api-common/src/main/java/com/docus/server/common/CommMsg.java @@ -1,4 +1,4 @@ -package com.docus.server.common.netty; +package com.docus.server.common; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/docus-api-common/src/main/java/com/docus/server/common/JsonDecoder.java b/docus-api-common/src/main/java/com/docus/server/common/JsonDecoder.java new file mode 100644 index 0000000..1e196ec --- /dev/null +++ b/docus-api-common/src/main/java/com/docus/server/common/JsonDecoder.java @@ -0,0 +1,22 @@ +package com.docus.server.common; + +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.CharsetUtil; + +import java.util.List; + +public class JsonDecoder extends MessageToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) throws Exception { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + CommMsg user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), CommMsg.class); + if (null != user) { + out.add(user); + } + } +} diff --git a/docus-api-common/src/main/java/com/docus/server/common/JsonEncoder.java b/docus-api-common/src/main/java/com/docus/server/common/JsonEncoder.java new file mode 100644 index 0000000..e5088a4 --- /dev/null +++ b/docus-api-common/src/main/java/com/docus/server/common/JsonEncoder.java @@ -0,0 +1,20 @@ +package com.docus.server.common; + +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.io.Serializable; + +public class JsonEncoder extends MessageToByteEncoder> { + + @Override + protected void encode(ChannelHandlerContext ctx, CommMsg user, ByteBuf buf) throws Exception { + if (null != user) { + String json = JSON.toJSONString(user); + ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes())); + } + } +} diff --git a/docus-api-common/src/main/java/com/docus/server/common/util/LogUtil.java b/docus-api-common/src/main/java/com/docus/server/common/util/LogUtil.java new file mode 100644 index 0000000..3b96afe --- /dev/null +++ b/docus-api-common/src/main/java/com/docus/server/common/util/LogUtil.java @@ -0,0 +1,185 @@ +package com.docus.server.common.util; + +import com.baomidou.mybatisplus.extension.api.Assert; + +import java.io.PrintWriter; +import java.io.StringWriter; + +public abstract class LogUtil { + + private static String getStackTrace(Throwable t) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + try { + t.printStackTrace(pw); + return sw.toString(); + } finally { + pw.close(); + } + } + + public static String exceptionMsg(String msg, Throwable e) { + return msg + ",异常消息[" + getStackTrace(e) + "]"; + } + + public static String exceptionSimpleMsg(String msg, Throwable e) { + return msg + ",异常消息[" + e.getLocalizedMessage() + "]"; + } + + /** + * 获取计算程序运行的时钟 + * + * @return clock + */ + public static Clock getClock() { + return new Clock() { + /**开始时间*/ + private long startTime = System.nanoTime(); + + /** + * 消耗时间[毫秒] + * @return 消耗时间 + */ + public long consume() { + return LogUtil.consume(startTime, System.nanoTime()); + } + + /** + * 结束时间 + */ + public long getEnd() { + return System.nanoTime(); + } + + /** + * 开始时间 + */ + public long getStart() { + return startTime; + } + + public long consumeSecond() { + return LogUtil.consumeSecond(startTime, System.nanoTime()); + } + }; + } + + + /** + * 消耗时间[纳秒] + * + * @return 消耗时间 秒 + */ + public static long consumeNano(long startTime, long endTime) { + return (endTime - startTime) / 1000000000 + 1; + } + + /** + * 消耗时间[微秒] + * + * @return 消耗时间 秒 + */ + public static long consume(long startTime, long endTime) { + return (endTime - startTime) / 1000000 + 1; + } + + /** + * 消耗时间[耗秒] + * + * @return 消耗时间 秒 + */ + public static long consumeSecond(long startTime, long endTime) { + return (endTime - startTime) / 1000 + 1; + } + + /** + * getRootCause + * 获取异常的原始异常,用于反馈异常的真实原因。 + * + * @param ex 异常信息 + * @return 原始异常,最底层异常 + * @author 林小松 + */ + public static Throwable getRootCause(Throwable ex) { + Throwable cause = ex; + while (cause.getCause() != null) { + cause = cause.getCause(); + } + return cause; + } + + /** + * 递归寻找根异常异常信息 + * + * @param ex 异常 + * @return 异常信息 + */ + public static String traceRootExceptionInfo(Throwable ex) { + Throwable cause = ex; + while (cause.getCause() != null) { + cause = cause.getCause(); + } + return cause.toString(); + } + + /** + * 递归寻找引起异常的所有异常信息 + * + * @param ex 异常 + * @return 异常信息 + */ + public static String tarceExceptionInfo(Throwable ex) { + StringBuffer sb = new StringBuffer(); + Throwable cause = ex; + sb.insert(0, cause); + while (cause.getCause() != null) { + cause = cause.getCause(); + sb.insert(0, cause.toString()); + } + return sb.toString(); + } + + public static interface Clock { + + + /** + * 返回程序消耗的毫秒数 + * + * @return 消耗毫秒数 + */ + long consume(); + + /** + * 返回程序消耗的秒数 + * + * @return 消耗秒数 + */ + long consumeSecond(); + + /** + * 返回开始时间 + * + * @return 开始时间 + * @author:lwei + */ + long getStart(); + + /** + * 返回结束时间 + * + * @return 结束时间 + * @author:lwei + */ + long getEnd(); + } + + public static void main(String[] args) { + try { + Assert.fail("dd"); + } catch (Throwable e) { + System.out.println(exceptionMsg("aa", e)); + ; + } + } + +} \ No newline at end of file diff --git a/docus-api-common/src/main/java/com/docus/server/common/util/io/SerializableMsgCodec.java b/docus-api-common/src/main/java/com/docus/server/common/util/io/SerializableMsgCodec.java new file mode 100644 index 0000000..68d2944 --- /dev/null +++ b/docus-api-common/src/main/java/com/docus/server/common/util/io/SerializableMsgCodec.java @@ -0,0 +1,47 @@ +package com.docus.server.common.util.io; + +import com.baomidou.mybatisplus.extension.api.Assert; +import com.docus.core.util.pdf.util.IOUtils; +import com.docus.server.common.util.LogUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class SerializableMsgCodec { + + public Serializable decode(byte[] bytes) { + ObjectInputStream ois = null; + try { + ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + + return (Serializable) ois.readObject(); + } catch (Exception e) { + Assert.fail(LogUtil.exceptionMsg("对象序列化异常,异常消息", e)); + return null; + } finally { + IOUtils.close(ois); + } + + } + + public byte[] encode(Serializable obj) { + ObjectOutputStream oos = null; + ByteArrayOutputStream baos = null; + try { + baos = new ByteArrayOutputStream(); + oos = new ObjectOutputStream(baos); + oos.writeObject(obj); + byte[] bytes = baos.toByteArray(); + return bytes; + } catch (Exception e) { + Assert.fail(LogUtil.exceptionMsg("对象序列化异常,异常消息", e)); + return null; + } finally { + IOUtils.close(baos); + IOUtils.close(oos); + } + } +} diff --git a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollector/task/ReportDownTwoDTO.java b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollector/task/ReportDownTwoDTO.java index 22f891f..1ef9df3 100644 --- a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollector/task/ReportDownTwoDTO.java +++ b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schcollector/task/ReportDownTwoDTO.java @@ -1,5 +1,6 @@ package com.docus.server.dto.scheduling.management.schcollector.task; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -20,7 +21,7 @@ import java.util.Map; @Data @ApiModel(value = "ReportDownTwoDTO", description = "下发任务") public class ReportDownTwoDTO { - + @JsonIgnore @ApiModelProperty(value = "扩展参数") private Map params;