diff --git a/collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java b/collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java index 4690a2f..56e96f3 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/AppRunBootstrap.java @@ -6,7 +6,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.scheduling.annotation.EnableAsync; @EnableAsync -@EnableFeignClients(basePackages = ("com.docus.core.excel.feign")) +@EnableFeignClients(basePackages = {"com.docus.core.excel.feign", "com.docus.server.api.taskdistribute"}) @SpringBootApplication(scanBasePackages = {"com.docus"}) public class AppRunBootstrap { public static void main(String[] args) { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java index 1f814f4..b47ed67 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java @@ -1,21 +1,33 @@ package com.docus.server.common; +import com.docus.core.util.Func; +import com.docus.core.util.ListUtils; import com.docus.core.util.json.JSON; +import com.docus.server.api.taskdistribute.TaskDistributeApi; import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; +import com.docus.server.enums.BusyStateEnum; +import com.docus.server.enums.OnlineStateEnum; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.util.Collection; +import java.util.List; import java.util.Map; -import java.util.Set; @Component @Slf4j public class SchedulerTask { @Resource private ChannelRepository channelRepository; + @Resource + private TaskDistributeApi taskDistributeApi; //定时任务 // 5 * * * * ? 在每分钟的5秒执行 @@ -26,22 +38,98 @@ public class SchedulerTask { Map ipToTerminatorCacheMap = channelRepository.getIpToTerminatorCacheMap(); - Set keySet = ipToTerminatorCacheMap.keySet(); + //没有可用的通道 + if (Func.isEmpty(ipToTerminatorCacheMap)) { + return; + } + + Collection values = ipToTerminatorCacheMap.values(); + + List AllTerminators = Lists.newArrayList(); + List onlyTerminators = Lists.newArrayList(); + List priorityTerminators = Lists.newArrayList(); + List noOnlyTerminators = Lists.newArrayList(); + + Map onlyCollectorIdMap = Maps.newConcurrentMap(); + Map priorityCollectorIdsMap = Maps.newConcurrentMap(); + + for (NettyTerminatorDTO t : values) { + + //是否空闲 + BusyStateEnum busyState = t.getBusyState(); + //是否在线 + OnlineStateEnum onlineState = t.getOnlineState(); + //只采集 + String onlyCollectorIds = t.getOnlyCollectorIds(); + //优先采集 + String priorityCollectorIds = t.getPriorityCollectorIds(); + + //在线且空闲的通道 + if (valid(busyState, onlineState)) { + + //所有空闲,在线的通道 + AllTerminators.add(t); + + //有无偏好属性的, + // 只采集的+ 没有优先采集的 + if (Func.isNotBlank(onlyCollectorIds) && Func.isBlank(priorityCollectorIds)) { + onlyTerminators.add(t); + + String[] split = onlyCollectorIds.split(","); + + for (String k : split) { + onlyCollectorIdMap.put(k, t); + } - for (String clientIp : keySet) { - NettyTerminatorDTO nettyTerminatorDTO = ipToTerminatorCacheMap.get(clientIp); + } - System.out.println(JSON.toJSON(nettyTerminatorDTO)); + //只采集的+优先采集的 + if (Func.isNotBlank(onlyCollectorIds) && Func.isNotBlank(priorityCollectorIds)) { + priorityTerminators.add(t); + String[] split = priorityCollectorIds.split(","); + for (String k : split) { + priorityCollectorIdsMap.put(k, t); + } + } + + //没有偏好属性的 + if (Func.isBlank(onlyCollectorIds)) { + noOnlyTerminators.add(t); + } + } + + System.out.println(JSON.toJSON(t)); + } + + // 只采集的+ 没有优先采集的,A + Map stringNettyTerminatorDTOMap = ListUtils.toMap(onlyTerminators, NettyTerminatorDTO::getTerminatorIp); + + //只采集的+优先采集的,A,B + Map stringNettyTerminatorDTOMap1 = ListUtils.toMap(priorityTerminators, NettyTerminatorDTO::getTerminatorIp); + + //没有偏好属性的,C + Map stringNettyTerminatorDTOMap2 = ListUtils.toMap(noOnlyTerminators, NettyTerminatorDTO::getTerminatorIp); + + //拿到所有偏好属性 + + + if (CollectionUtils.isEmpty(AllTerminators)) { + //:todo 任务平台需要修改发布任务策略 + ReportDownTwoDTO task = taskDistributeApi.getTask(String.valueOf(AllTerminators.size())); } - //todo:执行业务 log.info("定时任务: 执行完毕"); } catch (Exception e) { log.error("定时任务执行出错", e); } } + private boolean valid(BusyStateEnum busyState, OnlineStateEnum onlineState) { + return BusyStateEnum.IDLE.equals(busyState) + && OnlineStateEnum.ONLINE.equals(onlineState); + } + } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java index ed971de..bc084b4 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/ChannelRepository.java @@ -3,6 +3,7 @@ package com.docus.server.common.netty.server; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.OnlineStateEnum; import com.docus.server.service.ISchTerminatorService; +import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; import io.netty.channel.Channel; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; @@ -39,15 +40,18 @@ public class ChannelRepository { //客户端上线 String terminatorIp = nettyTerminatorDTO.getTerminatorIp(); - IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel); + //更新数据库终端数据 + SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState()); + nettyTerminatorDTO.setId(schTerminatorVO.getId()); + //缓存 + IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel); IP_TERMINATOR_CACHE_MAP.put(terminatorIp, nettyTerminatorDTO); AttributeKey attributeKey = AttributeKey.valueOf("ip"); channel.attr(attributeKey).set(terminatorIp); - //更新数据库终端数据 - iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState()); + } public String getClientKey(Channel channel) { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java index edd2099..b3f9acb 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyBusinessHandler.java @@ -99,7 +99,6 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler { } NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); - nettyTerminatorDTO.setId(ctx.channel().id().asShortText()); nettyTerminatorDTO.setTerminatorIp(clientIp); nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java index 82728af..3f0849a 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/netty/server/handler/NettyHeartbeatHandler.java @@ -64,7 +64,6 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter { } NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO(); - nettyTerminatorDTO.setId(ctx.channel().id().asShortText()); nettyTerminatorDTO.setTerminatorIp(clientIp); nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java index 27b2a44..e0e1573 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchTerminatorService.java @@ -57,9 +57,9 @@ public interface ISchTerminatorService { /** * 新增或更新 - * - * @param terminatorIp + * @param terminatorIp * @param onlineState + * @return */ - void saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState); + SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java index 1c1c44a..f746912 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchTerminatorServiceImpl.java @@ -55,20 +55,22 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService { } @Override - public void saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) { + public SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) { SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminatorIp); if (Func.isNull(schTerminator)) { - SchTerminator schTerminatorVO = new SchTerminator(); + SchTerminator schTerminatorDO = new SchTerminator(); - schTerminatorVO.setId(idService.getDateSeq()); - schTerminatorVO.setTerminatorIp(terminatorIp); - schTerminatorVO.setTerminatorName(terminatorIp); - schTerminatorVO.setOnlineState(onlineStateEnum); - schTerminatorVO.setBusyState(BusyStateEnum.IDLE); + schTerminatorDO.setId(idService.getDateSeq()); + schTerminatorDO.setTerminatorIp(terminatorIp); + schTerminatorDO.setTerminatorName(terminatorIp); + schTerminatorDO.setOnlineState(onlineStateEnum); + schTerminatorDO.setBusyState(BusyStateEnum.IDLE); - iSchTerminatorDao.save(schTerminatorVO); + iSchTerminatorDao.save(schTerminatorDO); + + return SchTerminatorConvert.INSTANCE.convertVO(schTerminatorDO); } else { schTerminator.setBusyState(BusyStateEnum.IDLE); @@ -76,7 +78,10 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService { schTerminator.setUpdateTime(new Date()); iSchTerminatorDao.updateById(schTerminator); + + return SchTerminatorConvert.INSTANCE.convertVO(schTerminator); } + } /** diff --git a/docus-client-interface/src/main/java/com/docus/server/api/taskdistribute/TaskDistributeApi.java b/docus-client-interface/src/main/java/com/docus/server/api/taskdistribute/TaskDistributeApi.java new file mode 100644 index 0000000..e534c53 --- /dev/null +++ b/docus-client-interface/src/main/java/com/docus/server/api/taskdistribute/TaskDistributeApi.java @@ -0,0 +1,27 @@ +package com.docus.server.api.taskdistribute; + +import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; + + +/** + * 任务平台接口 + * + * @author AutoGenerator + * @since 2023-07-15 + */ +@Api(value = "任务平台接口", tags = "任务平台接口") +@FeignClient(url = "${taskdistribute.api.url:http://192.168.16.85:9296}", name = "collector-scheduling-management.TaskDistributeApi") +@RequestMapping("/api") +public interface TaskDistributeApi { + + @ApiOperation("向客户端发送TCP命令") + @GetMapping("/noviewtask/GetTask") + ReportDownTwoDTO getTask(@RequestParam String collectid); + +} diff --git a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java index 0121f33..58c5f73 100644 --- a/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java +++ b/docus-client-interface/src/main/java/com/docus/server/dto/scheduling.management/schterminator/NettyTerminatorDTO.java @@ -21,8 +21,8 @@ import java.util.Date; @ApiModel(value = "NettyTerminatorDTO", description = "NettyTerminatorDTO") public class NettyTerminatorDTO implements Serializable { - @ApiModelProperty(value = "channelId主键") - private String id; + @ApiModelProperty(value = "终端主键") + private Long id; @ApiModelProperty(value = "终端IP") private String terminatorIp;