新增主动任务重试监听

segment2.0
linrf 2 years ago
parent 9e8569101f
commit 7aa1921914

@ -3,9 +3,7 @@ package com.docus.server.archivefile.controller;
import com.docus.infrastructure.web.request.SearchRequest;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.api.archivefile.TaskMessageApi;
import com.docus.server.archivefile.convert.TaskMessageConvert;
import com.docus.server.archivefile.service.ITaskMessageService;
import com.docus.server.entity.TaskMessage;
import com.docus.server.vo.TaskMessageVO;
import org.springframework.web.bind.annotation.RestController;
@ -29,8 +27,7 @@ public class TaskMessageController implements TaskMessageApi {
*/
@Override
public TaskMessageVO find(String id) {
TaskMessage taskMessage = taskMessageService.findById(id);
return TaskMessageConvert.INSTANCE.convert(taskMessage);
return taskMessageService.findById(id);
}
/**

@ -7,4 +7,6 @@ import com.docus.server.entity.TaskConfig;
public interface ITaskConfigDao extends IBaseDao<TaskConfig> {
PageResult<TaskConfig> searchTaskConfig(SearchRequest searchRequest);
int findByNameAndIdAndKey(Long id, String name, String retryKey);
}

@ -44,4 +44,21 @@ public class TaskConfigDaoImpl extends BaseDaoImpl<TaskConfigMapper, TaskConfig>
List<TaskConfig> list = super.find(query);
return new PageResult<>(list);
}
@Override
public int findByNameAndIdAndKey(Long id, String name, String retryKey) {
LambdaQueryWrapper<TaskConfig> query = Wrappers.lambdaQuery();
if (id != null) {
query.ne(TaskConfig::getId, id);
}
if (Func.isNotBlank(name)) {
query.eq(TaskConfig::getName, name);
}
if (Func.isNotBlank(retryKey)) {
query.eq(TaskConfig::getRetryKey, retryKey);
}
return baseMapper.selectCount(query);
}
}

@ -38,7 +38,9 @@ public class TaskConfigRetryLogDaoImpl extends BaseDaoImpl<TaskConfigRetryLogMap
if (searchRequest.getParams("type") != null) {
query.eq(TaskConfigRetryLog::getType, searchRequest.getParams("type"));
}
if (searchRequest.getParams("messageId") != null) {
query.eq(TaskConfigRetryLog::getMessageId, searchRequest.getParams("messageId"));
}
//默认createTime倒序排序
query.orderByDesc(TaskConfigRetryLog::getCreateTime);
List<TaskConfigRetryLog> list = super.find(query);

@ -37,6 +37,9 @@ public class TaskMessageRetryLogDaoImpl extends BaseDaoImpl<TaskMessageRetryLogM
if (searchRequest.getParams("collectType") != null) {
query.eq(TaskMessageRetryLog::getCollectType, searchRequest.getParams("collectType"));
}
if (searchRequest.getParams("messageId") != null) {
query.eq(TaskMessageRetryLog::getMessageId, searchRequest.getParams("messageId"));
}
//默认createTime倒序排序
query.orderByDesc(TaskMessageRetryLog::getCreateTime);

@ -17,7 +17,7 @@ public interface ITaskMessageService {
List<TaskMessage> findByState(StateEnum stateEnum);
TaskMessage findById(String id);
TaskMessageVO findById(String id);
PageResult<TaskMessageVO> search(SearchRequest searchRequest);

@ -1,12 +1,18 @@
package com.docus.server.archivefile.service.impl;
import com.docus.core.util.Func;
import com.docus.core.util.ListUtils;
import com.docus.core.util.property.Setters;
import com.docus.infrastructure.redis.service.IdService;
import com.docus.infrastructure.web.exception.ApiException;
import com.docus.infrastructure.web.exception.ExceptionCode;
import com.docus.infrastructure.web.request.SearchRequest;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.archivefile.infrastructure.dao.ITaskConfigDao;
import com.docus.server.archivefile.infrastructure.dao.ITaskConfigRetryLogDao;
import com.docus.server.archivefile.service.ITaskConfigService;
import com.docus.server.entity.TaskConfig;
import com.docus.server.entity.TaskConfigRetryLog;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -14,12 +20,15 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Service
public class TaskConfigServiceImpl implements ITaskConfigService {
@Resource
private ITaskConfigDao taskConfigDao;
@Resource
private ITaskConfigRetryLogDao taskConfigRetryLogDao;
@Resource
private IdService idService;
@Resource
private TrackRetryService trackRetryService;
@ -42,21 +51,50 @@ public class TaskConfigServiceImpl implements ITaskConfigService {
@Override
public TaskConfig getTaskConfigById(String taskConfigId) {
return taskConfigDao.findById(taskConfigId);
TaskConfig config = taskConfigDao.findById(taskConfigId);
List<TaskConfigRetryLog> taskConfigRetryLogs = taskConfigRetryLogDao.findBy("messageId", config.getId());
if (!CollectionUtils.isEmpty(taskConfigRetryLogs)) {
config.setHaveRetryLog(true);
}
return config;
}
@Transactional(rollbackFor = Exception.class)
@Override
public boolean updateTaskConfig(TaskConfig taskConfig) {
if (Func.isNull(taskConfig.getId())) {
validSave(taskConfig.getRetryKey(), taskConfig.getName());
taskConfig.setId(idService.getDateSeq());
} else {
validUpdate(taskConfig.getId(), taskConfig.getName(), taskConfig.getRetryKey());
}
return taskConfigDao.saveOrUpdate(taskConfig);
}
@Override
public PageResult<TaskConfig> search(SearchRequest searchRequest) {
return taskConfigDao.searchTaskConfig(searchRequest);
PageResult<TaskConfig> result = searchTaskMessage(searchRequest);
if (CollectionUtils.isEmpty(result.getList())) {
return new PageResult<>();
}
List<TaskConfigRetryLog> retryLogs = taskConfigRetryLogDao.findBy("messageId", ListUtils.distinctSelect(result.getList(), TaskConfig::getId));
Map<Long, TaskConfigRetryLog> taskConfigRetryLogMap = ListUtils.toMap(retryLogs, TaskConfigRetryLog::getMessageId);
Setters.<TaskConfig>instance().list(result.getList()).cycleSetProperties(taskConfig -> {
if (taskConfigRetryLogMap.containsKey(taskConfig.getId())) {
taskConfig.setHaveRetryLog(true);
}
});
return result;
}
@Transactional(rollbackFor = Exception.class)
@ -91,4 +129,32 @@ public class TaskConfigServiceImpl implements ITaskConfigService {
}
}
private void validUpdate(Long id, String name, String retryKey) {
int count = taskConfigDao.findByNameAndIdAndKey(id, name, null);
if (count > 0) {
throw new ApiException(ExceptionCode.ParamIllegal.getCode(), "任务名称不能重复");
}
int keyCount = taskConfigDao.findByNameAndIdAndKey(id, null, retryKey);
if (keyCount > 0) {
throw new ApiException(ExceptionCode.ParamIllegal.getCode(), "任务重试键不能重复");
}
}
private void validSave(String retryKey, String name) {
TaskConfig keyConfig = taskConfigDao.findOneBy("retryKey", retryKey);
if (Func.notNull(keyConfig)) {
throw new ApiException(ExceptionCode.ParamIllegal.getCode(), "任务重试键不能重复");
}
TaskConfig nameConfig = taskConfigDao.findOneBy("name", name);
if (Func.notNull(nameConfig)) {
throw new ApiException(ExceptionCode.ParamIllegal.getCode(), "任务名称不能重复");
}
}
}

@ -70,8 +70,14 @@ public class TaskMessageServiceImpl implements ITaskMessageService {
}
@Override
public TaskMessage findById(String id) {
return taskMessageDao.findById(id);
public TaskMessageVO findById(String id) {
TaskMessage taskMessage = taskMessageDao.findById(id);
TaskMessageVO taskMessageVO = TaskMessageConvert.INSTANCE.convert(taskMessage);
List<TaskMessageRetryLog> retryLogs = taskMessageRetryLogDao.findBy("messageId", id);
if (!CollectionUtils.isEmpty(retryLogs)) {
taskMessageVO.setHaveRetryLog(true);
}
return taskMessageVO;
}
@Override

@ -32,7 +32,7 @@ public class TrackRetryService {
@TrackLogGroup(group = "doRetryTaskConfig", processor = RetryProcessor.class)
@Async("recordMessage")
public void doRetryTaskConfig(TaskConfig config) throws Exception {
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(config.getType());
IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(config.getRetryKey());
jobHandler.execute(String.valueOf(config.getId()));
}

@ -1,6 +1,7 @@
package com.docus.server.common;
import com.docus.infrastructure.web.api.CommonResult;
import com.docus.infrastructure.web.exception.ApiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.MethodParameter;
@ -11,6 +12,7 @@ import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.servlet.NoHandlerFoundException;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
import javax.servlet.http.HttpServletRequest;
@ -46,6 +48,12 @@ public class GlobalResponseBodyAdvice implements ResponseBodyAdvice<Object> {
@ResponseBody
@ExceptionHandler(value = Exception.class)
public Object defaultErrorHandler(HttpServletRequest req, Exception ex) {
if (ex instanceof ApiException) {
logger.info("{} 错误 ,code:{}, message:{}", req.getRequestURL(), ((ApiException) ex).getCode(), ex.getMessage());
return CommonResult.failed(ex.getMessage());
} else if (ex instanceof NoHandlerFoundException) {
logger.info("{} 错误 ,code:{}, message:{}", req.getRequestURL(), 404, ex.getMessage());
}
return CommonResult.failed("系统出错");
}
}

@ -3,14 +3,21 @@ package com.docus.server.api.archivefile;
import com.docus.infrastructure.web.request.SearchRequest;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.entity.TaskConfigRetryLog;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* API
* Generated on 2023-06-29
*/
@Api(value = "主动消息任务重试日志管理接口", tags = "主动消息任务重试日志管理接口")
@FeignClient(value = "tsms-auth-api", contextId = "tsms-auth-api.TaskConfigRetryLogApi")
@RequestMapping("/taskConfigRetryLog")
public interface TaskConfigRetryLogApi {
@ -21,6 +28,7 @@ public interface TaskConfigRetryLogApi {
* @param id id
* @return
*/
@ApiOperation("按主键查询")
@GetMapping("/find/{id}")
TaskConfigRetryLog find(@PathVariable(value = "id") String id);
@ -30,6 +38,7 @@ public interface TaskConfigRetryLogApi {
* @param searchRequest
* @return
*/
@ApiOperation("关键字搜素")
@PostMapping("/search")
PageResult<TaskConfigRetryLog> search(@RequestBody SearchRequest searchRequest);

@ -3,6 +3,8 @@ package com.docus.server.api.archivefile;
import com.docus.infrastructure.web.request.SearchRequest;
import com.docus.infrastructure.web.response.PageResult;
import com.docus.server.entity.TaskMessageRetryLog;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@ -15,6 +17,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
* API
* Generated on 2023-06-28
*/
@Api(value = "被动消息任务重试日志管理接口", tags = "被动消息任务重试日志管理接口")
@FeignClient(value = "docus-collector-api", contextId = "docus-collector-api.TaskMessageRetryLogApi")
@RequestMapping("/taskMessageRetryLog")
public interface TaskMessageRetryLogApi {
@ -25,6 +28,7 @@ public interface TaskMessageRetryLogApi {
* @param id id
* @return
*/
@ApiOperation("按主键查询")
@GetMapping("/find/{id}")
TaskMessageRetryLog find(@PathVariable(value = "id") String id);
@ -33,6 +37,7 @@ public interface TaskMessageRetryLogApi {
* @param searchRequest
* @return
*/
@ApiOperation("关键字搜素")
@PostMapping("/search")
PageResult<TaskMessageRetryLog> search(@RequestBody SearchRequest searchRequest);

@ -26,8 +26,14 @@ public class TaskConfig {
*
*/
@ApiModelProperty(value = "任务名字")
@TableField("name")
private String name;
/**
*
*/
@ApiModelProperty(value = "任务重试键")
@TableField("retry_key")
private String retryKey;
/**
* deptuser....
*/

Loading…
Cancel
Save