主动任务重试
parent
d120068846
commit
9460764c55
@ -1,24 +0,0 @@
|
||||
package com.docus.server.collect.web.process;
|
||||
|
||||
import com.docus.log.context.TrackContext;
|
||||
import com.docus.log.processor.AbstractProcessor;
|
||||
import com.docus.server.archivefile.service.ITaskMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 重试处理器
|
||||
*
|
||||
* @author linruifeng
|
||||
*/
|
||||
@Slf4j
|
||||
public class RetryProcessor extends AbstractProcessor {
|
||||
@Resource
|
||||
private ITaskMessageService messageService;
|
||||
|
||||
@Override
|
||||
public Object doProcess(TrackContext context) {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package com.docus.server.record.controller;
|
||||
|
||||
public class Package {
|
||||
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package com.docus.server.archivefile.controller;
|
||||
|
||||
import com.docus.infrastructure.web.request.SearchRequest;
|
||||
import com.docus.infrastructure.web.response.PageResult;
|
||||
import com.docus.server.api.archivefile.TaskConfigRetryLogApi;
|
||||
import com.docus.server.archivefile.service.ITaskConfigRetryLogService;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* 主动任务配置重试表 Controller
|
||||
* Generated on 2023-06-29
|
||||
*/
|
||||
@RestController
|
||||
public class TaskConfigRetryLogController implements TaskConfigRetryLogApi {
|
||||
@Autowired
|
||||
private ITaskConfigRetryLogService taskConfigRetryLogService;
|
||||
|
||||
/**
|
||||
* 按主键查询
|
||||
*
|
||||
* @param id 主键Id
|
||||
* @return 实体
|
||||
*/
|
||||
@Override
|
||||
public TaskConfigRetryLog find(String id) {
|
||||
return taskConfigRetryLogService.findById(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关键字搜素
|
||||
*
|
||||
* @param searchRequest 搜索参数
|
||||
* @return 分页列表
|
||||
*/
|
||||
@Override
|
||||
public PageResult<TaskConfigRetryLog> search(SearchRequest searchRequest) {
|
||||
return taskConfigRetryLogService.search(searchRequest);
|
||||
}
|
||||
}
|
@ -1,4 +0,0 @@
|
||||
package com.docus.server.record.controller.param;
|
||||
|
||||
public class Package {
|
||||
}
|
@ -1,4 +0,0 @@
|
||||
package com.docus.server.record.controller.vo;
|
||||
|
||||
public class Package {
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.docus.server.archivefile.convert;
|
||||
|
||||
import com.docus.server.entity.TaskConfig;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.Mappings;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
@Mapper
|
||||
public interface TaskConfigRetryLogConvert {
|
||||
TaskConfigRetryLogConvert INSTANCE = Mappers.getMapper(TaskConfigRetryLogConvert.class);
|
||||
|
||||
@Mappings({})
|
||||
TaskConfigRetryLog convert(TaskConfig taskConfig);
|
||||
|
||||
default TaskConfigRetryLog toConvertRetryLog(TaskConfig taskConfig, Long id) {
|
||||
TaskConfigRetryLog taskConfigRetryLog = convert(taskConfig);
|
||||
taskConfigRetryLog.setId(id);
|
||||
taskConfigRetryLog.setMessageId(taskConfig.getId());
|
||||
return taskConfigRetryLog;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package com.docus.server.archivefile.infrastructure.dao;
|
||||
|
||||
import com.docus.infrastructure.core.db.dao.IBaseDao;
|
||||
import com.docus.infrastructure.web.request.SearchRequest;
|
||||
import com.docus.infrastructure.web.response.PageResult;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
|
||||
public interface ITaskConfigRetryLogDao extends IBaseDao<TaskConfigRetryLog> {
|
||||
PageResult<TaskConfigRetryLog> search(SearchRequest searchRequest);
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
package com.docus.server.archivefile.infrastructure.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
/**
|
||||
* 主动任务配置重试表 Mapper 接口
|
||||
* Generated on 2023-06-29
|
||||
*/
|
||||
@Mapper
|
||||
public interface TaskConfigRetryLogMapper extends BaseMapper<TaskConfigRetryLog> {
|
||||
|
||||
}
|
@ -0,0 +1,96 @@
|
||||
package com.docus.server.archivefile.process;
|
||||
|
||||
import com.docus.infrastructure.redis.service.IdService;
|
||||
import com.docus.log.context.TrackContext;
|
||||
import com.docus.log.processor.AbstractProcessor;
|
||||
import com.docus.server.archivefile.convert.TaskConfigRetryLogConvert;
|
||||
import com.docus.server.archivefile.convert.TaskMessageRetryLogConvert;
|
||||
import com.docus.server.archivefile.infrastructure.dao.ITaskConfigDao;
|
||||
import com.docus.server.archivefile.infrastructure.dao.ITaskConfigRetryLogDao;
|
||||
import com.docus.server.archivefile.infrastructure.dao.ITaskMessageDao;
|
||||
import com.docus.server.archivefile.infrastructure.dao.ITaskMessageRetryLogDao;
|
||||
import com.docus.server.entity.TaskConfig;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
import com.docus.server.entity.TaskMessage;
|
||||
import com.docus.server.entity.TaskMessageRetryLog;
|
||||
import com.docus.server.enums.StateEnum;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 重试处理器
|
||||
*
|
||||
* @author linruifeng
|
||||
*/
|
||||
@Slf4j
|
||||
public class RetryProcessor extends AbstractProcessor {
|
||||
public static final String EMPTY_STRING = "";
|
||||
@Resource
|
||||
private ITaskMessageRetryLogDao taskMessageRetryLogDao;
|
||||
@Resource
|
||||
private ITaskConfigRetryLogDao taskConfigRetryLogDao;
|
||||
@Resource
|
||||
private ITaskMessageDao taskMessageDao;
|
||||
@Resource
|
||||
private ITaskConfigDao taskConfigDao;
|
||||
@Resource
|
||||
private IdService idService;
|
||||
|
||||
@Override
|
||||
public Object doProcess(TrackContext context) {
|
||||
|
||||
String group = context.getGroup();
|
||||
|
||||
if ("doRetryTaskMessage".equalsIgnoreCase(group)) {
|
||||
|
||||
doRetryTaskMessageLog(context);
|
||||
|
||||
} else if ("doRetryTaskConfig".equalsIgnoreCase(group)) {
|
||||
|
||||
doRetryTaskConfigLog(context);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void doRetryTaskConfigLog(TrackContext context) {
|
||||
boolean error = context.isError();
|
||||
TaskConfig taskConfig = (TaskConfig) context.getArgs()[0];
|
||||
TaskConfig taskConfigDO = taskConfigDao.findById(taskConfig.getId());
|
||||
TaskConfigRetryLog taskConfigRetryLog = TaskConfigRetryLogConvert.INSTANCE.toConvertRetryLog(taskConfig, idService.getDateSeq());
|
||||
if (error) {
|
||||
taskConfigRetryLog.setLastErrorMsg(context.getExMessageResult());
|
||||
taskConfigRetryLog.setState(StateEnum.FAIL);
|
||||
taskConfigDO.setState(StateEnum.FAIL);
|
||||
taskConfigDO.setUpdateTime(new Date());
|
||||
} else {
|
||||
taskConfigRetryLog.setLastErrorMsg(EMPTY_STRING);
|
||||
taskConfigRetryLog.setState(StateEnum.OK);
|
||||
taskConfigDO.setState(StateEnum.OK);
|
||||
taskConfigDO.setUpdateTime(new Date());
|
||||
}
|
||||
taskConfigRetryLogDao.save(taskConfigRetryLog);
|
||||
taskConfigDao.updateById(taskConfigDO);
|
||||
}
|
||||
|
||||
private void doRetryTaskMessageLog(TrackContext context) {
|
||||
boolean error = context.isError();
|
||||
TaskMessage taskMessage = (TaskMessage) context.getArgs()[0];
|
||||
TaskMessage taskMessageDO = taskMessageDao.findById(taskMessage.getId());
|
||||
TaskMessageRetryLog taskMessageRetryLog = TaskMessageRetryLogConvert.INSTANCE.toConvertRetryLog(taskMessage, idService.getDateSeq());
|
||||
if (error) {
|
||||
taskMessageRetryLog.setErrorMsg(context.getExMessageResult());
|
||||
taskMessageRetryLog.setState(StateEnum.FAIL);
|
||||
taskMessageDO.setState(StateEnum.FAIL);
|
||||
taskMessageDO.setUpdateTime(new Date());
|
||||
} else {
|
||||
taskMessageRetryLog.setErrorMsg(EMPTY_STRING);
|
||||
taskMessageRetryLog.setState(StateEnum.OK);
|
||||
taskMessageDO.setState(StateEnum.OK);
|
||||
taskMessageDO.setUpdateTime(new Date());
|
||||
}
|
||||
taskMessageRetryLogDao.save(taskMessageRetryLog);
|
||||
taskMessageDao.updateById(taskMessageDO);
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
package com.docus.server.archivefile.service;
|
||||
|
||||
import com.docus.infrastructure.web.request.SearchRequest;
|
||||
import com.docus.infrastructure.web.response.PageResult;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
|
||||
public interface ITaskConfigRetryLogService {
|
||||
TaskConfigRetryLog findById(String id);
|
||||
|
||||
PageResult<TaskConfigRetryLog> search(SearchRequest searchRequest);
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package com.docus.server.archivefile.service.impl;
|
||||
|
||||
import com.docus.infrastructure.web.request.SearchRequest;
|
||||
import com.docus.infrastructure.web.response.PageResult;
|
||||
import com.docus.server.archivefile.infrastructure.dao.ITaskConfigRetryLogDao;
|
||||
import com.docus.server.archivefile.service.ITaskConfigRetryLogService;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Service
|
||||
public class TaskConfigRetryLogServiceImpl implements ITaskConfigRetryLogService {
|
||||
@Resource
|
||||
private ITaskConfigRetryLogDao taskConfigRetryLogDao;
|
||||
|
||||
@Override
|
||||
public TaskConfigRetryLog findById(String id) {
|
||||
return taskConfigRetryLogDao.findById(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageResult<TaskConfigRetryLog> search(SearchRequest searchRequest) {
|
||||
return taskConfigRetryLogDao.search(searchRequest);
|
||||
}
|
||||
}
|
@ -1,54 +1,39 @@
|
||||
package com.docus.server.archivefile.service.impl;
|
||||
|
||||
import com.docus.core.util.json.JSON;
|
||||
import com.docus.infrastructure.redis.service.IdService;
|
||||
import com.docus.log.annotation.TrackLogGroup;
|
||||
import com.docus.log.executor.TrackRetrySpringExecutor;
|
||||
import com.docus.log.handler.IJobHandler;
|
||||
import com.docus.log.handler.impl.MethodJobHandler;
|
||||
import com.docus.server.archivefile.convert.TaskMessageRetryLogConvert;
|
||||
import com.docus.server.archivefile.infrastructure.dao.ITaskMessageRetryLogDao;
|
||||
import com.docus.server.archivefile.process.RetryProcessor;
|
||||
import com.docus.server.entity.TaskConfig;
|
||||
import com.docus.server.entity.TaskMessage;
|
||||
import com.docus.server.entity.TaskMessageRetryLog;
|
||||
import com.docus.server.util.ExceptionUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.retry.annotation.Backoff;
|
||||
import org.springframework.retry.annotation.Retryable;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 重试service
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class TrackRetryService {
|
||||
@Resource
|
||||
private ITaskMessageRetryLogDao taskMessageRetryLogDao;
|
||||
@Resource
|
||||
private IdService idService;
|
||||
|
||||
@TrackLogGroup(group = "doRetryTaskMessage", processor = RetryProcessor.class)
|
||||
@Async("recordMessage")
|
||||
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
|
||||
public void doRetry(TaskMessage message) {
|
||||
TaskMessageRetryLogConvert retryLogConvert = TaskMessageRetryLogConvert.INSTANCE;
|
||||
TaskMessageRetryLog taskMessageRetryLog = retryLogConvert.toConvertRetryLog(message, idService.getDateSeq());
|
||||
|
||||
try {
|
||||
|
||||
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey());
|
||||
String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName();
|
||||
Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name));
|
||||
public void doRetryTaskMessage(TaskMessage message) throws Exception {
|
||||
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey());
|
||||
String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName();
|
||||
Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name));
|
||||
|
||||
jobHandler.execute(o);
|
||||
jobHandler.execute(o);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
retryLogConvert.afterThrowingProcess(ExceptionUtils.getExceptionMessage(e), taskMessageRetryLog);
|
||||
}
|
||||
@TrackLogGroup(group = "doRetryTaskConfig", processor = RetryProcessor.class)
|
||||
@Async("recordMessage")
|
||||
public void doRetryTaskConfig(TaskConfig config) throws Exception {
|
||||
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(config.getType());
|
||||
|
||||
taskMessageRetryLogDao.save(taskMessageRetryLog);
|
||||
jobHandler.execute(String.valueOf(config.getId()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,30 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.xmgps.tsms.auth.api.mapper.TaskConfigRetryLogMapper">
|
||||
|
||||
<!-- 通用查询映射结果 -->
|
||||
<resultMap id="BaseResultMap"
|
||||
type="com.docus.server.entity.TaskConfigRetryLog">
|
||||
<id column="id" property="id"/>
|
||||
<result column="message_id" property="messageId"/>
|
||||
<result column="name" property="name"/>
|
||||
<result column="type" property="type"/>
|
||||
<result column="start_time" property="startTime"/>
|
||||
<result column="end_time" property="endTime"/>
|
||||
<result column="all_pointer_time" property="allPointerTime"/>
|
||||
<result column="page_size" property="pageSize"/>
|
||||
<result column="spilt_period" property="spiltPeriod"/>
|
||||
<result column="inc_pointer_time" property="incPointerTime"/>
|
||||
<result column="param" property="param"/>
|
||||
<result column="state" property="state"/>
|
||||
<result column="last_error_msg" property="lastErrorMsg"/>
|
||||
<result column="create_time" property="createTime"/>
|
||||
<result column="update_time" property="updateTime"/>
|
||||
</resultMap>
|
||||
|
||||
<!-- 通用查询结果列 -->
|
||||
<sql id="Base_Column_List">
|
||||
id, message_id, name, type, start_time, end_time, all_pointer_time, page_size, spilt_period, inc_pointer_time, param, state, last_error_msg, create_time, update_time
|
||||
</sql>
|
||||
|
||||
</mapper>
|
@ -0,0 +1,36 @@
|
||||
package com.docus.server.api.archivefile;
|
||||
|
||||
import com.docus.infrastructure.web.request.SearchRequest;
|
||||
import com.docus.infrastructure.web.response.PageResult;
|
||||
import com.docus.server.entity.TaskConfigRetryLog;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
|
||||
/**
|
||||
* 主动任务配置重试表 API
|
||||
* Generated on 2023-06-29
|
||||
*/
|
||||
@FeignClient(value = "tsms-auth-api", contextId = "tsms-auth-api.TaskConfigRetryLogApi")
|
||||
@RequestMapping("/taskConfigRetryLog")
|
||||
public interface TaskConfigRetryLogApi {
|
||||
|
||||
/**
|
||||
* 按主键查询
|
||||
*
|
||||
* @param id 主键id
|
||||
* @return 实体
|
||||
*/
|
||||
@GetMapping("/find/{id}")
|
||||
TaskConfigRetryLog find(@PathVariable(value = "id") String id);
|
||||
|
||||
/**
|
||||
* 关键字搜素
|
||||
*
|
||||
* @param searchRequest 搜索参数
|
||||
* @return 分页列表
|
||||
*/
|
||||
@PostMapping("/search")
|
||||
PageResult<TaskConfigRetryLog> search(@RequestBody SearchRequest searchRequest);
|
||||
|
||||
}
|
@ -1,14 +0,0 @@
|
||||
package com.docus.server.entity;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class TaskMessageExt extends TaskMessage {
|
||||
|
||||
@ApiModelProperty(value = "是否有重试详情日志")
|
||||
private boolean haveRetryLog;
|
||||
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
package com.docus.server.util;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class ExceptionUtils {
|
||||
|
||||
public static String getExceptionMessage(Throwable ex) {
|
||||
StringWriter stringWriter = new StringWriter();
|
||||
ex.printStackTrace(new PrintWriter(stringWriter));
|
||||
String errorMessage = stringWriter.toString();
|
||||
|
||||
if (null != errorMessage && errorMessage.length() > 1000) {
|
||||
return errorMessage.substring(0, 1000);
|
||||
}
|
||||
return errorMessage;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue