添加被动消息重试机制
parent
fefdadc58c
commit
4c51c95bcd
@ -0,0 +1,52 @@
|
||||
package com.docus.server.collect.basic.http.test;
|
||||
|
||||
import com.docus.core.util.json.JSON;
|
||||
import com.docus.log.executor.TrackRetrySpringExecutor;
|
||||
import com.docus.log.handler.IJobHandler;
|
||||
import com.docus.log.handler.MethodJobHandler;
|
||||
import com.docus.server.collect.IConverter;
|
||||
import com.docus.server.collect.IHttpResult;
|
||||
import com.docus.server.collect.web.common.entity.TaskOriginalMessage;
|
||||
import com.docus.server.collect.web.enums.CollectTypeEnum;
|
||||
import com.docus.server.collect.web.process.VisitorProcessor;
|
||||
import com.docus.server.collect.web.service.CollectService;
|
||||
import com.docus.server.collect.web.service.ITaskOriginalMessageService;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
|
||||
/**
|
||||
* @author wen yongbin
|
||||
* @date 2023年2月25日21:56:33
|
||||
* @see CollectTypeEnum 枚举
|
||||
* @see IConverter 通用转化器
|
||||
* @see IHttpResult 通用返回结果
|
||||
* @see VisitorProcessor 通用处理器
|
||||
*/
|
||||
|
||||
@Api(value = "TEST-用户管理接口", tags = "TEST-用户管理接口")
|
||||
@Slf4j
|
||||
@RestController("a")
|
||||
@RequestMapping("/api/rest/user")
|
||||
public class TestRestController {
|
||||
@Resource
|
||||
private CollectService collectService;
|
||||
@Resource
|
||||
private ITaskOriginalMessageService messageService;
|
||||
|
||||
@ApiOperation("TEST-2")
|
||||
@GetMapping("/get2")
|
||||
public void get2() throws Exception {
|
||||
TaskOriginalMessage message = messageService.findAll().get(0);
|
||||
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey());
|
||||
String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName();
|
||||
Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name));
|
||||
jobHandler.execute(o);
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package com.docus.server.collect.web.job;
|
||||
|
||||
import com.docus.core.util.json.JSON;
|
||||
import com.docus.log.executor.TrackRetrySpringExecutor;
|
||||
import com.docus.log.handler.IJobHandler;
|
||||
import com.docus.log.handler.MethodJobHandler;
|
||||
import com.docus.server.collect.web.common.entity.TaskOriginalMessage;
|
||||
import com.docus.server.collect.web.service.ITaskOriginalMessageService;
|
||||
import com.docus.server.common.enums.StateEnum;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.retry.annotation.Backoff;
|
||||
import org.springframework.retry.annotation.Retryable;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 重试所有被动消息 job
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class TrackRetryJob {
|
||||
|
||||
@Resource
|
||||
private ITaskOriginalMessageService messageService;
|
||||
|
||||
@XxlJob("trackRetryJob")
|
||||
public void retry() throws Exception {
|
||||
List<TaskOriginalMessage> messages = messageService.findBy("state", StateEnum.FAIL);
|
||||
|
||||
for (TaskOriginalMessage message : messages) {
|
||||
doRetry(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Async("recordMessage")
|
||||
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
|
||||
public void doRetry(TaskOriginalMessage message) throws Exception {
|
||||
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey());
|
||||
String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName();
|
||||
Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name));
|
||||
jobHandler.execute(o);
|
||||
|
||||
//to write retry log
|
||||
}
|
||||
}
|
@ -1,91 +0,0 @@
|
||||
package com.docus.server.collect.web.process;
|
||||
|
||||
import com.docus.core.util.Func;
|
||||
import com.docus.core.util.json.JSON;
|
||||
import com.docus.log.context.TrackContext;
|
||||
import com.docus.log.processor.AbstractProcessor;
|
||||
import com.docus.server.collect.IConverter;
|
||||
import com.docus.server.collect.IResult;
|
||||
import com.docus.server.collect.web.enums.CollectTypeEnum;
|
||||
import com.docus.server.collect.web.service.ITaskOriginalMessageService;
|
||||
import com.docus.server.common.enums.IIntegerEnum;
|
||||
import com.docus.server.common.enums.StateEnum;
|
||||
import com.docus.server.common.util.SpringUtils;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author linruifeng
|
||||
*/
|
||||
@Slf4j
|
||||
public class JobProcessor extends AbstractProcessor {
|
||||
private ITaskOriginalMessageService messageService;
|
||||
private IConverter converter;
|
||||
private IResult result;
|
||||
|
||||
/**
|
||||
* 前置通知
|
||||
*/
|
||||
@Override
|
||||
public Object beforeProcess(TrackContext context) {
|
||||
super.beforeProcess(context);
|
||||
initBeans(context.getBeanNames());
|
||||
String message = XxlJobHelper.getJobParam();
|
||||
context.setArgs(new Object[]{message});
|
||||
if (Func.isEmpty(message)) {
|
||||
throw new RuntimeException("参数为空");
|
||||
}
|
||||
// String jsonStr = JSON.toJSON(converter.handle(message, context.getGroup()));
|
||||
Long taskId = messageService.insertTaskOriginalMessage("", message, IIntegerEnum.fromDisplay(CollectTypeEnum.class, context.getGroup()));
|
||||
Map<String, Object> params = context.getParams();
|
||||
params.put("taskId", taskId);
|
||||
// params.put("jsonStr", jsonStr);
|
||||
params.put("group", context.getGroup());
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 后置和异常通知
|
||||
*/
|
||||
@Override
|
||||
public Object doProcess(TrackContext context) {
|
||||
Map<String, Object> params = context.getParams();
|
||||
try {
|
||||
Long taskId = (Long) params.get("taskId");
|
||||
String afterReturnResult = (String) params.get("jsonStr");
|
||||
params.put("msg", context.getExMessageResult());
|
||||
|
||||
if (Func.isNotEmpty(afterReturnResult)) {
|
||||
params.putAll(JSON.fromJSONWithGeneric(afterReturnResult, new TypeReference<Map<? extends String, ?>>() {
|
||||
}));
|
||||
}
|
||||
|
||||
if (!context.isError()) {
|
||||
log.info("=== AOP 后置通知 ===");
|
||||
params.put("msg", "操作成功!");
|
||||
messageService.updateTaskOriginalMessage(taskId, afterReturnResult, context.getExMessageResult(), StateEnum.OK);
|
||||
return result.ok(params);
|
||||
} else {
|
||||
log.info("=== AOP 异常通知 ===");
|
||||
log.error((String) params.get(context.getExMessageResult()));
|
||||
messageService.updateTaskOriginalMessage(taskId, afterReturnResult, context.getExMessageResult(), StateEnum.FAIL);
|
||||
return result.fail(params);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
return result.fail(params);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化bean
|
||||
*/
|
||||
private void initBeans(String[] beanNames) {
|
||||
// this.converter = (IConverter) SpringUtils.getBean(beanNames[0]);
|
||||
// this.result = (IResult) SpringUtils.getBean(beanNames[1]);
|
||||
this.messageService = SpringUtils.getBean(ITaskOriginalMessageService.class);
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package com.docus.server.collect.web.process;
|
||||
|
||||
import com.docus.log.context.TrackContext;
|
||||
import com.docus.log.processor.AbstractProcessor;
|
||||
import com.docus.server.collect.web.service.ITaskOriginalMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 重试处理器
|
||||
*
|
||||
* @author linruifeng
|
||||
*/
|
||||
@Slf4j
|
||||
public class RetryProcessor extends AbstractProcessor {
|
||||
@Resource
|
||||
private ITaskOriginalMessageService messageService;
|
||||
|
||||
@Override
|
||||
public Object doProcess(TrackContext context) {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,11 +1,13 @@
|
||||
package com.docus.server.collect.web.service;
|
||||
|
||||
import com.docus.server.collect.web.common.entity.TaskOriginalMessage;
|
||||
import com.docus.server.collect.web.enums.CollectTypeEnum;
|
||||
import com.docus.server.common.enums.StateEnum;
|
||||
import com.docus.server.common.service.IBaseService;
|
||||
|
||||
public interface ITaskOriginalMessageService {
|
||||
public interface ITaskOriginalMessageService extends IBaseService<TaskOriginalMessage> {
|
||||
|
||||
Long insertTaskOriginalMessage(String json, String xml, CollectTypeEnum collectType);
|
||||
Long insertTaskOriginalMessage(String json, String xml, CollectTypeEnum collectType, String retryKey);
|
||||
|
||||
void updateTaskOriginalMessage(Long id, String json, String exMessageResult, StateEnum stateEnum);
|
||||
}
|
||||
|
@ -1 +0,0 @@
|
||||
mybatis-plus.typeEnumsPackage=com.docus.server.common.enums;
|
Loading…
Reference in New Issue