diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java index c312141..292a18d 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -331,7 +332,8 @@ public class SchedulerTask { "}"; ReportDownTwoDTO reportDownTwoDTO3 = JSON.fromJSON(collectorId3, ReportDownTwoDTO.class); - List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); +// List allTaskList = Lists.newArrayList(reportDownTwoDTO1, reportDownTwoDTO2, reportDownTwoDTO3); + List allTaskList = new ArrayList<>(); if (!CollectionUtils.isEmpty(this.retryTaskQueue)) { ReportDownTwoDTO retryTask = (ReportDownTwoDTO) this.retryTaskQueue.take(); diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java index d9e5838..cd6cb66 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java @@ -8,10 +8,13 @@ import com.docus.server.dto.scheduling.management.schcollectrecord.EditSchCollec import com.docus.server.dto.scheduling.management.schterminator.EditSchTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.enums.BusyStateEnum; +import com.docus.server.enums.RetryTaskEnum; import com.docus.server.enums.StateEnum; +import com.docus.server.service.ISchCollectRecordRetryLogService; import com.docus.server.service.ISchCollectRecordService; import com.docus.server.service.ISchTerminatorService; import com.docus.server.vo.scheduling.management.schcollectrecord.SchCollectRecordVO; +import com.docus.server.vo.scheduling.management.schcollectrecordretrylog.SchCollectRecordRetryLogVO; import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO; import javax.annotation.Resource; @@ -29,6 +32,8 @@ public class ChannelProcessor extends AbstractProcessor { private ISchTerminatorService iSchTerminatorService; @Resource private ISchCollectRecordService iSchCollectRecordService; + @Resource + private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService; @Override protected Object doProcess(TrackContext context) { @@ -57,21 +62,42 @@ public class ChannelProcessor extends AbstractProcessor { EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0]; if (!error) { - SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId())); - SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); + if (RetryTaskEnum.NO_RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { + SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId())); - NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); - if (Func.isEmpty(nettyTerminatorDTO)) { - return false; - } + NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + + if (Func.isEmpty(nettyTerminatorDTO)) { + return false; + } + + List stateEnums = Arrays.asList(StateEnum.values()); + + if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + } + } else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { - List stateEnums = Arrays.asList(StateEnum.values()); + SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(String.valueOf(collectRecordDTO.getId())); - if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { - nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(retryLogVO.getTerminatorId())); + + NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + + if (Func.isEmpty(nettyTerminatorDTO)) { + return false; + } + + List stateEnums = Arrays.asList(StateEnum.values()); + + if (stateEnums.contains(retryLogVO.getTaskExecState())) { + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + } } + } return error;