【采集调度器-后端TCP API】采集调度器下发任务命令 API

segment2.0
linrf 2 years ago
parent 0978dcbc80
commit 7afede7f3a

@ -6,7 +6,7 @@
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-scheduling-management</artifactId>
<artifactId>collector-scheduling-management-linrf</artifactId>
<name>Archetype - collector-scheduling-management</name>
<url>http://maven.apache.org</url>

@ -2,19 +2,20 @@ package com.docus.server.common;
import com.docus.core.util.DateUtil;
import com.docus.core.util.Func;
import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.convert.CommMsgConvert;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.RetryTaskEnum;
import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.service.ISchTerminatorService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
@ -24,10 +25,8 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
@ -36,6 +35,10 @@ public class SchedulerTask {
private ChannelRepository channelRepository;
@Resource
private TaskDistributeApi taskDistributeApi;
@Resource
private ISchCollectRecordService iSchCollectRecordService;
@Resource
private ISchTerminatorService iSchTerminatorService;
//定时任务
// 5 * * * * ? 在每分钟的5秒执行
@ -52,44 +55,29 @@ public class SchedulerTask {
return;
}
List<ReportDownTwoDTO> reportDownTwoDTOList;
List<List<ReportTaskTwoDTO>> taskInfos = new ArrayList<>();
Map<String, ReportDownTwoDTO> reportDownTwoMap = Maps.newConcurrentMap();
if (!CollectionUtils.isEmpty(terminalList)) {
//:todo 任务平台需要修改发布任务策略
reportDownTwoDTOList = getTask(terminalList.size());
if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) {
return;
}
reportDownTwoMap = ListUtils.toMap(reportDownTwoDTOList, ReportDownTwoDTO::getPatientId);
taskInfos = ListUtils.select(reportDownTwoDTOList, ReportDownTwoDTO::getTasks);
//:todo 任务平台需要修改发布任务策略
List<ReportDownTwoDTO> reportDownTwoDTOList = getTask(terminalList.size());
if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) {
return;
}
//只采集,有优先级的
for (NettyTerminatorDTO terminal : terminalList) {
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) {
if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) {
//把这个任务派给这个终端
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
tcpToClient(terminal, reportDownTwoDTO);
}
return;
tcpToClient(terminal, report);
}
return;
}
}
}
@ -101,18 +89,16 @@ public class SchedulerTask {
continue;
}
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
if (!CollectionUtils.isEmpty(terminal.getOnlyCollectorIds()) && terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
tcpToClient(terminal, report);
return;
}
}
@ -127,18 +113,16 @@ public class SchedulerTask {
continue;
}
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())
ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
if (!CollectionUtils.isEmpty(terminal.getPriorityCollectorIds()) && terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
tcpToClient(terminal, report);
return;
}
}
@ -153,40 +137,69 @@ public class SchedulerTask {
continue;
}
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
for (ReportDownTwoDTO report : reportDownTwoDTOList) {
//先找出有只采集的任务。
//把这个任务派给这个终端
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
String collectorId = reportTaskTwoDto.getCollectorId();
ReportTaskTwoDTO reportTaskTwoDto = report.getTasks().get(0);
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
tcpToClient(terminal, report);
return;
}
}
//只采集,空闲的。
//获取只采集的任务,并且进行分配。
for (NettyTerminatorDTO terminal : terminalList) {
//把刚才已经分配任务过的采集器排除
if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
continue;
}
List<ReportDownTwoDTO> onlyTaskInfos = getOnlyTaskInfos(terminal.getOnlyCollectorIds());
if (CollectionUtils.isEmpty(onlyTaskInfos) || Func.isBlank(onlyTaskInfos.get(0).getPatientId())) {
return;
}
for (ReportDownTwoDTO report : onlyTaskInfos) {
//将这条任务分配这个这个终端
//下发
terminal.setBusyState(BusyStateEnum.BUSY);
iSchTerminatorService.saveOrUpdate(terminal.getTerminatorIp(), terminal);
tcpToClient(terminal, report);
return;
}
}
log.info("定时任务: 执行完毕");
} catch (Exception e) {
} catch (
Exception e) {
log.error("定时任务执行出错", e);
}
}
private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) {
private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO reportDownTwoDTO) {
Channel channel = channelRepository.get(terminal.getTerminatorIp());
CommMsgDTO commMsgDTO = CommMsgDTO.builder()
.content(JSON.toJSON(messageContent))
.messageType(MsgConstants.SCH_DISTRIBUTE_TASKS)
.build();
SchCollectRecord schCollectRecord = iSchCollectRecordService.saveOrUpdateRecord(terminal, reportDownTwoDTO);
CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO);
SchCollectorTaskDTO messageContent = new SchCollectorTaskDTO();
messageContent.setCollectorRecordId(schCollectRecord.getId());
messageContent.setIsRetry(String.valueOf(RetryTaskEnum.NO_RETRY_TASK.getValue()));
messageContent.setTaskInfo(reportDownTwoDTO);
commMsg.setMessageTime(DateUtil.formatDateTime(new Date()));
CommMsg commMsg = CommMsg.builder()
.messageType(MsgConstants.SCH_DISTRIBUTE_TASKS)
.messageTime(DateUtil.formatDateTime(new Date()))
.content(JSON.toJSON(messageContent))
.build();
//tcp 下发任务到终端
if (channel != null) {
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
}
@ -198,21 +211,171 @@ public class SchedulerTask {
//mock
ReportDownTwoDTO task = taskDistributeApi.getTask("1");
ReportDownTwoDTO task1 = taskDistributeApi.getTask("1");
return Lists.newArrayList(task, task1);
String collectorId1 = "{\n" +
" \"createTime\": \"2022-12-03 12:39:30\",\n" +
" \"hospitals\": [\n" +
" {\n" +
" \"admissDate\": \"2023-12-31 01:01:01\",\n" +
" \"admissId\": \"amid_999901\",\n" +
" \"admissTimes\": 1,\n" +
" \"disDate\": \"2023-12-31 01:01:01\",\n" +
" \"disDeptName\": \"22222\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" }\n" +
" ],\n" +
" \"jzh\": \"jzh_999901\",\n" +
" \"patient\": {\n" +
" \"inpatientNo\": \"999901\",\n" +
" \"name\": \"ceshi\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" },\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"recordType\": \"1\",\n" +
" \"tasks\": [\n" +
" {\n" +
" \"collectorId\": \"1\",\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"taskId\": 834292710565826560\n" +
" }\n" +
" ]\n" +
"}";
ReportDownTwoDTO reportDownTwoDTO1 = JSON.fromJSON(collectorId1, ReportDownTwoDTO.class);
String collectorId2 = "{\n" +
" \"createTime\": \"2022-12-03 12:39:30\",\n" +
" \"hospitals\": [\n" +
" {\n" +
" \"admissDate\": \"2023-12-31 01:01:01\",\n" +
" \"admissId\": \"amid_999901\",\n" +
" \"admissTimes\": 1,\n" +
" \"disDate\": \"2023-12-31 01:01:01\",\n" +
" \"disDeptName\": \"22222\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" }\n" +
" ],\n" +
" \"jzh\": \"jzh_999901\",\n" +
" \"patient\": {\n" +
" \"inpatientNo\": \"999901\",\n" +
" \"name\": \"ceshi\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" },\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"recordType\": \"1\",\n" +
" \"tasks\": [\n" +
" {\n" +
" \"collectorId\": \"2\",\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"taskId\": 834292712465846272\n" +
" }\n" +
" ]\n" +
"}";
ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class);
String collectorId3 = "{\n" +
" \"createTime\": \"2023-01-09 19:26:11\",\n" +
" \"hospitals\": [\n" +
" {\n" +
" \"admissDate\": \"2023-12-31 01:01:01\",\n" +
" \"admissId\": \"amid_297974\",\n" +
" \"admissTimes\": 21,\n" +
" \"disDate\": \"2023-12-31 01:01:01\",\n" +
" \"disDeptName\": \"普外二科(甲乳胸烧伤整形)\",\n" +
" \"patientId\": \"772389719349678080\"\n" +
" }\n" +
" ],\n" +
" \"jzh\": \"jzh_297974\",\n" +
" \"patient\": {\n" +
" \"inpatientNo\": \"297974\",\n" +
" \"name\": \"曾美英\",\n" +
" \"patientId\": \"772389719349678080\"\n" +
" },\n" +
" \"patientId\": \"772389719349678080\",\n" +
" \"recordType\": \"1\",\n" +
" \"tasks\": [\n" +
" {\n" +
" \"collectorId\": \"3\",\n" +
" \"patientId\": \"772389719349678080\",\n" +
" \"taskId\": 838201379426750464\n" +
" }\n" +
" ]\n" +
"}";
ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
return Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3);
}
//根据采集器id类型一次获取一批采集器类型任务
private ArrayList<ReportDownTwoDTO> getTask(List<String> collectorIds) {
private List<ReportDownTwoDTO> getOnlyTaskInfos(List<String> collectorIds) {
// return taskDistributeApi.getTask(collectorIds.get(0));
//mock
ReportDownTwoDTO task = taskDistributeApi.getTask("4");
ReportDownTwoDTO task1 = taskDistributeApi.getTask("5");
return Lists.newArrayList(task, task1);
String collectorId2 = "{\n" +
" \"createTime\": \"2022-12-03 12:39:30\",\n" +
" \"hospitals\": [\n" +
" {\n" +
" \"admissDate\": \"2023-12-31 01:01:01\",\n" +
" \"admissId\": \"amid_999901\",\n" +
" \"admissTimes\": 1,\n" +
" \"disDate\": \"2023-12-31 01:01:01\",\n" +
" \"disDeptName\": \"22222\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" }\n" +
" ],\n" +
" \"jzh\": \"jzh_999901\",\n" +
" \"patient\": {\n" +
" \"inpatientNo\": \"999901\",\n" +
" \"name\": \"ceshi\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" },\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"recordType\": \"1\",\n" +
" \"tasks\": [\n" +
" {\n" +
" \"collectorId\": \"2\",\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"taskId\": 834292712465846272\n" +
" }\n" +
" ]\n" +
"}";
ReportDownTwoDTO reportDownTwoDTO2 = JSON.fromJSON(collectorId2, ReportDownTwoDTO.class);
String collectorId3 = "{\n" +
" \"createTime\": \"2022-12-03 12:39:30\",\n" +
" \"hospitals\": [\n" +
" {\n" +
" \"admissDate\": \"2023-12-31 01:01:01\",\n" +
" \"admissId\": \"amid_999901\",\n" +
" \"admissTimes\": 1,\n" +
" \"disDate\": \"2023-12-31 01:01:01\",\n" +
" \"disDeptName\": \"22222\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" }\n" +
" ],\n" +
" \"jzh\": \"jzh_999901\",\n" +
" \"patient\": {\n" +
" \"inpatientNo\": \"999901\",\n" +
" \"name\": \"ceshi\",\n" +
" \"patientId\": \"758878610105573376\"\n" +
" },\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"recordType\": \"1\",\n" +
" \"tasks\": [\n" +
" {\n" +
" \"collectorId\": \"3\",\n" +
" \"patientId\": \"758878610105573376\",\n" +
" \"taskId\": 834292883635392512\n" +
" }\n" +
" ]\n" +
"}";
ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class);
return Lists.newArrayList(reportDownTwoDTO2, reportDownTwoDTO3);
}
}

@ -24,7 +24,6 @@ import java.util.stream.Collectors;
@Component
@Slf4j
public class ChannelRepository {
@Resource
private ISchTerminatorService iSchTerminatorService;
@ -46,7 +45,7 @@ public class ChannelRepository {
String terminatorIp = nettyTerminatorDTO.getTerminatorIp();
//更新数据库终端数据
SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState());
SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO);
nettyTerminatorDTO.setId(schTerminatorVO.getId());
//缓存
@ -99,7 +98,9 @@ public class ChannelRepository {
public void remove(String key) {
IP_CHANNEL_CACHE_MAP.remove(key);
iSchTerminatorService.saveOrUpdate(key, OnlineStateEnum.OFFLINE);
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
iSchTerminatorService.saveOrUpdate(key, nettyTerminatorDTO);
}
}

@ -1,5 +1,6 @@
package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func;
import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants;
@ -93,17 +94,18 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
Channel channel = repository.get(clientIp);
if (channel != null && channel.isOpen()) {
channel.close();
}
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
String clientId = repository.getClientKey(ctx.channel());
if (Func.isBlank(clientId)) {
repository.put(nettyTerminatorDTO, ctx.channel());
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);
repository.put(nettyTerminatorDTO, ctx.channel());
}
DEFAULT_CHANNEL_GROUP.add(ctx.channel());

@ -1,5 +1,6 @@
package com.docus.server.common.netty.server.handler;
import com.docus.core.util.Func;
import com.docus.core.util.StringUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.common.MsgConstants;
@ -10,7 +11,6 @@ import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -58,25 +58,27 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
log.info("【采集器-终端IP】:{},连接上线,IP地址信息:{}", clientIp, clientIp);
Channel channel = repository.get(clientIp);
if (channel != null && channel.isOpen()) {
channel.close();
}
String clientKey = repository.getClientKey(ctx.channel());
if (Func.isNotBlank(clientKey)) {
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);
//将ip和channel进行映射
repository.put(nettyTerminatorDTO, ctx.channel());
//将ip和channel进行映射
repository.put(nettyTerminatorDTO, ctx.channel());
}
} else {
if (ctx.channel().isOpen()) {
//触发下一个handler
ctx.fireChannelRead(msg);
}
}
}
private ChannelRepository repository;

@ -4,9 +4,14 @@ import com.docus.core.util.Func;
import com.docus.log.context.TrackContext;
import com.docus.log.processor.AbstractProcessor;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.StateEnum;
import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
import javax.annotation.Resource;
@ -22,13 +27,57 @@ public class ChannelProcessor extends AbstractProcessor {
private ChannelRepository channelRepository;
@Resource
private ISchTerminatorService iSchTerminatorService;
@Resource
private ISchCollectRecordService iSchCollectRecordService;
@Override
protected Object doProcess(TrackContext context) {
return logProcess(context);
String group = context.getGroup();
switch (group) {
case "SchCollectRecordController-edit":
return doSchCollectRecordControllerEdit(context);
case "SchTerminatorController":
return doSchTerminatorController(context);
default:
return true;
}
}
private boolean doSchCollectRecordControllerEdit(TrackContext context) {
return logCollectRecord(context);
}
private boolean doSchTerminatorController(TrackContext context) {
return logTerminator(context);
}
private boolean logCollectRecord(TrackContext context) {
boolean error = context.isError();
EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0];
if (!error) {
SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId()));
SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId()));
NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp()));
if (Func.isEmpty(nettyTerminatorDTO)) {
return false;
}
List<StateEnum> stateEnums = Arrays.asList(StateEnum.values());
if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) {
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
}
}
return error;
}
private boolean logProcess(TrackContext context) {
private boolean logTerminator(TrackContext context) {
boolean error = context.isError();
EditSchTerminatorDTO terminatorDTO = (EditSchTerminatorDTO) context.getArgs()[0];
if (!error) {

@ -3,13 +3,16 @@ package com.docus.server.controller;
import com.docus.core.util.json.JSON;
import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.log.annotation.TrackGroup;
import com.docus.server.api.scheduling.management.SchCollectRecordApi;
import com.docus.server.common.MsgConstants;
import com.docus.server.common.process.ChannelProcessor;
import com.docus.server.dto.scheduling.management.schcollector.task.SchCollectorTaskDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.service.ICommMsgService;
import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
@ -98,7 +101,7 @@ public class SchCollectRecordController implements SchCollectRecordApi {
* @return
*/
@Override
public boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
return iSchCollectRecordService.add(addSchCollectRecordDTO);
}
@ -108,6 +111,7 @@ public class SchCollectRecordController implements SchCollectRecordApi {
* @param editSchCollectRecordDTO
* @return
*/
@TrackGroup(group = "SchCollectRecordController-edit", processor = ChannelProcessor.class)
@Override
public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) {
return iSchCollectRecordService.edit(editSchCollectRecordDTO);

@ -2,9 +2,12 @@ package com.docus.server.service;
import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
/**
@ -28,7 +31,7 @@ public interface ISchCollectRecordService {
* @param addSchCollectRecordDTO
* @return
*/
boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO);
SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO);
/**
*
@ -53,4 +56,6 @@ public interface ISchCollectRecordService {
* @return
*/
PageResult<SchCollectRecordVO> search(SearchDTO searchDTO);
SchCollectRecord saveOrUpdateRecord(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent);
}

@ -5,7 +5,7 @@ import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
/**
@ -58,8 +58,8 @@ public interface ISchTerminatorService {
/**
*
* @param terminatorIp
* @param onlineState
* @param nettyTerminatorDTO
* @return
*/
SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState);
SchTerminatorVO saveOrUpdate(String terminatorIp, NettyTerminatorDTO nettyTerminatorDTO);
}

@ -1,26 +1,34 @@
package com.docus.server.service.impl;
import com.docus.core.util.DateUtil;
import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON;
import com.docus.core.util.property.Setters;
import com.docus.infrastructure.redis.service.IdService;
import com.docus.infrastructure.web.exception.ApiException;
import com.docus.infrastructure.web.exception.ExceptionCode;
import com.docus.infrastructure.web.request.SearchDTO;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.convert.SchCollectRecordConvert;
import com.docus.server.convert.SchCollectRecordRetryLogConvert;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
import com.docus.server.entity.scheduling.management.SchSystemParams;
import com.docus.server.enums.RetryTaskEnum;
import com.docus.server.enums.SubStateEnum;
import com.docus.server.infrastructure.dao.ISchCollectRecordDao;
import com.docus.server.infrastructure.dao.ISchCollectRecordRetryLogDao;
import com.docus.server.infrastructure.dao.ISchSystemParamsDao;
import com.docus.server.infrastructure.dao.ISchTerminatorDao;
import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
@ -38,9 +46,13 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
@Resource
private ISchCollectRecordDao iSchCollectRecordDao;
@Resource
private ISchTerminatorDao iSchTerminatorDao;
@Resource
private ISchCollectRecordRetryLogDao iSchCollectRecordRetryLogDao;
@Resource
private ISchSystemParamsDao iSchSystemParamsDao;
@Resource
private IdService idService;
/**
*
@ -80,6 +92,26 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
return result;
}
@Transactional(rollbackFor = Exception.class)
@Override
public SchCollectRecord saveOrUpdateRecord(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) {
//新增采集记录表
AddSchCollectRecordDTO addSchCollectRecordDTO = new AddSchCollectRecordDTO();
addSchCollectRecordDTO.setId(idService.getDateSeq());
addSchCollectRecordDTO.setCollectorId(Long.valueOf(messageContent.getTasks().get(0).getCollectorId()));
addSchCollectRecordDTO.setTerminatorId(terminal.getId());
addSchCollectRecordDTO.setTaskId(messageContent.getTasks().get(0).getTaskId());
SchSystemParams params = iSchSystemParamsDao.findOneBy("paramValue", messageContent.getTasks().get(0).getCollectorId());
addSchCollectRecordDTO.setTaskName(String.format("%s%s%s", messageContent.getPatient().getName(), params.getParamName(), "采集"));
addSchCollectRecordDTO.setTaskMemo(String.format("%s%s%s", messageContent.getPatient().getName(), params.getParamName(), "采集"));
addSchCollectRecordDTO.setStartTime(DateUtil.now());
addSchCollectRecordDTO.setTaskDetailInfo(String.format("病案号:%s姓名%s采集类型%s", messageContent.getPatientId(), messageContent.getPatient().getName(), params.getParamName()));
addSchCollectRecordDTO.setSubTaskExecState(SubStateEnum.RECEIVE);
addSchCollectRecordDTO.setTaskOriginJson(JSON.toJSON(messageContent));
return this.add(addSchCollectRecordDTO);
}
/**
*
*
@ -87,9 +119,10 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
* @return
*/
@Override
public boolean add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
public SchCollectRecord add(AddSchCollectRecordDTO addSchCollectRecordDTO) {
SchCollectRecord schCollectRecord = SchCollectRecordConvert.INSTANCE.convertDO(addSchCollectRecordDTO);
return iSchCollectRecordDao.add(schCollectRecord);
iSchCollectRecordDao.add(schCollectRecord);
return schCollectRecord;
}
/**
@ -102,6 +135,7 @@ public class SchCollectRecordServiceImpl implements ISchCollectRecordService {
public boolean edit(EditSchCollectRecordDTO editSchCollectRecordDTO) {
RetryTaskEnum isRetryTask = editSchCollectRecordDTO.getIsRetryTask();
Long id = editSchCollectRecordDTO.getId();
if (RetryTaskEnum.NO_RETRY_TASK.equals(isRetryTask)) {
//不是重试任务
SchCollectRecord schCollectRecord = iSchCollectRecordDao.findById(id);

@ -8,9 +8,9 @@ import com.docus.server.convert.SchTerminatorConvert;
import com.docus.server.dto.scheduling.management.schterminator.AddSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.DeleteSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchTerminator;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.infrastructure.dao.ISchTerminatorDao;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
@ -55,26 +55,27 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
}
@Override
public SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) {
public SchTerminatorVO saveOrUpdate(String terminatorIp, NettyTerminatorDTO nettyTerminatorDTO) {
SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminatorIp);
//新增
if (Func.isNull(schTerminator)) {
SchTerminator schTerminatorDO = new SchTerminator();
schTerminatorDO.setId(idService.getDateSeq());
schTerminatorDO.setTerminatorIp(terminatorIp);
schTerminatorDO.setTerminatorName(terminatorIp);
schTerminatorDO.setOnlineState(onlineStateEnum);
schTerminatorDO.setOnlineState(nettyTerminatorDTO.getOnlineState());
schTerminatorDO.setBusyState(BusyStateEnum.IDLE);
iSchTerminatorDao.save(schTerminatorDO);
return SchTerminatorConvert.INSTANCE.convertVO(schTerminatorDO);
} else {
schTerminator.setBusyState(BusyStateEnum.IDLE);
schTerminator.setOnlineState(onlineStateEnum);
//更新
schTerminator.setBusyState(nettyTerminatorDTO.getBusyState());
schTerminator.setOnlineState(nettyTerminatorDTO.getOnlineState());
schTerminator.setUpdateTime(new Date());
iSchTerminatorDao.updateById(schTerminator);

@ -5,6 +5,7 @@ import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.dto.scheduling.management.schcollectrecord.AddSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.DeleteSchCollectRecordDTO;
import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollectRecordDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@ -57,7 +58,7 @@ public interface SchCollectRecordApi {
*/
@ApiOperation("新增")
@PostMapping("/add")
boolean add(@RequestBody AddSchCollectRecordDTO addSchCollectRecordDTO);
SchCollectRecord add(@RequestBody AddSchCollectRecordDTO addSchCollectRecordDTO);
/**
*

Loading…
Cancel
Save