From f30621cb8d6660f7a269d0c46a48c9c971814184 Mon Sep 17 00:00:00 2001 From: beeajax <1105173470@qq.com> Date: Fri, 21 Jul 2023 22:55:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E4=B8=8B=E5=8F=91-=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../docus/server/common/SchedulerTask.java | 4 +- .../common/process/ChannelProcessor.java | 44 +++++++++++++++---- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java index c312141..292a18d 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -331,7 +332,8 @@ public class SchedulerTask { "}"; ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class); - List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); +// List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); + List allTaskList = new ArrayList<>(); if (!CollectionUtils.isEmpty(this.retryTaskQueue)) { ReportDownTwoDTO retryTask = (ReportDownTwoDTO) this.retryTaskQueue.take(); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java index d9e5838..cd6cb66 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java @@ -8,10 +8,13 @@ import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollec import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.BusyStateEnum; +import com.docus.server.enums.RetryTaskEnum; import com.docus.server.enums.StateEnum; +import com.docus.server.service.ISchCollectRecordRetryLogService; import com.docus.server.service.ISchCollectRecordService; import com.docus.server.service.ISchTerminatorService; import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; +import com.docus.server.vo.scheduling.management.schcollectrecordretrylog.SchCollectRecordRetryLogVO; import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; import javax.annotation.Resource; @@ -29,6 +32,8 @@ public class ChannelProcessor extends AbstractProcessor { private ISchTerminatorService iSchTerminatorService; @Resource private ISchCollectRecordService iSchCollectRecordService; + @Resource + private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService; @Override protected Object doProcess(TrackContext context) { @@ -57,21 +62,42 @@ public class ChannelProcessor extends AbstractProcessor { EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0]; if (!error) { - SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId())); - SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); + if (RetryTaskEnum.NO_RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { + SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId())); - NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); - if (Func.isEmpty(nettyTerminatorDTO)) { - return false; - } + NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + + if (Func.isEmpty(nettyTerminatorDTO)) { + return false; + } + + List stateEnums = Arrays.asList(StateEnum.values()); + + if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + } + } else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { - List stateEnums = Arrays.asList(StateEnum.values()); + SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(String.valueOf(collectRecordDTO.getId())); - if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { - nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(retryLogVO.getTerminatorId())); + + NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + + if (Func.isEmpty(nettyTerminatorDTO)) { + return false; + } + + List stateEnums = Arrays.asList(StateEnum.values()); + + if (stateEnums.contains(retryLogVO.getTaskExecState())) { + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + } } + } return error;