修改netty编码和解码

segment2.0
beeajax 2 years ago
parent edd19c1de4
commit f5db2168f3

@ -0,0 +1,16 @@
package com.docus.server.common.netty;
import lombok.Data;
import java.io.Serializable;
@Data
public class CommMsg<MSG_CONTENT extends Serializable> implements Serializable {
public String messageType;
public String messageTime;
public MSG_CONTENT content;
}

@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT! // Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Message.proto // source: CommMsg.proto
package com.docus.server.common.netty; package com.docus.server.common.netty;
@ -15,7 +15,7 @@ public final class Payload {
(com.google.protobuf.ExtensionRegistryLite) registry); (com.google.protobuf.ExtensionRegistryLite) registry);
} }
public interface MessageOrBuilder extends public interface MessageOrBuilder extends
// @@protoc_insertion_point(interface_extends:Message) // @@protoc_insertion_point(interface_extends:CommMsg)
com.google.protobuf.MessageOrBuilder { com.google.protobuf.MessageOrBuilder {
/** /**
@ -31,12 +31,12 @@ public final class Payload {
getClientBytes(); getClientBytes();
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return The enum numeric value on the wire for cmd. * @return The enum numeric value on the wire for cmd.
*/ */
int getCmdValue(); int getCmdValue();
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return The cmd. * @return The cmd.
*/ */
Message.type getCmd(); Message.type getCmd();
@ -54,14 +54,14 @@ public final class Payload {
getContentBytes(); getContentBytes();
} }
/** /**
* Protobuf type {@code Message} * Protobuf type {@code CommMsg}
*/ */
public static final class Message extends public static final class Message extends
com.google.protobuf.GeneratedMessageV3 implements com.google.protobuf.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:Message) // @@protoc_insertion_point(message_implements:CommMsg)
MessageOrBuilder { MessageOrBuilder {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;
// Use Message.newBuilder() to construct. // Use CommMsg.newBuilder() to construct.
private Message(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) { private Message(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
super(builder); super(builder);
} }
@ -152,7 +152,7 @@ public final class Payload {
} }
/** /**
* Protobuf enum {@code Message.type} * Protobuf enum {@code CommMsg.type}
*/ */
public enum type public enum type
implements com.google.protobuf.ProtocolMessageEnum { implements com.google.protobuf.ProtocolMessageEnum {
@ -341,7 +341,7 @@ public final class Payload {
this.value = value; this.value = value;
} }
// @@protoc_insertion_point(enum_scope:Message.type) // @@protoc_insertion_point(enum_scope:CommMsg.type)
} }
public static final int CLIENT_FIELD_NUMBER = 1; public static final int CLIENT_FIELD_NUMBER = 1;
@ -385,14 +385,14 @@ public final class Payload {
public static final int CMD_FIELD_NUMBER = 2; public static final int CMD_FIELD_NUMBER = 2;
private int cmd_; private int cmd_;
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return The enum numeric value on the wire for cmd. * @return The enum numeric value on the wire for cmd.
*/ */
@Override public int getCmdValue() { @Override public int getCmdValue() {
return cmd_; return cmd_;
} }
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return The cmd. * @return The cmd.
*/ */
@Override public type getCmd() { @Override public type getCmd() {
@ -614,11 +614,11 @@ public final class Payload {
return builder; return builder;
} }
/** /**
* Protobuf type {@code Message} * Protobuf type {@code CommMsg}
*/ */
public static final class Builder extends public static final class Builder extends
com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:Message) // @@protoc_insertion_point(builder_implements:CommMsg)
MessageOrBuilder { MessageOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() { getDescriptor() {
@ -633,7 +633,7 @@ public final class Payload {
Message.class, Builder.class); Message.class, Builder.class);
} }
// Construct using com.venus.common.protobuf.Payload.Message.newBuilder() // Construct using com.venus.common.protobuf.Payload.CommMsg.newBuilder()
private Builder() { private Builder() {
maybeForceBuilderInitialization(); maybeForceBuilderInitialization();
} }
@ -854,14 +854,14 @@ public final class Payload {
private int cmd_ = 0; private int cmd_ = 0;
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return The enum numeric value on the wire for cmd. * @return The enum numeric value on the wire for cmd.
*/ */
@Override public int getCmdValue() { @Override public int getCmdValue() {
return cmd_; return cmd_;
} }
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @param value The enum numeric value on the wire for cmd to set. * @param value The enum numeric value on the wire for cmd to set.
* @return This builder for chaining. * @return This builder for chaining.
*/ */
@ -872,7 +872,7 @@ public final class Payload {
return this; return this;
} }
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return The cmd. * @return The cmd.
*/ */
@Override @Override
@ -882,7 +882,7 @@ public final class Payload {
return result == null ? type.UNRECOGNIZED : result; return result == null ? type.UNRECOGNIZED : result;
} }
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @param value The cmd to set. * @param value The cmd to set.
* @return This builder for chaining. * @return This builder for chaining.
*/ */
@ -896,7 +896,7 @@ public final class Payload {
return this; return this;
} }
/** /**
* <code>.Message.type cmd = 2;</code> * <code>.CommMsg.type cmd = 2;</code>
* @return This builder for chaining. * @return This builder for chaining.
*/ */
public Builder clearCmd() { public Builder clearCmd() {
@ -996,10 +996,10 @@ public final class Payload {
} }
// @@protoc_insertion_point(builder_scope:Message) // @@protoc_insertion_point(builder_scope:CommMsg)
} }
// @@protoc_insertion_point(class_scope:Message) // @@protoc_insertion_point(class_scope:CommMsg)
private static final Message DEFAULT_INSTANCE; private static final Message DEFAULT_INSTANCE;
static { static {
DEFAULT_INSTANCE = new Message(); DEFAULT_INSTANCE = new Message();
@ -1050,8 +1050,8 @@ public final class Payload {
descriptor; descriptor;
static { static {
String[] descriptorData = { String[] descriptorData = {
"\n\rMessage.proto\"\260\001\n\007Message\022\016\n\006client\030\001 " + "\n\rCommMsg.proto\"\260\001\n\007Message\022\016\n\006client\030\001 " +
"\001(\t\022\032\n\003cmd\030\002 \001(\0162\r.Message.type\022\017\n\007conte" + "\001(\t\022\032\n\003cmd\030\002 \001(\0162\r.CommMsg.type\022\017\n\007conte" +
"nt\030\003 \001(\t\"h\n\004type\022\025\n\021HEARTBEAT_REQUEST\020\000\022" + "nt\030\003 \001(\t\"h\n\004type\022\025\n\021HEARTBEAT_REQUEST\020\000\022" +
"\026\n\022HEARTBEAT_RESPONSE\020\001\022\010\n\004AUTH\020\002\022\t\n\005PRI" + "\026\n\022HEARTBEAT_RESPONSE\020\001\022\010\n\004AUTH\020\002\022\t\n\005PRI" +
"NT\020\003\022\016\n\nPRINT_DONE\020\004\022\014\n\010WHATEVER\020\005B$\n\031co" + "NT\020\003\022\016\n\nPRINT_DONE\020\004\022\014\n\010WHATEVER\020\005B$\n\031co" +

@ -44,6 +44,10 @@ public class ChannelRepository {
return CHANNEL_CACHE_MAP.get(key); return CHANNEL_CACHE_MAP.get(key);
} }
public Map<String, Channel> getAll() {
return CHANNEL_CACHE_MAP;
}
/** /**
* 线,线 * 线,线
* *

@ -1,14 +1,9 @@
package com.docus.server.common.netty.server; package com.docus.server.common.netty.server;
import com.docus.server.common.netty.Payload;
import com.docus.server.common.netty.server.handler.NettyBusinessHandler; import com.docus.server.common.netty.server.handler.NettyBusinessHandler;
import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler; import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -40,12 +35,18 @@ public class NettyServerInitializer extends ChannelInitializer<Channel> {
TimeUnit.SECONDS) TimeUnit.SECONDS)
) )
// 加载加码解码处理器,同时解决粘包拆包问题 // 加载加码解码处理器,同时解决粘包拆包问题
.addLast(new ProtobufVarint32FrameDecoder()) // .addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(Payload.Message.getDefaultInstance())) // .addLast(new ProtobufDecoder(Payload.CommMsg.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender()) // .addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder()) // .addLast(new ProtobufEncoder())
// .addLast(new LineBasedFrameDecoder(2048))
// .addLast(new StringDecoder())
// .addLast(new StringEncoder())
// 加载业务处理器 // 加载业务处理器
.addLast(new NettyHeartbeatHandler(channelRepository)) .addLast(new NettyHeartbeatHandler(channelRepository))
.addLast(businessHandler); .addLast(businessHandler);
// .addLast(new EchoServerHandler());
} }
} }

@ -0,0 +1,77 @@
package com.docus.server.common.netty.server.handler;
import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Date;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
//客户端连接上来,没有心跳前,属于离线状态,有心跳后,才属于在线状态。
//客户端断开后自动剔除agent。
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
Channel incoming = ctx.channel();
System.out.println("NettyClient:" + incoming.remoteAddress() + "在线1");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
Channel incoming = ctx.channel();
System.out.println("NettyClient:" + incoming.remoteAddress() + "在线2");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
Channel incoming = ctx.channel();
System.out.println("NettyClient:" + incoming.remoteAddress() + "在线3");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[]
buf.getBytes(0, barray);
//将byte[]转成字符串用于打印
String str = new String(barray);
CommMsg<Test> commMsg = JSON.fromJSONWithGeneric(str, new TypeReference<CommMsg<Test>>() {
});
if (str.length() > 0) {
System.out.println(str);
System.out.println("收到消息回复一条消息给客户端");
System.out.println("client channelActive..");
ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端" + new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush
System.out.flush();
} else {
System.out.println("不能读啊");
}
} finally {
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server occur exception:" + cause.getMessage());
cause.printStackTrace();
ctx.close(); // 关闭发生异常的连接
}
}

@ -1,11 +1,19 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.Payload; import com.docus.server.common.netty.Payload;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -18,12 +26,13 @@ import java.net.InetSocketAddress;
@Slf4j @Slf4j
@ChannelHandler.Sharable @ChannelHandler.Sharable
@Component @Component
public class NettyBusinessHandler extends SimpleChannelInboundHandler<Payload.Message> { public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Resource @Resource
private ChannelRepository repository; private ChannelRepository repository;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Payload.Message message) { protected void channelRead0(ChannelHandlerContext channelHandlerContext, Payload.Message message) {
if (message.getCmd() == Payload.Message.type.AUTH) { if (message.getCmd() == Payload.Message.type.AUTH) {
log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent()); log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent());
@ -44,6 +53,64 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<Payload.Me
} }
} }
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
//创建目标大小的数组
byte[] barray = new byte[buf.readableBytes()];
//把数据从bytebuf转移到byte[]
buf.getBytes(0, barray);
//将byte[]转成字符串用于打印
String message = new String(barray);
//空消息不处理
if (!StringUtils.hasText(message)) {
return;
}
CommMsg<Test> commMsg = JSON.fromJSONWithGeneric(message, new TypeReference<CommMsg<Test>>() {
});
String messageType = commMsg.getMessageType();
// if (message.getCmd() == Payload.Message.type.AUTH) {
// log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent());
// InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
// String clientIp = ipSocket.getAddress().getHostAddress();
// log.info("工控机:{},连接上线,IP地址信息:{}", message.getClient(), clientIp);
// Channel channel = repository.get(message.getClient());
// if (channel != null && channel.isOpen()) {
// channel.close();
// }
// repository.put(message.getClient(), channelHandlerContext.channel());
// }
// if (message.getCmd() == Payload.Message.type.PRINT_DONE) {
// log.info("接受到打印回馈指令,内容{}", message.getContent());
// }
// if (message.getCmd() == Payload.Message.type.WHATEVER) {
// log.info("收到测试消息:{}", message.getContent());
// }
//心跳
//注册
//业务消息
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
GROUP.add(channel);
System.out.println(channel.remoteAddress() + " 上线," + "在线数量:" + GROUP.size());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 获取到当前要断开连接的Channel
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "下线," + "在线数量:" + GROUP.size());
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); cause.printStackTrace();
@ -53,4 +120,5 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<Payload.Me
} }
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
} }

@ -1,6 +1,5 @@
package com.docus.server.common.netty.server.handler; package com.docus.server.common.netty.server.handler;
import com.docus.server.common.netty.Payload;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -19,16 +18,18 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Payload.Message message = (Payload.Message) msg; // Payload.Message message = (Payload.Message) msg;
if (message.getCmd().equals(Payload.Message.type.HEARTBEAT_REQUEST)) { // if (message.getCmd().equals(Payload.Message.type.HEARTBEAT_REQUEST)) {
log.info("接收到客户端的心跳"); log.info("接收到客户端的心跳");
} else { // } else {
if (ctx.channel().isOpen()) { if (ctx.channel().isOpen()) {
//触发下一个handler //触发下一个handler
ctx.fireChannelRead(msg); ctx.fireChannelRead(msg);
}
} }
// }
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
//
} }
private ChannelRepository repository; private ChannelRepository repository;

@ -0,0 +1,32 @@
package com.docus.server.common.netty.server.handler;
import java.io.Serializable;
public class Test implements Serializable {
private int age;
private String name;
public Test() {
}
public Test(int age, String name) {
this.age = age;
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
Loading…
Cancel
Save