下发任务超时触发任务超时并重新给终端分配任务

segment2.0
beeajax 2 years ago
parent b71224ed9b
commit 25ec97a03c

@ -0,0 +1,26 @@
package com.docus.server;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListenerConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public ChannelTopic expiredTopic() {
return new ChannelTopic("__keyevent@0__:expired"); // 选择0号数据库
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
}

@ -0,0 +1,37 @@
package com.docus.server.common;
import com.docus.server.service.impl.RedisKeyExpirationService;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Resource
private RedisKeyExpirationService redisKeyExpirationService;
/**
* notify-keyspace-events Ex
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String expireKey = message.toString();
if (validExpireKey(expireKey)) {
redisKeyExpirationService.expired(expireKey);
}
}
private boolean validExpireKey(String expireKey) {
return expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")
|| expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:");
}
}

@ -3,6 +3,7 @@ package com.docus.server.common;
import com.docus.core.util.DateUtil; import com.docus.core.util.DateUtil;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.core.util.json.JSON; import com.docus.core.util.json.JSON;
import com.docus.infrastructure.redis.service.RedisOps;
import com.docus.server.api.taskdistribute.TaskDistributeApi; import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.CommMsg;
import com.docus.server.common.netty.server.ChannelRepository; import com.docus.server.common.netty.server.ChannelRepository;
@ -12,10 +13,12 @@ import com.docus.server.dto.scheduling.management.schcollector.task.SchCollector
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog; import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
import com.docus.server.entity.scheduling.management.SchCollector;
import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.RetryTaskEnum; import com.docus.server.enums.RetryTaskEnum;
import com.docus.server.service.ISchCollectRecordRetryLogService; import com.docus.server.service.ISchCollectRecordRetryLogService;
import com.docus.server.service.ISchCollectRecordService; import com.docus.server.service.ISchCollectRecordService;
import com.docus.server.service.ISchCollectorService;
import com.docus.server.service.ISchTerminatorService; import com.docus.server.service.ISchTerminatorService;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -47,13 +50,17 @@ public class SchedulerTask {
private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService; private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService;
@Resource @Resource
private ISchTerminatorService iSchTerminatorService; private ISchTerminatorService iSchTerminatorService;
@Resource
private ISchCollectorService iSchCollectorService;
@Resource
private RedisOps redisOps;
private BlockingQueue<ReportDownTwoDTO> retryTaskQueue = new LinkedBlockingQueue<>(); private BlockingQueue<ReportDownTwoDTO> retryTaskQueue = new LinkedBlockingQueue<>();
//定时任务 //定时任务
// 5 * * * * ? 在每分钟的5秒执行 // 5 * * * * ? 在每分钟的5秒执行
@Scheduled(cron = "0/1 * * * * ?") @Scheduled(cron = "0/1 * * * * ?")
public void scheduleTask() { public void collectorTask() {
try { try {
log.info("定时任务: 开始执行"); log.info("定时任务: 开始执行");
@ -226,6 +233,19 @@ public class SchedulerTask {
if (channel != null) { if (channel != null) {
channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8));
} }
String isRetry = messageContent.getIsRetry();
Long collectorRecordId = messageContent.getCollectorRecordId();
String collectorId = messageContent.getTaskInfo().getTasks().get(0).getCollectorId();
SchCollector schCollector = iSchCollectorService.findByCollectorId(collectorId);
if (RetryTaskEnum.NO_RETRY_TASK.equals(isRetry)) {
redisOps.setEx(String.format("schCollectorRecord:noRetryTask:expireKey:%s", collectorRecordId), String.valueOf(collectorRecordId), schCollector.getTaskTimeout());
} else {
redisOps.setEx(String.format("schCollectorRecord:isRetryTask:expireKey:%s", collectorRecordId), String.valueOf(collectorRecordId), schCollector.getTaskTimeout());
}
} }

@ -44,11 +44,32 @@ public class ChannelProcessor extends AbstractProcessor {
return doSchCollectRecordControllerEdit(context); return doSchCollectRecordControllerEdit(context);
case "SchTerminatorController": case "SchTerminatorController":
return doSchTerminatorController(context); return doSchTerminatorController(context);
case "RedisKeyExpirationService-expired":
return doRedisKeyExpired(context);
default: default:
return true; return true;
} }
} }
private Object doRedisKeyExpired(TrackContext context) {
boolean error = context.isError();
String expireKey = (String) context.getArgs()[0];
String recordId = expireKey.substring(expireKey.lastIndexOf(":") + 1);
if (!error) {
if (expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:")) {
retryTask(recordId);
}
if (expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")) {
noRetryTask(recordId);
}
}
return null;
}
private boolean doSchCollectRecordControllerEdit(TrackContext context) { private boolean doSchCollectRecordControllerEdit(TrackContext context) {
return logCollectRecord(context); return logCollectRecord(context);
} }
@ -59,48 +80,54 @@ public class ChannelProcessor extends AbstractProcessor {
private boolean logCollectRecord(TrackContext context) { private boolean logCollectRecord(TrackContext context) {
boolean error = context.isError(); boolean error = context.isError();
EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0]; EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0];
if (!error) { if (!error) {
if (RetryTaskEnum.NO_RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { if (RetryTaskEnum.NO_RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) {
SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId()));
SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); noRetryTask(String.valueOf(collectRecordDTO.getId()));
NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); } else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) {
if (Func.isEmpty(nettyTerminatorDTO)) { retryTask(String.valueOf(collectRecordDTO.getId()));
return false;
}
List<StateEnum> stateEnums = Arrays.asList(StateEnum.values()); }
if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { }
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); return error;
}
} else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) {
SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(String.valueOf(collectRecordDTO.getId())); }
SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(retryLogVO.getTerminatorId())); private void retryTask(String recordLogId) {
SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(recordLogId);
NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); updateTerminatorState(retryLogVO.getTerminatorId(), retryLogVO.getTaskExecState());
if (Func.isEmpty(nettyTerminatorDTO)) { }
return false;
}
List<StateEnum> stateEnums = Arrays.asList(StateEnum.values()); private void noRetryTask(String recordId) {
if (stateEnums.contains(retryLogVO.getTaskExecState())) { SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(recordId);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
}
}
updateTerminatorState(schCollectRecordVO.getTerminatorId(), schCollectRecordVO.getTaskExecState());
}
private void updateTerminatorState(Long terminatorId, StateEnum taskExecState) {
SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(terminatorId));
NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp()));
if (Func.isEmpty(nettyTerminatorDTO)) {
return;
} }
return error;
List<StateEnum> stateEnums = Arrays.asList(StateEnum.values());
if (stateEnums.contains(taskExecState)) {
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
}
} }
private boolean logTerminator(TrackContext context) { private boolean logTerminator(TrackContext context) {

@ -6,6 +6,7 @@ import com.docus.server.dto.scheduling.management.schcollector.AddSchCollectorDT
import com.docus.server.dto.scheduling.management.schcollector.DeleteSchCollectorDTO; import com.docus.server.dto.scheduling.management.schcollector.DeleteSchCollectorDTO;
import com.docus.server.dto.scheduling.management.schcollector.EditSchCollectorDTO; import com.docus.server.dto.scheduling.management.schcollector.EditSchCollectorDTO;
import com.docus.server.dto.scheduling.management.schcollector.UpdateSchCollectorDTO; import com.docus.server.dto.scheduling.management.schcollector.UpdateSchCollectorDTO;
import com.docus.server.entity.scheduling.management.SchCollector;
import com.docus.server.vo.scheduling.management.schcollector.SchCollectorVO; import com.docus.server.vo.scheduling.management.schcollector.SchCollectorVO;
/** /**
@ -62,4 +63,6 @@ public interface ISchCollectorService {
*/ */
boolean updateVersion(UpdateSchCollectorDTO schCollector); boolean updateVersion(UpdateSchCollectorDTO schCollector);
SchCollector findByCollectorId(String collectorId);
} }

@ -0,0 +1,56 @@
package com.docus.server.service.impl;
import com.docus.core.util.DateUtil;
import com.docus.log.annotation.TrackGroup;
import com.docus.server.common.process.ChannelProcessor;
import com.docus.server.entity.scheduling.management.SchCollectRecord;
import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog;
import com.docus.server.enums.StateEnum;
import com.docus.server.enums.SubStateEnum;
import com.docus.server.infrastructure.dao.ISchCollectRecordDao;
import com.docus.server.infrastructure.dao.ISchCollectRecordRetryLogDao;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class RedisKeyExpirationService {
@Resource
private ISchCollectRecordDao iSchCollectRecordDao;
@Resource
private ISchCollectRecordRetryLogDao iSchCollectRecordRetryLogDao;
@TrackGroup(group = "RedisKeyExpirationService-expired", processor = ChannelProcessor.class)
public void expired(String expireKey) {
if (expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")) {
System.out.println("expireKey过期了===" + expireKey);
String recordId = expireKey.substring(expireKey.lastIndexOf(":") + 1);
SchCollectRecord collectRecord = iSchCollectRecordDao.findById(recordId);
collectRecord.setTaskExecState(StateEnum.FAIL);
//超时作废
collectRecord.setSubTaskExecState(SubStateEnum.CANCEL);
collectRecord.setUpdateTime(DateUtil.now());
collectRecord.setLastTaskErrorMsg("任务超时,触发任务作废");
iSchCollectRecordDao.updateById(collectRecord);
}
if (expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:")) {
System.out.println("expireKey过期了===" + expireKey);
String logRecordId = expireKey.substring(expireKey.lastIndexOf(":") + 1);
SchCollectRecordRetryLog recordRetryLog = iSchCollectRecordRetryLogDao.findById(logRecordId);
recordRetryLog.setTaskExecState(StateEnum.FAIL);
recordRetryLog.setSubTaskExecState(SubStateEnum.CANCEL);
recordRetryLog.setUpdateTime(DateUtil.now());
recordRetryLog.setLastTaskErrorMsg("任务超时,触发任务作废");
iSchCollectRecordRetryLogDao.updateById(recordRetryLog);
}
}
}

@ -95,6 +95,11 @@ public class SchCollectorServiceImpl implements ISchCollectorService {
return iSchCollectorDao.updateById(schCollector); return iSchCollectorDao.updateById(schCollector);
} }
@Override
public SchCollector findByCollectorId(String collectorId) {
return iSchCollectorDao.findOneBy("collectorId", collectorId);
}
/** /**
* *
* *

Loading…
Cancel
Save