添加被动消息重试机制

segment2.0
beeajax 2 years ago
parent 4c51c95bcd
commit 798ab2be7e

@ -1,17 +1,11 @@
package com.docus.server.collect.web.job;
import com.docus.core.util.json.JSON;
import com.docus.log.executor.TrackRetrySpringExecutor;
import com.docus.log.handler.IJobHandler;
import com.docus.log.handler.MethodJobHandler;
import com.docus.server.collect.web.common.entity.TaskOriginalMessage;
import com.docus.server.collect.web.service.ITaskOriginalMessageService;
import com.docus.server.collect.web.service.TrackRetryService;
import com.docus.server.common.enums.StateEnum;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -26,24 +20,17 @@ public class TrackRetryJob {
@Resource
private ITaskOriginalMessageService messageService;
@Resource
private TrackRetryService trackRetryService;
@XxlJob("trackRetryJob")
public void retry() throws Exception {
List<TaskOriginalMessage> messages = messageService.findBy("state", StateEnum.FAIL);
for (TaskOriginalMessage message : messages) {
doRetry(message);
trackRetryService.doRetry(message);
}
}
@Async("recordMessage")
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
public void doRetry(TaskOriginalMessage message) throws Exception {
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey());
String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName();
Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name));
jobHandler.execute(o);
//to write retry log
}
}

@ -1,7 +1,7 @@
package com.docus.server.collect.web.service;
import com.docus.core.util.Func;
import com.docus.log.annotation.TrackRetry;
import com.docus.log.annotation.TrackRetryListener;
import com.docus.server.record.common.pojo.dto.TBasicDTO;
import com.docus.server.record.service.ITBasicService;
import com.docus.server.sys.common.pojo.dto.DeptDTO;
@ -10,7 +10,6 @@ import com.docus.server.sys.service.IPowerDeptService;
import com.docus.server.sys.service.IPowerUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@ -26,13 +25,9 @@ public class CollectService {
private ITBasicService tBasicService;
/**
* 3
*
*/
/*@Async("recordMessage")
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))*/
@Transactional(rollbackFor = Exception.class)
@TrackRetry("dept")
@TrackRetryListener("dept")
public void insertOrUpdateDept(DeptDTO deptDTO) {
//异步写入归档系统,失败自动重试。
if (checkType(deptDTO.getOperateType(), DEL_TYPE)) {
@ -44,11 +39,8 @@ public class CollectService {
/**
*
*
* @param userDTO
*/
@Transactional(rollbackFor = Exception.class)
@TrackRetry("user")
@TrackRetryListener("user")
public void insertOrUpdateUser(UserDTO userDTO) {
// 判断操作类型
if (checkType(userDTO.getOperateType(), DEL_TYPE)) {
@ -60,11 +52,8 @@ public class CollectService {
/**
*
*
* @param tBasicDTO
*/
@TrackRetry("basic")
@Transactional(rollbackFor = Exception.class)
@TrackRetryListener("basic")
public void insertOrUpdateBasic(TBasicDTO tBasicDTO) {
Integer num = tBasicService.findByJzh(tBasicDTO.getJzh());
if (num > 0) {

@ -0,0 +1,26 @@
package com.docus.server.collect.web.service;
import com.docus.core.util.json.JSON;
import com.docus.log.executor.TrackRetrySpringExecutor;
import com.docus.log.handler.IJobHandler;
import com.docus.log.handler.MethodJobHandler;
import com.docus.server.collect.web.common.entity.TaskOriginalMessage;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class TrackRetryService {
@Async("recordMessage")
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
public void doRetry(TaskOriginalMessage message) throws Exception {
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey());
String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName();
Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name));
jobHandler.execute(o);
//to write retry log
}
}
Loading…
Cancel
Save