处理netty 下发任务粘包分包问题

segment2.0
linrf 2 years ago
parent 12bb788009
commit fe21f972ae

@ -36,20 +36,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
@Override @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CommMsg commMsg) throws Exception { protected void channelRead0(ChannelHandlerContext channelHandlerContext, CommMsg commMsg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// //创建目标大小的数组
// byte[] barray = new byte[buf.readableBytes()];
// //把数据从bytebuf转移到byte[]
// buf.getBytes(0, barray);
// //将byte[]转成字符串用于打印
// String message = new String(barray);
//
// //空消息不处理
// if (!StringUtils.hasText(message)) {
// return;
// }
//
// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();

@ -24,19 +24,7 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// //创建目标大小的数组
// byte[] barray = new byte[buf.readableBytes()];
// //把数据从bytebuf转移到byte[]
// buf.getBytes(0, barray);
// //将byte[]转成字符串用于打印
// String message = new String(barray);
// //空消息不处理
// if (!StringUtils.hasText(message)) {
// return;
// }
// CommMsg commMsg = JSON.fromJSON((String) message, CommMsg.class);
CommMsg commMsg = (CommMsg) msg; CommMsg commMsg = (CommMsg) msg;
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();

@ -31,20 +31,6 @@ public class ClientHandler extends SimpleChannelInboundHandler<CommMsg<Serializa
@Override @Override
protected void channelRead0(ChannelHandlerContext context, CommMsg commMsg) throws Exception { protected void channelRead0(ChannelHandlerContext context, CommMsg commMsg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// //创建目标大小的数组
// byte[] barray = new byte[buf.readableBytes()];
// //把数据从bytebuf转移到byte[]
// buf.getBytes(0, barray);
// //将byte[]转成字符串用于打印
// String message = new String(barray);
//
// //空消息不处理
// if (!StringUtils.hasText(message)) {
// return;
// }
// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
@ -98,9 +84,18 @@ public class ClientHandler extends SimpleChannelInboundHandler<CommMsg<Serializa
if (isExistProcess) { if (isExistProcess) {
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
} else { } else {
//不存在进程,则启动指定采集器进程 //不存在进程,则启动指定采集器进程,等待启动完成下发任务
StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName); StartUpExeUtils.startUpExeOnly(saveCollectorPackagePath + collectorId + "\\collector\\" + processName);
while (true) {
Channel channel = collectorChannelCacheMap.get(collectorId);
log.info("等待采集器={}启动中", collectorId);
if (null != channel && channel.isOpen()) {
log.info("等待采集器={}启动成功并注册到终端", collectorId);
collectorChannelCacheMap.writeAndFlush(collectorId, commMsg); collectorChannelCacheMap.writeAndFlush(collectorId, commMsg);
break;
}
}
} }
} }
} }

@ -43,7 +43,6 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
.content(appName + " 我来了") .content(appName + " 我来了")
.build(); .build();
// ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(onlineRegister), CharsetUtil.UTF_8));
ctx.writeAndFlush(onlineRegister); ctx.writeAndFlush(onlineRegister);
super.channelActive(ctx); super.channelActive(ctx);
} }
@ -62,7 +61,6 @@ public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
.build(); .build();
//发送心跳消息,并在发送失败时关闭该接连 //发送心跳消息,并在发送失败时关闭该接连
// ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(heartbeat), CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
} else { } else {

@ -9,7 +9,6 @@ import com.docus.server.common.MsgConstants;
import com.docus.server.common.netty.server.CollectorChannelCacheMap; import com.docus.server.common.netty.server.CollectorChannelCacheMap;
import com.docus.server.common.netty.state.DeviceStateContext; import com.docus.server.common.netty.state.DeviceStateContext;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -17,7 +16,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -45,20 +43,6 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, CommMsg commMsg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, CommMsg commMsg) throws Exception {
DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx); DeviceStateContext deviceStateContext = collectorChannelCacheMap.getAttributeValue(ctx);
// ByteBuf buf = (ByteBuf) msg;
// //创建目标大小的数组
// byte[] barray = new byte[buf.readableBytes()];
// //把数据从bytebuf转移到byte[]
// buf.getBytes(0, barray);
// //将byte[]转成字符串用于打印
// String message = new String(barray);
//
// //空消息不处理
// if (!StringUtils.hasText(message)) {
// return;
// }
// CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType(); String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime(); String messageTime = commMsg.getMessageTime();
@ -68,12 +52,12 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
//===========login ok,切换到已登录状态===============messageContent=collectorId========= //===========login ok,切换到已登录状态===============messageContent=collectorId=========
deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过"); deviceStateContext.onLoginSucc(messageContent, System.currentTimeMillis(), 10, "采集器认证通过");
CommMsg<Serializable> login_ok = CommMsg.builder() CommMsg<Serializable> authMsg = CommMsg.builder()
.messageTime(DateUtil.formatDateTime(new Date())) .messageTime(DateUtil.formatDateTime(new Date()))
.content("login ok") .content("采集器认证通过")
.messageType("response") .messageType("response")
.build(); .build();
ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(login_ok), CharsetUtil.UTF_8)); ctx.writeAndFlush(authMsg);
Channel channel = collectorChannelCacheMap.get(messageContent); Channel channel = collectorChannelCacheMap.get(messageContent);
@ -97,7 +81,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
.content("HEARTBEAT_REQUEST ok") .content("HEARTBEAT_REQUEST ok")
.messageType("response HEARTBEAT_REQUEST") .messageType("response HEARTBEAT_REQUEST")
.build(); .build();
ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(resp), CharsetUtil.UTF_8)); ctx.writeAndFlush(resp);
log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent); log.info("接受到【终端服务端-接收采集器消息】客户端的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent);
@ -130,7 +114,6 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<CommMsg<Se
log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent); log.info("【终端服务端-接收采集器消息】收到采集器上报的任务状态,内容{}", messageContent);
EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(JSON.toJSON(messageContent), EditSchCollectRecordDTO.class); EditSchCollectRecordDTO editSchCollectRecordDTO = JSON.fromJSON(JSON.toJSON(messageContent), EditSchCollectRecordDTO.class);
schCollectRecordApi.edit(editSchCollectRecordDTO); schCollectRecordApi.edit(editSchCollectRecordDTO);
System.out.println(1);
} }

Loading…
Cancel
Save