From f5db2168f34cdeb54ce7c75f4c67524800d4fd54 Mon Sep 17 00:00:00 2001 From: beeajax <1105173470@qq.com> Date: Sun, 16 Jul 2023 17:06:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9netty=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E5=92=8C=E8=A7=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../docus/server/common/netty/CommMsg.java | 16 ++++ .../docus/server/common/netty/Payload.java | 46 +++++------ .../netty/server/ChannelRepository.java | 4 + .../netty/server/NettyServerInitializer.java | 19 ++--- .../server/handler/EchoServerHandler.java | 77 +++++++++++++++++++ .../server/handler/NettyBusinessHandler.java | 72 ++++++++++++++++- .../server/handler/NettyHeartbeatHandler.java | 19 ++--- .../common/netty/server/handler/Test.java | 32 ++++++++ 8 files changed, 242 insertions(+), 43 deletions(-) create mode 100644 collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java create mode 100644 collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java create mode 100644 collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/Test.java diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java new file mode 100644 index 0000000..ad80d91 --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/CommMsg.java @@ -0,0 +1,16 @@ +package com.docus.server.common.netty; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class CommMsg implements Serializable { + + public String messageType; + + public String messageTime; + + public MSG_CONTENT content; + +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/Payload.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/Payload.java index 5d56684..603e6c1 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/Payload.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/Payload.java @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: Message.proto +// source: CommMsg.proto package com.docus.server.common.netty; @@ -15,7 +15,7 @@ public final class Payload { (com.google.protobuf.ExtensionRegistryLite) registry); } public interface MessageOrBuilder extends - // @@protoc_insertion_point(interface_extends:Message) + // @@protoc_insertion_point(interface_extends:CommMsg) com.google.protobuf.MessageOrBuilder { /** @@ -31,12 +31,12 @@ public final class Payload { getClientBytes(); /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return The enum numeric value on the wire for cmd. */ int getCmdValue(); /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return The cmd. */ Message.type getCmd(); @@ -54,14 +54,14 @@ public final class Payload { getContentBytes(); } /** - * Protobuf type {@code Message} + * Protobuf type {@code CommMsg} */ public static final class Message extends com.google.protobuf.GeneratedMessageV3 implements - // @@protoc_insertion_point(message_implements:Message) + // @@protoc_insertion_point(message_implements:CommMsg) MessageOrBuilder { private static final long serialVersionUID = 0L; - // Use Message.newBuilder() to construct. + // Use CommMsg.newBuilder() to construct. private Message(com.google.protobuf.GeneratedMessageV3.Builder builder) { super(builder); } @@ -152,7 +152,7 @@ public final class Payload { } /** - * Protobuf enum {@code Message.type} + * Protobuf enum {@code CommMsg.type} */ public enum type implements com.google.protobuf.ProtocolMessageEnum { @@ -341,7 +341,7 @@ public final class Payload { this.value = value; } - // @@protoc_insertion_point(enum_scope:Message.type) + // @@protoc_insertion_point(enum_scope:CommMsg.type) } public static final int CLIENT_FIELD_NUMBER = 1; @@ -385,14 +385,14 @@ public final class Payload { public static final int CMD_FIELD_NUMBER = 2; private int cmd_; /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return The enum numeric value on the wire for cmd. */ @Override public int getCmdValue() { return cmd_; } /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return The cmd. */ @Override public type getCmd() { @@ -614,11 +614,11 @@ public final class Payload { return builder; } /** - * Protobuf type {@code Message} + * Protobuf type {@code CommMsg} */ public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder implements - // @@protoc_insertion_point(builder_implements:Message) + // @@protoc_insertion_point(builder_implements:CommMsg) MessageOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { @@ -633,7 +633,7 @@ public final class Payload { Message.class, Builder.class); } - // Construct using com.venus.common.protobuf.Payload.Message.newBuilder() + // Construct using com.venus.common.protobuf.Payload.CommMsg.newBuilder() private Builder() { maybeForceBuilderInitialization(); } @@ -854,14 +854,14 @@ public final class Payload { private int cmd_ = 0; /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return The enum numeric value on the wire for cmd. */ @Override public int getCmdValue() { return cmd_; } /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @param value The enum numeric value on the wire for cmd to set. * @return This builder for chaining. */ @@ -872,7 +872,7 @@ public final class Payload { return this; } /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return The cmd. */ @Override @@ -882,7 +882,7 @@ public final class Payload { return result == null ? type.UNRECOGNIZED : result; } /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @param value The cmd to set. * @return This builder for chaining. */ @@ -896,7 +896,7 @@ public final class Payload { return this; } /** - * .Message.type cmd = 2; + * .CommMsg.type cmd = 2; * @return This builder for chaining. */ public Builder clearCmd() { @@ -996,10 +996,10 @@ public final class Payload { } - // @@protoc_insertion_point(builder_scope:Message) + // @@protoc_insertion_point(builder_scope:CommMsg) } - // @@protoc_insertion_point(class_scope:Message) + // @@protoc_insertion_point(class_scope:CommMsg) private static final Message DEFAULT_INSTANCE; static { DEFAULT_INSTANCE = new Message(); @@ -1050,8 +1050,8 @@ public final class Payload { descriptor; static { String[] descriptorData = { - "\n\rMessage.proto\"\260\001\n\007Message\022\016\n\006client\030\001 " + - "\001(\t\022\032\n\003cmd\030\002 \001(\0162\r.Message.type\022\017\n\007conte" + + "\n\rCommMsg.proto\"\260\001\n\007Message\022\016\n\006client\030\001 " + + "\001(\t\022\032\n\003cmd\030\002 \001(\0162\r.CommMsg.type\022\017\n\007conte" + "nt\030\003 \001(\t\"h\n\004type\022\025\n\021HEARTBEAT_REQUEST\020\000\022" + "\026\n\022HEARTBEAT_RESPONSE\020\001\022\010\n\004AUTH\020\002\022\t\n\005PRI" + "NT\020\003\022\016\n\nPRINT_DONE\020\004\022\014\n\010WHATEVER\020\005B$\n\031co" + diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java index 2755879..59b19bc 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java @@ -44,6 +44,10 @@ public class ChannelRepository { return CHANNEL_CACHE_MAP.get(key); } + public Map getAll() { + return CHANNEL_CACHE_MAP; + } + /** * 工控机离线,工控机上所有设备离线 * diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java index e4585f4..04f4600 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java @@ -1,14 +1,9 @@ package com.docus.server.common.netty.server; -import com.docus.server.common.netty.Payload; import com.docus.server.common.netty.server.handler.NettyBusinessHandler; import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.stereotype.Component; @@ -40,12 +35,18 @@ public class NettyServerInitializer extends ChannelInitializer { TimeUnit.SECONDS) ) // 加载加码解码处理器,同时解决粘包拆包问题 - .addLast(new ProtobufVarint32FrameDecoder()) - .addLast(new ProtobufDecoder(Payload.Message.getDefaultInstance())) - .addLast(new ProtobufVarint32LengthFieldPrepender()) - .addLast(new ProtobufEncoder()) +// .addLast(new ProtobufVarint32FrameDecoder()) +// .addLast(new ProtobufDecoder(Payload.CommMsg.getDefaultInstance())) +// .addLast(new ProtobufVarint32LengthFieldPrepender()) +// .addLast(new ProtobufEncoder()) + +// .addLast(new LineBasedFrameDecoder(2048)) +// .addLast(new StringDecoder()) +// .addLast(new StringEncoder()) + // 加载业务处理器 .addLast(new NettyHeartbeatHandler(channelRepository)) .addLast(businessHandler); +// .addLast(new EchoServerHandler()); } } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java new file mode 100644 index 0000000..249f1c1 --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java @@ -0,0 +1,77 @@ +package com.docus.server.common.netty.server.handler; + +import com.docus.core.util.json.JSON; +import com.docus.server.common.netty.CommMsg; +import com.fasterxml.jackson.core.type.TypeReference; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.CharsetUtil; + +import java.util.Date; + +public class EchoServerHandler extends ChannelInboundHandlerAdapter { + + + //客户端连接上来,没有心跳前,属于离线状态,有心跳后,才属于在线状态。 + //客户端断开后,自动剔除,agent。 + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelRegistered(); + Channel incoming = ctx.channel(); + System.out.println("NettyClient:" + incoming.remoteAddress() + "在线1"); + } + + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelUnregistered(); + Channel incoming = ctx.channel(); + System.out.println("NettyClient:" + incoming.remoteAddress() + "在线2"); + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelActive(); + Channel incoming = ctx.channel(); + System.out.println("NettyClient:" + incoming.remoteAddress() + "在线3"); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + ByteBuf buf = (ByteBuf) msg; + //创建目标大小的数组 + byte[] barray = new byte[buf.readableBytes()]; + //把数据从bytebuf转移到byte[] + buf.getBytes(0, barray); + //将byte[]转成字符串用于打印 + String str = new String(barray); + + CommMsg commMsg = JSON.fromJSONWithGeneric(str, new TypeReference>() { + }); + + if (str.length() > 0) { + System.out.println(str); + System.out.println("收到消息回复一条消息给客户端"); + System.out.println("client channelActive.."); + ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端" + new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush + System.out.flush(); + } else { + System.out.println("不能读啊"); + } + } finally { + } + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.println("server occur exception:" + cause.getMessage()); + cause.printStackTrace(); + ctx.close(); // 关闭发生异常的连接 + } +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index ca77e1a..36f1cc1 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -1,11 +1,19 @@ package com.docus.server.common.netty.server.handler; +import com.docus.core.util.StringUtils; +import com.docus.core.util.json.JSON; +import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.Payload; import com.docus.server.common.netty.server.ChannelRepository; +import com.fasterxml.jackson.core.type.TypeReference; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -18,12 +26,13 @@ import java.net.InetSocketAddress; @Slf4j @ChannelHandler.Sharable @Component -public class NettyBusinessHandler extends SimpleChannelInboundHandler { +public class NettyBusinessHandler extends SimpleChannelInboundHandler { + + private static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Resource private ChannelRepository repository; - @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Payload.Message message) { if (message.getCmd() == Payload.Message.type.AUTH) { log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent()); @@ -44,6 +53,64 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler commMsg = JSON.fromJSONWithGeneric(message, new TypeReference>() { + }); + + String messageType = commMsg.getMessageType(); + +// if (message.getCmd() == Payload.Message.type.AUTH) { +// log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent()); +// InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress(); +// String clientIp = ipSocket.getAddress().getHostAddress(); +// log.info("工控机:{},连接上线,IP地址信息:{}", message.getClient(), clientIp); +// Channel channel = repository.get(message.getClient()); +// if (channel != null && channel.isOpen()) { +// channel.close(); +// } +// repository.put(message.getClient(), channelHandlerContext.channel()); +// } +// if (message.getCmd() == Payload.Message.type.PRINT_DONE) { +// log.info("接受到打印回馈指令,内容{}", message.getContent()); +// } +// if (message.getCmd() == Payload.Message.type.WHATEVER) { +// log.info("收到测试消息:{}", message.getContent()); +// } + + //心跳 + //注册 + //业务消息 + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + + GROUP.add(channel); + System.out.println(channel.remoteAddress() + " 上线," + "在线数量:" + GROUP.size()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // 获取到当前要断开连接的Channel + Channel channel = ctx.channel(); + System.out.println(channel.remoteAddress() + "下线," + "在线数量:" + GROUP.size()); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); @@ -53,4 +120,5 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler