任务下发

segment2.0
beeajax 2 years ago
parent abd6074895
commit 0978dcbc80

@ -1,23 +1,31 @@
package com.docus.server.common;
import com.docus.core.util.DateUtil;
import com.docus.core.util.Func;
import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.convert.CommMsgConvert;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportTaskTwoDTO;
import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -36,87 +44,128 @@ public class SchedulerTask {
try {
log.info("定时任务: 开始执行");
Map<String, NettyTerminatorDTO> ipToTerminatorCacheMap = channelRepository.getIpToTerminatorCacheMap();
//在线并且空闲的终端
List<NettyTerminatorDTO> terminalList = channelRepository.getAvailTerminator();
//没有可用的通道
if (Func.isEmpty(ipToTerminatorCacheMap)) {
if (Func.isEmpty(terminalList)) {
return;
}
Collection<NettyTerminatorDTO> values = ipToTerminatorCacheMap.values();
List<ReportDownTwoDTO> reportDownTwoDTOList;
List<NettyTerminatorDTO> AllTerminators = Lists.newArrayList();
List<NettyTerminatorDTO> onlyTerminators = Lists.newArrayList();
List<NettyTerminatorDTO> priorityTerminators = Lists.newArrayList();
List<NettyTerminatorDTO> noOnlyTerminators = Lists.newArrayList();
List<List<ReportTaskTwoDTO>> taskInfos = new ArrayList<>();
Map<String, NettyTerminatorDTO> onlyCollectorIdMap = Maps.newConcurrentMap();
Map<String, NettyTerminatorDTO> priorityCollectorIdsMap = Maps.newConcurrentMap();
for (NettyTerminatorDTO t : values) {
//是否空闲
BusyStateEnum busyState = t.getBusyState();
//是否在线
OnlineStateEnum onlineState = t.getOnlineState();
//只采集
String onlyCollectorIds = t.getOnlyCollectorIds();
//优先采集
String priorityCollectorIds = t.getPriorityCollectorIds();
Map<String, ReportDownTwoDTO> reportDownTwoMap = Maps.newConcurrentMap();
if (!CollectionUtils.isEmpty(terminalList)) {
//:todo 任务平台需要修改发布任务策略
reportDownTwoDTOList = getTask(terminalList.size());
//在线且空闲的通道
if (valid(busyState, onlineState)) {
if (Func.isEmpty(reportDownTwoDTOList) || Func.isBlank(reportDownTwoDTOList.get(0).getPatientId())) {
return;
}
//所有空闲,在线的通道
AllTerminators.add(t);
reportDownTwoMap = ListUtils.toMap(reportDownTwoDTOList, ReportDownTwoDTO::getPatientId);
//有无偏好属性的,
// 只采集的+ 没有优先采集的
if (Func.isNotBlank(onlyCollectorIds) && Func.isBlank(priorityCollectorIds)) {
onlyTerminators.add(t);
taskInfos = ListUtils.select(reportDownTwoDTOList, ReportDownTwoDTO::getTasks);
String[] split = onlyCollectorIds.split(",");
}
for (String k : split) {
onlyCollectorIdMap.put(k, t);
//只采集,有优先级的
for (NettyTerminatorDTO terminal : terminalList) {
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
//先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())) {
//把这个任务派给这个终端
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
return;
}
}
}
}
//只采集的+优先采集的
if (Func.isNotBlank(onlyCollectorIds) && Func.isNotBlank(priorityCollectorIds)) {
priorityTerminators.add(t);
//只采集没有优先级
for (NettyTerminatorDTO terminal : terminalList) {
//把刚才已经分配任务过的采集器排除
if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
continue;
}
String[] split = priorityCollectorIds.split(",");
for (String k : split) {
priorityCollectorIdsMap.put(k, t);
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
//先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
if (terminal.getOnlyCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
}
//没有偏好属性的
if (Func.isBlank(onlyCollectorIds)) {
noOnlyTerminators.add(t);
return;
}
}
System.out.println(JSON.toJSON(t));
}
// 只采集的+ 没有优先采集的A
Map<String, NettyTerminatorDTO> stringNettyTerminatorDTOMap = ListUtils.toMap(onlyTerminators, NettyTerminatorDTO::getTerminatorIp);
//只采集的+优先采集的,A,B
Map<String, NettyTerminatorDTO> stringNettyTerminatorDTOMap1 = ListUtils.toMap(priorityTerminators, NettyTerminatorDTO::getTerminatorIp);
//无只采集,有优先级
for (NettyTerminatorDTO terminal : terminalList) {
//没有偏好属性的,C
Map<String, NettyTerminatorDTO> stringNettyTerminatorDTOMap2 = ListUtils.toMap(noOnlyTerminators, NettyTerminatorDTO::getTerminatorIp);
//把刚才已经分配任务过的采集器排除
if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
continue;
}
//拿到所有偏好属性
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
//先找出有只采集的任务。
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
if (terminal.getPriorityCollectorIds().contains(reportTaskTwoDto.getCollectorId())
&& BusyStateEnum.IDLE.equals(terminal.getBusyState())) {
//把这个任务派给这个终端
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
return;
}
}
}
if (CollectionUtils.isEmpty(AllTerminators)) {
//:todo 任务平台需要修改发布任务策略
ReportDownTwoDTO task = taskDistributeApi.getTask(String.valueOf(AllTerminators.size()));
//无只采集,无优先级
for (NettyTerminatorDTO terminal : terminalList) {
//把刚才已经分配任务过的采集器排除
if (BusyStateEnum.BUSY.equals(terminal.getBusyState())) {
continue;
}
for (List<ReportTaskTwoDTO> taskInfo : taskInfos) {
//先找出有只采集的任务。
//把这个任务派给这个终端
ReportTaskTwoDTO reportTaskTwoDto = taskInfo.get(0);
String collectorId = reportTaskTwoDto.getCollectorId();
terminal.setBusyState(BusyStateEnum.BUSY);
String patientId = reportTaskTwoDto.getPatientId();
if (reportDownTwoMap.containsKey(patientId)) {
ReportDownTwoDTO reportDownTwoDTO = reportDownTwoMap.get(patientId);
tcpToClient(terminal, reportDownTwoDTO);
}
return;
}
}
@ -126,10 +175,44 @@ public class SchedulerTask {
}
}
private boolean valid(BusyStateEnum busyState, OnlineStateEnum onlineState) {
return BusyStateEnum.IDLE.equals(busyState)
&& OnlineStateEnum.ONLINE.equals(onlineState);
private void tcpToClient(NettyTerminatorDTO terminal, ReportDownTwoDTO messageContent) {
Channel channel = channelRepository.get(terminal.getTerminatorIp());
CommMsgDTO commMsgDTO = CommMsgDTO.builder()
.content(JSON.toJSON(messageContent))
.messageType(MsgConstants.SCH_DISTRIBUTE_TASKS)
.build();
CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO);
commMsg.setMessageTime(DateUtil.formatDateTime(new Date()));
if (channel != null) {
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
}
}
//根据有效终端一次获取一批任务例如10个终端获取10个不同类型任务
private List<ReportDownTwoDTO> getTask(int size) {
// return taskDistributeApi.getTask(String.valueOf(size));
//mock
ReportDownTwoDTO task = taskDistributeApi.getTask("1");
ReportDownTwoDTO task1 = taskDistributeApi.getTask("1");
return Lists.newArrayList(task, task1);
}
//根据采集器id类型一次获取一批采集器类型任务
private ArrayList<ReportDownTwoDTO> getTask(List<String> collectorIds) {
// return taskDistributeApi.getTask(collectorIds.get(0));
//mock
ReportDownTwoDTO task = taskDistributeApi.getTask("4");
ReportDownTwoDTO task1 = taskDistributeApi.getTask("5");
return Lists.newArrayList(task, task1);
}
}

@ -1,6 +1,7 @@
package com.docus.server.common.netty.server;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
@ -8,10 +9,14 @@ import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* IP
@ -80,6 +85,14 @@ public class ChannelRepository {
return IP_TERMINATOR_CACHE_MAP;
}
public List<NettyTerminatorDTO> getAvailTerminator() {
if (!CollectionUtils.isEmpty(IP_TERMINATOR_CACHE_MAP)) {
return IP_TERMINATOR_CACHE_MAP.values().stream().filter(p -> OnlineStateEnum.ONLINE.equals(p.getOnlineState())
&& BusyStateEnum.IDLE.equals(p.getBusyState())).collect(Collectors.toList());
}
return Collections.emptyList();
}
/**
* 线
*/

@ -1,5 +1,6 @@
package com.docus.server.common.process;
import com.docus.core.util.Func;
import com.docus.log.context.TrackContext;
import com.docus.log.processor.AbstractProcessor;
import com.docus.server.common.netty.server.ChannelRepository;
@ -9,6 +10,9 @@ import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* channel
@ -32,10 +36,14 @@ public class ChannelProcessor extends AbstractProcessor {
NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(terminatorVO.getTerminatorIp());
nettyTerminatorDTO.setOnlyCollectorIds(terminatorDTO.getOnlyCollectorIds());
nettyTerminatorDTO.setPriorityCollectorIds(terminatorDTO.getPriorityCollectorIds());
if (Func.isNotBlank(terminatorDTO.getOnlyCollectorIds())) {
List<String> onlyList = Arrays.stream(terminatorDTO.getOnlyCollectorIds().split(",")).map(String::valueOf).collect(Collectors.toList());
nettyTerminatorDTO.setOnlyCollectorIds(onlyList);
}
if (Func.isNotBlank(terminatorDTO.getPriorityCollectorIds())) {
List<String> priList = Arrays.stream(terminatorDTO.getPriorityCollectorIds().split(",")).map(String::valueOf).collect(Collectors.toList());
nettyTerminatorDTO.setPriorityCollectorIds(priList);
}
}
return error;
}

@ -0,0 +1,107 @@
package com.docus.server.common.test;
import java.util.ArrayList;
import java.util.List;
public class DispatchService {
public void dispatch() {
//获取所有空闲的终端
List<Terminal> terminalList = new ArrayList<>();
List<TaskInfo> taskInfos = this.getTaskInfos(terminalList.size());
//只采集,有优先级的
for (Terminal terminal : terminalList) {
for (TaskInfo taskInfo : taskInfos) {
//先找出有只采集的任务。
if (terminal.getOnlyTags().contains(taskInfo.getCollectType())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
if (terminal.getPriorityTags().contains(taskInfo.getCollectType())) {
//把这个任务派给这个终端
terminal.setState(1);
return;
}
}
}
}
//只采集没有优先级
for (Terminal terminal : terminalList) {
//把刚才已经分配任务过的采集器排除
if (terminal.getState() == 1) {
continue;
}
for (TaskInfo taskInfo : taskInfos) {
//先找出有只采集的任务。
if (terminal.getOnlyTags().contains(taskInfo.getCollectType())) {
//把这个任务派给这个终端,并且把这个终端设置成繁忙
terminal.setState(1);
return;
}
}
}
//无只采集,有优先级
for (Terminal terminal : terminalList) {
//把刚才已经分配任务过的采集器排除
if (terminal.getState() == 1) {
continue;
}
for (TaskInfo taskInfo : taskInfos) {
//先找出有只采集的任务。
if (terminal.getPriorityTags().contains(taskInfo.getCollectType())) {
//把这个任务派给这个终端
terminal.setState(1);
return;
}
}
}
//无只采集,无优先级
for (Terminal terminal : terminalList) {
//把刚才已经分配任务过的采集器排除
if (terminal.getState() == 1) {
continue;
}
for (TaskInfo taskInfo : taskInfos) {
//先找出有只采集的任务。
//把这个任务派给这个终端
terminal.setState(1);
return;
}
}
}
private void dispatchTask(List<TaskInfo> taskInfos, Terminal terminal) {
for (TaskInfo taskInfo : taskInfos) {
//先找出有只采集的任务。
if (terminal.getOnlyTags().contains(taskInfo.getCollectType())) {
//把这个任务派给这个终端
return;
}
if (terminal.getPriorityTags().contains(taskInfo.getCollectType())) {
//把这个任务派给这个终端
return;
}
}
}
public List<TaskInfo> getTaskInfos(int size) {
return null;
}
}

@ -0,0 +1,14 @@
package com.docus.server.common.test;
import lombok.Data;
@Data
public class TaskInfo {
private Long taskId;
private String collectType;
private String info;
}

@ -0,0 +1,16 @@
package com.docus.server.common.test;
import lombok.Data;
import java.util.List;
@Data
public class Terminal {
private String id;
private String collectType;
private List<String> priorityTags;
private List<String> onlyTags;
private Integer state;
}

@ -3,7 +3,7 @@ api.base-package=com.docus.server
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
spring.datasource.username=docus
spring.datasource.password=docus702
spring.datasource.username=root
spring.datasource.password=root
mybatis-plus.type-enums-package=com.docus.server.enums

@ -13,8 +13,8 @@ spring:
#公司病案的文件服务数据库
master:
url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus-collector-scheduling?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: docus
password: docus702
username: root
password: root
driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
type: com.alibaba.druid.pool.DruidDataSource
servlet:
@ -23,7 +23,7 @@ spring:
max-request-size: 200MB
redis:
host: redis.docus.cn
password: JSdocus@702
# password: JSdocus@702
cloud:
nacos:
discovery:
@ -58,6 +58,6 @@ netty:
reader-idle-time-seconds: 30
writer-idle-time-seconds: 0
all-idle-time-seconds: 0
file:
uploadFolder: D://docus/
uploadFolder: D://docus/

@ -16,7 +16,7 @@ import org.springframework.web.bind.annotation.RequestParam;
* @since 2023-07-15
*/
@Api(value = "任务平台接口", tags = "任务平台接口")
@FeignClient(url = "${taskdistribute.api.url:http://192.168.16.85:9296}", name = "collector-scheduling-management.TaskDistributeApi")
@FeignClient(url = "${taskdistribute.api.url:http://222.76.205.50:9296}", name = "collector-scheduling-management.TaskDistributeApi")
@RequestMapping("/api")
public interface TaskDistributeApi {

@ -29,11 +29,11 @@ public class ReportDownTwoDTO {
@ApiModelProperty(value = "档案时间")
private String createTime;
@ApiModelProperty(value = "患者信息")
private ReportPatientTwoDto patient = new ReportPatientTwoDto();
private ReportPatientTwoDTO patient = new ReportPatientTwoDTO();
@ApiModelProperty(value = "住院信息")
private List<ReportHospitalTwoDto> hospitals = new ArrayList<>();
private List<ReportHospitalTwoDTO> hospitals = new ArrayList<>();
@ApiModelProperty(value = "任务信息")
private List<ReportTaskTwoDto> tasks = new ArrayList<>();
private List<ReportTaskTwoDTO> tasks = new ArrayList<>();
}

@ -14,8 +14,8 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
@ApiModel(value = "ReportHospitalTwoDto", description = "下发任务-医院查询信息")
public class ReportHospitalTwoDto {
@ApiModel(value = "ReportHospitalTwoDTO", description = "下发任务-医院查询信息")
public class ReportHospitalTwoDTO {
@ApiModelProperty(value = "患者id")
private String patientId;

@ -14,8 +14,8 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
@ApiModel(value = "ReportPatientTwoDto", description = "下发任务-病患信息")
public class ReportPatientTwoDto {
@ApiModel(value = "ReportPatientTwoDTO", description = "下发任务-病患信息")
public class ReportPatientTwoDTO {
@ApiModelProperty(value = "住院号/就诊号")
private String inpatientNo;

@ -14,8 +14,8 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
@ApiModel(value = "ReportTaskTwoDto", description = "下发任务-任务信息")
public class ReportTaskTwoDto {
@ApiModel(value = "ReportTaskTwoDTO", description = "下发任务-任务信息")
public class ReportTaskTwoDTO {
@ApiModelProperty(value = "采集器标识")
private String collectorId;
@ApiModelProperty(value = "任务id")

@ -6,10 +6,14 @@ import com.docus.server.enums.RestrictStateEnum;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* NettyTerminatorDTO
@ -17,6 +21,9 @@ import java.util.Date;
* @author AutoGenerator
* @since 2023-07-15
*/
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
@ApiModel(value = "NettyTerminatorDTO", description = "NettyTerminatorDTO")
public class NettyTerminatorDTO implements Serializable {
@ -31,10 +38,10 @@ public class NettyTerminatorDTO implements Serializable {
private String terminatorName;
@ApiModelProperty(value = "只干某些任务,任务偏好属性")
private String onlyCollectorIds;
private List<String> onlyCollectorIds;
@ApiModelProperty(value = "优先干某些任务,任务偏好属性(优先给哪个采集器,若不空闲再给任意采集器)")
private String priorityCollectorIds;
private List<String> priorityCollectorIds;
@ApiModelProperty(value = "是否限制采集器类型01是")
private RestrictStateEnum restrictState;

Loading…
Cancel
Save