diff --git a/collector-scheduling-management/pom.xml b/collector-scheduling-management/pom.xml
index a35b222..c1e79e0 100644
--- a/collector-scheduling-management/pom.xml
+++ b/collector-scheduling-management/pom.xml
@@ -6,7 +6,7 @@
1.0-SNAPSHOT
4.0.0
- collector-scheduling-management
+ collector-scheduling-management-linrf
Archetype - collector-scheduling-management
http://maven.apache.org
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/CodeGenerator.java b/collector-scheduling-management/src/main/java/com/docus/server/CodeGenerator.java
index cf79a1d..bbe9875 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/CodeGenerator.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/CodeGenerator.java
@@ -1,33 +1,33 @@
-//package com.docus.server;
-//
-//import com.baomidou.mybatisplus.generator.AutoGenerator;
-//import com.docus.infrastructure.generator.BaseCodeGenerator;
-//
-//import java.util.HashMap;
-//import java.util.Map;
-//
-//public class CodeGenerator {
-// public static void main(String[] args) {
-// //字段名和枚举名的映射,不区分表名
-// Map enumColumnMap = new HashMap<>();
-// enumColumnMap.put("PROFILE_ITЕM", "AdminProfileItemEnum");
-//
-// enumColumnMap.put("ORGANIZATION_TYPE", "OrganizationTypeEnum ");
-// enumColumnMap.put("OPERATE_STATUS", "OperateStatusEnum");
-// enumColumnMap.put("STATE", "StateEnum");
-// enumColumnMap.put("PRIVILEGE_LEVEL", "PrivilegeLevelEnum");
-// enumColumnMap.put("FLAG", "FlagEnum");
-// enumColumnMap.put("collect_type", "CollectTypeEnum");
-// BaseCodeGenerator.setEnumColumnMap(enumColumnMap);
-// BaseCodeGenerator.setModuleName("collector-scheduling-management");//多个module,需要指定modulename
-// //指定entity生成到独立module里,并生成 api interface
-// BaseCodeGenerator.setClientInterfaceModuleName("docus-client-interface");
-// BaseCodeGenerator.setClientInterfaceSubFolder("scheduling.management");//文件夹
-// BaseCodeGenerator.setClientInterfaceBasePackage("com.docus.server");
-// AutoGenerator defaultConfig = BaseCodeGenerator.getDefaultConfig();
-// defaultConfig.getGlobalConfig().setSwagger2(true);
-// defaultConfig.getStrategy().setEntityLombokModel(true);
-// defaultConfig.getStrategy().setInclude("sch_virtual_log", "sch_collect_error_log", "sch_collect_record", "sch_collect_record_retry_log", "sch_collector", "sch_collector_config", "sch_collector_version", "sch_collector_version_file", "sch_collector_version_log", "sch_operation_log", "sch_system_params", "sch_terminator");//需要生成的表,可指定多个,留空为全部生成
-// BaseCodeGenerator.generate(defaultConfig);
-// }
-//}
+package com.docus.server;
+
+import com.baomidou.mybatisplus.generator.AutoGenerator;
+import com.docus.infrastructure.generator.BaseCodeGenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CodeGenerator {
+ public static void main(String[] args) {
+ //字段名和枚举名的映射,不区分表名
+ Map enumColumnMap = new HashMap<>();
+ enumColumnMap.put("PROFILE_ITЕM", "AdminProfileItemEnum");
+
+ enumColumnMap.put("ORGANIZATION_TYPE", "OrganizationTypeEnum ");
+ enumColumnMap.put("OPERATE_STATUS", "OperateStatusEnum");
+ enumColumnMap.put("STATE", "StateEnum");
+ enumColumnMap.put("PRIVILEGE_LEVEL", "PrivilegeLevelEnum");
+ enumColumnMap.put("FLAG", "FlagEnum");
+ enumColumnMap.put("collect_type", "CollectTypeEnum");
+ BaseCodeGenerator.setEnumColumnMap(enumColumnMap);
+ BaseCodeGenerator.setModuleName("collector-scheduling-management");//多个module,需要指定modulename
+ //指定entity生成到独立module里,并生成 api interface
+ BaseCodeGenerator.setClientInterfaceModuleName("docus-client-interface");
+ BaseCodeGenerator.setClientInterfaceSubFolder("scheduling.management");//文件夹
+ BaseCodeGenerator.setClientInterfaceBasePackage("com.docus.server");
+ AutoGenerator defaultConfig = BaseCodeGenerator.getDefaultConfig();
+ defaultConfig.getGlobalConfig().setSwagger2(true);
+ defaultConfig.getStrategy().setEntityLombokModel(true);
+ defaultConfig.getStrategy().setInclude("sch_collector_copy1");//需要生成的表,可指定多个,留空为全部生成
+ BaseCodeGenerator.generate(defaultConfig);
+ }
+}
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java b/collector-scheduling-management/src/main/java/com/docus/server/ScheduleBootstrap.java
similarity index 83%
rename from collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java
rename to collector-scheduling-management/src/main/java/com/docus/server/ScheduleBootstrap.java
index 1f665fd..47bbea7 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/ScheduleBootstrap.java
@@ -5,14 +5,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
-import javax.annotation.PreDestroy;
-
@EnableAsync
@EnableFeignClients(basePackages = {"com.docus.core.excel.feign", "com.docus.server.api.taskdistribute"})
@SpringBootApplication(scanBasePackages = {"com.docus"})
-public class AppRunBootstrap {
+public class ScheduleBootstrap {
public static void main(String[] args) {
System.setProperty("javax.xml.parsers.DocumentBuilderFactory", "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
- SpringApplication.run(AppRunBootstrap.class, args);
+ SpringApplication.run(ScheduleBootstrap.class, args);
}
}
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
index 9770f10..3d253b0 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
@@ -9,7 +9,6 @@ import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
-import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
@@ -22,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.io.Serializable;
import java.net.InetSocketAddress;
/**
@@ -52,12 +52,11 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler {
return;
}
- CommMsg commMsg = JSON.fromJSONWithGeneric(message, new TypeReference>() {
- });
+ CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime();
- TerminatorContent messageContent = commMsg.getContent();
+ Serializable messageContent = commMsg.getContent();
if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
log.info("接受到终端重启命令,内容{}", messageContent);
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
index 6bac1c7..9d308c8 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
@@ -9,13 +9,13 @@ import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
-import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
+import java.io.Serializable;
import java.net.InetSocketAddress;
/**
@@ -39,19 +39,18 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
return;
}
- CommMsg commMsg = JSON.fromJSONWithGeneric(message, new TypeReference>() {
- });
+ CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
String messageType = commMsg.getMessageType();
String messageTime = commMsg.getMessageTime();
- TerminatorContent messageContent = commMsg.getContent();
+ Serializable content = commMsg.getContent();
if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) {
log.info("接收到客户端的心跳");
- log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent);
+ log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, content);
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
diff --git a/collector-scheduling-management/src/main/resources/application.properties b/collector-scheduling-management/src/main/resources/application.properties
index 63a543c..bde7d71 100644
--- a/collector-scheduling-management/src/main/resources/application.properties
+++ b/collector-scheduling-management/src/main/resources/application.properties
@@ -3,7 +3,7 @@ api.base-package=com.docus.server
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
-spring.datasource.username=root
-spring.datasource.password=root
+spring.datasource.username=docus
+spring.datasource.password=docus702
mybatis-plus.type-enums-package=com.docus.server.enums
diff --git a/collector-terminal-management/WinSW.exe b/collector-terminal-management/WinSW.exe
new file mode 100644
index 0000000..6806bb4
Binary files /dev/null and b/collector-terminal-management/WinSW.exe differ
diff --git a/collector-terminal-management/assembly.xml b/collector-terminal-management/assembly.xml
new file mode 100644
index 0000000..cbd39ff
--- /dev/null
+++ b/collector-terminal-management/assembly.xml
@@ -0,0 +1,73 @@
+
+
+ exe
+
+ dir
+
+ false
+
+
+
+
+ /lib
+ ${basedir}/target/lib
+
+
+
+ /config
+ ${basedir}/target/resources
+ 0755
+
+ *.xml
+ *.yml
+ *.properties
+
+
+
+
+ /dataConfig
+ ${basedir}/target/dataConfig
+ 0755
+
+ *.json
+
+
+
+
+ /
+ ${basedir}/target/resources/bin
+ 0755
+
+ *.bat
+
+
+
+
+ /
+ ${basedir}/target/resources/bin
+ 0755
+
+ *.xml
+
+
+
+
+ /
+ ${basedir}
+ 0755
+
+ *.exe
+
+
+
+
+ ${basedir}/target
+ /
+ 0755
+
+ ${project.build.finalName}.jar
+
+
+
+
\ No newline at end of file
diff --git a/collector-terminal-management/pom.xml b/collector-terminal-management/pom.xml
new file mode 100644
index 0000000..d2224c7
--- /dev/null
+++ b/collector-terminal-management/pom.xml
@@ -0,0 +1,185 @@
+
+
+ docus-collector-server
+ com.docus
+ 1.0-SNAPSHOT
+
+ 4.0.0
+ collector-terminal-management[0]
+ Archetype - collector-scheduling-management
+ http://maven.apache.org
+
+
+
+ com.docus
+ docus-api-common
+ 1.0-SNAPSHOT
+ compile
+
+
+
+ org.freemarker
+ freemarker
+ 2.3.30
+
+
+
+ cn.smallbun.screw
+ screw-core
+ 1.0.3
+
+
+
+
+
+
+ src/main/resources
+ true
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.4.4
+
+ ZIP
+
+
+ non-exists
+ non-exists
+
+
+
+
+
+
+ repackage
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+
+ target/lib
+ false
+ false
+ runtime
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 3.2.0
+
+
+ copy-resources
+ package
+
+ copy-resources
+
+
+
+
+ src/main/resources
+
+ **/*.*
+
+
+
+ ${project.build.directory}/resources
+
+
+
+ copy-bin
+ package
+
+ copy-resources
+
+
+
+
+ src/main/resources
+ true
+
+ bin/*.xml
+ bin/*.bat
+ *.yml
+ *.ftl
+
+
+
+ ${project.build.directory}/resources
+
+
+
+ copy-data-config
+ package
+
+ copy-resources
+
+
+
+
+ ../../dataConfig
+ true
+
+
+ ${project.build.directory}/dataConfig
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.2.0
+
+
+ **/*.yml
+
+
+
+
+
+ maven-assembly-plugin
+
+
+
+ ${project.artifactId}
+ false
+
+ assembly.xml
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+
+
diff --git a/collector-terminal-management/src/main/java/com/docus/server/CodeGenerator.java b/collector-terminal-management/src/main/java/com/docus/server/CodeGenerator.java
new file mode 100644
index 0000000..bbe9875
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/CodeGenerator.java
@@ -0,0 +1,33 @@
+package com.docus.server;
+
+import com.baomidou.mybatisplus.generator.AutoGenerator;
+import com.docus.infrastructure.generator.BaseCodeGenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CodeGenerator {
+ public static void main(String[] args) {
+ //字段名和枚举名的映射,不区分表名
+ Map enumColumnMap = new HashMap<>();
+ enumColumnMap.put("PROFILE_ITЕM", "AdminProfileItemEnum");
+
+ enumColumnMap.put("ORGANIZATION_TYPE", "OrganizationTypeEnum ");
+ enumColumnMap.put("OPERATE_STATUS", "OperateStatusEnum");
+ enumColumnMap.put("STATE", "StateEnum");
+ enumColumnMap.put("PRIVILEGE_LEVEL", "PrivilegeLevelEnum");
+ enumColumnMap.put("FLAG", "FlagEnum");
+ enumColumnMap.put("collect_type", "CollectTypeEnum");
+ BaseCodeGenerator.setEnumColumnMap(enumColumnMap);
+ BaseCodeGenerator.setModuleName("collector-scheduling-management");//多个module,需要指定modulename
+ //指定entity生成到独立module里,并生成 api interface
+ BaseCodeGenerator.setClientInterfaceModuleName("docus-client-interface");
+ BaseCodeGenerator.setClientInterfaceSubFolder("scheduling.management");//文件夹
+ BaseCodeGenerator.setClientInterfaceBasePackage("com.docus.server");
+ AutoGenerator defaultConfig = BaseCodeGenerator.getDefaultConfig();
+ defaultConfig.getGlobalConfig().setSwagger2(true);
+ defaultConfig.getStrategy().setEntityLombokModel(true);
+ defaultConfig.getStrategy().setInclude("sch_collector_copy1");//需要生成的表,可指定多个,留空为全部生成
+ BaseCodeGenerator.generate(defaultConfig);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/DefaultWebConfig.java b/collector-terminal-management/src/main/java/com/docus/server/DefaultWebConfig.java
new file mode 100644
index 0000000..d9ec30b
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/DefaultWebConfig.java
@@ -0,0 +1,17 @@
+package com.docus.server;
+
+import com.docus.infrastructure.WebConfig;
+import com.docus.infrastructure.web.json.JsonSerializerModule;
+import com.docus.server.common.serializer.DefJsonSerializerModule;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class DefaultWebConfig extends WebConfig {
+
+ @Override
+ public JsonSerializerModule jsonSerializerModu1e() {
+ super.jsonSerializerModu1e();
+ return new DefJsonSerializerModule();
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/TerminalBootstrap.java b/collector-terminal-management/src/main/java/com/docus/server/TerminalBootstrap.java
new file mode 100644
index 0000000..93f3750
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/TerminalBootstrap.java
@@ -0,0 +1,16 @@
+package com.docus.server;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+@EnableAsync
+@EnableFeignClients(basePackages = {"com.docus.core.excel.feign", "com.docus.server.api.taskdistribute"})
+@SpringBootApplication(scanBasePackages = {"com.docus"})
+public class TerminalBootstrap {
+ public static void main(String[] args) {
+ System.setProperty("javax.xml.parsers.DocumentBuilderFactory", "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
+ SpringApplication.run(TerminalBootstrap.class, args);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java
new file mode 100644
index 0000000..019e9d9
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/MsgConstants.java
@@ -0,0 +1,55 @@
+package com.docus.server.common;
+
+public class MsgConstants {
+
+ /**
+ * 客户端与采集调度器心跳
+ */
+ public static final String HEARTBEAT_REQUEST = "0".trim();
+
+ /**
+ * 客户端上线注册
+ */
+ public static final String ONLINE_REGISTER = "1".trim();
+
+ /**
+ * 客户端下线移除
+ */
+ public static final String OFFLINE_REMOVE = "2".trim();
+
+ /**
+ * 客户端异常注册
+ */
+ public static final String EXCEPTION_REMOVE = "3".trim();
+
+ /**
+ * 终端重启命令
+ */
+ public static final String TERMINATOR_RESTART = "4".trim();
+
+ /**
+ * 采集器重启命令
+ */
+ public static final String COLLECTOR_RESTART = "5".trim();
+
+ /**
+ * 虚拟机重启命令
+ */
+ public static final String VIRTUAL_RESTART = "6".trim();
+
+ /**
+ * 更新采集器文件命令
+ */
+ public static final String UPDATE_COLLECTOR_FILE = "7".trim();
+
+ /**
+ * 更新采集器配置命令
+ */
+ public static final String UPDATE_COLLECTOR_CONFIG = "8".trim();
+
+ /**
+ * 采集调度器下发任务命令
+ */
+ public static final String SCH_DISTRIBUTE_TASKS = "9".trim();
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/RedisCollectorTaskKeyExpirationListener.java b/collector-terminal-management/src/main/java/com/docus/server/common/RedisCollectorTaskKeyExpirationListener.java
new file mode 100644
index 0000000..ab0057e
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/RedisCollectorTaskKeyExpirationListener.java
@@ -0,0 +1,30 @@
+package com.docus.server.common;
+
+import com.docus.infrastructure.redis.listener.RedisKeyExpirationListener;
+import com.docus.server.service.impl.RedisKeyExpirationService;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+@Component
+public class RedisCollectorTaskKeyExpirationListener extends RedisKeyExpirationListener {
+
+ @Resource
+ private RedisKeyExpirationService redisKeyExpirationService;
+
+ public RedisCollectorTaskKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
+ super(listenerContainer);
+ }
+
+ @Override
+ protected void processExpireKey(String expireKey) {
+ redisKeyExpirationService.expired(expireKey);
+ }
+
+ @Override
+ protected boolean validExpireKey(String expireKey) {
+ return expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")
+ || expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:");
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/SchCollectorTask.java b/collector-terminal-management/src/main/java/com/docus/server/common/SchCollectorTask.java
new file mode 100644
index 0000000..287085c
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/SchCollectorTask.java
@@ -0,0 +1,439 @@
+package com.docus.server.common;
+
+import com.docus.core.util.DateUtil;
+import com.docus.core.util.Func;
+import com.docus.core.util.json.JSON;
+import com.docus.infrastructure.redis.service.RedisOps;
+import com.docus.server.api.taskdistribute.TaskDistributeApi;
+import com.docus.server.common.netty.CommMsg;
+import com.docus.server.common.netty.server.ChannelRepository;
+import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
+import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
+import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecord;
+import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
+import com.docus.server.entity.scheduling.management.SchCollector;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.RetryTaskEnum;
+import com.docus.server.service.ISchCollectRecordRetryLogService;
+import com.docus.server.service.ISchCollectRecordService;
+import com.docus.server.service.ISchCollectorService;
+import com.docus.server.service.ISchTerminatorService;
+import com.google.common.collect.Lists;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Component
+@Slf4j
+public class SchCollectorTask {
+ @Resource
+ private ChannelRepository channelRepository;
+ @Resource
+ private TaskDistributeApi taskDistributeApi;
+ @Resource
+ private ISchCollectRecordService iSchCollectRecordService;
+ @Resource
+ private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService;
+ @Resource
+ private ISchTerminatorService iSchTerminatorService;
+ @Resource
+ private ISchCollectorService iSchCollectorService;
+ @Resource
+ private RedisOps redisOps;
+
+ private BlockingQueue retryTaskQueue = new LinkedBlockingQueue<>();
+
+ //定时任务
+ // 5 * * * * ? 在每分钟的5秒执行
+ @Scheduled(cron = "0/1 * * * * ?")
+ public void runTask() {
+ try {
+ log.info("定时任务: 开始执行");
+
+ //在线并且空闲的终端
+ List terminalList = channelRepository.getAvailTerminator();
+
+ //没有可用的通道
+ if (Func.isEmpty(terminalList)) {
+ return;
+ }
+
+ //:todo 任务平台需要修改发布任务策略
+ List reportDownTwoDTOList = getTask(terminalList.size());
+
+ if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) {
+ return;
+ }
+
+ //只采集,有优先级的
+ for (NettyTerminatorDTO terminal : terminalList) {
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
+ //先找出有只采集的任务。
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
+ && BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
+ //把这个任务派给这个终端,并且把这个终端设置成繁忙
+ if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) {
+ //把这个任务派给这个终端
+ terminal.setBusyState(BusyStateEnum.BUSY);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
+ }
+ return;
+ }
+ }
+ }
+
+ //只采集没有优先级
+ for (NettyTerminatorDTO terminal : terminalList) {
+ //把刚才已经分配任务过的采集器排除
+ if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
+ continue;
+ }
+
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
+ //先找出有只采集的任务。
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
+ && BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
+ //把这个任务派给这个终端,并且把这个终端设置成繁忙
+ terminal.setBusyState(BusyStateEnum.BUSY);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
+ return;
+ }
+ }
+ }
+
+
+ //无只采集,有优先级
+ for (NettyTerminatorDTO terminal : terminalList) {
+
+ //把刚才已经分配任务过的采集器排除
+ if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
+ continue;
+ }
+
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
+ //先找出有只采集的任务。
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())
+ && BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
+ //把这个任务派给这个终端
+ terminal.setBusyState(BusyStateEnum.BUSY);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
+ return;
+ }
+ }
+ }
+
+
+ //无只采集,无优先级
+ for (NettyTerminatorDTO terminal : terminalList) {
+
+ //把刚才已经分配任务过的采集器排除
+ if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
+ continue;
+ }
+
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
+ //先找出有只采集的任务。
+ //把这个任务派给这个终端
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ terminal.setBusyState(BusyStateEnum.BUSY);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
+ return;
+ }
+ }
+
+ //只采集,空闲的。
+ //获取只采集的任务,并且进行分配。
+ for (NettyTerminatorDTO terminal : terminalList) {
+ //把刚才已经分配任务过的采集器排除
+ if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
+ continue;
+ }
+
+ List onlyTaskInfos = getOnlyTaskInfos(terminal.getOnlyCollectorIds());
+
+
+ if (CollectionUtils.isEmpty(onlyTaskInfos) || Func.isBlank(onlyTaskInfos.get(0).getPatientId())) {
+ return;
+ }
+
+ for (ReportDownTwoDTO report : onlyTaskInfos) {
+ //将这条任务分配这个这个终端
+ //下发
+ terminal.setBusyState(BusyStateEnum.BUSY);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
+
+ return;
+ }
+ }
+ log.info("定时任务: 执行完毕");
+ } catch (
+ Exception e) {
+ log.error("定时任务执行出错", e);
+ }
+
+ }
+
+ private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) {
+ Channel channel = channelRepository.get(terminal.getTerminatorIp());
+
+ Map params = reportDownTwoDTO.getParams();
+
+ SchCollectorTaskDTO messageContent = new SchCollectorTaskDTO();
+
+ if (CollectionUtils.isEmpty(params)) {
+ //不是重试任务
+ SchCollectRecord schCollectRecord = iSchCollectRecordService.saveOrUpdateRecord(terminal, reportDownTwoDTO);
+
+ messageContent.setCollectorRecordId(schCollectRecord.getId());
+ messageContent.setIsRetry(String.valueOf(RetryTaskEnum.NO_RETRY_TASK.getValue()));
+ messageContent.setTaskInfo(reportDownTwoDTO);
+ } else {
+ //重试任务
+ SchCollectRecordRetryLog schCollectRecordRetryLog = iSchCollectRecordRetryLogService.saveOrUpdateRecordRetryLog(terminal, reportDownTwoDTO);
+
+ messageContent.setCollectorRecordId(schCollectRecordRetryLog.getId());
+ messageContent.setIsRetry(String.valueOf(RetryTaskEnum.RETRY_TASK.getValue()));
+ messageContent.setTaskInfo(reportDownTwoDTO);
+ }
+
+ CommMsg commMsg = CommMsg.builder()
+ .messageType(MsgConstants.SCH_DISTRIBUTE_TASKS)
+ .messageTime(DateUtil.formatDateTime(new Date()))
+ .content(JSON.toJSON(messageContent))
+ .build();
+
+ //tcp 下发任务到终端
+ if (channel != null) {
+ channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
+ }
+
+ String isRetry = messageContent.getIsRetry();
+ Long collectorRecordId = messageContent.getCollectorRecordId();
+ String collectorId = messageContent.getTaskInfo().getTasks().get(0).getCollectorId();
+ SchCollector schCollector = iSchCollectorService.findByCollectorId(collectorId);
+
+ if (RetryTaskEnum.NO_RETRY_TASK.equals(isRetry)) {
+ redisOps.setEx(String.format("schCollectorRecord:noRetryTask:expireKey:%s", collectorRecordId), String.valueOf(collectorRecordId), schCollector.getTaskTimeout());
+ } else {
+ redisOps.setEx(String.format("schCollectorRecord:isRetryTask:expireKey:%s", collectorRecordId), String.valueOf(collectorRecordId), schCollector.getTaskTimeout());
+ }
+
+
+ }
+
+
+ public void addRetryTask(ReportDownTwoDTO reportDownTwoDTO) {
+ this.retryTaskQueue.add(reportDownTwoDTO);
+ }
+
+
+ //根据有效终端,一次获取一批任务,例如10个终端,获取10个不同类型任务
+ public List getTask(int size) throws InterruptedException {
+// return taskDistributeApi.getTask(String.valueOf(size));
+
+ //mock
+
+ String collectorId1 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"1\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292710565826560\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO1 = JSON.fromJSON(collectorId1, ReportDownTwoDTO.class);
+
+
+ String collectorId2 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"2\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292712465846272\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class);
+
+
+ String collectorId3 = "{\n" +
+ " \"createTime\": \"2023-01-09 19:26:11\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_297974\",\n" +
+ " \"admissTimes\": 21,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"普外二科(甲乳胸烧伤整形)\",\n" +
+ " \"patientId\": \"772389719349678080\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_297974\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"297974\",\n" +
+ " \"name\": \"曾美英\",\n" +
+ " \"patientId\": \"772389719349678080\"\n" +
+ " },\n" +
+ " \"patientId\": \"772389719349678080\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"3\",\n" +
+ " \"patientId\": \"772389719349678080\",\n" +
+ " \"taskId\": 838201379426750464\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
+
+// List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3);
+ List allTaskList = new ArrayList<>();
+
+ if (!CollectionUtils.isEmpty(this.retryTaskQueue)) {
+ ReportDownTwoDTO retryTask = (ReportDownTwoDTO) this.retryTaskQueue.take();
+ //重试任务
+ allTaskList.add(retryTask);
+ }
+
+ return allTaskList;
+
+ }
+
+ //根据采集器id类型,一次获取一批采集器类型任务
+ private List getOnlyTaskInfos(List collectorIds) {
+// return taskDistributeApi.getTask(collectorIds.get(0));
+
+ //mock
+
+
+ String collectorId2 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"2\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292712465846272\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class);
+
+
+ String collectorId3 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"3\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292883635392512\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
+
+ return Lists.newArrayList(reportDownTwoDTO2, reportDownTwoDTO3);
+ }
+}
+
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/TerminatorListener.java b/collector-terminal-management/src/main/java/com/docus/server/common/TerminatorListener.java
new file mode 100644
index 0000000..14dda2f
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/TerminatorListener.java
@@ -0,0 +1,31 @@
+package com.docus.server.common;
+
+import com.docus.server.entity.scheduling.management.SchTerminator;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.OnlineStateEnum;
+import com.docus.server.service.ISchTerminatorService;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * 重启置空
+ */
+@Component
+public class TerminatorListener implements ApplicationListener {
+ @Resource
+ private ISchTerminatorService iSchTerminatorService;
+
+ @Override
+ public void onApplicationEvent(ContextRefreshedEvent event) {
+ List terminators = iSchTerminatorService.findAll();
+ terminators.forEach(p -> {
+ p.setBusyState(BusyStateEnum.IDLE);
+ p.setOnlineState(OnlineStateEnum.OFFLINE);
+ });
+ iSchTerminatorService.batchUpdate(terminators);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java
new file mode 100644
index 0000000..ef7c42a
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/CommMsg.java
@@ -0,0 +1,22 @@
+package com.docus.server.common.netty;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class CommMsg implements Serializable {
+
+ public String messageType;
+
+ public String messageTime;
+
+ public MSG_CONTENT content;
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/Payload.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/Payload.java
new file mode 100644
index 0000000..603e6c1
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/Payload.java
@@ -0,0 +1,1073 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: CommMsg.proto
+
+package com.docus.server.common.netty;
+
+public final class Payload {
+ private Payload() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistryLite registry) {
+ }
+
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ registerAllExtensions(
+ (com.google.protobuf.ExtensionRegistryLite) registry);
+ }
+ public interface MessageOrBuilder extends
+ // @@protoc_insertion_point(interface_extends:CommMsg)
+ com.google.protobuf.MessageOrBuilder {
+
+ /**
+ * string client = 1;
+ * @return The client.
+ */
+ String getClient();
+ /**
+ * string client = 1;
+ * @return The bytes for client.
+ */
+ com.google.protobuf.ByteString
+ getClientBytes();
+
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return The enum numeric value on the wire for cmd.
+ */
+ int getCmdValue();
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return The cmd.
+ */
+ Message.type getCmd();
+
+ /**
+ * string content = 3;
+ * @return The content.
+ */
+ String getContent();
+ /**
+ * string content = 3;
+ * @return The bytes for content.
+ */
+ com.google.protobuf.ByteString
+ getContentBytes();
+ }
+ /**
+ * Protobuf type {@code CommMsg}
+ */
+ public static final class Message extends
+ com.google.protobuf.GeneratedMessageV3 implements
+ // @@protoc_insertion_point(message_implements:CommMsg)
+ MessageOrBuilder {
+ private static final long serialVersionUID = 0L;
+ // Use CommMsg.newBuilder() to construct.
+ private Message(com.google.protobuf.GeneratedMessageV3.Builder> builder) {
+ super(builder);
+ }
+ private Message() {
+ client_ = "";
+ cmd_ = 0;
+ content_ = "";
+ }
+
+ @Override
+ @SuppressWarnings({"unused"})
+ protected Object newInstance(
+ UnusedPrivateParameter unused) {
+ return new Message();
+ }
+
+ @Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private Message(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ this();
+ if (extensionRegistry == null) {
+ throw new NullPointerException();
+ }
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+ String s = input.readStringRequireUtf8();
+
+ client_ = s;
+ break;
+ }
+ case 16: {
+ int rawValue = input.readEnum();
+
+ cmd_ = rawValue;
+ break;
+ }
+ case 26: {
+ String s = input.readStringRequireUtf8();
+
+ content_ = s;
+ break;
+ }
+ default: {
+ if (!parseUnknownField(
+ input, unknownFields, extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return Payload.internal_static_Message_descriptor;
+ }
+
+ @Override
+ protected FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return Payload.internal_static_Message_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ Message.class, Builder.class);
+ }
+
+ /**
+ * Protobuf enum {@code CommMsg.type}
+ */
+ public enum type
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ *
+ *客户端心跳消息
+ *
+ *
+ * HEARTBEAT_REQUEST = 0;
+ */
+ HEARTBEAT_REQUEST(0),
+ /**
+ *
+ *服务端心跳消息
+ *
+ *
+ * HEARTBEAT_RESPONSE = 1;
+ */
+ HEARTBEAT_RESPONSE(1),
+ /**
+ *
+ * 认证
+ *
+ *
+ * AUTH = 2;
+ */
+ AUTH(2),
+ /**
+ *
+ *请求开始打印命令
+ *
+ *
+ * PRINT = 3;
+ */
+ PRINT(3),
+ /**
+ *
+ * 打印完成反馈
+ *
+ *
+ * PRINT_DONE = 4;
+ */
+ PRINT_DONE(4),
+ /**
+ *
+ * 随便是什么
+ *
+ *
+ * WHATEVER = 5;
+ */
+ WHATEVER(5),
+ UNRECOGNIZED(-1),
+ ;
+
+ /**
+ *
+ *客户端心跳消息
+ *
+ *
+ * HEARTBEAT_REQUEST = 0;
+ */
+ public static final int HEARTBEAT_REQUEST_VALUE = 0;
+ /**
+ *
+ *服务端心跳消息
+ *
+ *
+ * HEARTBEAT_RESPONSE = 1;
+ */
+ public static final int HEARTBEAT_RESPONSE_VALUE = 1;
+ /**
+ *
+ * 认证
+ *
+ *
+ * AUTH = 2;
+ */
+ public static final int AUTH_VALUE = 2;
+ /**
+ *
+ *请求开始打印命令
+ *
+ *
+ * PRINT = 3;
+ */
+ public static final int PRINT_VALUE = 3;
+ /**
+ *
+ * 打印完成反馈
+ *
+ *
+ * PRINT_DONE = 4;
+ */
+ public static final int PRINT_DONE_VALUE = 4;
+ /**
+ *
+ * 随便是什么
+ *
+ *
+ * WHATEVER = 5;
+ */
+ public static final int WHATEVER_VALUE = 5;
+
+
+ @Override
+ public final int getNumber() {
+ if (this == UNRECOGNIZED) {
+ throw new IllegalArgumentException(
+ "Can't get the number of an unknown enum value.");
+ }
+ return value;
+ }
+
+ /**
+ * @param value The numeric wire value of the corresponding enum entry.
+ * @return The enum associated with the given numeric wire value.
+ * @deprecated Use {@link #forNumber(int)} instead.
+ */
+ @Deprecated
+ public static type valueOf(int value) {
+ return forNumber(value);
+ }
+
+ /**
+ * @param value The numeric wire value of the corresponding enum entry.
+ * @return The enum associated with the given numeric wire value.
+ */
+ public static type forNumber(int value) {
+ switch (value) {
+ case 0: return HEARTBEAT_REQUEST;
+ case 1: return HEARTBEAT_RESPONSE;
+ case 2: return AUTH;
+ case 3: return PRINT;
+ case 4: return PRINT_DONE;
+ case 5: return WHATEVER;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static final com.google.protobuf.Internal.EnumLiteMap<
+ type> internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public type findValueByNumber(int number) {
+ return type.forNumber(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ if (this == UNRECOGNIZED) {
+ throw new IllegalStateException(
+ "Can't get the descriptor of an unrecognized enum value.");
+ }
+ return getDescriptor().getValues().get(ordinal());
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return Message.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final type[] VALUES = values();
+
+ public static type valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ if (desc.getIndex() == -1) {
+ return UNRECOGNIZED;
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int value;
+
+ private type(int value) {
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:CommMsg.type)
+ }
+
+ public static final int CLIENT_FIELD_NUMBER = 1;
+ private volatile Object client_;
+ /**
+ * string client = 1;
+ * @return The client.
+ */
+ @Override
+ public String getClient() {
+ Object ref = client_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ client_ = s;
+ return s;
+ }
+ }
+ /**
+ * string client = 1;
+ * @return The bytes for client.
+ */
+ @Override
+ public com.google.protobuf.ByteString
+ getClientBytes() {
+ Object ref = client_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (String) ref);
+ client_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ public static final int CMD_FIELD_NUMBER = 2;
+ private int cmd_;
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return The enum numeric value on the wire for cmd.
+ */
+ @Override public int getCmdValue() {
+ return cmd_;
+ }
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return The cmd.
+ */
+ @Override public type getCmd() {
+ @SuppressWarnings("deprecation")
+ type result = type.valueOf(cmd_);
+ return result == null ? type.UNRECOGNIZED : result;
+ }
+
+ public static final int CONTENT_FIELD_NUMBER = 3;
+ private volatile Object content_;
+ /**
+ * string content = 3;
+ * @return The content.
+ */
+ @Override
+ public String getContent() {
+ Object ref = content_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ content_ = s;
+ return s;
+ }
+ }
+ /**
+ * string content = 3;
+ * @return The bytes for content.
+ */
+ @Override
+ public com.google.protobuf.ByteString
+ getContentBytes() {
+ Object ref = content_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (String) ref);
+ content_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ private byte memoizedIsInitialized = -1;
+ @Override
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized == 1) return true;
+ if (isInitialized == 0) return false;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ @Override
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ if (!getClientBytes().isEmpty()) {
+ com.google.protobuf.GeneratedMessageV3.writeString(output, 1, client_);
+ }
+ if (cmd_ != type.HEARTBEAT_REQUEST.getNumber()) {
+ output.writeEnum(2, cmd_);
+ }
+ if (!getContentBytes().isEmpty()) {
+ com.google.protobuf.GeneratedMessageV3.writeString(output, 3, content_);
+ }
+ unknownFields.writeTo(output);
+ }
+
+ @Override
+ public int getSerializedSize() {
+ int size = memoizedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (!getClientBytes().isEmpty()) {
+ size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, client_);
+ }
+ if (cmd_ != type.HEARTBEAT_REQUEST.getNumber()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(2, cmd_);
+ }
+ if (!getContentBytes().isEmpty()) {
+ size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, content_);
+ }
+ size += unknownFields.getSerializedSize();
+ memoizedSize = size;
+ return size;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Message)) {
+ return super.equals(obj);
+ }
+ Message other = (Message) obj;
+
+ if (!getClient()
+ .equals(other.getClient())) return false;
+ if (cmd_ != other.cmd_) return false;
+ if (!getContent()
+ .equals(other.getContent())) return false;
+ if (!unknownFields.equals(other.unknownFields)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptor().hashCode();
+ hash = (37 * hash) + CLIENT_FIELD_NUMBER;
+ hash = (53 * hash) + getClient().hashCode();
+ hash = (37 * hash) + CMD_FIELD_NUMBER;
+ hash = (53 * hash) + cmd_;
+ hash = (37 * hash) + CONTENT_FIELD_NUMBER;
+ hash = (53 * hash) + getContent().hashCode();
+ hash = (29 * hash) + unknownFields.hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static Message parseFrom(
+ java.nio.ByteBuffer data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static Message parseFrom(
+ java.nio.ByteBuffer data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static Message parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static Message parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static Message parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static Message parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static Message parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static Message parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static Message parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input);
+ }
+ public static Message parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static Message parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static Message parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+
+ @Override
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder() {
+ return DEFAULT_INSTANCE.toBuilder();
+ }
+ public static Builder newBuilder(Message prototype) {
+ return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+ }
+ @Override
+ public Builder toBuilder() {
+ return this == DEFAULT_INSTANCE
+ ? new Builder() : new Builder().mergeFrom(this);
+ }
+
+ @Override
+ protected Builder newBuilderForType(
+ BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code CommMsg}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessageV3.Builder implements
+ // @@protoc_insertion_point(builder_implements:CommMsg)
+ MessageOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return Payload.internal_static_Message_descriptor;
+ }
+
+ @Override
+ protected FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return Payload.internal_static_Message_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ Message.class, Builder.class);
+ }
+
+ // Construct using com.venus.common.protobuf.Payload.CommMsg.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessageV3
+ .alwaysUseFieldBuilders) {
+ }
+ }
+ @Override
+ public Builder clear() {
+ super.clear();
+ client_ = "";
+
+ cmd_ = 0;
+
+ content_ = "";
+
+ return this;
+ }
+
+ @Override
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return Payload.internal_static_Message_descriptor;
+ }
+
+ @Override
+ public Message getDefaultInstanceForType() {
+ return Message.getDefaultInstance();
+ }
+
+ @Override
+ public Message build() {
+ Message result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ @Override
+ public Message buildPartial() {
+ Message result = new Message(this);
+ result.client_ = client_;
+ result.cmd_ = cmd_;
+ result.content_ = content_;
+ onBuilt();
+ return result;
+ }
+
+ @Override
+ public Builder clone() {
+ return super.clone();
+ }
+ @Override
+ public Builder setField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ Object value) {
+ return super.setField(field, value);
+ }
+ @Override
+ public Builder clearField(
+ com.google.protobuf.Descriptors.FieldDescriptor field) {
+ return super.clearField(field);
+ }
+ @Override
+ public Builder clearOneof(
+ com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+ return super.clearOneof(oneof);
+ }
+ @Override
+ public Builder setRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ int index, Object value) {
+ return super.setRepeatedField(field, index, value);
+ }
+ @Override
+ public Builder addRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ Object value) {
+ return super.addRepeatedField(field, value);
+ }
+ @Override
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof Message) {
+ return mergeFrom((Message)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(Message other) {
+ if (other == Message.getDefaultInstance()) return this;
+ if (!other.getClient().isEmpty()) {
+ client_ = other.client_;
+ onChanged();
+ }
+ if (other.cmd_ != 0) {
+ setCmdValue(other.getCmdValue());
+ }
+ if (!other.getContent().isEmpty()) {
+ content_ = other.content_;
+ onChanged();
+ }
+ this.mergeUnknownFields(other.unknownFields);
+ onChanged();
+ return this;
+ }
+
+ @Override
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ @Override
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Message parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (Message) e.getUnfinishedMessage();
+ throw e.unwrapIOException();
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+
+ private Object client_ = "";
+ /**
+ * string client = 1;
+ * @return The client.
+ */
+ @Override
+ public String getClient() {
+ Object ref = client_;
+ if (!(ref instanceof String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ client_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ /**
+ * string client = 1;
+ * @return The bytes for client.
+ */
+ @Override
+ public com.google.protobuf.ByteString
+ getClientBytes() {
+ Object ref = client_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (String) ref);
+ client_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * string client = 1;
+ * @param value The client to set.
+ * @return This builder for chaining.
+ */
+ public Builder setClient(
+ String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ client_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * string client = 1;
+ * @return This builder for chaining.
+ */
+ public Builder clearClient() {
+
+ client_ = getDefaultInstance().getClient();
+ onChanged();
+ return this;
+ }
+ /**
+ * string client = 1;
+ * @param value The bytes for client to set.
+ * @return This builder for chaining.
+ */
+ public Builder setClientBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ checkByteStringIsUtf8(value);
+
+ client_ = value;
+ onChanged();
+ return this;
+ }
+
+ private int cmd_ = 0;
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return The enum numeric value on the wire for cmd.
+ */
+ @Override public int getCmdValue() {
+ return cmd_;
+ }
+ /**
+ * .CommMsg.type cmd = 2;
+ * @param value The enum numeric value on the wire for cmd to set.
+ * @return This builder for chaining.
+ */
+ public Builder setCmdValue(int value) {
+
+ cmd_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return The cmd.
+ */
+ @Override
+ public type getCmd() {
+ @SuppressWarnings("deprecation")
+ type result = type.valueOf(cmd_);
+ return result == null ? type.UNRECOGNIZED : result;
+ }
+ /**
+ * .CommMsg.type cmd = 2;
+ * @param value The cmd to set.
+ * @return This builder for chaining.
+ */
+ public Builder setCmd(type value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ cmd_ = value.getNumber();
+ onChanged();
+ return this;
+ }
+ /**
+ * .CommMsg.type cmd = 2;
+ * @return This builder for chaining.
+ */
+ public Builder clearCmd() {
+
+ cmd_ = 0;
+ onChanged();
+ return this;
+ }
+
+ private Object content_ = "";
+ /**
+ * string content = 3;
+ * @return The content.
+ */
+ @Override
+ public String getContent() {
+ Object ref = content_;
+ if (!(ref instanceof String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ content_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ /**
+ * string content = 3;
+ * @return The bytes for content.
+ */
+ @Override
+ public com.google.protobuf.ByteString
+ getContentBytes() {
+ Object ref = content_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (String) ref);
+ content_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * string content = 3;
+ * @param value The content to set.
+ * @return This builder for chaining.
+ */
+ public Builder setContent(
+ String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ content_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * string content = 3;
+ * @return This builder for chaining.
+ */
+ public Builder clearContent() {
+
+ content_ = getDefaultInstance().getContent();
+ onChanged();
+ return this;
+ }
+ /**
+ * string content = 3;
+ * @param value The bytes for content to set.
+ * @return This builder for chaining.
+ */
+ public Builder setContentBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ checkByteStringIsUtf8(value);
+
+ content_ = value;
+ onChanged();
+ return this;
+ }
+ @Override
+ public final Builder setUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.setUnknownFields(unknownFields);
+ }
+
+ @Override
+ public final Builder mergeUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.mergeUnknownFields(unknownFields);
+ }
+
+
+ // @@protoc_insertion_point(builder_scope:CommMsg)
+ }
+
+ // @@protoc_insertion_point(class_scope:CommMsg)
+ private static final Message DEFAULT_INSTANCE;
+ static {
+ DEFAULT_INSTANCE = new Message();
+ }
+
+ public static Message getDefaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+
+ private static final com.google.protobuf.Parser
+ PARSER = new com.google.protobuf.AbstractParser() {
+ @Override
+ public Message parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new Message(input, extensionRegistry);
+ }
+ };
+
+ public static com.google.protobuf.Parser parser() {
+ return PARSER;
+ }
+
+ @Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ @Override
+ public Message getDefaultInstanceForType() {
+ return DEFAULT_INSTANCE;
+ }
+
+ }
+
+ private static final com.google.protobuf.Descriptors.Descriptor
+ internal_static_Message_descriptor;
+ private static final
+ com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internal_static_Message_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ String[] descriptorData = {
+ "\n\rCommMsg.proto\"\260\001\n\007Message\022\016\n\006client\030\001 " +
+ "\001(\t\022\032\n\003cmd\030\002 \001(\0162\r.CommMsg.type\022\017\n\007conte" +
+ "nt\030\003 \001(\t\"h\n\004type\022\025\n\021HEARTBEAT_REQUEST\020\000\022" +
+ "\026\n\022HEARTBEAT_RESPONSE\020\001\022\010\n\004AUTH\020\002\022\t\n\005PRI" +
+ "NT\020\003\022\016\n\nPRINT_DONE\020\004\022\014\n\010WHATEVER\020\005B$\n\031co" +
+ "m.venus.common.protobufB\007Payloadb\006proto3"
+ };
+ descriptor = com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ });
+ internal_static_Message_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_Message_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+ internal_static_Message_descriptor,
+ new String[] { "Client", "Cmd", "Content", });
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java
new file mode 100644
index 0000000..93e1b7e
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClient.java
@@ -0,0 +1,71 @@
+package com.docus.server.common.netty.client;
+
+import com.docus.core.util.json.JSON;
+import com.docus.server.common.netty.CommMsg;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by 1-point at 2021/9/7
+ * Netty客户端
+ */
+@Slf4j
+@Component
+public class NettyClient {
+
+ @Resource
+ private NettyClientProperties nettyProperties;
+
+ @Value("${spring.application.name}")
+ private String appName;
+
+ private SocketChannel socketChannel;
+
+ /**
+ * 发送消息给服务端
+ *
+ * @param message 消息内容
+ */
+ public void sendMessage(CommMsg message) {
+ boolean success = socketChannel.writeAndFlush(JSON.toJSON(message)).isSuccess();
+ if (success) {
+ log.info("发送消息成功");
+ }
+ }
+
+ @PostConstruct
+ public void start() {
+ final EventLoopGroup group = new NioEventLoopGroup();
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(group)
+ .channel(NioSocketChannel.class)
+ .remoteAddress(nettyProperties.getHost(), nettyProperties.getPort())
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new NettyClientInitializer(nettyProperties, this, appName));
+ ChannelFuture future = bootstrap.connect();
+ //客户端断线重连逻辑
+ future.addListener((ChannelFutureListener) status -> {
+ if (status.isSuccess()) {
+ log.info("连接Netty服务端成功");
+ } else {
+ log.warn("连接失败,进行断线重连");
+ status.channel().eventLoop().schedule(this::start, nettyProperties.getReconnectSeconds(), TimeUnit.SECONDS);
+ }
+ });
+ socketChannel = (SocketChannel) future.channel();
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java
new file mode 100644
index 0000000..75d34ca
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientInitializer.java
@@ -0,0 +1,41 @@
+package com.docus.server.common.netty.client;
+
+import com.docus.server.common.netty.client.handler.ClientHandler;
+import com.docus.server.common.netty.client.handler.HeartbeatHandler;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+public class NettyClientInitializer extends ChannelInitializer {
+
+ private NettyClientProperties nettyProperties;
+
+ private NettyClient nettyClient;
+
+ private String appName;
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ channel.pipeline()
+ // 加载空闲监听器
+ .addLast(new IdleStateHandler(nettyProperties.getReaderIdleTimeSeconds(),
+ nettyProperties.getWriterIdleTimeSeconds(), nettyProperties.getAllIdleTimeSeconds()))
+ // 加载加码解码处理器,同时解决粘包拆包问题
+// .addLast(new ProtobufVarint32FrameDecoder())
+// .addLast(new ProtobufDecoder(Payload.Message.getDefaultInstance()))
+// .addLast(new ProtobufVarint32LengthFieldPrepender())
+// .addLast(new ProtobufEncoder())
+ // 加载心跳处理器
+ .addLast(new HeartbeatHandler(nettyClient, nettyProperties, appName))
+ // 加载业务处理器
+ .addLast(new ClientHandler())
+ .addLast();
+ }
+
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientProperties.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientProperties.java
new file mode 100644
index 0000000..92e0001
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/NettyClientProperties.java
@@ -0,0 +1,32 @@
+package com.docus.server.common.netty.client;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+@ConfigurationProperties(prefix = NettyClientProperties.PREFIX)
+public class NettyClientProperties {
+
+ public static final String PREFIX = "netty.client";
+
+ // 读空闲等待时间
+ private int readerIdleTimeSeconds = 0;
+
+ // 写空闲等待时间
+ private int writerIdleTimeSeconds = 10;
+
+ // 读写空闲等待时间
+ private int allIdleTimeSeconds = 0;
+
+ // 服务主机
+ private String host;
+
+ // 服务端口
+ private Integer port;
+
+ // 重连时间/秒
+ private int reconnectSeconds = 10;
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java
new file mode 100644
index 0000000..179c2ea
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/ClientHandler.java
@@ -0,0 +1,76 @@
+package com.docus.server.common.netty.client.handler;
+
+import com.docus.core.util.StringUtils;
+import com.docus.core.util.json.JSON;
+import com.docus.server.common.MsgConstants;
+import com.docus.server.common.netty.CommMsg;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+
+@Slf4j
+public class ClientHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext context, ByteBuf msg) throws Exception {
+ ByteBuf buf = (ByteBuf) msg;
+ //创建目标大小的数组
+ byte[] barray = new byte[buf.readableBytes()];
+ //把数据从bytebuf转移到byte[]
+ buf.getBytes(0, barray);
+ //将byte[]转成字符串用于打印
+ String message = new String(barray);
+
+ //空消息不处理
+ if (!StringUtils.hasText(message)) {
+ return;
+ }
+
+ CommMsg commMsg = JSON.fromJSON(message, CommMsg.class);
+
+ String messageType = commMsg.getMessageType();
+ String messageTime = commMsg.getMessageTime();
+ Serializable messageContent = commMsg.getContent();
+
+ log.info("======== 收到服务端消息, 消息时间={}, 消息类型={}, 消息内容={}", messageTime, messageType, messageContent + " ======== ");
+
+ if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
+ log.info("接受到终端重启命令,内容={}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
+ log.info("收到采集器重启命令,内容={}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
+ log.info("收到虚拟机重启命令,内容={}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
+ log.info("收到更新采集器文件命令,内容={}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
+ log.info("收到更新采集器配置命令,内容={}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
+ log.info("收到采集调度器下发任务命令,内容={}", messageContent);
+ }
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ Channel channel = ctx.channel();
+ if (channel.isActive()) {
+ ctx.close();
+ }
+ super.exceptionCaught(ctx, cause);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java
new file mode 100644
index 0000000..4d2eb11
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/client/handler/HeartbeatHandler.java
@@ -0,0 +1,100 @@
+package com.docus.server.common.netty.client.handler;
+
+import com.docus.core.util.DateUtil;
+import com.docus.core.util.json.JSON;
+import com.docus.server.common.MsgConstants;
+import com.docus.server.common.netty.CommMsg;
+import com.docus.server.common.netty.client.NettyClient;
+import com.docus.server.common.netty.client.NettyClientProperties;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.EventLoop;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.CharsetUtil;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 处理发送心跳逻辑
+ */
+@Slf4j
+@AllArgsConstructor
+@NoArgsConstructor
+public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
+
+ private NettyClient nettyClient;
+
+ private NettyClientProperties nettyProperties;
+
+ private String appName;
+
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ log.info("通道激活......");
+
+// Payload.Message.Builder builder = Payload.Message.newBuilder()
+// .setClient(appName)
+// .setContent("我来了")
+// .setCmd(Payload.Message.type.AUTH);
+
+ CommMsg onlineRegister = CommMsg.builder()
+ .messageType(MsgConstants.ONLINE_REGISTER)
+ .messageTime(DateUtil.formatDateTime(new Date()))
+ .content(appName + " 我来了")
+ .build();
+
+ ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(onlineRegister), CharsetUtil.UTF_8));
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
+ if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
+ // 一定时间内,通道内未传递消息,发送心跳,保证存活
+ log.info("after {} seconds no message wrote", nettyProperties.getWriterIdleTimeSeconds());
+
+// Payload.Message heartbeat = Payload.Message
+// .newBuilder()
+// .setCmd(Payload.Message.type.HEARTBEAT_REQUEST)
+// .build();
+
+ CommMsg heartbeat = CommMsg.builder()
+ .messageType(MsgConstants.HEARTBEAT_REQUEST)
+ .messageTime(DateUtil.formatDateTime(new Date()))
+ .build();
+
+ //发送心跳消息,并在发送失败时关闭该接连
+ ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(heartbeat), CharsetUtil.UTF_8)).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+ }
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ //如果运行过程中服务端挂了,执行重连机制
+ log.info("通道释放==================");
+ EventLoop eventLoop = ctx.channel().eventLoop();
+ eventLoop.schedule(() -> nettyClient.start(), nettyProperties.getReconnectSeconds(), TimeUnit.SECONDS);
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ log.error("捕获的异常:{}", cause.getMessage());
+ ctx.close();
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java
new file mode 100644
index 0000000..9a8a58c
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java
@@ -0,0 +1,106 @@
+package com.docus.server.common.netty.server;
+
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.OnlineStateEnum;
+import com.docus.server.service.ISchTerminatorService;
+import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
+import io.netty.channel.Channel;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * 客户端IP和通信信道的映射
+ */
+@Component
+@Slf4j
+public class ChannelRepository {
+ @Resource
+ private ISchTerminatorService iSchTerminatorService;
+
+ /**
+ *
+ */
+ private final static Map IP_CHANNEL_CACHE_MAP = new ConcurrentHashMap<>();
+
+ /**
+ *
+ */
+ private final static Map IP_TERMINATOR_CACHE_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * 终端上线
+ */
+ public void put(NettyTerminatorDTO nettyTerminatorDTO, Channel channel) {
+ //客户端上线
+ String terminatorIp = nettyTerminatorDTO.getTerminatorIp();
+
+ //更新数据库终端数据
+ SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO);
+ nettyTerminatorDTO.setId(schTerminatorVO.getId());
+
+ //缓存
+ IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel);
+ IP_TERMINATOR_CACHE_MAP.put(terminatorIp, nettyTerminatorDTO);
+
+ AttributeKey attributeKey = AttributeKey.valueOf("ip");
+ channel.attr(attributeKey).set(terminatorIp);
+
+
+ }
+
+ public String getClientKey(Channel channel) {
+
+ AttributeKey key = AttributeKey.valueOf("ip");
+
+ if (channel.hasAttr(key)) {
+ return channel.attr(key).get();
+ }
+ return null;
+ }
+
+ public Channel get(String key) {
+ return IP_CHANNEL_CACHE_MAP.get(key);
+ }
+
+ public NettyTerminatorDTO getTerminatorByIp(String key) {
+ return IP_TERMINATOR_CACHE_MAP.get(key);
+ }
+
+ public Map getIpToChannelCacheMap() {
+ return IP_CHANNEL_CACHE_MAP;
+ }
+
+ public Map getIpToTerminatorCacheMap() {
+ return IP_TERMINATOR_CACHE_MAP;
+ }
+
+ public List getAvailTerminator() {
+ if (!CollectionUtils.isEmpty(IP_TERMINATOR_CACHE_MAP)) {
+ return IP_TERMINATOR_CACHE_MAP.values().stream().filter(p -> OnlineStateEnum.ONLINE.equals(p.getOnlineState())
+ && BusyStateEnum.IDLE.equals(p.getBusyState())).collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * 终端离线
+ */
+ public void remove(String key) {
+ IP_CHANNEL_CACHE_MAP.remove(key);
+
+ NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
+ nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
+ iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO);
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServer.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServer.java
new file mode 100644
index 0000000..f597647
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServer.java
@@ -0,0 +1,70 @@
+package com.docus.server.common.netty.server;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import java.net.InetSocketAddress;
+
+/**
+ * netty服务端初始化
+ */
+@Component
+@Slf4j
+public class NettyServer {
+
+ /**
+ * boss 线程组用于处理连接工作
+ */
+ private EventLoopGroup boss;
+ /**
+ * worker 线程组用于数据处理
+ */
+ private EventLoopGroup worker;
+
+ @Resource
+ private NettyServerProperties serverProperties;
+
+ @Resource
+ private NettyServerInitializer serverInitializer;
+
+ @PostConstruct
+ public void start() throws InterruptedException {
+ boss = new NioEventLoopGroup(serverProperties.getBossThreadCount());
+ worker = new NioEventLoopGroup(serverProperties.getWorkerThreadCount());
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(boss, worker)
+ // 指定Channel
+ .channel(NioServerSocketChannel.class)
+ //使用指定的端口设置套接字地址
+ .localAddress(new InetSocketAddress(serverProperties.getPort()))
+ //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ //将小的数据包包装成更大的帧进行传送,提高网络的负载
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childHandler(serverInitializer);
+ ChannelFuture future = bootstrap.bind().sync();
+ if (future.isSuccess()) {
+ log.info("Start netty server successfully");
+ } else {
+ log.error("Start netty server failed");
+ }
+ }
+
+ @PreDestroy
+ public void destroy() throws InterruptedException {
+ boss.shutdownGracefully().sync();
+ worker.shutdownGracefully().sync();
+ log.info("关闭Netty");
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java
new file mode 100644
index 0000000..04f4600
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerInitializer.java
@@ -0,0 +1,52 @@
+package com.docus.server.common.netty.server;
+
+import com.docus.server.common.netty.server.handler.NettyBusinessHandler;
+import com.docus.server.common.netty.server.handler.NettyHeartbeatHandler;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * netty服务端信道初始化
+ */
+@Component
+public class NettyServerInitializer extends ChannelInitializer {
+
+ @Resource
+ private NettyServerProperties serverProperties;
+
+ @Resource
+ private NettyBusinessHandler businessHandler;
+
+ @Resource
+ private ChannelRepository channelRepository;
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ channel.pipeline()
+ //空闲检测
+ .addLast(new IdleStateHandler(serverProperties.getReaderIdleTimeSeconds(),
+ serverProperties.getWriterIdleTimeSeconds(),
+ serverProperties.getAllIdleTimeSeconds(),
+ TimeUnit.SECONDS)
+ )
+ // 加载加码解码处理器,同时解决粘包拆包问题
+// .addLast(new ProtobufVarint32FrameDecoder())
+// .addLast(new ProtobufDecoder(Payload.CommMsg.getDefaultInstance()))
+// .addLast(new ProtobufVarint32LengthFieldPrepender())
+// .addLast(new ProtobufEncoder())
+
+// .addLast(new LineBasedFrameDecoder(2048))
+// .addLast(new StringDecoder())
+// .addLast(new StringEncoder())
+
+ // 加载业务处理器
+ .addLast(new NettyHeartbeatHandler(channelRepository))
+ .addLast(businessHandler);
+// .addLast(new EchoServerHandler());
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerProperties.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerProperties.java
new file mode 100644
index 0000000..02f515d
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/NettyServerProperties.java
@@ -0,0 +1,31 @@
+package com.docus.server.common.netty.server;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * netty服务端配置
+ */
+@Data
+@Component
+@ConfigurationProperties(prefix = NettyServerProperties.PREFIX)
+public class NettyServerProperties {
+
+ public static final String PREFIX = "netty.server";
+
+ // 读空闲等待时间
+ private int readerIdleTimeSeconds = 30;
+
+ // 写空闲等待时间
+ private int writerIdleTimeSeconds;
+
+ // 读写空闲等待时间
+ private int allIdleTimeSeconds;
+
+ private Integer port;
+
+ private int bossThreadCount;
+
+ private int workerThreadCount;
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java
new file mode 100644
index 0000000..600aa47
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/EchoServerHandler.java
@@ -0,0 +1,77 @@
+package com.docus.server.common.netty.server.handler;
+
+import com.docus.core.util.json.JSON;
+import com.docus.server.common.netty.CommMsg;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.CharsetUtil;
+
+import java.util.Date;
+
+public class EchoServerHandler extends ChannelInboundHandlerAdapter {
+
+
+ //客户端连接上来,没有心跳前,属于离线状态,有心跳后,才属于在线状态。
+ //客户端断开后,自动剔除,agent。
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelRegistered();
+ Channel incoming = ctx.channel();
+ System.out.println("NettyClient:" + incoming.remoteAddress() + "在线1");
+ }
+
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelUnregistered();
+ Channel incoming = ctx.channel();
+ System.out.println("NettyClient:" + incoming.remoteAddress() + "在线2");
+ }
+
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelActive();
+ Channel incoming = ctx.channel();
+ System.out.println("NettyClient:" + incoming.remoteAddress() + "在线3");
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ ByteBuf buf = (ByteBuf) msg;
+ //创建目标大小的数组
+ byte[] barray = new byte[buf.readableBytes()];
+ //把数据从bytebuf转移到byte[]
+ buf.getBytes(0, barray);
+ //将byte[]转成字符串用于打印
+ String str = new String(barray);
+
+ CommMsg commMsg = JSON.fromJSONWithGeneric(str, new TypeReference>() {
+ });
+
+ if (str.length() > 0) {
+ System.out.println(str);
+ System.out.println("收到消息回复一条消息给客户端");
+ System.out.println("client channelActive..");
+ ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端发一条数据给客户端" + new Date().toString(), CharsetUtil.UTF_8)); // 必须有flush
+ System.out.flush();
+ } else {
+ System.out.println("不能读啊");
+ }
+ } finally {
+ }
+ }
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ System.out.println("server occur exception:" + cause.getMessage());
+ cause.printStackTrace();
+ ctx.close(); // 关闭发生异常的连接
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
new file mode 100644
index 0000000..9770f10
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
@@ -0,0 +1,144 @@
+package com.docus.server.common.netty.server.handler;
+
+import com.docus.core.util.Func;
+import com.docus.core.util.StringUtils;
+import com.docus.core.util.json.JSON;
+import com.docus.server.common.MsgConstants;
+import com.docus.server.common.netty.CommMsg;
+import com.docus.server.common.netty.server.ChannelRepository;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.OnlineStateEnum;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.net.InetSocketAddress;
+
+/**
+ * 业务消息处理
+ */
+@Slf4j
+@ChannelHandler.Sharable
+@Component
+public class NettyBusinessHandler extends SimpleChannelInboundHandler {
+
+ private static final ChannelGroup DEFAULT_CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ @Resource
+ private ChannelRepository repository;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
+ ByteBuf buf = (ByteBuf) msg;
+ //创建目标大小的数组
+ byte[] barray = new byte[buf.readableBytes()];
+ //把数据从bytebuf转移到byte[]
+ buf.getBytes(0, barray);
+ //将byte[]转成字符串用于打印
+ String message = new String(barray);
+
+ //空消息不处理
+ if (!StringUtils.hasText(message)) {
+ return;
+ }
+
+ CommMsg commMsg = JSON.fromJSONWithGeneric(message, new TypeReference>() {
+ });
+
+ String messageType = commMsg.getMessageType();
+ String messageTime = commMsg.getMessageTime();
+ TerminatorContent messageContent = commMsg.getContent();
+
+ if (messageType.equals(MsgConstants.TERMINATOR_RESTART)) {
+ log.info("接受到终端重启命令,内容{}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.COLLECTOR_RESTART)) {
+ log.info("收到采集器重启命令,内容{}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.VIRTUAL_RESTART)) {
+ log.info("收到虚拟机重启命令,内容{}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_FILE)) {
+ log.info("收到更新采集器文件命令,内容{}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.UPDATE_COLLECTOR_CONFIG)) {
+ log.info("收到更新采集器配置命令,内容{}", messageContent);
+ }
+
+ if (messageType.equals(MsgConstants.SCH_DISTRIBUTE_TASKS)) {
+ log.info("收到采集调度器下发任务命令,内容{}", messageContent);
+ }
+ }
+
+ /**
+ * netty client 上线
+ */
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelRegistered();
+
+ InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+ String clientIp = ipSocket.getAddress().getHostAddress();
+ log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
+
+ String clientId = repository.getClientKey(ctx.channel());
+
+ if (Func.isBlank(clientId)) {
+
+ NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
+ nettyTerminatorDTO.setTerminatorIp(clientIp);
+ nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
+ nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
+
+ repository.put(nettyTerminatorDTO, ctx.channel());
+ }
+
+ DEFAULT_CHANNEL_GROUP.add(ctx.channel());
+
+ System.out.println(ctx.channel().remoteAddress() + " 上线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
+ }
+
+ /**
+ * netty client 下线
+ */
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ Channel channel = ctx.channel();
+ System.out.println(channel.remoteAddress() + " 下线," + "【采集器-终端】在线数量:" + DEFAULT_CHANNEL_GROUP.size());
+ String clientId = repository.getClientKey(channel);
+ log.error("客户端下线,终端连接:{}", clientId);
+ //移除终端,终端离线
+ if (clientId != null) {
+ repository.remove(clientId);
+ }
+ }
+
+ /**
+ * netty exception 通道异常
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ Channel channel = ctx.channel();
+ if (channel.isActive()) {
+ ctx.close();
+ }
+
+ super.exceptionCaught(ctx, cause);
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
new file mode 100644
index 0000000..6bac1c7
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
@@ -0,0 +1,104 @@
+package com.docus.server.common.netty.server.handler;
+
+import com.docus.core.util.Func;
+import com.docus.core.util.StringUtils;
+import com.docus.core.util.json.JSON;
+import com.docus.server.common.MsgConstants;
+import com.docus.server.common.netty.CommMsg;
+import com.docus.server.common.netty.server.ChannelRepository;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.OnlineStateEnum;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+
+/**
+ * 客户端和服务端心跳
+ */
+@Slf4j
+@ChannelHandler.Sharable
+public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ByteBuf buf = (ByteBuf) msg;
+ //创建目标大小的数组
+ byte[] barray = new byte[buf.readableBytes()];
+ //把数据从bytebuf转移到byte[]
+ buf.getBytes(0, barray);
+ //将byte[]转成字符串用于打印
+ String message = new String(barray);
+ //空消息不处理
+ if (!StringUtils.hasText(message)) {
+ return;
+ }
+
+ CommMsg commMsg = JSON.fromJSONWithGeneric(message, new TypeReference>() {
+ });
+
+ String messageType = commMsg.getMessageType();
+ String messageTime = commMsg.getMessageTime();
+ TerminatorContent messageContent = commMsg.getContent();
+
+ if (messageType.equals(MsgConstants.HEARTBEAT_REQUEST)) {
+
+ log.info("接收到客户端的心跳");
+
+
+ log.info("接受到【采集器-终端】的心跳消息:消息类型={},消息时间={},消息内容={}", messageType, messageTime, messageContent);
+
+ InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+ String clientIp = ipSocket.getAddress().getHostAddress();
+
+ log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
+
+ String clientKey = repository.getClientKey(ctx.channel());
+
+ if (Func.isNotBlank(clientKey)) {
+
+ NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
+ nettyTerminatorDTO.setTerminatorIp(clientIp);
+ nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
+ nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
+
+ //将ip和channel进行映射
+ repository.put(nettyTerminatorDTO, ctx.channel());
+ }
+
+ } else {
+
+ if (ctx.channel().isOpen()) {
+ //触发下一个handler
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ }
+
+ private ChannelRepository repository;
+
+ public NettyHeartbeatHandler(ChannelRepository repository) {
+ this.repository = repository;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ String clientId = repository.getClientKey(ctx.channel());
+ log.error("通道发生异常,终端连接:{}", clientId);
+ //移除终端,终端离线
+ if (clientId != null) {
+ repository.remove(clientId);
+ }
+ if (ctx.channel().isActive()) {
+ ctx.close();
+ }
+ super.exceptionCaught(ctx, cause);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/TerminatorContent.java b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/TerminatorContent.java
new file mode 100644
index 0000000..24e60b6
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/netty/server/handler/TerminatorContent.java
@@ -0,0 +1,22 @@
+package com.docus.server.common.netty.server.handler;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class TerminatorContent implements Serializable {
+
+ /*终端IP*/
+ private String terminatorIp;
+
+ /*终端名称*/
+ private String terminatorName;
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-terminal-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java
new file mode 100644
index 0000000..076813b
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java
@@ -0,0 +1,152 @@
+package com.docus.server.common.process;
+
+import com.docus.core.util.Func;
+import com.docus.log.context.TrackContext;
+import com.docus.log.processor.AbstractProcessor;
+import com.docus.server.common.netty.server.ChannelRepository;
+import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
+import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.RetryTaskEnum;
+import com.docus.server.enums.StateEnum;
+import com.docus.server.service.ISchCollectRecordRetryLogService;
+import com.docus.server.service.ISchCollectRecordService;
+import com.docus.server.service.ISchTerminatorService;
+import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
+import com.docus.server.vo.scheduling.management.schcollectrecordretrylog.SchCollectRecordRetryLogVO;
+import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
+
+import javax.annotation.Resource;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * channel 管理
+ */
+public class ChannelProcessor extends AbstractProcessor {
+ @Resource
+ private ChannelRepository channelRepository;
+ @Resource
+ private ISchTerminatorService iSchTerminatorService;
+ @Resource
+ private ISchCollectRecordService iSchCollectRecordService;
+ @Resource
+ private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService;
+
+ @Override
+ protected Object doProcess(TrackContext context) {
+ String group = context.getGroup();
+
+ switch (group) {
+ case "SchCollectRecordController-edit":
+ return doSchCollectRecordControllerEdit(context);
+ case "SchTerminatorController":
+ return doSchTerminatorController(context);
+ case "RedisKeyExpirationService-expired":
+ return doRedisKeyExpired(context);
+ default:
+ return true;
+ }
+ }
+
+ private Object doRedisKeyExpired(TrackContext context) {
+ boolean error = context.isError();
+ String expireKey = (String) context.getArgs()[0];
+
+ String recordId = expireKey.substring(expireKey.lastIndexOf(":") + 1);
+
+ if (!error) {
+
+ if (expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:")) {
+ retryTask(recordId);
+ }
+ if (expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")) {
+ noRetryTask(recordId);
+
+ }
+ }
+ return null;
+ }
+
+ private boolean doSchCollectRecordControllerEdit(TrackContext context) {
+ return logCollectRecord(context);
+ }
+
+ private boolean doSchTerminatorController(TrackContext context) {
+ return logTerminator(context);
+ }
+
+ private boolean logCollectRecord(TrackContext context) {
+ boolean error = context.isError();
+
+ EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0];
+
+ if (!error) {
+
+ if (RetryTaskEnum.NO_RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) {
+
+ noRetryTask(String.valueOf(collectRecordDTO.getId()));
+
+ } else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) {
+
+ retryTask(String.valueOf(collectRecordDTO.getId()));
+
+ }
+
+ }
+ return error;
+
+ }
+
+ private void retryTask(String recordLogId) {
+ SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(recordLogId);
+
+ updateTerminatorState(retryLogVO.getTerminatorId(), retryLogVO.getTaskExecState());
+
+ }
+
+ private void noRetryTask(String recordId) {
+
+ SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(recordId);
+
+ updateTerminatorState(schCollectRecordVO.getTerminatorId(), schCollectRecordVO.getTaskExecState());
+ }
+
+ private void updateTerminatorState(Long terminatorId, StateEnum taskExecState) {
+ SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(terminatorId));
+
+ NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp()));
+
+ if (Func.isEmpty(nettyTerminatorDTO)) {
+ return;
+ }
+
+ List stateEnums = Arrays.asList(StateEnum.values());
+
+ if (stateEnums.contains(taskExecState)) {
+ nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
+ }
+ }
+
+ private boolean logTerminator(TrackContext context) {
+ boolean error = context.isError();
+ EditSchTerminatorDTO terminatorDTO = (EditSchTerminatorDTO) context.getArgs()[0];
+ if (!error) {
+ SchTerminatorVO terminatorVO = iSchTerminatorService.findById(String.valueOf(terminatorDTO.getId()));
+
+ NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(terminatorVO.getTerminatorIp());
+
+ if (Func.isNotBlank(terminatorDTO.getOnlyCollectorIds())) {
+ List onlyList = Arrays.stream(terminatorDTO.getOnlyCollectorIds().split(",")).map(String::valueOf).collect(Collectors.toList());
+ nettyTerminatorDTO.setOnlyCollectorIds(onlyList);
+ }
+ if (Func.isNotBlank(terminatorDTO.getPriorityCollectorIds())) {
+ List priList = Arrays.stream(terminatorDTO.getPriorityCollectorIds().split(",")).map(String::valueOf).collect(Collectors.toList());
+ nettyTerminatorDTO.setPriorityCollectorIds(priList);
+ }
+ }
+ return error;
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/process/CollectorVersionProcessor.java b/collector-terminal-management/src/main/java/com/docus/server/common/process/CollectorVersionProcessor.java
new file mode 100644
index 0000000..ac4c504
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/process/CollectorVersionProcessor.java
@@ -0,0 +1,72 @@
+package com.docus.server.common.process;
+
+import com.docus.infrastructure.redis.service.IdService;
+import com.docus.log.context.TrackContext;
+import com.docus.log.processor.AbstractProcessor;
+import com.docus.server.dto.scheduling.management.schcollector.UpdateSchCollectorDTO;
+import com.docus.server.entity.scheduling.management.SchCollector;
+import com.docus.server.entity.scheduling.management.SchCollectorVersion;
+import com.docus.server.entity.scheduling.management.SchCollectorVersionLog;
+import com.docus.server.entity.scheduling.management.SchSystemParams;
+import com.docus.server.enums.StateEnum;
+import com.docus.server.infrastructure.dao.ISchCollectorDao;
+import com.docus.server.infrastructure.dao.ISchCollectorVersionDao;
+import com.docus.server.infrastructure.dao.ISchCollectorVersionLogDao;
+import com.docus.server.infrastructure.dao.ISchSystemParamsDao;
+
+import javax.annotation.Resource;
+
+/**
+ * 日志管理
+ */
+public class CollectorVersionProcessor extends AbstractProcessor {
+ @Resource
+ private ISchCollectorVersionLogDao iSchCollectorVersionLogDao;
+ @Resource
+ private ISchCollectorVersionDao iSchCollectorVersionDao;
+ @Resource
+ private ISchSystemParamsDao iSchSystemParamsDao;
+ @Resource
+ private ISchCollectorDao iSchCollectorDao;
+ @Resource
+ private IdService idService;
+
+ @Override
+ public Object beforeProcess(TrackContext context) {
+ super.beforeProcess(context);
+ UpdateSchCollectorDTO updateDTO = (UpdateSchCollectorDTO) context.getArgs()[0];
+ Long collectorId = updateDTO.getCollectorId();
+ SchCollector schCollector = iSchCollectorDao.findOneBy("collectorId", collectorId);
+ SchCollectorVersion preVersion = iSchCollectorVersionDao.findById(schCollector.getCollectorVersionId());
+ return preVersion.getCollectVersion();
+ }
+
+ @Override
+ protected Object doProcess(TrackContext context) {
+ logProcess(context);
+ return null;
+ }
+
+ private void logProcess(TrackContext context) {
+
+ boolean error = context.isError();
+
+ UpdateSchCollectorDTO updateDTO = (UpdateSchCollectorDTO) context.getArgs()[0];
+ Long collectorId = updateDTO.getCollectorId();
+ String preVersion = (String) context.getBeforeResult();
+ SchCollectorVersion curVersion = iSchCollectorVersionDao.findById(updateDTO.getCollectorVersionId());
+ SchSystemParams schSystemParams = iSchSystemParamsDao.findOneBy("paramValue", collectorId);
+
+ SchCollectorVersionLog log = new SchCollectorVersionLog();
+ log.setId(idService.getDateSeq());
+ log.setCollectorId(collectorId);
+ log.setOperationModule(context.getGroup());
+ log.setOperationType(context.getAction());
+ log.setOperationDesc(context.getDesc());
+ log.setOperationContent(String.format("采集器:%s,上一个版本是:%s,更换成当前版本是:%s", schSystemParams.getParamName(), preVersion, curVersion.getCollectVersion()));
+ log.setState(error ? StateEnum.FAIL : StateEnum.OK);
+
+ iSchCollectorVersionLogDao.save(log);
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/process/TcpProcessor.java b/collector-terminal-management/src/main/java/com/docus/server/common/process/TcpProcessor.java
new file mode 100644
index 0000000..49831e6
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/process/TcpProcessor.java
@@ -0,0 +1,62 @@
+package com.docus.server.common.process;
+
+import com.docus.core.util.json.JSON;
+import com.docus.log.context.TrackContext;
+import com.docus.log.processor.AbstractProcessor;
+import com.docus.server.common.MsgConstants;
+import com.docus.server.dto.scheduling.management.schcollector.UpdateSchCollectorDTO;
+import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
+import com.docus.server.entity.scheduling.management.SchCollectorVersionFile;
+import com.docus.server.service.ICommMsgService;
+import com.docus.server.service.ISchCollectorVersionFileService;
+import com.docus.server.service.ISchCollectorVersionService;
+import com.docus.server.vo.scheduling.management.schcollectorversion.SchCollectorVersionVO;
+import com.docus.server.vo.scheduling.management.schcollectorversion.TcpSchCollectorVersionContentVO;
+import com.google.common.collect.Lists;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * tcp 管理
+ */
+public class TcpProcessor extends AbstractProcessor {
+ @Resource
+ private ICommMsgService iCommMsgService;
+ @Resource
+ private ISchCollectorVersionFileService iSchCollectorVersionFileService;
+ @Resource
+ private ISchCollectorVersionService iSchCollectorVersionService;
+
+ @Override
+ protected Object doProcess(TrackContext context) {
+ return logProcess(context);
+ }
+
+ private boolean logProcess(TrackContext context) {
+ boolean error = context.isError();
+ if (!error) {
+ UpdateSchCollectorDTO updateSchCollectorDTO = (UpdateSchCollectorDTO) context.getArgs()[0];
+ Long collectorId = updateSchCollectorDTO.getCollectorId();
+ Long collectorVersionId = updateSchCollectorDTO.getCollectorVersionId();
+
+ SchCollectorVersionFile schCollectorVersionFile = iSchCollectorVersionFileService.findByCollectorIdAndVersionId(collectorId, collectorVersionId);
+ SchCollectorVersionVO schCollectorVersionVO = iSchCollectorVersionService.findById(String.valueOf(collectorVersionId));
+
+ TcpSchCollectorVersionContentVO tcpSchCollectorVersionContentVO = new TcpSchCollectorVersionContentVO();
+ tcpSchCollectorVersionContentVO.setCollectorId(collectorId);
+ tcpSchCollectorVersionContentVO.setFilePath(schCollectorVersionFile.getFilePath());
+ tcpSchCollectorVersionContentVO.setCollectorVersion(schCollectorVersionVO.getCollectVersion());
+
+ List tcpSchCollectorVersionContentVOList = Lists.newArrayList(tcpSchCollectorVersionContentVO);
+
+ CommMsgDTO commMsgDTO = CommMsgDTO.builder()
+ .content(JSON.toJSON(tcpSchCollectorVersionContentVOList))
+ .messageType(MsgConstants.UPDATE_COLLECTOR_FILE)
+ .build();
+
+ iCommMsgService.clientsCommand(commMsgDTO);
+ }
+ return error;
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/serializer/DefJsonSerializerModule.java b/collector-terminal-management/src/main/java/com/docus/server/common/serializer/DefJsonSerializerModule.java
new file mode 100644
index 0000000..b5033ed
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/serializer/DefJsonSerializerModule.java
@@ -0,0 +1,25 @@
+package com.docus.server.common.serializer;
+
+import com.docus.core.util.Convert;
+import com.docus.infrastructure.web.json.JsonSerializerModule;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DefJsonSerializerModule extends JsonSerializerModule {
+ public DefJsonSerializerModule() {
+
+ addSerializer(Date.class, new JsonSerializer() {
+ @Override
+ public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
+ jsonGenerator.writeString(date == null ? null : Convert.toString(date, Convert.DATA_FORMAT_DATETIME_SLASH, TimeZone.getTimeZone(ZoneId.systemDefault())));
+ }
+ });
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/serializer/StringToDateConverter.java b/collector-terminal-management/src/main/java/com/docus/server/common/serializer/StringToDateConverter.java
new file mode 100644
index 0000000..4663077
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/serializer/StringToDateConverter.java
@@ -0,0 +1,64 @@
+package com.docus.server.common.serializer;
+
+import org.springframework.core.convert.converter.Converter;
+import org.springframework.stereotype.Component;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 全局handler前日期统一处理
+ */
+@Component
+public class StringToDateConverter implements Converter {
+
+ private static final List DATE_FORMARTS = new ArrayList<>(4);
+
+ static {
+ DATE_FORMARTS.add("yyyy-MM");
+ DATE_FORMARTS.add("yyyy-MM-dd");
+ DATE_FORMARTS.add("yyyy-MM-dd hh:mm");
+ DATE_FORMARTS.add("yyyy-MM-dd hh:mm:ss");
+ }
+
+ @Override
+ public Date convert(String source) {
+ String value = source.trim();
+ if ("".equals(value)) {
+ return null;
+ }
+ if (source.matches("^\\d{4}-\\d{1,2}$")) {
+ return parseDate(source, DATE_FORMARTS.get(0));
+ } else if (source.matches("^\\d{4}-\\d{1,2}-\\d{1,2}$")) {
+ return parseDate(source, DATE_FORMARTS.get(1));
+ } else if (source.matches("^\\d{4}-\\d{1,2}-\\d{1,2} {1}\\d{1,2}:\\d{1,2}$")) {
+ return parseDate(source, DATE_FORMARTS.get(2));
+ } else if (source.matches("^\\d{4}-\\d{1,2}-\\d{1,2} {1}\\d{1,2}:\\d{1,2}:\\d{1,2}$")) {
+ return parseDate(source, DATE_FORMARTS.get(3));
+ } else {
+ throw new IllegalArgumentException("Invalid boolean value '" + source + "'");
+ }
+ }
+
+ /**
+ * 格式化日期
+ *
+ * @param dateStr String 字符型日期
+ * @param format String 格式
+ * @return Date 日期
+ */
+ public Date parseDate(String dateStr, String format) {
+ Date date = null;
+ try {
+ DateFormat dateFormat = new SimpleDateFormat(format);
+ date = dateFormat.parse(dateStr);
+ } catch (Exception e) {
+ System.out.println(String.format("日期%s转换%s错误", dateStr, format));
+ }
+ return date;
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/test/DispatchService.java b/collector-terminal-management/src/main/java/com/docus/server/common/test/DispatchService.java
new file mode 100644
index 0000000..8132d99
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/test/DispatchService.java
@@ -0,0 +1,107 @@
+package com.docus.server.common.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DispatchService {
+
+ public void dispatch() {
+
+
+ //获取所有空闲的终端
+ List terminalList = new ArrayList<>();
+ List taskInfos = this.getTaskInfos(terminalList.size());
+
+
+
+ //只采集,有优先级的
+ for (Terminal terminal : terminalList) {
+ for (TaskInfo taskInfo : taskInfos) {
+ //先找出有只采集的任务。
+ if (terminal.getOnlyTags().contains(taskInfo.getCollectType())) {
+ //把这个任务派给这个终端,并且把这个终端设置成繁忙
+ if (terminal.getPriorityTags().contains(taskInfo.getCollectType())) {
+ //把这个任务派给这个终端
+ terminal.setState(1);
+ return;
+ }
+ }
+ }
+ }
+
+ //只采集没有优先级
+ for (Terminal terminal : terminalList) {
+ //把刚才已经分配任务过的采集器排除
+ if (terminal.getState() == 1) {
+ continue;
+ }
+
+ for (TaskInfo taskInfo : taskInfos) {
+ //先找出有只采集的任务。
+ if (terminal.getOnlyTags().contains(taskInfo.getCollectType())) {
+ //把这个任务派给这个终端,并且把这个终端设置成繁忙
+
+ terminal.setState(1);
+ return;
+ }
+ }
+ }
+
+
+ //无只采集,有优先级
+ for (Terminal terminal : terminalList) {
+
+ //把刚才已经分配任务过的采集器排除
+ if (terminal.getState() == 1) {
+ continue;
+ }
+
+ for (TaskInfo taskInfo : taskInfos) {
+ //先找出有只采集的任务。
+ if (terminal.getPriorityTags().contains(taskInfo.getCollectType())) {
+ //把这个任务派给这个终端
+ terminal.setState(1);
+ return;
+ }
+ }
+ }
+
+
+ //无只采集,无优先级
+ for (Terminal terminal : terminalList) {
+
+ //把刚才已经分配任务过的采集器排除
+ if (terminal.getState() == 1) {
+ continue;
+ }
+
+ for (TaskInfo taskInfo : taskInfos) {
+ //先找出有只采集的任务。
+ //把这个任务派给这个终端
+ terminal.setState(1);
+ return;
+ }
+ }
+
+ }
+
+ private void dispatchTask(List taskInfos, Terminal terminal) {
+ for (TaskInfo taskInfo : taskInfos) {
+ //先找出有只采集的任务。
+ if (terminal.getOnlyTags().contains(taskInfo.getCollectType())) {
+ //把这个任务派给这个终端
+ return;
+ }
+ if (terminal.getPriorityTags().contains(taskInfo.getCollectType())) {
+ //把这个任务派给这个终端
+ return;
+ }
+ }
+ }
+
+
+ public List getTaskInfos(int size) {
+ return null;
+ }
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/test/TaskInfo.java b/collector-terminal-management/src/main/java/com/docus/server/common/test/TaskInfo.java
new file mode 100644
index 0000000..08f8b45
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/test/TaskInfo.java
@@ -0,0 +1,14 @@
+package com.docus.server.common.test;
+
+import lombok.Data;
+
+@Data
+public class TaskInfo {
+
+ private Long taskId;
+
+ private String collectType;
+
+ private String info;
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/common/test/Terminal.java b/collector-terminal-management/src/main/java/com/docus/server/common/test/Terminal.java
new file mode 100644
index 0000000..4af50fa
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/common/test/Terminal.java
@@ -0,0 +1,16 @@
+package com.docus.server.common.test;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class Terminal {
+
+ private String id;
+ private String collectType;
+ private List priorityTags;
+ private List onlyTags;
+ private Integer state;
+
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/controller/CommMsgController.java b/collector-terminal-management/src/main/java/com/docus/server/controller/CommMsgController.java
new file mode 100644
index 0000000..437f03c
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/controller/CommMsgController.java
@@ -0,0 +1,25 @@
+package com.docus.server.controller;
+
+import com.docus.server.api.scheduling.management.CommMsgApi;
+import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
+import com.docus.server.service.ICommMsgService;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+/**
+ * 通用消息体 TCP API
+ *
+ * @author AutoGenerator
+ * @since 2023-07-15
+ */
+@RestController
+public class CommMsgController implements CommMsgApi {
+ @Resource
+ private ICommMsgService iCommMsgService;
+
+ @Override
+ public void clientCommand(CommMsgDTO commMsgDTO) {
+ iCommMsgService.clientCommand(commMsgDTO);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/controller/FileController.java b/collector-terminal-management/src/main/java/com/docus/server/controller/FileController.java
new file mode 100644
index 0000000..dc42eab
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/controller/FileController.java
@@ -0,0 +1,31 @@
+package com.docus.server.controller;
+
+import com.docus.server.api.scheduling.management.FileApi;
+import com.docus.server.service.IFileUploadService;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
+
+import javax.annotation.Resource;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * 文件上传下载 API
+ *
+ * @author AutoGenerator
+ * @since 2023-07-15
+ */
+@RestController
+public class FileController implements FileApi {
+ @Resource
+ private IFileUploadService iFileUploadService;
+
+ @Override
+ public void downloadFile(String filePath, HttpServletResponse response) throws Exception {
+ iFileUploadService.downloadFile(filePath, response);
+ }
+
+ @Override
+ public void uploadFile(MultipartFile[] multipartFiles, String pathKey) throws Exception {
+ iFileUploadService.uploadFile(multipartFiles, pathKey);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectErrorLogController.java b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectErrorLogController.java
new file mode 100644
index 0000000..ca41df1
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectErrorLogController.java
@@ -0,0 +1,81 @@
+package com.docus.server.controller;
+
+import com.docus.infrastructure.web.request.SearchDTO;
+import com.docus.infrastructure.web.response.PageResult;
+import com.docus.server.api.scheduling.management.SchCollectErrorLogApi;
+import com.docus.server.dto.scheduling.management.schcollecterrorlog.AddSchCollectErrorLogDTO;
+import com.docus.server.dto.scheduling.management.schcollecterrorlog.DeleteSchCollectErrorLogDTO;
+import com.docus.server.dto.scheduling.management.schcollecterrorlog.EditSchCollectErrorLogDTO;
+import com.docus.server.service.ISchCollectErrorLogService;
+import com.docus.server.vo.scheduling.management.schcollecterrorlog.SchCollectErrorLogVO;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
+
+import javax.annotation.Resource;
+
+/**
+ * 采集器异常日志 控制器类
+ *
+ * @author AutoGenerator
+ * @since 2023-07-15
+ */
+@RestController
+public class SchCollectErrorLogController implements SchCollectErrorLogApi {
+ @Resource
+ private ISchCollectErrorLogService iSchCollectErrorLogService;
+
+ /**
+ * 按主键查询
+ *
+ * @param id 主键Id
+ * @return 实体
+ */
+ @Override
+ public SchCollectErrorLogVO findById(String id) {
+ return iSchCollectErrorLogService.findById(id);
+ }
+
+ /**
+ * 关键字搜索
+ *
+ * @param searchDTO 搜索参数
+ * @return 分页列表
+ */
+ @Override
+ public PageResult search(SearchDTO searchDTO) {
+ return iSchCollectErrorLogService.search(searchDTO);
+ }
+
+ /**
+ * 新增
+ *
+ * @param addSchCollectErrorLogDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public boolean add(AddSchCollectErrorLogDTO addSchCollectErrorLogDTO, MultipartFile[] multipartFiles) throws Exception {
+ return iSchCollectErrorLogService.add(addSchCollectErrorLogDTO, multipartFiles);
+ }
+
+ /**
+ * 编辑
+ *
+ * @param editSchCollectErrorLogDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public boolean edit(EditSchCollectErrorLogDTO editSchCollectErrorLogDTO) {
+ return iSchCollectErrorLogService.edit(editSchCollectErrorLogDTO);
+ }
+
+ /**
+ * 批量删除
+ *
+ * @param deleteSchCollectErrorLogDTO 删除参数
+ * @return 成功或失败
+ */
+ @Override
+ public int delete(DeleteSchCollectErrorLogDTO deleteSchCollectErrorLogDTO) {
+ return iSchCollectErrorLogService.delete(deleteSchCollectErrorLogDTO);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java
new file mode 100644
index 0000000..f39a5a1
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java
@@ -0,0 +1,103 @@
+package com.docus.server.controller;
+
+import com.docus.core.util.ParamsUtils;
+import com.docus.core.util.json.JSON;
+import com.docus.infrastructure.web.request.SearchDTO;
+import com.docus.infrastructure.web.response.PageResult;
+import com.docus.log.annotation.TrackGroup;
+import com.docus.server.api.scheduling.management.SchCollectRecordApi;
+import com.docus.server.common.SchCollectorTask;
+import com.docus.server.common.process.ChannelProcessor;
+import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
+import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
+import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
+import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
+import com.docus.server.dto.scheduling.management.schcollectrecord.RetrySchCollectRecordDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecord;
+import com.docus.server.enums.RetryTaskEnum;
+import com.docus.server.service.ISchCollectRecordService;
+import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+/**
+ * 采集记录表 控制器类
+ *
+ * @author AutoGenerator
+ * @since 2023-07-15
+ */
+@RestController
+public class SchCollectRecordController implements SchCollectRecordApi {
+ @Resource
+ private ISchCollectRecordService iSchCollectRecordService;
+ @Resource
+ private SchCollectorTask schedulerTask;
+
+ /**
+ * 按主键查询
+ *
+ * @param id 主键Id
+ * @return 实体
+ */
+ @Override
+ public SchCollectRecordVO findById(String id) {
+ return iSchCollectRecordService.findById(id);
+ }
+
+ /**
+ * 关键字搜索
+ *
+ * @param searchDTO 搜索参数
+ * @return 分页列表
+ */
+ @Override
+ public PageResult search(SearchDTO searchDTO) {
+ return iSchCollectRecordService.search(searchDTO);
+ }
+
+ /**
+ * 新增
+ *
+ * @param addSchCollectRecordDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
+ return iSchCollectRecordService.add(addSchCollectRecordDTO);
+ }
+
+ /**
+ * 编辑
+ *
+ * @param editSchCollectRecordDTO 编辑参数
+ * @return 成功或失败
+ */
+ @TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class)
+ @Override
+ public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) {
+ return iSchCollectRecordService.edit(editSchCollectRecordDTO);
+ }
+
+ /**
+ * 批量删除
+ *
+ * @param deleteSchCollectRecordDTO 删除参数
+ * @return 成功或失败
+ */
+ @Override
+ public int delete(DeleteSchCollectRecordDTO deleteSchCollectRecordDTO) {
+ return iSchCollectRecordService.delete(deleteSchCollectRecordDTO);
+ }
+
+ @Override
+ public void retryTask(RetrySchCollectRecordDTO retrySchCollectRecordDTO) {
+ ReportDownTwoDTO report = JSON.fromJSON(retrySchCollectRecordDTO.getTaskOriginJson(), ReportDownTwoDTO.class);
+
+ report.setParams(ParamsUtils
+ .addParam("collectRecordId", retrySchCollectRecordDTO.getId())
+ .addParam("isRetryTask", RetryTaskEnum.RETRY_TASK.getValue())
+ .param());
+ schedulerTask.addRetryTask(report);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectRecordRetryLogController.java b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectRecordRetryLogController.java
new file mode 100644
index 0000000..cad13fa
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectRecordRetryLogController.java
@@ -0,0 +1,81 @@
+package com.docus.server.controller;
+
+import com.docus.infrastructure.web.request.SearchDTO;
+import com.docus.infrastructure.web.response.PageResult;
+import com.docus.server.api.scheduling.management.SchCollectRecordRetryLogApi;
+import com.docus.server.dto.scheduling.management.schcollectrecordretrylog.AddSchCollectRecordRetryLogDTO;
+import com.docus.server.dto.scheduling.management.schcollectrecordretrylog.DeleteSchCollectRecordRetryLogDTO;
+import com.docus.server.dto.scheduling.management.schcollectrecordretrylog.EditSchCollectRecordRetryLogDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
+import com.docus.server.service.ISchCollectRecordRetryLogService;
+import com.docus.server.vo.scheduling.management.schcollectrecordretrylog.SchCollectRecordRetryLogVO;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+/**
+ * 采集记录表重试表 控制器类
+ *
+ * @author AutoGenerator
+ * @since 2023-07-15
+ */
+@RestController
+public class SchCollectRecordRetryLogController implements SchCollectRecordRetryLogApi {
+ @Resource
+ private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService;
+
+ /**
+ * 按主键查询
+ *
+ * @param id 主键Id
+ * @return 实体
+ */
+ @Override
+ public SchCollectRecordRetryLogVO findById(String id) {
+ return iSchCollectRecordRetryLogService.findById(id);
+ }
+
+ /**
+ * 关键字搜索
+ *
+ * @param searchDTO 搜索参数
+ * @return 分页列表
+ */
+ @Override
+ public PageResult search(SearchDTO searchDTO) {
+ return iSchCollectRecordRetryLogService.search(searchDTO);
+ }
+
+ /**
+ * 新增
+ *
+ * @param addSchCollectRecordRetryLogDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public SchCollectRecordRetryLog add(AddSchCollectRecordRetryLogDTO addSchCollectRecordRetryLogDTO) {
+ return iSchCollectRecordRetryLogService.add(addSchCollectRecordRetryLogDTO);
+ }
+
+ /**
+ * 编辑
+ *
+ * @param editSchCollectRecordRetryLogDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public boolean edit(EditSchCollectRecordRetryLogDTO editSchCollectRecordRetryLogDTO) {
+ return iSchCollectRecordRetryLogService.edit(editSchCollectRecordRetryLogDTO);
+ }
+
+ /**
+ * 批量删除
+ *
+ * @param deleteSchCollectRecordRetryLogDTO 删除参数
+ * @return 成功或失败
+ */
+ @Override
+ public int delete(DeleteSchCollectRecordRetryLogDTO deleteSchCollectRecordRetryLogDTO) {
+ return iSchCollectRecordRetryLogService.delete(deleteSchCollectRecordRetryLogDTO);
+ }
+}
diff --git a/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectorConfigController.java b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectorConfigController.java
new file mode 100644
index 0000000..e630329
--- /dev/null
+++ b/collector-terminal-management/src/main/java/com/docus/server/controller/SchCollectorConfigController.java
@@ -0,0 +1,94 @@
+package com.docus.server.controller;
+
+import com.docus.core.util.json.JSON;
+import com.docus.infrastructure.web.request.SearchDTO;
+import com.docus.infrastructure.web.response.PageResult;
+import com.docus.server.api.scheduling.management.SchCollectorConfigApi;
+import com.docus.server.dto.scheduling.management.schcollectorconfig.AddSchCollectorConfigDTO;
+import com.docus.server.dto.scheduling.management.schcollectorconfig.DeleteSchCollectorConfigDTO;
+import com.docus.server.dto.scheduling.management.schcollectorconfig.EditSchCollectorConfigDTO;
+import com.docus.server.service.ISchCollectorConfigService;
+import com.docus.server.vo.scheduling.management.schcollectorconfig.SchCollectorConfigVO;
+import com.google.common.collect.Maps;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+
+/**
+ * 采集器配置 控制器类
+ *
+ * @author AutoGenerator
+ * @since 2023-07-15
+ */
+@RestController
+public class SchCollectorConfigController implements SchCollectorConfigApi {
+ @Resource
+ private ISchCollectorConfigService iSchCollectorConfigService;
+
+ /**
+ * 按主键查询
+ *
+ * @param id 主键Id
+ * @return 实体
+ */
+ @Override
+ public SchCollectorConfigVO findById(String id) {
+ return iSchCollectorConfigService.findById(id);
+ }
+
+ /**
+ * 关键字搜索
+ *
+ * @param searchDTO 搜索参数
+ * @return 分页列表
+ */
+ @Override
+ public PageResult search(SearchDTO searchDTO) {
+ return iSchCollectorConfigService.search(searchDTO);
+ }
+
+ /**
+ * 新增
+ *
+ * @param addSchCollectorConfigDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public boolean add(AddSchCollectorConfigDTO addSchCollectorConfigDTO) {
+ return iSchCollectorConfigService.add(addSchCollectorConfigDTO);
+ }
+
+ /**
+ * 编辑
+ *
+ * @param editSchCollectorConfigDTO 编辑参数
+ * @return 成功或失败
+ */
+ @Override
+ public boolean edit(EditSchCollectorConfigDTO editSchCollectorConfigDTO) {
+ return iSchCollectorConfigService.edit(editSchCollectorConfigDTO);
+ }
+
+ /**
+ * 批量删除
+ *
+ * @param deleteSchCollectorConfigDTO 删除参数
+ * @return 成功或失败
+ */
+ @Override
+ public int delete(DeleteSchCollectorConfigDTO deleteSchCollectorConfigDTO) {
+ return iSchCollectorConfigService.delete(deleteSchCollectorConfigDTO);
+ }
+
+
+ public static void main(String[] args) {
+
+ HashMap