From 25ec97a03c58246e995a630c525b7cc80027e008 Mon Sep 17 00:00:00 2001 From: beeajax <1105173470@qq.com> Date: Sat, 22 Jul 2023 21:20:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8B=E5=8F=91=E4=BB=BB=E5=8A=A1=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E8=A7=A6=E5=8F=91=E4=BB=BB=E5=8A=A1=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E5=B9=B6=E9=87=8D=E6=96=B0=E7=BB=99=E7=BB=88=E7=AB=AF=E5=88=86?= =?UTF-8?q?=E9=85=8D=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/docus/server/RedisListenerConfig.java | 26 +++++++ .../common/RedisKeyExpirationListener.java | 37 ++++++++++ .../docus/server/common/SchedulerTask.java | 22 +++++- .../common/process/ChannelProcessor.java | 73 +++++++++++++------ .../server/service/ISchCollectorService.java | 3 + .../impl/RedisKeyExpirationService.java | 56 ++++++++++++++ .../service/impl/SchCollectorServiceImpl.java | 5 ++ 7 files changed, 198 insertions(+), 24 deletions(-) create mode 100644 collector-scheduling-management/src/main/java/com/docus/server/RedisListenerConfig.java create mode 100644 collector-scheduling-management/src/main/java/com/docus/server/common/RedisKeyExpirationListener.java create mode 100644 collector-scheduling-management/src/main/java/com/docus/server/service/impl/RedisKeyExpirationService.java diff --git a/collector-scheduling-management/src/main/java/com/docus/server/RedisListenerConfig.java b/collector-scheduling-management/src/main/java/com/docus/server/RedisListenerConfig.java new file mode 100644 index 0000000..148c918 --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/RedisListenerConfig.java @@ -0,0 +1,26 @@ +package com.docus.server; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +@Configuration +public class RedisListenerConfig { + @Autowired + private RedisConnectionFactory redisConnectionFactory; + + @Bean + public ChannelTopic expiredTopic() { + return new ChannelTopic("__keyevent@0__:expired"); // 选择0号数据库 + } + + @Bean + public RedisMessageListenerContainer redisMessageListenerContainer() { + RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); + redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); + return redisMessageListenerContainer; + } +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/RedisKeyExpirationListener.java b/collector-scheduling-management/src/main/java/com/docus/server/common/RedisKeyExpirationListener.java new file mode 100644 index 0000000..a1d72f7 --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/RedisKeyExpirationListener.java @@ -0,0 +1,37 @@ +package com.docus.server.common; + +import com.docus.server.service.impl.RedisKeyExpirationService; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { + + public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + @Resource + private RedisKeyExpirationService redisKeyExpirationService; + + /** + * 开启notify-keyspace-events Ex的配置 + */ + @Override + public void onMessage(Message message, byte[] pattern) { + + String expireKey = message.toString(); + if (validExpireKey(expireKey)) { + redisKeyExpirationService.expired(expireKey); + } + } + + private boolean validExpireKey(String expireKey) { + return expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:") + || expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:"); + } +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java index 292a18d..1502927 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/SchedulerTask.java @@ -3,6 +3,7 @@ package com.docus.server.common; import com.docus.core.util.DateUtil; import com.docus.core.util.Func; import com.docus.core.util.json.JSON; +import com.docus.infrastructure.redis.service.RedisOps; import com.docus.server.api.taskdistribute.TaskDistributeApi; import com.docus.server.common.netty.CommMsg; import com.docus.server.common.netty.server.ChannelRepository; @@ -12,10 +13,12 @@ import com.docus.server.dto.scheduling.management.schcollector.task.SchCollector import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO; import com.docus.server.entity.scheduling.management.SchCollectRecord; import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog; +import com.docus.server.entity.scheduling.management.SchCollector; import com.docus.server.enums.BusyStateEnum; import com.docus.server.enums.RetryTaskEnum; import com.docus.server.service.ISchCollectRecordRetryLogService; import com.docus.server.service.ISchCollectRecordService; +import com.docus.server.service.ISchCollectorService; import com.docus.server.service.ISchTerminatorService; import com.google.common.collect.Lists; import io.netty.buffer.Unpooled; @@ -47,13 +50,17 @@ public class SchedulerTask { private ISchCollectRecordRetryLogService iSchCollectRecordRetryLogService; @Resource private ISchTerminatorService iSchTerminatorService; + @Resource + private ISchCollectorService iSchCollectorService; + @Resource + private RedisOps redisOps; private BlockingQueue retryTaskQueue = new LinkedBlockingQueue<>(); //定时任务 // 5 * * * * ? 在每分钟的5秒执行 @Scheduled(cron = "0/1 * * * * ?") - public void scheduleTask() { + public void collectorTask() { try { log.info("定时任务: 开始执行"); @@ -226,6 +233,19 @@ public class SchedulerTask { if (channel != null) { channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); } + + String isRetry = messageContent.getIsRetry(); + Long collectorRecordId = messageContent.getCollectorRecordId(); + String collectorId = messageContent.getTaskInfo().getTasks().get(0).getCollectorId(); + SchCollector schCollector = iSchCollectorService.findByCollectorId(collectorId); + + if (RetryTaskEnum.NO_RETRY_TASK.equals(isRetry)) { + redisOps.setEx(String.format("schCollectorRecord:noRetryTask:expireKey:%s", collectorRecordId), String.valueOf(collectorRecordId), schCollector.getTaskTimeout()); + } else { + redisOps.setEx(String.format("schCollectorRecord:isRetryTask:expireKey:%s", collectorRecordId), String.valueOf(collectorRecordId), schCollector.getTaskTimeout()); + } + + } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java index cd6cb66..076813b 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/common/process/ChannelProcessor.java @@ -44,11 +44,32 @@ public class ChannelProcessor extends AbstractProcessor { return doSchCollectRecordControllerEdit(context); case "SchTerminatorController": return doSchTerminatorController(context); + case "RedisKeyExpirationService-expired": + return doRedisKeyExpired(context); default: return true; } } + private Object doRedisKeyExpired(TrackContext context) { + boolean error = context.isError(); + String expireKey = (String) context.getArgs()[0]; + + String recordId = expireKey.substring(expireKey.lastIndexOf(":") + 1); + + if (!error) { + + if (expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:")) { + retryTask(recordId); + } + if (expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")) { + noRetryTask(recordId); + + } + } + return null; + } + private boolean doSchCollectRecordControllerEdit(TrackContext context) { return logCollectRecord(context); } @@ -59,48 +80,54 @@ public class ChannelProcessor extends AbstractProcessor { private boolean logCollectRecord(TrackContext context) { boolean error = context.isError(); + EditSchCollectRecordDTO collectRecordDTO = (EditSchCollectRecordDTO) context.getArgs()[0]; if (!error) { if (RetryTaskEnum.NO_RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { - SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(String.valueOf(collectRecordDTO.getId())); - SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(schCollectRecordVO.getTerminatorId())); + noRetryTask(String.valueOf(collectRecordDTO.getId())); - NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + } else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { - if (Func.isEmpty(nettyTerminatorDTO)) { - return false; - } + retryTask(String.valueOf(collectRecordDTO.getId())); - List stateEnums = Arrays.asList(StateEnum.values()); + } - if (stateEnums.contains(schCollectRecordVO.getTaskExecState())) { - nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); - } - } else if (RetryTaskEnum.RETRY_TASK.equals(collectRecordDTO.getIsRetryTask())) { + } + return error; - SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(String.valueOf(collectRecordDTO.getId())); + } - SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(retryLogVO.getTerminatorId())); + private void retryTask(String recordLogId) { + SchCollectRecordRetryLogVO retryLogVO = iSchCollectRecordRetryLogService.findById(recordLogId); - NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + updateTerminatorState(retryLogVO.getTerminatorId(), retryLogVO.getTaskExecState()); - if (Func.isEmpty(nettyTerminatorDTO)) { - return false; - } + } - List stateEnums = Arrays.asList(StateEnum.values()); + private void noRetryTask(String recordId) { - if (stateEnums.contains(retryLogVO.getTaskExecState())) { - nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); - } - } + SchCollectRecordVO schCollectRecordVO = iSchCollectRecordService.findById(recordId); + updateTerminatorState(schCollectRecordVO.getTerminatorId(), schCollectRecordVO.getTaskExecState()); + } + + private void updateTerminatorState(Long terminatorId, StateEnum taskExecState) { + SchTerminatorVO schTerminatorVO = iSchTerminatorService.findById(String.valueOf(terminatorId)); + + NettyTerminatorDTO nettyTerminatorDTO = channelRepository.getTerminatorByIp(String.valueOf(schTerminatorVO.getTerminatorIp())); + + if (Func.isEmpty(nettyTerminatorDTO)) { + return; } - return error; + List stateEnums = Arrays.asList(StateEnum.values()); + + if (stateEnums.contains(taskExecState)) { + nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE); + } } private boolean logTerminator(TrackContext context) { diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorService.java index 8490aff..7278571 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorService.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorService.java @@ -6,6 +6,7 @@ import com.docus.server.dto.scheduling.management.schcollector.AddSchCollectorDT import com.docus.server.dto.scheduling.management.schcollector.DeleteSchCollectorDTO; import com.docus.server.dto.scheduling.management.schcollector.EditSchCollectorDTO; import com.docus.server.dto.scheduling.management.schcollector.UpdateSchCollectorDTO; +import com.docus.server.entity.scheduling.management.SchCollector; import com.docus.server.vo.scheduling.management.schcollector.SchCollectorVO; /** @@ -62,4 +63,6 @@ public interface ISchCollectorService { */ boolean updateVersion(UpdateSchCollectorDTO schCollector); + SchCollector findByCollectorId(String collectorId); + } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/RedisKeyExpirationService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/RedisKeyExpirationService.java new file mode 100644 index 0000000..51230cc --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/RedisKeyExpirationService.java @@ -0,0 +1,56 @@ +package com.docus.server.service.impl; + +import com.docus.core.util.DateUtil; +import com.docus.log.annotation.TrackGroup; +import com.docus.server.common.process.ChannelProcessor; +import com.docus.server.entity.scheduling.management.SchCollectRecord; +import com.docus.server.entity.scheduling.management.SchCollectRecordRetryLog; +import com.docus.server.enums.StateEnum; +import com.docus.server.enums.SubStateEnum; +import com.docus.server.infrastructure.dao.ISchCollectRecordDao; +import com.docus.server.infrastructure.dao.ISchCollectRecordRetryLogDao; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class RedisKeyExpirationService { + @Resource + private ISchCollectRecordDao iSchCollectRecordDao; + @Resource + private ISchCollectRecordRetryLogDao iSchCollectRecordRetryLogDao; + + @TrackGroup(group = "RedisKeyExpirationService-expired", processor = ChannelProcessor.class) + public void expired(String expireKey) { + + if (expireKey.startsWith("schCollectorRecord:noRetryTask:expireKey:")) { + System.out.println("expireKey过期了===" + expireKey); + + String recordId = expireKey.substring(expireKey.lastIndexOf(":") + 1); + + SchCollectRecord collectRecord = iSchCollectRecordDao.findById(recordId); + collectRecord.setTaskExecState(StateEnum.FAIL); + //超时作废 + collectRecord.setSubTaskExecState(SubStateEnum.CANCEL); + collectRecord.setUpdateTime(DateUtil.now()); + collectRecord.setLastTaskErrorMsg("任务超时,触发任务作废"); + iSchCollectRecordDao.updateById(collectRecord); + + } + if (expireKey.startsWith("schCollectorRecord:isRetryTask:expireKey:")) { + System.out.println("expireKey过期了===" + expireKey); + + String logRecordId = expireKey.substring(expireKey.lastIndexOf(":") + 1); + + SchCollectRecordRetryLog recordRetryLog = iSchCollectRecordRetryLogDao.findById(logRecordId); + recordRetryLog.setTaskExecState(StateEnum.FAIL); + recordRetryLog.setSubTaskExecState(SubStateEnum.CANCEL); + recordRetryLog.setUpdateTime(DateUtil.now()); + recordRetryLog.setLastTaskErrorMsg("任务超时,触发任务作废"); + iSchCollectRecordRetryLogDao.updateById(recordRetryLog); + + } + + } + +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorServiceImpl.java index e068fd4..185a7d6 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorServiceImpl.java @@ -95,6 +95,11 @@ public class SchCollectorServiceImpl implements ISchCollectorService { return iSchCollectorDao.updateById(schCollector); } + @Override + public SchCollector findByCollectorId(String collectorId) { + return iSchCollectorDao.findOneBy("collectorId", collectorId); + } + /** * 新增 *