diff --git a/collector-scheduling-management/pom.xml b/collector-scheduling-management/pom.xml
index a35b222..c1e79e0 100644
--- a/collector-scheduling-management/pom.xml
+++ b/collector-scheduling-management/pom.xml
@@ -6,7 +6,7 @@
1.0-SNAPSHOT
4.0.0
- collector-scheduling-management
+ collector-scheduling-management-linrf
Archetype - collector-scheduling-management
http://maven.apache.org
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java
index 1054bc0..052214c 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java
@@ -2,19 +2,20 @@ package com.docus.server.common;
import com.docus.core.util.DateUtil;
import com.docus.core.util.Func;
-import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository;
-import com.docus.server.convert.CommMsgConvert;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
-import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
+import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.RetryTaskEnum;
+import com.docus.server.service.ISchCollectRecordService;
+import com.docus.server.service.ISchTerminatorService;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
@@ -24,10 +25,8 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
-import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import java.util.Map;
@Component
@Slf4j
@@ -36,6 +35,10 @@ public class SchedulerTask {
private ChannelRepository channelRepository;
@Resource
private TaskDistributeApi taskDistributeApi;
+ @Resource
+ private ISchCollectRecordService iSchCollectRecordService;
+ @Resource
+ private ISchTerminatorService iSchTerminatorService;
//定时任务
// 5 * * * * ? 在每分钟的5秒执行
@@ -52,44 +55,29 @@ public class SchedulerTask {
return;
}
- List reportDownTwoDTOList;
-
- List> taskInfos = new ArrayList<>();
-
- Map reportDownTwoMap = Maps.newConcurrentMap();
- if (!CollectionUtils.isEmpty(terminalList)) {
- //:todo 任务平台需要修改发布任务策略
- reportDownTwoDTOList = getTask(terminalList.size());
-
- if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) {
- return;
- }
-
- reportDownTwoMap = ListUtils.toMap(reportDownTwoDTOList, ReportDownTwoDTO::getPatientId);
-
- taskInfos = ListUtils.select(reportDownTwoDTOList, ReportDownTwoDTO::getTasks);
+ //:todo 任务平台需要修改发布任务策略
+ List reportDownTwoDTOList = getTask(terminalList.size());
+ if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) {
+ return;
}
//只采集,有优先级的
for (NettyTerminatorDTO terminal : terminalList) {
- for (List taskInfo : taskInfos) {
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
- ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
- if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
- if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) {
+ if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) {
//把这个任务派给这个终端
terminal.setBusyState(BusyStateEnum.BUSY);
- String patientId = reportTaskTwoDto.getPatientId();
- if (reportDownTwoMap.containsKey(patientId)) {
- ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
- tcpToClient(terminal, reportDownTwoDTO);
- }
- return;
+ tcpToClient(terminal, report);
}
+ return;
}
}
}
@@ -101,18 +89,16 @@ public class SchedulerTask {
continue;
}
- for (List taskInfo : taskInfos) {
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
- ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
- if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
terminal.setBusyState(BusyStateEnum.BUSY);
- String patientId = reportTaskTwoDto.getPatientId();
- if (reportDownTwoMap.containsKey(patientId)) {
- ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
- tcpToClient(terminal, reportDownTwoDTO);
- }
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
return;
}
}
@@ -127,18 +113,16 @@ public class SchedulerTask {
continue;
}
- for (List taskInfo : taskInfos) {
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
- ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
- if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
+ if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端
terminal.setBusyState(BusyStateEnum.BUSY);
- String patientId = reportTaskTwoDto.getPatientId();
- if (reportDownTwoMap.containsKey(patientId)) {
- ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
- tcpToClient(terminal, reportDownTwoDTO);
- }
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
return;
}
}
@@ -153,40 +137,69 @@ public class SchedulerTask {
continue;
}
- for (List taskInfo : taskInfos) {
+ for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
//把这个任务派给这个终端
- ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
- String collectorId = reportTaskTwoDto.getCollectorId();
+ ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
terminal.setBusyState(BusyStateEnum.BUSY);
- String patientId = reportTaskTwoDto.getPatientId();
- if (reportDownTwoMap.containsKey(patientId)) {
- ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
- tcpToClient(terminal, reportDownTwoDTO);
- }
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
return;
}
}
+ //只采集,空闲的。
+ //获取只采集的任务,并且进行分配。
+ for (NettyTerminatorDTO terminal : terminalList) {
+ //把刚才已经分配任务过的采集器排除
+ if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
+ continue;
+ }
+
+ List onlyTaskInfos = getOnlyTaskInfos(terminal.getOnlyCollectorIds());
+
+
+ if (CollectionUtils.isEmpty(onlyTaskInfos) || Func.isBlank(onlyTaskInfos.get(0).getPatientId())) {
+ return;
+ }
+ for (ReportDownTwoDTO report : onlyTaskInfos) {
+ //将这条任务分配这个这个终端
+ //下发
+ terminal.setBusyState(BusyStateEnum.BUSY);
+ iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
+
+ tcpToClient(terminal, report);
+
+ return;
+ }
+ }
log.info("定时任务: 执行完毕");
- } catch (Exception e) {
+ } catch (
+ Exception e) {
log.error("定时任务执行出错", e);
}
+
}
- private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) {
+ private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) {
Channel channel = channelRepository.get(terminal.getTerminatorIp());
- CommMsgDTO commMsgDTO = CommMsgDTO.builder()
- .content(JSON.toJSON(messageContent))
- .messageType(MsgConstants.SCH_DISTRIBUTE_TASKS)
- .build();
+ SchCollectRecord schCollectRecord = iSchCollectRecordService.saveOrUpdateRecord(terminal, reportDownTwoDTO);
- CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO);
+ SchCollectorTaskDTO messageContent = new SchCollectorTaskDTO();
+ messageContent.setCollectorRecordId(schCollectRecord.getId());
+ messageContent.setIsRetry(String.valueOf(RetryTaskEnum.NO_RETRY_TASK.getValue()));
+ messageContent.setTaskInfo(reportDownTwoDTO);
- commMsg.setMessageTime(DateUtil.formatDateTime(new Date()));
+ CommMsg commMsg = CommMsg.builder()
+ .messageType(MsgConstants.SCH_DISTRIBUTE_TASKS)
+ .messageTime(DateUtil.formatDateTime(new Date()))
+ .content(JSON.toJSON(messageContent))
+ .build();
+ //tcp 下发任务到终端
if (channel != null) {
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
}
@@ -198,21 +211,171 @@ public class SchedulerTask {
//mock
- ReportDownTwoDTO task = taskDistributeApi.getTask("1");
- ReportDownTwoDTO task1 = taskDistributeApi.getTask("1");
-
- return Lists.newArrayList(task, task1);
+ String collectorId1 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"1\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292710565826560\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO1 = JSON.fromJSON(collectorId1, ReportDownTwoDTO.class);
+
+
+ String collectorId2 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"2\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292712465846272\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class);
+
+
+ String collectorId3 = "{\n" +
+ " \"createTime\": \"2023-01-09 19:26:11\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_297974\",\n" +
+ " \"admissTimes\": 21,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"普外二科(甲乳胸烧伤整形)\",\n" +
+ " \"patientId\": \"772389719349678080\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_297974\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"297974\",\n" +
+ " \"name\": \"曾美英\",\n" +
+ " \"patientId\": \"772389719349678080\"\n" +
+ " },\n" +
+ " \"patientId\": \"772389719349678080\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"3\",\n" +
+ " \"patientId\": \"772389719349678080\",\n" +
+ " \"taskId\": 838201379426750464\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
+
+
+ return Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3);
}
//根据采集器id类型,一次获取一批采集器类型任务
- private ArrayList getTask(List collectorIds) {
+ private List getOnlyTaskInfos(List collectorIds) {
// return taskDistributeApi.getTask(collectorIds.get(0));
//mock
- ReportDownTwoDTO task = taskDistributeApi.getTask("4");
- ReportDownTwoDTO task1 = taskDistributeApi.getTask("5");
- return Lists.newArrayList(task, task1);
+
+ String collectorId2 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"2\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292712465846272\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class);
+
+
+ String collectorId3 = "{\n" +
+ " \"createTime\": \"2022-12-03 12:39:30\",\n" +
+ " \"hospitals\": [\n" +
+ " {\n" +
+ " \"admissDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"admissId\": \"amid_999901\",\n" +
+ " \"admissTimes\": 1,\n" +
+ " \"disDate\": \"2023-12-31 01:01:01\",\n" +
+ " \"disDeptName\": \"22222\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"jzh\": \"jzh_999901\",\n" +
+ " \"patient\": {\n" +
+ " \"inpatientNo\": \"999901\",\n" +
+ " \"name\": \"ceshi\",\n" +
+ " \"patientId\": \"758878610105573376\"\n" +
+ " },\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"recordType\": \"1\",\n" +
+ " \"tasks\": [\n" +
+ " {\n" +
+ " \"collectorId\": \"3\",\n" +
+ " \"patientId\": \"758878610105573376\",\n" +
+ " \"taskId\": 834292883635392512\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
+
+ return Lists.newArrayList(reportDownTwoDTO2, reportDownTwoDTO3);
}
}
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java
index e42a8bd..9a8a58c 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java
@@ -24,7 +24,6 @@ import java.util.stream.Collectors;
@Component
@Slf4j
public class ChannelRepository {
-
@Resource
private ISchTerminatorService iSchTerminatorService;
@@ -46,7 +45,7 @@ public class ChannelRepository {
String terminatorIp = nettyTerminatorDTO.getTerminatorIp();
//更新数据库终端数据
- SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState());
+ SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO);
nettyTerminatorDTO.setId(schTerminatorVO.getId());
//缓存
@@ -99,7 +98,9 @@ public class ChannelRepository {
public void remove(String key) {
IP_CHANNEL_CACHE_MAP.remove(key);
- iSchTerminatorService.saveOrUpdate(key, OnlineStateEnum.OFFLINE);
+ NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
+ nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
+ iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO);
}
}
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
index b3f9acb..9770f10 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java
@@ -1,5 +1,6 @@
package com.docus.server.common.netty.server.handler;
+import com.docus.core.util.Func;
import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants;
@@ -93,17 +94,18 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler {
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
- Channel channel = repository.get(clientIp);
- if (channel != null && channel.isOpen()) {
- channel.close();
- }
- NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
- nettyTerminatorDTO.setTerminatorIp(clientIp);
- nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
- nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
+ String clientId = repository.getClientKey(ctx.channel());
+
+ if (Func.isBlank(clientId)) {
- repository.put(nettyTerminatorDTO, ctx.channel());
+ NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
+ nettyTerminatorDTO.setTerminatorIp(clientIp);
+ nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
+ nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
+
+ repository.put(nettyTerminatorDTO, ctx.channel());
+ }
DEFAULT_CHANNEL_GROUP.add(ctx.channel());
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
index 3f0849a..6bac1c7 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java
@@ -1,5 +1,6 @@
package com.docus.server.common.netty.server.handler;
+import com.docus.core.util.Func;
import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants;
@@ -10,7 +11,6 @@ import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -58,25 +58,27 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
- Channel channel = repository.get(clientIp);
- if (channel != null && channel.isOpen()) {
- channel.close();
- }
+ String clientKey = repository.getClientKey(ctx.channel());
+
+ if (Func.isNotBlank(clientKey)) {
- NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
- nettyTerminatorDTO.setTerminatorIp(clientIp);
- nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
- nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
+ NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
+ nettyTerminatorDTO.setTerminatorIp(clientIp);
+ nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
+ nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
- //将ip和channel进行映射
- repository.put(nettyTerminatorDTO, ctx.channel());
+ //将ip和channel进行映射
+ repository.put(nettyTerminatorDTO, ctx.channel());
+ }
} else {
+
if (ctx.channel().isOpen()) {
//触发下一个handler
ctx.fireChannelRead(msg);
}
}
+
}
private ChannelRepository repository;
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java
index c50033d..d9e5838 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java
@@ -4,9 +4,14 @@ import com.docus.core.util.Func;
import com.docus.log.context.TrackContext;
import com.docus.log.processor.AbstractProcessor;
import com.docus.server.common.netty.server.ChannelRepository;
+import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.enums.BusyStateEnum;
+import com.docus.server.enums.StateEnum;
+import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.service.ISchTerminatorService;
+import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
import javax.annotation.Resource;
@@ -22,13 +27,57 @@ public class ChannelProcessor extends AbstractProcessor {
private ChannelRepository channelRepository;
@Resource
private ISchTerminatorService iSchTerminatorService;
+ @Resource
+ private ISchCollectRecordService iSchCollectRecordService;
@Override
protected Object doProcess(TrackContext context) {
- return logProcess(context);
+ String group = context.getGroup();
+
+ switch (group) {
+ case "SchCollectRecordController-edit":
+ return doSchCollectRecordControllerEdit(context);
+ case "SchTerminatorController":
+ return doSchTerminatorController(context);
+ default:
+ return true;
+ }
+ }
+
+ private boolean doSchCollectRecordControllerEdit(TrackContext context) {
+ return logCollectRecord(context);
+ }
+
+ private boolean doSchTerminatorController(TrackContext context) {
+ return logTerminator(context);
+ }
+
+ private boolean logCollectRecord(TrackContext context) {
+ boolean error = context.isError();
+ EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0];
+
+ if (!error) {
+ SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId()));
+
+ SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId()));
+
+ NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp()));
+
+ if (Func.isEmpty(nettyTerminatorDTO)) {
+ return false;
+ }
+
+ List stateEnums = Arrays.asList(StateEnum.values());
+
+ if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) {
+ nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
+ }
+ }
+ return error;
+
}
- private boolean logProcess(TrackContext context) {
+ private boolean logTerminator(TrackContext context) {
boolean error = context.isError();
EditSchTerminatorDTO terminatorDTO = (EditSchTerminatorDTO) context.getArgs()[0];
if (!error) {
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java
index 3f4748e..ae86405 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java
@@ -3,13 +3,16 @@ package com.docus.server.controller;
import com.docus.core.util.json.JSON;
import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
+import com.docus.log.annotation.TrackGroup;
import com.docus.server.api.scheduling.management.SchCollectRecordApi;
import com.docus.server.common.MsgConstants;
+import com.docus.server.common.process.ChannelProcessor;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.service.ICommMsgService;
import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
@@ -98,7 +101,7 @@ public class SchCollectRecordController implements SchCollectRecordApi {
* @return 成功或失败
*/
@Override
- public boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
+ public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
return iSchCollectRecordService.add(addSchCollectRecordDTO);
}
@@ -108,6 +111,7 @@ public class SchCollectRecordController implements SchCollectRecordApi {
* @param editSchCollectRecordDTO 编辑参数
* @return 成功或失败
*/
+ @TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class)
@Override
public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) {
return iSchCollectRecordService.edit(editSchCollectRecordDTO);
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java
index 3f0e973..c06f13b 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java
@@ -2,9 +2,12 @@ package com.docus.server.service;
import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
+import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
/**
@@ -28,7 +31,7 @@ public interface ISchCollectRecordService {
* @param addSchCollectRecordDTO 新增参数
* @return 成功或失败
*/
- boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO);
+ SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO);
/**
* 编辑
@@ -53,4 +56,6 @@ public interface ISchCollectRecordService {
* @return 分页列表
*/
PageResult search(SearchDTO searchDTO);
+
+ SchCollectRecord saveOrUpdateRecord(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent);
}
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java
index e0e1573..dc135ed 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java
@@ -5,7 +5,7 @@ import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
-import com.docus.server.enums.OnlineStateEnum;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
/**
@@ -58,8 +58,8 @@ public interface ISchTerminatorService {
/**
* 新增或更新
* @param terminatorIp
- * @param onlineState
+ * @param nettyTerminatorDTO
* @return
*/
- SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState);
+ SchTerminatorVO saveOrUpdate(String terminatorIp, NettyTerminatorDTO nettyTerminatorDTO);
}
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java
index 671aa68..152f416 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java
@@ -1,26 +1,34 @@
package com.docus.server.service.impl;
+import com.docus.core.util.DateUtil;
import com.docus.core.util.ListUtils;
+import com.docus.core.util.json.JSON;
import com.docus.core.util.property.Setters;
+import com.docus.infrastructure.redis.service.IdService;
import com.docus.infrastructure.web.exception.ApiException;
import com.docus.infrastructure.web.exception.ExceptionCode;
import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.convert.SchCollectRecordConvert;
import com.docus.server.convert.SchCollectRecordRetryLogConvert;
+import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
import com.docus.server.entity.scheduling.management.SchSystemParams;
import com.docus.server.enums.RetryTaskEnum;
+import com.docus.server.enums.SubStateEnum;
import com.docus.server.infrastructure.dao.ISchCollectRecordDao;
import com.docus.server.infrastructure.dao.ISchCollectRecordRetryLogDao;
import com.docus.server.infrastructure.dao.ISchSystemParamsDao;
+import com.docus.server.infrastructure.dao.ISchTerminatorDao;
import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
@@ -38,9 +46,13 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
@Resource
private ISchCollectRecordDao iSchCollectRecordDao;
@Resource
+ private ISchTerminatorDao iSchTerminatorDao;
+ @Resource
private ISchCollectRecordRetryLogDao iSchCollectRecordRetryLogDao;
@Resource
private ISchSystemParamsDao iSchSystemParamsDao;
+ @Resource
+ private IdService idService;
/**
* 按主键查询
@@ -80,6 +92,26 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
return result;
}
+ @Transactional(rollbackFor = Exception.class)
+ @Override
+ public SchCollectRecord saveOrUpdateRecord(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) {
+ //新增采集记录表
+ AddSchCollectRecordDTO addSchCollectRecordDTO = new AddSchCollectRecordDTO();
+ addSchCollectRecordDTO.setId(idService.getDateSeq());
+ addSchCollectRecordDTO.setCollectorId(Long.valueOf(messageContent.getTasks().get(0).getCollectorId()));
+ addSchCollectRecordDTO.setTerminatorId(terminal.getId());
+ addSchCollectRecordDTO.setTaskId(messageContent.getTasks().get(0).getTaskId());
+ SchSystemParams params = iSchSystemParamsDao.findOneBy("paramValue", messageContent.getTasks().get(0).getCollectorId());
+ addSchCollectRecordDTO.setTaskName(String.format("%s%s%s", messageContent.getPatient().getName(), params.getParamName(), "采集"));
+ addSchCollectRecordDTO.setTaskMemo(String.format("%s%s%s", messageContent.getPatient().getName(), params.getParamName(), "采集"));
+ addSchCollectRecordDTO.setStartTime(DateUtil.now());
+ addSchCollectRecordDTO.setTaskDetailInfo(String.format("病案号:%s,姓名:%s,采集类型:%s", messageContent.getPatientId(), messageContent.getPatient().getName(), params.getParamName()));
+ addSchCollectRecordDTO.setSubTaskExecState(SubStateEnum.RECEIVE);
+ addSchCollectRecordDTO.setTaskOriginJson(JSON.toJSON(messageContent));
+
+ return this.add(addSchCollectRecordDTO);
+ }
+
/**
* 新增
*
@@ -87,9 +119,10 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
* @return 成功或失败
*/
@Override
- public boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
+ public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
SchCollectRecord schCollectRecord = SchCollectRecordConvert.INSTANCE.convertDO(addSchCollectRecordDTO);
- return iSchCollectRecordDao.add(schCollectRecord);
+ iSchCollectRecordDao.add(schCollectRecord);
+ return schCollectRecord;
}
/**
@@ -102,6 +135,7 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) {
RetryTaskEnum isRetryTask = editSchCollectRecordDTO.getIsRetryTask();
Long id = editSchCollectRecordDTO.getId();
+
if (RetryTaskEnum.NO_RETRY_TASK.equals(isRetryTask)) {
//不是重试任务
SchCollectRecord schCollectRecord = iSchCollectRecordDao.findById(id);
diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java
index f746912..130fcb1 100644
--- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java
+++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java
@@ -8,9 +8,9 @@ import com.docus.server.convert.SchTerminatorConvert;
import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
+import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchTerminator;
import com.docus.server.enums.BusyStateEnum;
-import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.infrastructure.dao.ISchTerminatorDao;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
@@ -55,26 +55,27 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
}
@Override
- public SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) {
+ public SchTerminatorVO saveOrUpdate(String terminatorIp, NettyTerminatorDTO nettyTerminatorDTO) {
SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminatorIp);
+ //新增
if (Func.isNull(schTerminator)) {
SchTerminator schTerminatorDO = new SchTerminator();
schTerminatorDO.setId(idService.getDateSeq());
schTerminatorDO.setTerminatorIp(terminatorIp);
schTerminatorDO.setTerminatorName(terminatorIp);
- schTerminatorDO.setOnlineState(onlineStateEnum);
+ schTerminatorDO.setOnlineState(nettyTerminatorDTO.getOnlineState());
schTerminatorDO.setBusyState(BusyStateEnum.IDLE);
iSchTerminatorDao.save(schTerminatorDO);
return SchTerminatorConvert.INSTANCE.convertVO(schTerminatorDO);
} else {
-
- schTerminator.setBusyState(BusyStateEnum.IDLE);
- schTerminator.setOnlineState(onlineStateEnum);
+ //更新
+ schTerminator.setBusyState(nettyTerminatorDTO.getBusyState());
+ schTerminator.setOnlineState(nettyTerminatorDTO.getOnlineState());
schTerminator.setUpdateTime(new Date());
iSchTerminatorDao.updateById(schTerminator);
diff --git a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java
index 3c88de9..1e82555 100644
--- a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java
+++ b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java
@@ -5,6 +5,7 @@ import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
+import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -57,7 +58,7 @@ public interface SchCollectRecordApi {
*/
@ApiOperation("新增")
@PostMapping("/add")
- boolean add(@RequestBody AddSchCollectRecordDTO addSchCollectRecordDTO);
+ SchCollectRecord add(@RequestBody AddSchCollectRecordDTO addSchCollectRecordDTO);
/**
* 编辑