From 4c51c95bcd4b8b2287ed6e5bdbb527b4e1025c06 Mon Sep 17 00:00:00 2001 From: beeajax <1105173470@qq.com> Date: Sun, 18 Jun 2023 08:35:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=A2=AB=E5=8A=A8=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collect-sdry/src/main/resources/bootstrap.yml | 2 +- .../basic/http/BasicRestController.java | 4 +- .../basic/http/test/TestRestController.java | 52 +++++++++++ .../mq/impl/BasicJmsMqCollectServiceImpl.java | 4 +- .../impl/BasicRabbitMqCollectServiceImpl.java | 4 +- .../impl/BasicRocketMqCollectServiceImpl.java | 4 +- .../basic/ws/impl/BasicServerImpl.java | 4 +- .../collect/dept/http/DeptRestController.java | 2 +- .../mq/impl/DeptJmsMqCollectServiceImpl.java | 4 +- .../impl/DeptRabbitMqCollectServiceImpl.java | 4 +- .../impl/DeptRocketMqCollectServiceImpl.java | 4 +- .../collect/dept/ws/impl/DeptServerImpl.java | 2 +- .../service/impl/ReportServiceImpl.java | 6 +- .../report/ws/impl/ReportServerImpl.java | 10 +- .../collect/user/http/UserRestController.java | 2 +- .../mq/impl/UserJmsMqCollectServiceImpl.java | 4 +- .../impl/UserRabbitMqCollectServiceImpl.java | 4 +- .../impl/UserRocketMqCollectServiceImpl.java | 4 +- .../collect/user/ws/impl/UserServerImpl.java | 2 +- .../common/entity/TaskOriginalMessage.java | 4 + .../convert/TaskOriginalMessageConverter.java | 3 +- .../server/collect/web/job/TrackRetryJob.java | 49 ++++++++++ .../collect/web/process/JobProcessor.java | 91 ------------------- .../collect/web/process/RetryProcessor.java | 24 +++++ .../collect/web/process/VisitorProcessor.java | 3 +- .../collect/web/service/CollectService.java | 4 + .../service/ITaskOriginalMessageService.java | 6 +- .../impl/TaskOriginalMessageServiceImpl.java | 14 +-- .../server/common/service/IBaseService.java | 7 +- .../common/service/impl/BaseServiceImpl.java | 13 +-- .../src/main/resources/application.properties | 1 - 31 files changed, 194 insertions(+), 147 deletions(-) create mode 100644 common-collect/src/main/java/com/docus/server/collect/basic/http/test/TestRestController.java create mode 100644 common-collect/src/main/java/com/docus/server/collect/web/job/TrackRetryJob.java delete mode 100644 common-collect/src/main/java/com/docus/server/collect/web/process/JobProcessor.java create mode 100644 common-collect/src/main/java/com/docus/server/collect/web/process/RetryProcessor.java diff --git a/collect-sdry/src/main/resources/bootstrap.yml b/collect-sdry/src/main/resources/bootstrap.yml index ddbeaef..6963d99 100644 --- a/collect-sdry/src/main/resources/bootstrap.yml +++ b/collect-sdry/src/main/resources/bootstrap.yml @@ -174,7 +174,7 @@ mybatis-plus: field-strategy: NOT_EMPTY db-type: MYSQL mapper-locations: classpath*:/mapper/*Mapper.xml,file:mybatis.mapper/**/*Mapper.xml - type-enums-package: com.docus.server.collect.web.enums + typeEnumsPackage: com.docus.server.collect.web.enums,com.docus.server.common.enums xxl: job: diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/http/BasicRestController.java b/common-collect/src/main/java/com/docus/server/collect/basic/http/BasicRestController.java index 7185209..a6d523b 100644 --- a/common-collect/src/main/java/com/docus/server/collect/basic/http/BasicRestController.java +++ b/common-collect/src/main/java/com/docus/server/collect/basic/http/BasicRestController.java @@ -47,7 +47,7 @@ public class BasicRestController { @ApiOperation("新增基础数据(根据记账号)") @PostMapping("/insert") @TrackGroup( - group = "HTTP_PASSIVE_BASIC", + group = "HTTP_PASSIVE_BASIC", retryKey = "basic", beanNames = {"httpTBasicConverter", "httpBasicResultImpl"}, processor = VisitorProcessor.class) public CommonResult insertHttpPassiveBasic(@RequestBody String message) { @@ -65,7 +65,7 @@ public class BasicRestController { @ApiOperation("编辑基础数据(根据记账号)") @PostMapping("/update") @TrackGroup( - group = "HTTP_PASSIVE_BASIC", + group = "HTTP_PASSIVE_BASIC", retryKey = "basic", beanNames = {"httpTBasicConverter", "httpBasicResultImpl"}, processor = VisitorProcessor.class) public CommonResult updateHttpPassiveBasic(@RequestBody String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/http/test/TestRestController.java b/common-collect/src/main/java/com/docus/server/collect/basic/http/test/TestRestController.java new file mode 100644 index 0000000..4fcff35 --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/basic/http/test/TestRestController.java @@ -0,0 +1,52 @@ +package com.docus.server.collect.basic.http.test; + +import com.docus.core.util.json.JSON; +import com.docus.log.executor.TrackRetrySpringExecutor; +import com.docus.log.handler.IJobHandler; +import com.docus.log.handler.MethodJobHandler; +import com.docus.server.collect.IConverter; +import com.docus.server.collect.IHttpResult; +import com.docus.server.collect.web.common.entity.TaskOriginalMessage; +import com.docus.server.collect.web.enums.CollectTypeEnum; +import com.docus.server.collect.web.process.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.collect.web.service.ITaskOriginalMessageService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + + +/** + * @author wen yongbin + * @date 2023年2月25日21:56:33 + * @see CollectTypeEnum 枚举 + * @see IConverter 通用转化器 + * @see IHttpResult 通用返回结果 + * @see VisitorProcessor 通用处理器 + */ + +@Api(value = "TEST-用户管理接口", tags = "TEST-用户管理接口") +@Slf4j +@RestController("a") +@RequestMapping("/api/rest/user") +public class TestRestController { + @Resource + private CollectService collectService; + @Resource + private ITaskOriginalMessageService messageService; + + @ApiOperation("TEST-2") + @GetMapping("/get2") + public void get2() throws Exception { + TaskOriginalMessage message = messageService.findAll().get(0); + IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey()); + String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName(); + Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name)); + jobHandler.execute(o); + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java index 1f91b5f..3643614 100644 --- a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java @@ -23,7 +23,7 @@ public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService { @Override @TrackGroup( - group = "JMS_MQ_PASSIVE_INSERT_BASIC", + group = "JMS_MQ_PASSIVE_INSERT_BASIC", retryKey = "basic", beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, processor = VisitorProcessor.class) public void addTBasic(String message) { @@ -33,7 +33,7 @@ public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService { @Override @TrackGroup( - group = "JMS_MQ_PASSIVE_UPDATE_BASIC", + group = "JMS_MQ_PASSIVE_UPDATE_BASIC", retryKey = "basic", beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, processor = VisitorProcessor.class) public void updateTBasic(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java index 034fdf5..cea2ec4 100644 --- a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java @@ -28,7 +28,7 @@ public class BasicRabbitMqCollectServiceImpl implements IBasicMqCollectService { */ @Override @TrackGroup( - group = "RABBIT_MQ_PASSIVE_INSERT_BASIC", + group = "RABBIT_MQ_PASSIVE_INSERT_BASIC", retryKey = "basic", beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, processor = VisitorProcessor.class) public void addTBasic(String message) { @@ -43,7 +43,7 @@ public class BasicRabbitMqCollectServiceImpl implements IBasicMqCollectService { */ @Override @TrackGroup( - group = "RABBIT_MQ_PASSIVE_UPDATE_BASIC", + group = "RABBIT_MQ_PASSIVE_UPDATE_BASIC", retryKey = "basic", beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, processor = VisitorProcessor.class) public void updateTBasic(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java index 9ec1bcb..c869051 100644 --- a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java @@ -25,7 +25,7 @@ public class BasicRocketMqCollectServiceImpl implements IBasicMqCollectService { */ @Override @TrackGroup( - group = "ROCKET_MQ_PASSIVE_INSERT_DEPT", + group = "ROCKET_MQ_PASSIVE_INSERT_BASIC", retryKey = "basic", beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, processor = VisitorProcessor.class) public void addTBasic(String message) { @@ -40,7 +40,7 @@ public class BasicRocketMqCollectServiceImpl implements IBasicMqCollectService { */ @Override @TrackGroup( - group = "ROCKET_MQ_PASSIVE_UPDATE_DEPT", + group = "ROCKET_MQ_PASSIVE_UPDATE_BASIC", retryKey = "basic", beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, processor = VisitorProcessor.class) public void updateTBasic(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/ws/impl/BasicServerImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/ws/impl/BasicServerImpl.java index c44f5a6..17681cc 100644 --- a/common-collect/src/main/java/com/docus/server/collect/basic/ws/impl/BasicServerImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/basic/ws/impl/BasicServerImpl.java @@ -31,7 +31,7 @@ public class BasicServerImpl implements IBasicServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_INSERT_BASIC", + group = "WEBSERVICE_XML_INSERT_BASIC", retryKey = "basic", beanNames = {"wsTBasicConverter", "wsBasicResultImpl"}, processor = VisitorProcessor.class) public String setTBasic(String message) { @@ -42,7 +42,7 @@ public class BasicServerImpl implements IBasicServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_UPDATE_BASIC", + group = "WEBSERVICE_XML_UPDATE_BASIC", retryKey = "basic", beanNames = {"wsTBasicConverter", "wsBasicResultImpl"}, processor = VisitorProcessor.class) public String updateTBasic(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/http/DeptRestController.java b/common-collect/src/main/java/com/docus/server/collect/dept/http/DeptRestController.java index a42caef..2da8bc8 100644 --- a/common-collect/src/main/java/com/docus/server/collect/dept/http/DeptRestController.java +++ b/common-collect/src/main/java/com/docus/server/collect/dept/http/DeptRestController.java @@ -47,7 +47,7 @@ public class DeptRestController { @ApiOperation("新增/编辑科室(根据科室代码)") @PostMapping("/update") @TrackGroup( - group = "HTTP_PASSIVE_DEPT", + group = "HTTP_PASSIVE_DEPT", retryKey = "dept", beanNames = {"httpDeptConverter", "httpDeptResultImpl"}, processor = VisitorProcessor.class) public CommonResult httpPassiveDept(@RequestBody String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java index bf6d3fd..eb41168 100644 --- a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java @@ -28,7 +28,7 @@ public class DeptJmsMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "JMS_MQ_PASSIVE_INSERT_DEPT", + group = "JMS_MQ_PASSIVE_INSERT_DEPT", retryKey = "dept", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void addDept(String message) { @@ -44,7 +44,7 @@ public class DeptJmsMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "JMS_MQ_PASSIVE_UPDATE_DEPT", + group = "JMS_MQ_PASSIVE_UPDATE_DEPT", retryKey = "dept", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void updateDept(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java index ffdf546..3837952 100644 --- a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java @@ -25,7 +25,7 @@ public class DeptRabbitMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "RABBIT_MQ_PASSIVE_INSERT_DEPT", + group = "RABBIT_MQ_PASSIVE_INSERT_DEPT", retryKey = "dept", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void addDept(String message) { @@ -41,7 +41,7 @@ public class DeptRabbitMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "RABBIT_MQ_PASSIVE_UPDATE_DEPT", + group = "RABBIT_MQ_PASSIVE_UPDATE_DEPT", retryKey = "dept", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void updateDept(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java index e29490f..7080a77 100644 --- a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java @@ -25,7 +25,7 @@ public class DeptRocketMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "ROCKET_MQ_PASSIVE_INSERT_DEPT", + group = "ROCKET_MQ_PASSIVE_INSERT_DEPT", retryKey = "dept", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void addDept(String message) { @@ -41,7 +41,7 @@ public class DeptRocketMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "ROCKET_MQ_PASSIVE_UPDATE_DEPT", + group = "ROCKET_MQ_PASSIVE_UPDATE_DEPT", retryKey = "dept", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void updateDept(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/ws/impl/DeptServerImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/ws/impl/DeptServerImpl.java index 9dc8939..3c3a51e 100644 --- a/common-collect/src/main/java/com/docus/server/collect/dept/ws/impl/DeptServerImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/dept/ws/impl/DeptServerImpl.java @@ -31,7 +31,7 @@ public class DeptServerImpl implements IDeptServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_DEPT", + group = "WEBSERVICE_XML_DEPT", retryKey = "dept", beanNames = {"wsDeptConverter", "wsDeptResultImpl"}, processor = VisitorProcessor.class) public String deptModify(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/report/service/impl/ReportServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/report/service/impl/ReportServiceImpl.java index 9b30d62..3894f6d 100644 --- a/common-collect/src/main/java/com/docus/server/collect/report/service/impl/ReportServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/report/service/impl/ReportServiceImpl.java @@ -2,15 +2,16 @@ package com.docus.server.collect.report.service.impl; import com.docus.core.util.Func; import com.docus.infrastructure.redis.service.IdService; +import com.docus.log.annotation.TrackRetry; import com.docus.server.archivefile.mapper.AfCollectTaskMapper; import com.docus.server.archivefile.mapper.AfReportRecordMapper; import com.docus.server.archivefile.pojo.dto.ReportDTO; import com.docus.server.archivefile.pojo.entity.AfCollectTask; import com.docus.server.archivefile.pojo.entity.AfReportRecord; -import com.docus.server.collect.report.service.IReportService; -import com.docus.server.record.mapper.TBasicMapper; import com.docus.server.collect.report.event.TaskConsumptionReportDownEvent; import com.docus.server.collect.report.event.ThreePartyPushReportDownEvent; +import com.docus.server.collect.report.service.IReportService; +import com.docus.server.record.mapper.TBasicMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; @@ -40,6 +41,7 @@ public class ReportServiceImpl implements IReportService { private IdService idService; @Override + @TrackRetry("report") public void report(ReportDTO reportDTO) { // 系统无视图生成的任务,消费查询的报告 if (reportDTO.getTaskId() != null) { diff --git a/common-collect/src/main/java/com/docus/server/collect/report/ws/impl/ReportServerImpl.java b/common-collect/src/main/java/com/docus/server/collect/report/ws/impl/ReportServerImpl.java index ff59e7c..00b62c6 100644 --- a/common-collect/src/main/java/com/docus/server/collect/report/ws/impl/ReportServerImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/report/ws/impl/ReportServerImpl.java @@ -33,7 +33,7 @@ public class ReportServerImpl implements IReportServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_SA_REPORT", + group = "WEBSERVICE_XML_SA_REPORT", retryKey = "report", beanNames = {CONVERTER, WS_RESULT}, processor = VisitorProcessor.class) public String pushSAReport(String message) { @@ -43,7 +43,7 @@ public class ReportServerImpl implements IReportServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_ICU_REPORT", + group = "WEBSERVICE_XML_ICU_REPORT", retryKey = "report", beanNames = {CONVERTER, WS_RESULT}, processor = VisitorProcessor.class) public String pushICUReport(String message) { @@ -54,7 +54,7 @@ public class ReportServerImpl implements IReportServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_EXAMINATION_REPORT", + group = "WEBSERVICE_XML_EXAMINATION_REPORT", retryKey = "report", beanNames = {CONVERTER, WS_RESULT}, processor = VisitorProcessor.class) public String pushExaminationReport(String message) { @@ -65,7 +65,7 @@ public class ReportServerImpl implements IReportServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_INSERT_INSPECTION_REPORT", + group = "WEBSERVICE_XML_INSERT_INSPECTION_REPORT", retryKey = "report", beanNames = {CONVERTER, WS_RESULT}, processor = VisitorProcessor.class) public String pushAddInspectionReport(String message) { @@ -76,7 +76,7 @@ public class ReportServerImpl implements IReportServer { @Override @TrackGroup( - group = "WEBSERVICE_XML_UPDATE_INSPECTION_REPORT", + group = "WEBSERVICE_XML_UPDATE_INSPECTION_REPORT", retryKey = "report", beanNames = {CONVERTER, WS_RESULT}, processor = VisitorProcessor.class) public String pushUpdateInspectionReport(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java b/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java index 22a2b20..384f517 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java @@ -47,7 +47,7 @@ public class UserRestController { @ApiOperation("新增/编辑用户(根据用户工号)") @PostMapping("/update") @TrackGroup( - group = "HTTP_PASSIVE_USER", + group = "HTTP_PASSIVE_USER", retryKey = "user", beanNames = {"httpUserConverter", "httpUserResultImpl"}, processor = VisitorProcessor.class) public CommonResult httpPassiveUser(@RequestBody String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java index b3d1d19..651f4d6 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java @@ -29,7 +29,7 @@ public class UserJmsMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "JMS_MQ_PASSIVE_INSERT_USER", + group = "JMS_MQ_PASSIVE_INSERT_USER", retryKey = "user", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void addUser(String message) { @@ -44,7 +44,7 @@ public class UserJmsMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "JMS_MQ_PASSIVE_UPDATE_USER", + group = "JMS_MQ_PASSIVE_UPDATE_USER", retryKey = "user", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void updateUser(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java index 38cd1f4..3fb652e 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java @@ -25,7 +25,7 @@ public class UserRabbitMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "RABBIT_MQ_PASSIVE_INSERT_USER", + group = "RABBIT_MQ_PASSIVE_INSERT_USER", retryKey = "user", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void addUser(String message) { @@ -40,7 +40,7 @@ public class UserRabbitMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "RABBIT_MQ_PASSIVE_UPDATE_USER", + group = "RABBIT_MQ_PASSIVE_UPDATE_USER", retryKey = "user", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void updateUser(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java index 66f4081..1951a0f 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java @@ -25,7 +25,7 @@ public class UserRocketMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "ROCKET_MQ_PASSIVE_INSERT_USER", + group = "ROCKET_MQ_PASSIVE_INSERT_USER", retryKey = "user", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void addUser(String message) { @@ -40,7 +40,7 @@ public class UserRocketMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "ROCKET_MQ_PASSIVE_UPDATE_USER", + group = "ROCKET_MQ_PASSIVE_UPDATE_USER", retryKey = "user", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void updateUser(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/user/ws/impl/UserServerImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/ws/impl/UserServerImpl.java index 73744e9..646dbaf 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/ws/impl/UserServerImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/ws/impl/UserServerImpl.java @@ -37,7 +37,7 @@ public class UserServerImpl implements IUserServer { */ @Override @TrackGroup( - group = "WEBSERVICE_XML_USER", + group = "WEBSERVICE_XML_USER", retryKey = "user", beanNames = {"wsUserConverter", "wsUserResultImpl"}, processor = VisitorProcessor.class) public String userModify(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/web/common/entity/TaskOriginalMessage.java b/common-collect/src/main/java/com/docus/server/collect/web/common/entity/TaskOriginalMessage.java index 590e2e0..19bc5c9 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/common/entity/TaskOriginalMessage.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/common/entity/TaskOriginalMessage.java @@ -45,6 +45,10 @@ public class TaskOriginalMessage implements Serializable { @TableField("collect_type") private CollectTypeEnum collectType; + @ApiModelProperty(value = "重试标示") + @TableField("retry_key") + private String retryKey; + @ApiModelProperty(value = "原始报文转对象json") @TableField("json_str") private String jsonStr; diff --git a/common-collect/src/main/java/com/docus/server/collect/web/convert/TaskOriginalMessageConverter.java b/common-collect/src/main/java/com/docus/server/collect/web/convert/TaskOriginalMessageConverter.java index 9459977..a3ca0d0 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/convert/TaskOriginalMessageConverter.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/convert/TaskOriginalMessageConverter.java @@ -14,12 +14,13 @@ public class TaskOriginalMessageConverter { @Resource private IdService idService; - public TaskOriginalMessage toConvertTaskOriginalMessageDO(String json, String xml, CollectTypeEnum collectType) { + public TaskOriginalMessage toConvertTaskOriginalMessageDO(String json, String xml, CollectTypeEnum collectType, String retryKey) { TaskOriginalMessage taskOriginalMessage = new TaskOriginalMessage(); taskOriginalMessage.setId(idService.getDateSeq()); taskOriginalMessage.setName(collectType.name()); taskOriginalMessage.setMemo(collectType.getDesc()); taskOriginalMessage.setCollectType(collectType); + taskOriginalMessage.setRetryKey(retryKey); taskOriginalMessage.setJsonStr(json); taskOriginalMessage.setSource(xml); taskOriginalMessage.setState(StateEnum.OK); diff --git a/common-collect/src/main/java/com/docus/server/collect/web/job/TrackRetryJob.java b/common-collect/src/main/java/com/docus/server/collect/web/job/TrackRetryJob.java new file mode 100644 index 0000000..081170e --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/web/job/TrackRetryJob.java @@ -0,0 +1,49 @@ +package com.docus.server.collect.web.job; + +import com.docus.core.util.json.JSON; +import com.docus.log.executor.TrackRetrySpringExecutor; +import com.docus.log.handler.IJobHandler; +import com.docus.log.handler.MethodJobHandler; +import com.docus.server.collect.web.common.entity.TaskOriginalMessage; +import com.docus.server.collect.web.service.ITaskOriginalMessageService; +import com.docus.server.common.enums.StateEnum; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; + +/** + * 重试所有被动消息 job + */ +@Component +@Slf4j +public class TrackRetryJob { + + @Resource + private ITaskOriginalMessageService messageService; + + @XxlJob("trackRetryJob") + public void retry() throws Exception { + List messages = messageService.findBy("state", StateEnum.FAIL); + + for (TaskOriginalMessage message : messages) { + doRetry(message); + } + } + + @Async("recordMessage") + @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) + public void doRetry(TaskOriginalMessage message) throws Exception { + IJobHandler jobHandler = TrackRetrySpringExecutor.loadJobHandler(message.getRetryKey()); + String name = ((MethodJobHandler) jobHandler).getMethod().getParameterTypes()[0].getName(); + Object o = JSON.fromJSON(message.getJsonStr(), Class.forName(name)); + jobHandler.execute(o); + + //to write retry log + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/web/process/JobProcessor.java b/common-collect/src/main/java/com/docus/server/collect/web/process/JobProcessor.java deleted file mode 100644 index 9ef3a5e..0000000 --- a/common-collect/src/main/java/com/docus/server/collect/web/process/JobProcessor.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.docus.server.collect.web.process; - -import com.docus.core.util.Func; -import com.docus.core.util.json.JSON; -import com.docus.log.context.TrackContext; -import com.docus.log.processor.AbstractProcessor; -import com.docus.server.collect.IConverter; -import com.docus.server.collect.IResult; -import com.docus.server.collect.web.enums.CollectTypeEnum; -import com.docus.server.collect.web.service.ITaskOriginalMessageService; -import com.docus.server.common.enums.IIntegerEnum; -import com.docus.server.common.enums.StateEnum; -import com.docus.server.common.util.SpringUtils; -import com.fasterxml.jackson.core.type.TypeReference; -import com.xxl.job.core.context.XxlJobHelper; -import lombok.extern.slf4j.Slf4j; - -import java.util.Map; - -/** - * @author linruifeng - */ -@Slf4j -public class JobProcessor extends AbstractProcessor { - private ITaskOriginalMessageService messageService; - private IConverter converter; - private IResult result; - - /** - * 前置通知 - */ - @Override - public Object beforeProcess(TrackContext context) { - super.beforeProcess(context); - initBeans(context.getBeanNames()); - String message = XxlJobHelper.getJobParam(); - context.setArgs(new Object[]{message}); - if (Func.isEmpty(message)) { - throw new RuntimeException("参数为空"); - } -// String jsonStr = JSON.toJSON(converter.handle(message, context.getGroup())); - Long taskId = messageService.insertTaskOriginalMessage("", message, IIntegerEnum.fromDisplay(CollectTypeEnum.class, context.getGroup())); - Map params = context.getParams(); - params.put("taskId", taskId); -// params.put("jsonStr", jsonStr); - params.put("group", context.getGroup()); - return null; - } - - /** - * 后置和异常通知 - */ - @Override - public Object doProcess(TrackContext context) { - Map params = context.getParams(); - try { - Long taskId = (Long) params.get("taskId"); - String afterReturnResult = (String) params.get("jsonStr"); - params.put("msg", context.getExMessageResult()); - - if (Func.isNotEmpty(afterReturnResult)) { - params.putAll(JSON.fromJSONWithGeneric(afterReturnResult, new TypeReference>() { - })); - } - - if (!context.isError()) { - log.info("=== AOP 后置通知 ==="); - params.put("msg", "操作成功!"); - messageService.updateTaskOriginalMessage(taskId, afterReturnResult, context.getExMessageResult(), StateEnum.OK); - return result.ok(params); - } else { - log.info("=== AOP 异常通知 ==="); - log.error((String) params.get(context.getExMessageResult())); - messageService.updateTaskOriginalMessage(taskId, afterReturnResult, context.getExMessageResult(), StateEnum.FAIL); - return result.fail(params); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - return result.fail(params); - } - } - - /** - * 初始化bean - */ - private void initBeans(String[] beanNames) { -// this.converter = (IConverter) SpringUtils.getBean(beanNames[0]); -// this.result = (IResult) SpringUtils.getBean(beanNames[1]); - this.messageService = SpringUtils.getBean(ITaskOriginalMessageService.class); - } -} diff --git a/common-collect/src/main/java/com/docus/server/collect/web/process/RetryProcessor.java b/common-collect/src/main/java/com/docus/server/collect/web/process/RetryProcessor.java new file mode 100644 index 0000000..4b42011 --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/web/process/RetryProcessor.java @@ -0,0 +1,24 @@ +package com.docus.server.collect.web.process; + +import com.docus.log.context.TrackContext; +import com.docus.log.processor.AbstractProcessor; +import com.docus.server.collect.web.service.ITaskOriginalMessageService; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Resource; + +/** + * 重试处理器 + * + * @author linruifeng + */ +@Slf4j +public class RetryProcessor extends AbstractProcessor { + @Resource + private ITaskOriginalMessageService messageService; + + @Override + public Object doProcess(TrackContext context) { + return null; + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/web/process/VisitorProcessor.java b/common-collect/src/main/java/com/docus/server/collect/web/process/VisitorProcessor.java index 06db14c..52eb2cb 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/process/VisitorProcessor.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/process/VisitorProcessor.java @@ -33,10 +33,11 @@ public class VisitorProcessor extends AbstractProcessor { super.beforeProcess(context); initBeans(context.getBeanNames()); String message = (String) context.getArgs()[0]; + String retryKey = context.getRetryKey(); if (Func.isEmpty(message)) { throw new RuntimeException("参数为空"); } - Long taskId = messageService.insertTaskOriginalMessage("", message, IIntegerEnum.fromDisplay(CollectTypeEnum.class, context.getGroup())); + Long taskId = messageService.insertTaskOriginalMessage("", message, IIntegerEnum.fromDisplay(CollectTypeEnum.class, context.getGroup()), retryKey); String jsonStr = JSON.toJSON(converter.convert(message, context.getGroup())); messageService.updateTaskOriginalMessage(taskId, jsonStr, context.getExMessageResult(), StateEnum.OK); Map params = context.getParams(); diff --git a/common-collect/src/main/java/com/docus/server/collect/web/service/CollectService.java b/common-collect/src/main/java/com/docus/server/collect/web/service/CollectService.java index 8d5ec49..4b4021d 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/service/CollectService.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/service/CollectService.java @@ -1,6 +1,7 @@ package com.docus.server.collect.web.service; import com.docus.core.util.Func; +import com.docus.log.annotation.TrackRetry; import com.docus.server.record.common.pojo.dto.TBasicDTO; import com.docus.server.record.service.ITBasicService; import com.docus.server.sys.common.pojo.dto.DeptDTO; @@ -31,6 +32,7 @@ public class CollectService { /*@Async("recordMessage") @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))*/ @Transactional(rollbackFor = Exception.class) + @TrackRetry("dept") public void insertOrUpdateDept(DeptDTO deptDTO) { //异步写入归档系统,失败自动重试。 if (checkType(deptDTO.getOperateType(), DEL_TYPE)) { @@ -46,6 +48,7 @@ public class CollectService { * @param userDTO */ @Transactional(rollbackFor = Exception.class) + @TrackRetry("user") public void insertOrUpdateUser(UserDTO userDTO) { // 判断操作类型 if (checkType(userDTO.getOperateType(), DEL_TYPE)) { @@ -60,6 +63,7 @@ public class CollectService { * * @param tBasicDTO */ + @TrackRetry("basic") @Transactional(rollbackFor = Exception.class) public void insertOrUpdateBasic(TBasicDTO tBasicDTO) { Integer num = tBasicService.findByJzh(tBasicDTO.getJzh()); diff --git a/common-collect/src/main/java/com/docus/server/collect/web/service/ITaskOriginalMessageService.java b/common-collect/src/main/java/com/docus/server/collect/web/service/ITaskOriginalMessageService.java index cbd3833..7d8230b 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/service/ITaskOriginalMessageService.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/service/ITaskOriginalMessageService.java @@ -1,11 +1,13 @@ package com.docus.server.collect.web.service; +import com.docus.server.collect.web.common.entity.TaskOriginalMessage; import com.docus.server.collect.web.enums.CollectTypeEnum; import com.docus.server.common.enums.StateEnum; +import com.docus.server.common.service.IBaseService; -public interface ITaskOriginalMessageService { +public interface ITaskOriginalMessageService extends IBaseService { - Long insertTaskOriginalMessage(String json, String xml, CollectTypeEnum collectType); + Long insertTaskOriginalMessage(String json, String xml, CollectTypeEnum collectType, String retryKey); void updateTaskOriginalMessage(Long id, String json, String exMessageResult, StateEnum stateEnum); } diff --git a/common-collect/src/main/java/com/docus/server/collect/web/service/impl/TaskOriginalMessageServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/web/service/impl/TaskOriginalMessageServiceImpl.java index 097fc8c..b0138c8 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/service/impl/TaskOriginalMessageServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/service/impl/TaskOriginalMessageServiceImpl.java @@ -1,21 +1,21 @@ package com.docus.server.collect.web.service.impl; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.docus.core.util.DateUtil; import com.docus.core.util.Func; -import com.docus.server.collect.web.convert.TaskOriginalMessageConverter; import com.docus.server.collect.web.common.entity.TaskOriginalMessage; +import com.docus.server.collect.web.convert.TaskOriginalMessageConverter; import com.docus.server.collect.web.enums.CollectTypeEnum; import com.docus.server.collect.web.mapper.TaskOriginalMessageMapper; import com.docus.server.collect.web.service.ITaskOriginalMessageService; import com.docus.server.common.enums.StateEnum; +import com.docus.server.common.service.impl.BaseServiceImpl; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; @Service -public class TaskOriginalMessageServiceImpl extends ServiceImpl implements ITaskOriginalMessageService { +public class TaskOriginalMessageServiceImpl extends BaseServiceImpl implements ITaskOriginalMessageService { @Resource private TaskOriginalMessageConverter converter; @@ -24,8 +24,8 @@ public class TaskOriginalMessageServiceImpl extends ServiceImpl 500 ? exMessageResult.substring(0, 500) : exMessageResult); + if (Func.notNull(exMessageResult)) { + taskOriginalMessage.setErrorMsg(exMessageResult.length() > 500 ? exMessageResult.substring(0, 500) : exMessageResult); + } super.updateById(taskOriginalMessage); } } diff --git a/docus-api-common/src/main/java/com/docus/server/common/service/IBaseService.java b/docus-api-common/src/main/java/com/docus/server/common/service/IBaseService.java index 2c5ccee..d15892b 100644 --- a/docus-api-common/src/main/java/com/docus/server/common/service/IBaseService.java +++ b/docus-api-common/src/main/java/com/docus/server/common/service/IBaseService.java @@ -4,14 +4,15 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.IService; import com.docus.server.common.service.impl.Sort; +import java.io.Serializable; import java.util.Collection; import java.util.List; public interface IBaseService extends IService { - T findById(String id); + T findById(Serializable id); - List findByIds(Collection ids); + List findByIds(Collection ids); List findBy(String propertyName, Object propertyValue); @@ -41,5 +42,5 @@ public interface IBaseService extends IService { List findAllActive(Sort sort); - int deleteByIdList(List idList); + int deleteByIdList(List idList); } diff --git a/docus-api-common/src/main/java/com/docus/server/common/service/impl/BaseServiceImpl.java b/docus-api-common/src/main/java/com/docus/server/common/service/impl/BaseServiceImpl.java index 4b18f2c..aabcd02 100644 --- a/docus-api-common/src/main/java/com/docus/server/common/service/impl/BaseServiceImpl.java +++ b/docus-api-common/src/main/java/com/docus/server/common/service/impl/BaseServiceImpl.java @@ -10,17 +10,14 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.docus.server.common.service.IBaseService; +import java.io.Serializable; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; public abstract class BaseServiceImpl, T> extends ServiceImpl implements IBaseService { @Override - public T findById(String id) { + public T findById(Serializable id) { if (id == null) { throw new RuntimeException("param id is required"); } @@ -28,7 +25,7 @@ public abstract class BaseServiceImpl, T> extends Servic } @Override - public List findByIds(Collection ids) { + public List findByIds(Collection ids) { if (ids == null || ids.size() == 0) { return new ArrayList<>(); } @@ -164,7 +161,7 @@ public abstract class BaseServiceImpl, T> extends Servic } @Override - public int deleteByIdList(List idList) { + public int deleteByIdList(List idList) { if (idList == null || idList.size() == 0) { return 0; } diff --git a/docus-api-common/src/main/resources/application.properties b/docus-api-common/src/main/resources/application.properties index d58204b..e69de29 100644 --- a/docus-api-common/src/main/resources/application.properties +++ b/docus-api-common/src/main/resources/application.properties @@ -1 +0,0 @@ -mybatis-plus.typeEnumsPackage=com.docus.server.common.enums; \ No newline at end of file