From 7afede7f3ae6dd2b442fdcb19caaec86c9ede506 Mon Sep 17 00:00:00 2001 From: linrf Date: Fri, 21 Jul 2023 16:58:31 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E9=87=87=E9=9B=86=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=99=A8-=E5=90=8E=E7=AB=AFTCP=20API=E3=80=91=E9=87=87?= =?UTF-8?q?=E9=9B=86=E8=B0=83=E5=BA=A6=E5=99=A8=E4=B8=8B=E5=8F=91=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=91=BD=E4=BB=A4=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collector-scheduling-management/pom.xml | 2 +- .../docus/server/common/SchedulerTask.java | 307 ++++++++++++++---- .../netty/server/ChannelRepository.java | 7 +- .../server/handler/NettyBusinessHandler.java | 20 +- .../server/handler/NettyHeartbeatHandler.java | 24 +- .../common/process/ChannelProcessor.java | 53 ++- .../SchCollectRecordController.java | 6 +- .../service/ISchCollectRecordService.java | 7 +- .../server/service/ISchTerminatorService.java | 6 +- .../impl/SchCollectRecordServiceImpl.java | 38 ++- .../impl/SchTerminatorServiceImpl.java | 13 +- .../SchCollectRecordApi.java | 3 +- 12 files changed, 374 insertions(+), 112 deletions(-) diff --git a/collector-scheduling-management/pom.xml b/collector-scheduling-management/pom.xml index a35b222..c1e79e0 100644 --- a/collector-scheduling-management/pom.xml +++ b/collector-scheduling-management/pom.xml @@ -6,7 +6,7 @@ 1.0-SNAPSHOT 4.0.0 - collector-scheduling-management + collector-scheduling-management-linrf Archetype - collector-scheduling-management http://maven.apache.org diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java index 1054bc0..052214c 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java @@ -2,19 +2,20 @@ package com.docus.server.common; import com.docus.core.util.DateUtil; import com.docus.core.util.Func; -import com.docus.core.util.ListUtils; import com.docus.core.util.json.JSON; import com.docus.server.api.taskdistribute.TaskDistributeApi; import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; -import com.docus.server.convert.CommMsgConvert; import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO; -import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; +import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; +import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.enums.BusyStateEnum; +import com.docus.server.enums.RetryTaskEnum; +import com.docus.server.service.ISchCollectRecordService; +import com.docus.server.service.ISchTerminatorService; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.util.CharsetUtil; @@ -24,10 +25,8 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; @Component @Slf4j @@ -36,6 +35,10 @@ public class SchedulerTask { private ChannelRepository channelRepository; @Resource private TaskDistributeApi taskDistributeApi; + @Resource + private ISchCollectRecordService iSchCollectRecordService; + @Resource + private ISchTerminatorService iSchTerminatorService; //定时任务 // 5 * * * * ? 在每分钟的5秒执行 @@ -52,44 +55,29 @@ public class SchedulerTask { return; } - List reportDownTwoDTOList; - - List> taskInfos = new ArrayList<>(); - - Map reportDownTwoMap = Maps.newConcurrentMap(); - if (!CollectionUtils.isEmpty(terminalList)) { - //:todo 任务平台需要修改发布任务策略 - reportDownTwoDTOList = getTask(terminalList.size()); - - if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) { - return; - } - - reportDownTwoMap = ListUtils.toMap(reportDownTwoDTOList, ReportDownTwoDTO::getPatientId); - - taskInfos = ListUtils.select(reportDownTwoDTOList, ReportDownTwoDTO::getTasks); + //:todo 任务平台需要修改发布任务策略 + List reportDownTwoDTOList = getTask(terminalList.size()); + if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) { + return; } //只采集,有优先级的 for (NettyTerminatorDTO terminal : terminalList) { - for (List taskInfo : taskInfos) { + for (ReportDownTwoDTO report : reportDownTwoDTOList) { //先找出有只采集的任务。 - ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0); - if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) + ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); + if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) && BusyStateEnum.IDLE.equals(terminal.getBusyState())) { //把这个任务派给这个终端,并且把这个终端设置成繁忙 - if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) { + if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) { //把这个任务派给这个终端 terminal.setBusyState(BusyStateEnum.BUSY); - String patientId = reportTaskTwoDto.getPatientId(); - if (reportDownTwoMap.containsKey(patientId)) { - ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId); + iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal); - tcpToClient(terminal, reportDownTwoDTO); - } - return; + tcpToClient(terminal, report); } + return; } } } @@ -101,18 +89,16 @@ public class SchedulerTask { continue; } - for (List taskInfo : taskInfos) { + for (ReportDownTwoDTO report : reportDownTwoDTOList) { //先找出有只采集的任务。 - ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0); - if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) + ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); + if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId()) && BusyStateEnum.IDLE.equals(terminal.getBusyState())) { //把这个任务派给这个终端,并且把这个终端设置成繁忙 terminal.setBusyState(BusyStateEnum.BUSY); - String patientId = reportTaskTwoDto.getPatientId(); - if (reportDownTwoMap.containsKey(patientId)) { - ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId); - tcpToClient(terminal, reportDownTwoDTO); - } + iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal); + + tcpToClient(terminal, report); return; } } @@ -127,18 +113,16 @@ public class SchedulerTask { continue; } - for (List taskInfo : taskInfos) { + for (ReportDownTwoDTO report : reportDownTwoDTOList) { //先找出有只采集的任务。 - ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0); - if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId()) + ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); + if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId()) && BusyStateEnum.IDLE.equals(terminal.getBusyState())) { //把这个任务派给这个终端 terminal.setBusyState(BusyStateEnum.BUSY); - String patientId = reportTaskTwoDto.getPatientId(); - if (reportDownTwoMap.containsKey(patientId)) { - ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId); - tcpToClient(terminal, reportDownTwoDTO); - } + iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal); + + tcpToClient(terminal, report); return; } } @@ -153,40 +137,69 @@ public class SchedulerTask { continue; } - for (List taskInfo : taskInfos) { + for (ReportDownTwoDTO report : reportDownTwoDTOList) { //先找出有只采集的任务。 //把这个任务派给这个终端 - ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0); - String collectorId = reportTaskTwoDto.getCollectorId(); + ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0); terminal.setBusyState(BusyStateEnum.BUSY); - String patientId = reportTaskTwoDto.getPatientId(); - if (reportDownTwoMap.containsKey(patientId)) { - ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId); - tcpToClient(terminal, reportDownTwoDTO); - } + iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal); + + tcpToClient(terminal, report); return; } } + //只采集,空闲的。 + //获取只采集的任务,并且进行分配。 + for (NettyTerminatorDTO terminal : terminalList) { + //把刚才已经分配任务过的采集器排除 + if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) { + continue; + } + + List onlyTaskInfos = getOnlyTaskInfos(terminal.getOnlyCollectorIds()); + + + if (CollectionUtils.isEmpty(onlyTaskInfos) || Func.isBlank(onlyTaskInfos.get(0).getPatientId())) { + return; + } + for (ReportDownTwoDTO report : onlyTaskInfos) { + //将这条任务分配这个这个终端 + //下发 + terminal.setBusyState(BusyStateEnum.BUSY); + iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal); + + tcpToClient(terminal, report); + + return; + } + } log.info("定时任务: 执行完毕"); - } catch (Exception e) { + } catch ( + Exception e) { log.error("定时任务执行出错", e); } + } - private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) { + private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) { Channel channel = channelRepository.get(terminal.getTerminatorIp()); - CommMsgDTO commMsgDTO = CommMsgDTO.builder() - .content(JSON.toJSON(messageContent)) - .messageType(MsgConstants.SCH_DISTRIBUTE_TASKS) - .build(); + SchCollectRecord schCollectRecord = iSchCollectRecordService.saveOrUpdateRecord(terminal, reportDownTwoDTO); - CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO); + SchCollectorTaskDTO messageContent = new SchCollectorTaskDTO(); + messageContent.setCollectorRecordId(schCollectRecord.getId()); + messageContent.setIsRetry(String.valueOf(RetryTaskEnum.NO_RETRY_TASK.getValue())); + messageContent.setTaskInfo(reportDownTwoDTO); - commMsg.setMessageTime(DateUtil.formatDateTime(new Date())); + CommMsg commMsg = CommMsg.builder() + .messageType(MsgConstants.SCH_DISTRIBUTE_TASKS) + .messageTime(DateUtil.formatDateTime(new Date())) + .content(JSON.toJSON(messageContent)) + .build(); + //tcp 下发任务到终端 if (channel != null) { channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); } @@ -198,21 +211,171 @@ public class SchedulerTask { //mock - ReportDownTwoDTO task = taskDistributeApi.getTask("1"); - ReportDownTwoDTO task1 = taskDistributeApi.getTask("1"); - - return Lists.newArrayList(task, task1); + String collectorId1 = "{\n" + + " \"createTime\": \"2022-12-03 12:39:30\",\n" + + " \"hospitals\": [\n" + + " {\n" + + " \"admissDate\": \"2023-12-31 01:01:01\",\n" + + " \"admissId\": \"amid_999901\",\n" + + " \"admissTimes\": 1,\n" + + " \"disDate\": \"2023-12-31 01:01:01\",\n" + + " \"disDeptName\": \"22222\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " }\n" + + " ],\n" + + " \"jzh\": \"jzh_999901\",\n" + + " \"patient\": {\n" + + " \"inpatientNo\": \"999901\",\n" + + " \"name\": \"ceshi\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " },\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"recordType\": \"1\",\n" + + " \"tasks\": [\n" + + " {\n" + + " \"collectorId\": \"1\",\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"taskId\": 834292710565826560\n" + + " }\n" + + " ]\n" + + "}"; + ReportDownTwoDTO reportDownTwoDTO1 = JSON.fromJSON(collectorId1, ReportDownTwoDTO.class); + + + String collectorId2 = "{\n" + + " \"createTime\": \"2022-12-03 12:39:30\",\n" + + " \"hospitals\": [\n" + + " {\n" + + " \"admissDate\": \"2023-12-31 01:01:01\",\n" + + " \"admissId\": \"amid_999901\",\n" + + " \"admissTimes\": 1,\n" + + " \"disDate\": \"2023-12-31 01:01:01\",\n" + + " \"disDeptName\": \"22222\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " }\n" + + " ],\n" + + " \"jzh\": \"jzh_999901\",\n" + + " \"patient\": {\n" + + " \"inpatientNo\": \"999901\",\n" + + " \"name\": \"ceshi\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " },\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"recordType\": \"1\",\n" + + " \"tasks\": [\n" + + " {\n" + + " \"collectorId\": \"2\",\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"taskId\": 834292712465846272\n" + + " }\n" + + " ]\n" + + "}"; + ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class); + + + String collectorId3 = "{\n" + + " \"createTime\": \"2023-01-09 19:26:11\",\n" + + " \"hospitals\": [\n" + + " {\n" + + " \"admissDate\": \"2023-12-31 01:01:01\",\n" + + " \"admissId\": \"amid_297974\",\n" + + " \"admissTimes\": 21,\n" + + " \"disDate\": \"2023-12-31 01:01:01\",\n" + + " \"disDeptName\": \"普外二科(甲乳胸烧伤整形)\",\n" + + " \"patientId\": \"772389719349678080\"\n" + + " }\n" + + " ],\n" + + " \"jzh\": \"jzh_297974\",\n" + + " \"patient\": {\n" + + " \"inpatientNo\": \"297974\",\n" + + " \"name\": \"曾美英\",\n" + + " \"patientId\": \"772389719349678080\"\n" + + " },\n" + + " \"patientId\": \"772389719349678080\",\n" + + " \"recordType\": \"1\",\n" + + " \"tasks\": [\n" + + " {\n" + + " \"collectorId\": \"3\",\n" + + " \"patientId\": \"772389719349678080\",\n" + + " \"taskId\": 838201379426750464\n" + + " }\n" + + " ]\n" + + "}"; + ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class); + + + return Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); } //根据采集器id类型,一次获取一批采集器类型任务 - private ArrayList getTask(List collectorIds) { + private List getOnlyTaskInfos(List collectorIds) { // return taskDistributeApi.getTask(collectorIds.get(0)); //mock - ReportDownTwoDTO task = taskDistributeApi.getTask("4"); - ReportDownTwoDTO task1 = taskDistributeApi.getTask("5"); - return Lists.newArrayList(task, task1); + + String collectorId2 = "{\n" + + " \"createTime\": \"2022-12-03 12:39:30\",\n" + + " \"hospitals\": [\n" + + " {\n" + + " \"admissDate\": \"2023-12-31 01:01:01\",\n" + + " \"admissId\": \"amid_999901\",\n" + + " \"admissTimes\": 1,\n" + + " \"disDate\": \"2023-12-31 01:01:01\",\n" + + " \"disDeptName\": \"22222\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " }\n" + + " ],\n" + + " \"jzh\": \"jzh_999901\",\n" + + " \"patient\": {\n" + + " \"inpatientNo\": \"999901\",\n" + + " \"name\": \"ceshi\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " },\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"recordType\": \"1\",\n" + + " \"tasks\": [\n" + + " {\n" + + " \"collectorId\": \"2\",\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"taskId\": 834292712465846272\n" + + " }\n" + + " ]\n" + + "}"; + ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class); + + + String collectorId3 = "{\n" + + " \"createTime\": \"2022-12-03 12:39:30\",\n" + + " \"hospitals\": [\n" + + " {\n" + + " \"admissDate\": \"2023-12-31 01:01:01\",\n" + + " \"admissId\": \"amid_999901\",\n" + + " \"admissTimes\": 1,\n" + + " \"disDate\": \"2023-12-31 01:01:01\",\n" + + " \"disDeptName\": \"22222\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " }\n" + + " ],\n" + + " \"jzh\": \"jzh_999901\",\n" + + " \"patient\": {\n" + + " \"inpatientNo\": \"999901\",\n" + + " \"name\": \"ceshi\",\n" + + " \"patientId\": \"758878610105573376\"\n" + + " },\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"recordType\": \"1\",\n" + + " \"tasks\": [\n" + + " {\n" + + " \"collectorId\": \"3\",\n" + + " \"patientId\": \"758878610105573376\",\n" + + " \"taskId\": 834292883635392512\n" + + " }\n" + + " ]\n" + + "}"; + ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class); + + return Lists.newArrayList(reportDownTwoDTO2, reportDownTwoDTO3); } } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java index e42a8bd..9a8a58c 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java @@ -24,7 +24,6 @@ import java.util.stream.Collectors; @Component @Slf4j public class ChannelRepository { - @Resource private ISchTerminatorService iSchTerminatorService; @@ -46,7 +45,7 @@ public class ChannelRepository { String terminatorIp = nettyTerminatorDTO.getTerminatorIp(); //更新数据库终端数据 - SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState()); + SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO); nettyTerminatorDTO.setId(schTerminatorVO.getId()); //缓存 @@ -99,7 +98,9 @@ public class ChannelRepository { public void remove(String key) { IP_CHANNEL_CACHE_MAP.remove(key); - iSchTerminatorService.saveOrUpdate(key, OnlineStateEnum.OFFLINE); + NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); + nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); + iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO); } } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index b3f9acb..9770f10 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -1,5 +1,6 @@ package com.docus.server.common.netty.server.handler; +import com.docus.core.util.Func; import com.docus.core.util.StringUtils; import com.docus.core.util.json.JSON; import com.docus.server.common.MsgConstants; @@ -93,17 +94,18 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = ipSocket.getAddress().getHostAddress(); log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); - Channel channel = repository.get(clientIp); - if (channel != null && channel.isOpen()) { - channel.close(); - } - NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); - nettyTerminatorDTO.setTerminatorIp(clientIp); - nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); - nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); + String clientId = repository.getClientKey(ctx.channel()); + + if (Func.isBlank(clientId)) { - repository.put(nettyTerminatorDTO, ctx.channel()); + NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); + nettyTerminatorDTO.setTerminatorIp(clientIp); + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); + + repository.put(nettyTerminatorDTO, ctx.channel()); + } DEFAULT_CHANNEL_GROUP.add(ctx.channel()); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java index 3f0849a..6bac1c7 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java @@ -1,5 +1,6 @@ package com.docus.server.common.netty.server.handler; +import com.docus.core.util.Func; import com.docus.core.util.StringUtils; import com.docus.core.util.json.JSON; import com.docus.server.common.MsgConstants; @@ -10,7 +11,6 @@ import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.OnlineStateEnum; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -58,25 +58,27 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp); - Channel channel = repository.get(clientIp); - if (channel != null && channel.isOpen()) { - channel.close(); - } + String clientKey = repository.getClientKey(ctx.channel()); + + if (Func.isNotBlank(clientKey)) { - NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); - nettyTerminatorDTO.setTerminatorIp(clientIp); - nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); - nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE); + NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); + nettyTerminatorDTO.setTerminatorIp(clientIp); + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE); - //将ip和channel进行映射 - repository.put(nettyTerminatorDTO, ctx.channel()); + //将ip和channel进行映射 + repository.put(nettyTerminatorDTO, ctx.channel()); + } } else { + if (ctx.channel().isOpen()) { //触发下一个handler ctx.fireChannelRead(msg); } } + } private ChannelRepository repository; diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java index c50033d..d9e5838 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java @@ -4,9 +4,14 @@ import com.docus.core.util.Func; import com.docus.log.context.TrackContext; import com.docus.log.processor.AbstractProcessor; import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; +import com.docus.server.enums.BusyStateEnum; +import com.docus.server.enums.StateEnum; +import com.docus.server.service.ISchCollectRecordService; import com.docus.server.service.ISchTerminatorService; +import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; import javax.annotation.Resource; @@ -22,13 +27,57 @@ public class ChannelProcessor extends AbstractProcessor { private ChannelRepository channelRepository; @Resource private ISchTerminatorService iSchTerminatorService; + @Resource + private ISchCollectRecordService iSchCollectRecordService; @Override protected Object doProcess(TrackContext context) { - return logProcess(context); + String group = context.getGroup(); + + switch (group) { + case "SchCollectRecordController-edit": + return doSchCollectRecordControllerEdit(context); + case "SchTerminatorController": + return doSchTerminatorController(context); + default: + return true; + } + } + + private boolean doSchCollectRecordControllerEdit(TrackContext context) { + return logCollectRecord(context); + } + + private boolean doSchTerminatorController(TrackContext context) { + return logTerminator(context); + } + + private boolean logCollectRecord(TrackContext context) { + boolean error = context.isError(); + EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0]; + + if (!error) { + SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId())); + + SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); + + NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + + if (Func.isEmpty(nettyTerminatorDTO)) { + return false; + } + + List stateEnums = Arrays.asList(StateEnum.values()); + + if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + } + } + return error; + } - private boolean logProcess(TrackContext context) { + private boolean logTerminator(TrackContext context) { boolean error = context.isError(); EditSchTerminatorDTO terminatorDTO = (EditSchTerminatorDTO) context.getArgs()[0]; if (!error) { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java index 3f4748e..ae86405 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/controller/SchCollectRecordController.java @@ -3,13 +3,16 @@ package com.docus.server.controller; import com.docus.core.util.json.JSON; import com.docus.infrastructure.web.request.SearchDTO; import com.docus.infrastructure.web.response.PageResult; +import com.docus.log.annotation.TrackGroup; import com.docus.server.api.scheduling.management.SchCollectRecordApi; import com.docus.server.common.MsgConstants; +import com.docus.server.common.process.ChannelProcessor; import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; +import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.service.ICommMsgService; import com.docus.server.service.ISchCollectRecordService; import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; @@ -98,7 +101,7 @@ public class SchCollectRecordController implements SchCollectRecordApi { * @return 成功或失败 */ @Override - public boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO) { + public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) { return iSchCollectRecordService.add(addSchCollectRecordDTO); } @@ -108,6 +111,7 @@ public class SchCollectRecordController implements SchCollectRecordApi { * @param editSchCollectRecordDTO 编辑参数 * @return 成功或失败 */ + @TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class) @Override public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) { return iSchCollectRecordService.edit(editSchCollectRecordDTO); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java index 3f0e973..c06f13b 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectRecordService.java @@ -2,9 +2,12 @@ package com.docus.server.service; import com.docus.infrastructure.web.request.SearchDTO; import com.docus.infrastructure.web.response.PageResult; +import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; +import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; /** @@ -28,7 +31,7 @@ public interface ISchCollectRecordService { * @param addSchCollectRecordDTO 新增参数 * @return 成功或失败 */ - boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO); + SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO); /** * 编辑 @@ -53,4 +56,6 @@ public interface ISchCollectRecordService { * @return 分页列表 */ PageResult search(SearchDTO searchDTO); + + SchCollectRecord saveOrUpdateRecord(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java index e0e1573..dc135ed 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java @@ -5,7 +5,7 @@ import com.docus.infrastructure.web.response.PageResult; import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO; -import com.docus.server.enums.OnlineStateEnum; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; /** @@ -58,8 +58,8 @@ public interface ISchTerminatorService { /** * 新增或更新 * @param terminatorIp - * @param onlineState + * @param nettyTerminatorDTO * @return */ - SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState); + SchTerminatorVO saveOrUpdate(String terminatorIp, NettyTerminatorDTO nettyTerminatorDTO); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java index 671aa68..152f416 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectRecordServiceImpl.java @@ -1,26 +1,34 @@ package com.docus.server.service.impl; +import com.docus.core.util.DateUtil; import com.docus.core.util.ListUtils; +import com.docus.core.util.json.JSON; import com.docus.core.util.property.Setters; +import com.docus.infrastructure.redis.service.IdService; import com.docus.infrastructure.web.exception.ApiException; import com.docus.infrastructure.web.exception.ExceptionCode; import com.docus.infrastructure.web.request.SearchDTO; import com.docus.infrastructure.web.response.PageResult; import com.docus.server.convert.SchCollectRecordConvert; import com.docus.server.convert.SchCollectRecordRetryLogConvert; +import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog; import com.docus.server.entity.scheduling.management.SchSystemParams; import com.docus.server.enums.RetryTaskEnum; +import com.docus.server.enums.SubStateEnum; import com.docus.server.infrastructure.dao.ISchCollectRecordDao; import com.docus.server.infrastructure.dao.ISchCollectRecordRetryLogDao; import com.docus.server.infrastructure.dao.ISchSystemParamsDao; +import com.docus.server.infrastructure.dao.ISchTerminatorDao; import com.docus.server.service.ISchCollectRecordService; import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; @@ -38,9 +46,13 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService { @Resource private ISchCollectRecordDao iSchCollectRecordDao; @Resource + private ISchTerminatorDao iSchTerminatorDao; + @Resource private ISchCollectRecordRetryLogDao iSchCollectRecordRetryLogDao; @Resource private ISchSystemParamsDao iSchSystemParamsDao; + @Resource + private IdService idService; /** * 按主键查询 @@ -80,6 +92,26 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService { return result; } + @Transactional(rollbackFor = Exception.class) + @Override + public SchCollectRecord saveOrUpdateRecord(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) { + //新增采集记录表 + AddSchCollectRecordDTO addSchCollectRecordDTO = new AddSchCollectRecordDTO(); + addSchCollectRecordDTO.setId(idService.getDateSeq()); + addSchCollectRecordDTO.setCollectorId(Long.valueOf(messageContent.getTasks().get(0).getCollectorId())); + addSchCollectRecordDTO.setTerminatorId(terminal.getId()); + addSchCollectRecordDTO.setTaskId(messageContent.getTasks().get(0).getTaskId()); + SchSystemParams params = iSchSystemParamsDao.findOneBy("paramValue", messageContent.getTasks().get(0).getCollectorId()); + addSchCollectRecordDTO.setTaskName(String.format("%s%s%s", messageContent.getPatient().getName(), params.getParamName(), "采集")); + addSchCollectRecordDTO.setTaskMemo(String.format("%s%s%s", messageContent.getPatient().getName(), params.getParamName(), "采集")); + addSchCollectRecordDTO.setStartTime(DateUtil.now()); + addSchCollectRecordDTO.setTaskDetailInfo(String.format("病案号:%s,姓名:%s,采集类型:%s", messageContent.getPatientId(), messageContent.getPatient().getName(), params.getParamName())); + addSchCollectRecordDTO.setSubTaskExecState(SubStateEnum.RECEIVE); + addSchCollectRecordDTO.setTaskOriginJson(JSON.toJSON(messageContent)); + + return this.add(addSchCollectRecordDTO); + } + /** * 新增 * @@ -87,9 +119,10 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService { * @return 成功或失败 */ @Override - public boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO) { + public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) { SchCollectRecord schCollectRecord = SchCollectRecordConvert.INSTANCE.convertDO(addSchCollectRecordDTO); - return iSchCollectRecordDao.add(schCollectRecord); + iSchCollectRecordDao.add(schCollectRecord); + return schCollectRecord; } /** @@ -102,6 +135,7 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService { public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) { RetryTaskEnum isRetryTask = editSchCollectRecordDTO.getIsRetryTask(); Long id = editSchCollectRecordDTO.getId(); + if (RetryTaskEnum.NO_RETRY_TASK.equals(isRetryTask)) { //不是重试任务 SchCollectRecord schCollectRecord = iSchCollectRecordDao.findById(id); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java index f746912..130fcb1 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java @@ -8,9 +8,9 @@ import com.docus.server.convert.SchTerminatorConvert; import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO; +import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.entity.scheduling.management.SchTerminator; import com.docus.server.enums.BusyStateEnum; -import com.docus.server.enums.OnlineStateEnum; import com.docus.server.infrastructure.dao.ISchTerminatorDao; import com.docus.server.service.ISchTerminatorService; import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; @@ -55,26 +55,27 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService { } @Override - public SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) { + public SchTerminatorVO saveOrUpdate(String terminatorIp, NettyTerminatorDTO nettyTerminatorDTO) { SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminatorIp); + //新增 if (Func.isNull(schTerminator)) { SchTerminator schTerminatorDO = new SchTerminator(); schTerminatorDO.setId(idService.getDateSeq()); schTerminatorDO.setTerminatorIp(terminatorIp); schTerminatorDO.setTerminatorName(terminatorIp); - schTerminatorDO.setOnlineState(onlineStateEnum); + schTerminatorDO.setOnlineState(nettyTerminatorDTO.getOnlineState()); schTerminatorDO.setBusyState(BusyStateEnum.IDLE); iSchTerminatorDao.save(schTerminatorDO); return SchTerminatorConvert.INSTANCE.convertVO(schTerminatorDO); } else { - - schTerminator.setBusyState(BusyStateEnum.IDLE); - schTerminator.setOnlineState(onlineStateEnum); + //更新 + schTerminator.setBusyState(nettyTerminatorDTO.getBusyState()); + schTerminator.setOnlineState(nettyTerminatorDTO.getOnlineState()); schTerminator.setUpdateTime(new Date()); iSchTerminatorDao.updateById(schTerminator); diff --git a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java index 3c88de9..1e82555 100644 --- a/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java +++ b/docus-client-interface/src/main/java/com/docus/server/api/scheduling.management/SchCollectRecordApi.java @@ -5,6 +5,7 @@ import com.docus.infrastructure.web.response.PageResult; import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO; import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO; +import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -57,7 +58,7 @@ public interface SchCollectRecordApi { */ @ApiOperation("新增") @PostMapping("/add") - boolean add(@RequestBody AddSchCollectRecordDTO addSchCollectRecordDTO); + SchCollectRecord add(@RequestBody AddSchCollectRecordDTO addSchCollectRecordDTO); /** * 编辑