|
|
|
@ -2,9 +2,10 @@ package com.docus.server.common.netty.server.handler;
|
|
|
|
|
|
|
|
|
|
import com.docus.core.util.StringUtils;
|
|
|
|
|
import com.docus.core.util.json.JSON;
|
|
|
|
|
import com.docus.server.common.MsgConstants;
|
|
|
|
|
import com.docus.server.common.netty.CommMsg;
|
|
|
|
|
import com.docus.server.common.netty.Payload;
|
|
|
|
|
import com.docus.server.common.netty.server.ChannelRepository;
|
|
|
|
|
import com.docus.server.enums.OnlineStateEnum;
|
|
|
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
|
|
import io.netty.channel.Channel;
|
|
|
|
@ -28,33 +29,13 @@ import java.net.InetSocketAddress;
|
|
|
|
|
@Component
|
|
|
|
|
public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
|
|
|
|
|
|
|
|
private static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
|
|
|
|
private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
|
|
|
|
|
|
|
|
|
@Resource
|
|
|
|
|
private ChannelRepository repository;
|
|
|
|
|
|
|
|
|
|
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Payload.Message message) {
|
|
|
|
|
if (message.getCmd() == Payload.Message.type.AUTH) {
|
|
|
|
|
log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent());
|
|
|
|
|
InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
|
|
|
|
|
String clientIp = ipSocket.getAddress().getHostAddress();
|
|
|
|
|
log.info("工控机:{},连接上线,IP地址信息:{}", message.getClient(), clientIp);
|
|
|
|
|
Channel channel = repository.get(message.getClient());
|
|
|
|
|
if (channel != null && channel.isOpen()) {
|
|
|
|
|
channel.close();
|
|
|
|
|
}
|
|
|
|
|
repository.put(message.getClient(), channelHandlerContext.channel());
|
|
|
|
|
}
|
|
|
|
|
if (message.getCmd() == Payload.Message.type.PRINT_DONE) {
|
|
|
|
|
log.info("接受到打印回馈指令,内容{}", message.getContent());
|
|
|
|
|
}
|
|
|
|
|
if (message.getCmd() == Payload.Message.type.WHATEVER) {
|
|
|
|
|
log.info("收到测试消息:{}", message.getContent());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
|
|
|
|
|
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
|
|
|
|
|
ByteBuf buf = (ByteBuf) msg;
|
|
|
|
|
//创建目标大小的数组
|
|
|
|
|
byte[] barray = new byte[buf.readableBytes()];
|
|
|
|
@ -68,47 +49,76 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CommMsg<Test> commMsg = JSON.fromJSONWithGeneric(message, new TypeReference<CommMsg<Test>>() {
|
|
|
|
|
CommMsg<TerminatorContent> commMsg = JSON.fromJSONWithGeneric(message, new TypeReference<CommMsg<TerminatorContent>>() {
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
String messageType = commMsg.getMessageType();
|
|
|
|
|
String messageTime = commMsg.getMessageTime();
|
|
|
|
|
TerminatorContent messageContent = commMsg.getContent();
|
|
|
|
|
|
|
|
|
|
// if (message.getCmd() == Payload.Message.type.AUTH) {
|
|
|
|
|
// log.info("接受到工控机<{}>的认证消息:{}", message.getClient(), message.getContent());
|
|
|
|
|
// if (messageType.equals(MsgConstants.ONLINE_REGISTER)) {
|
|
|
|
|
// log.info("接受到【采集器-终端】的上线注册消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent);
|
|
|
|
|
// InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
|
|
|
|
|
// String clientIp = ipSocket.getAddress().getHostAddress();
|
|
|
|
|
// log.info("工控机:{},连接上线,IP地址信息:{}", message.getClient(), clientIp);
|
|
|
|
|
// Channel channel = repository.get(message.getClient());
|
|
|
|
|
// log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
|
|
|
|
|
// Channel channel = repository.get(clientIp);
|
|
|
|
|
// if (channel != null && channel.isOpen()) {
|
|
|
|
|
// channel.close();
|
|
|
|
|
// }
|
|
|
|
|
// repository.put(message.getClient(), channelHandlerContext.channel());
|
|
|
|
|
// }
|
|
|
|
|
// if (message.getCmd() == Payload.Message.type.PRINT_DONE) {
|
|
|
|
|
// log.info("接受到打印回馈指令,内容{}", message.getContent());
|
|
|
|
|
// }
|
|
|
|
|
// if (message.getCmd() == Payload.Message.type.WHATEVER) {
|
|
|
|
|
// log.info("收到测试消息:{}", message.getContent());
|
|
|
|
|
//
|
|
|
|
|
// /*
|
|
|
|
|
// * 将ip和channel进行映射
|
|
|
|
|
// */
|
|
|
|
|
// repository.put(clientIp, channelHandlerContext.channel(), OnlineStateEnum.OFFLINE);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
//心跳
|
|
|
|
|
//注册
|
|
|
|
|
//业务消息
|
|
|
|
|
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
|
|
|
|
|
log.info("接受到终端重启命令,内容{}", messageContent);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
|
|
|
|
|
log.info("收到采集器重启命令,内容{}", messageContent);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
|
|
|
|
|
log.info("收到虚拟机重启命令,内容{}", messageContent);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
|
|
|
|
|
log.info("收到更新采集器文件命令,内容{}", messageContent);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
|
|
|
|
|
log.info("收到更新采集器配置命令,内容{}", messageContent);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
|
|
|
|
|
log.info("收到采集调度器下发任务命令,内容{}", messageContent);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
|
Channel channel = ctx.channel();
|
|
|
|
|
ctx.fireChannelRegistered();
|
|
|
|
|
|
|
|
|
|
GROUP.add(channel);
|
|
|
|
|
System.out.println(channel.remoteAddress() + " 上线," + "在线数量:" + GROUP.size());
|
|
|
|
|
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
|
|
|
|
String clientIp = ipSocket.getAddress().getHostAddress();
|
|
|
|
|
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
|
|
|
|
|
Channel channel = repository.get(clientIp);
|
|
|
|
|
if (channel != null && channel.isOpen()) {
|
|
|
|
|
channel.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
repository.put(clientIp, ctx.channel(), OnlineStateEnum.OFFLINE);
|
|
|
|
|
|
|
|
|
|
DEFAULT_CHANNEL_GROUP.add(ctx.channel());
|
|
|
|
|
System.out.println(ctx.channel().remoteAddress() + " 上线," + "在线数量:" + DEFAULT_CHANNEL_GROUP.size());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
|
// 获取到当前要断开连接的Channel
|
|
|
|
|
Channel channel = ctx.channel();
|
|
|
|
|
System.out.println(channel.remoteAddress() + "下线," + "在线数量:" + GROUP.size());
|
|
|
|
|
System.out.println(channel.remoteAddress() + " 下线," + "在线数量:" + DEFAULT_CHANNEL_GROUP.size());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -118,6 +128,7 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
|
|
|
if (channel.isActive()) {
|
|
|
|
|
ctx.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
super.exceptionCaught(ctx, cause);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|