From ffb739920233a9b3b0bfa89e2ea1f5ae4f9ee865 Mon Sep 17 00:00:00 2001 From: wyb <1977763549@qq.com> Date: Thu, 3 Jul 2025 09:57:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E8=82=87=E5=BA=86=E6=8F=90=E4=BA=A4?= =?UTF-8?q?=EF=BC=8C=E4=BB=BB=E5=8A=A1=E5=BB=B6=E8=BF=9F=E7=94=9F=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data-config/collect-task-config.json | 1 + .../impl/ZqBusinessServiceImpl.java | 60 +++++++++- .../message/config/CollectTaskConfig.java | 1 + .../server/message/config/JobAdminConfig.java | 59 ++++++++++ .../message/entity/DelayedMessages.java | 58 +++++++++ .../docus/server/message/enums/TaskWait.java | 9 ++ .../message/mapper/DelayedMessagesMapper.java | 25 ++++ .../message/scheduler/JobScheduler.java | 26 +++++ .../thread/DelayTaskIssuanceHelper.java | 110 ++++++++++++++++++ .../mapper/DelayedMessagesMapper.xml | 56 +++++++++ 10 files changed, 403 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/docus/server/message/config/JobAdminConfig.java create mode 100644 src/main/java/com/docus/server/message/entity/DelayedMessages.java create mode 100644 src/main/java/com/docus/server/message/enums/TaskWait.java create mode 100644 src/main/java/com/docus/server/message/mapper/DelayedMessagesMapper.java create mode 100644 src/main/java/com/docus/server/message/scheduler/JobScheduler.java create mode 100644 src/main/java/com/docus/server/message/thread/DelayTaskIssuanceHelper.java create mode 100644 src/main/resources/mapper/DelayedMessagesMapper.xml diff --git a/data-config/collect-task-config.json b/data-config/collect-task-config.json index 4ea4c6a..fc715ee 100644 --- a/data-config/collect-task-config.json +++ b/data-config/collect-task-config.json @@ -1,5 +1,6 @@ [{ "method": "WS_RECORD_SUBMIT", + "delaySeconds": 60, "collectorIds": ["2", "3", "4", "5", "6", "8"] }, { diff --git a/src/main/java/com/docus/server/message/busservice/impl/ZqBusinessServiceImpl.java b/src/main/java/com/docus/server/message/busservice/impl/ZqBusinessServiceImpl.java index 1393484..4ea44c5 100644 --- a/src/main/java/com/docus/server/message/busservice/impl/ZqBusinessServiceImpl.java +++ b/src/main/java/com/docus/server/message/busservice/impl/ZqBusinessServiceImpl.java @@ -3,18 +3,26 @@ package com.docus.server.message.busservice.impl; import com.docus.core.util.DateUtil; import com.docus.core.util.Func; import com.docus.infrastructure.core.exception.BaseException; +import com.docus.infrastructure.redis.service.IdService; import com.docus.infrastructure.web.api.CommonResult; import com.docus.infrastructure.web.api.ResultCode; import com.docus.server.message.busservice.ZqBusinessService; import com.docus.server.message.config.CollectTaskConfig; import com.docus.server.message.converters.ZqWsNurseSubmitMessageConvert; import com.docus.server.message.converters.ZqWsRecordSubmitMessageConvert; -import com.docus.server.message.dto.*; +import com.docus.server.message.dto.Message; +import com.docus.server.message.dto.MessageResponse; +import com.docus.server.message.dto.TbasicQrPreciseCondition; +import com.docus.server.message.dto.ZqRecordSubmitDTO; +import com.docus.server.message.dto.ZqWsNurseSubmitDTO; +import com.docus.server.message.entity.DelayedMessages; +import com.docus.server.message.enums.TaskWait; import com.docus.server.message.feign.dto.CompensateTasRequest; import com.docus.server.message.feign.dto.HospitalSubmitNodeLogAddDTO; import com.docus.server.message.feign.enums.HospitalSubmitNodeEnum; import com.docus.server.message.feign.service.CollectTaskService; import com.docus.server.message.feign.service.HospitalSubmitNodeServiceApi; +import com.docus.server.message.mapper.DelayedMessagesMapper; import com.docus.server.message.mapper.TBasicMapper; import com.docus.server.message.validate.ZqRecordSubmitValidate; import com.docus.server.message.validate.ZqWsNurseSubmitValidate; @@ -22,7 +30,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.Date; import java.util.List; +import java.util.Objects; /** * 肇庆业务实现类 @@ -43,6 +54,12 @@ public class ZqBusinessServiceImpl implements ZqBusinessService { @Resource private CollectTaskService collectTaskService; + @Resource + private DelayedMessagesMapper delayedMessagesMapper; + + @Resource + private IdService idService; + @Override public MessageResponse recordSubmitHandle(Message message) { try { @@ -111,7 +128,15 @@ public class ZqBusinessServiceImpl implements ZqBusinessService { hospitalSubmitNodeServiceApi.addLog(nodeLogAddDTO); tBasicMapper.insertOrUpdateNurseSubmitTime(patientIds.get(0), Func.parseDate(submitDate, DateUtil.PATTERN_DATETIME)); - compensateTask("护理提交",patientIds, taskConfig.getCollectorIds()); + + int delaySeconds = taskConfig.getDelaySeconds(); + List collectorIds = taskConfig.getCollectorIds(); + if (delaySeconds > 0) { + addDelayTask(patientIds.get(0), collectorIds, delaySeconds); + } else { + compensateTask("护理提交", patientIds, taskConfig.getCollectorIds()); + } + return new MessageResponse(ResultCode.SUCCESS.getCode(), success()); } catch (BaseException baseException) { log.error(baseException.getMessage(), baseException); @@ -128,6 +153,37 @@ public class ZqBusinessServiceImpl implements ZqBusinessService { } } + private void addDelayTask(String patientId, List collectorIds, int delaySeconds) { + LocalDateTime nextExecuteDateTime = LocalDateTime.now().plusSeconds(delaySeconds); + String formatNextExecuteDateTime = Func.formatDateTime(nextExecuteDateTime); + Date nextExecuteDate = Func.parseDate(formatNextExecuteDateTime, DateUtil.PATTERN_DATETIME); + + for (String collectorId : collectorIds) { + String key = patientId + "-" + collectorId; + DelayedMessages messageCondition = new DelayedMessages(); + messageCondition.setKeyword(key); + messageCondition.setMessageType(TaskWait.SUBMIT_TASK_WAIT_DELAY_MSG_TYPE); + DelayedMessages delayedMessages = delayedMessagesMapper.findMessage(messageCondition); + if (Objects.isNull(delayedMessages)) { + delayedMessages = new DelayedMessages(); + delayedMessages.setId(idService.getDateSeq()); + delayedMessages.setMessageType(TaskWait.SUBMIT_TASK_WAIT_DELAY_MSG_TYPE); + delayedMessages.setKeyword(key); + delayedMessages.setMessage(key); + delayedMessages.setDelaySeconds(delaySeconds); + delayedMessages.setNextExecutionTime(nextExecuteDate); + delayedMessages.setCreateTime(new Date()); + delayedMessages.setRetries(0); + delayedMessages.setErrorMessage(""); + delayedMessagesMapper.insert(delayedMessages); + } else { + delayedMessages.setDelaySeconds(delaySeconds); + delayedMessages.setNextExecutionTime(nextExecuteDate); + delayedMessagesMapper.updateById(delayedMessages); + } + } + } + private void verifyPatientIds(List patientIds) { if (Func.isEmpty(patientIds)) { throw new BaseException("系统无此患者!"); diff --git a/src/main/java/com/docus/server/message/config/CollectTaskConfig.java b/src/main/java/com/docus/server/message/config/CollectTaskConfig.java index 5bc19a1..805f52f 100644 --- a/src/main/java/com/docus/server/message/config/CollectTaskConfig.java +++ b/src/main/java/com/docus/server/message/config/CollectTaskConfig.java @@ -38,6 +38,7 @@ public class CollectTaskConfig { public static class TaskConfig { private String method; private List collectorIds; + private int delaySeconds; private List babyCollectorIds; private String haveBabyQuery; } diff --git a/src/main/java/com/docus/server/message/config/JobAdminConfig.java b/src/main/java/com/docus/server/message/config/JobAdminConfig.java new file mode 100644 index 0000000..05cb68b --- /dev/null +++ b/src/main/java/com/docus/server/message/config/JobAdminConfig.java @@ -0,0 +1,59 @@ +package com.docus.server.message.config; + + +import com.docus.server.message.feign.service.CollectTaskService; +import com.docus.server.message.mapper.DelayedMessagesMapper; +import com.docus.server.message.scheduler.JobScheduler; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * job执行需要的一些必须得service,dao,mapper和配置等管理,从这里获取,保证依赖都有注入 + * + * @author YongBin Wen + * @date 2024/1/23 14:13 + */ +@Component +public class JobAdminConfig implements InitializingBean, DisposableBean { + + private static JobAdminConfig jobAdminConfig = null; + + public static JobAdminConfig getJobAdminConfig() { + return jobAdminConfig; + } + + // ---------------------- JobScheduler ---------------------- + + private JobScheduler xxlJobScheduler; + + @Override + public void afterPropertiesSet() throws Exception { + jobAdminConfig = this; + + xxlJobScheduler = new JobScheduler(); + xxlJobScheduler.init(); + } + + @Override + public void destroy() throws Exception { + xxlJobScheduler.destroy(); + } + + // dao service + + @Resource + private DelayedMessagesMapper delayedMessagesMapper; + + @Resource + private CollectTaskService collectTaskService; + + public DelayedMessagesMapper getDelayedMessagesMapper() { + return delayedMessagesMapper; + } + public CollectTaskService getCollectTaskService() { + return collectTaskService; + } +} diff --git a/src/main/java/com/docus/server/message/entity/DelayedMessages.java b/src/main/java/com/docus/server/message/entity/DelayedMessages.java new file mode 100644 index 0000000..9628914 --- /dev/null +++ b/src/main/java/com/docus/server/message/entity/DelayedMessages.java @@ -0,0 +1,58 @@ +package com.docus.server.message.entity; + +import lombok.Data; + +import java.util.Date; + +/** + * 延迟消息类 + * @author wyb + */ +@Data +public class DelayedMessages { + /** + * 消息ID + */ + private Long id; + + /** + * 消息类型 + */ + private String messageType; + + /** + * 消息检索关键词 + */ + private String keyword; + + /** + * 消息内容 + */ + private String message; + + /** + * 下次执行时间 + */ + private Date nextExecutionTime; + + /** + * 延迟时间(秒) + */ + private Integer delaySeconds; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 已重试次数 + */ + private Integer retries; + + /** + * 错误信息 + */ + private String errorMessage; + +} \ No newline at end of file diff --git a/src/main/java/com/docus/server/message/enums/TaskWait.java b/src/main/java/com/docus/server/message/enums/TaskWait.java new file mode 100644 index 0000000..7bb1e4c --- /dev/null +++ b/src/main/java/com/docus/server/message/enums/TaskWait.java @@ -0,0 +1,9 @@ +package com.docus.server.message.enums; + +/** + * @author YongBin Wen + * @date 2025/6/3 0003 15:44 + */ +public interface TaskWait { + String SUBMIT_TASK_WAIT_DELAY_MSG_TYPE = "SUBMIT_TASK_WAIT_DELAY_MSG_TYPE"; +} diff --git a/src/main/java/com/docus/server/message/mapper/DelayedMessagesMapper.java b/src/main/java/com/docus/server/message/mapper/DelayedMessagesMapper.java new file mode 100644 index 0000000..ae6ba7c --- /dev/null +++ b/src/main/java/com/docus/server/message/mapper/DelayedMessagesMapper.java @@ -0,0 +1,25 @@ +package com.docus.server.message.mapper; + +import com.docus.server.message.entity.DelayedMessages; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * 延迟消息 mapper + * @author wyb + */ +@Mapper +public interface DelayedMessagesMapper { + + int delById(@Param("id") Long id); + + int updateById(@Param("message") DelayedMessages message); + + int insert(@Param("message") DelayedMessages message); + + List findExecutableMessages(@Param("condition") DelayedMessages condition,@Param("currentDateTime") String formatCurrentDateTime); + + DelayedMessages findMessage(@Param("condition") DelayedMessages condition); +} diff --git a/src/main/java/com/docus/server/message/scheduler/JobScheduler.java b/src/main/java/com/docus/server/message/scheduler/JobScheduler.java new file mode 100644 index 0000000..49e44c6 --- /dev/null +++ b/src/main/java/com/docus/server/message/scheduler/JobScheduler.java @@ -0,0 +1,26 @@ +package com.docus.server.message.scheduler; + +import com.docus.server.message.thread.DelayTaskIssuanceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author YongBin Wen + * @date 2024/1/23 14:04 + */ + +public class JobScheduler { + + private static final Logger logger = LoggerFactory.getLogger(JobScheduler.class); + + public void init() throws Exception { + DelayTaskIssuanceHelper.getInstance().start(); + + logger.info(">>>>>>>>> init job admin success."); + } + + + public void destroy() throws Exception { + DelayTaskIssuanceHelper.getInstance().toStop(); + } +} diff --git a/src/main/java/com/docus/server/message/thread/DelayTaskIssuanceHelper.java b/src/main/java/com/docus/server/message/thread/DelayTaskIssuanceHelper.java new file mode 100644 index 0000000..84a2599 --- /dev/null +++ b/src/main/java/com/docus/server/message/thread/DelayTaskIssuanceHelper.java @@ -0,0 +1,110 @@ +package com.docus.server.message.thread; + +import com.docus.core.util.DateUtil; +import com.docus.core.util.Func; +import com.docus.infrastructure.web.api.CommonResult; +import com.docus.server.message.config.JobAdminConfig; +import com.docus.server.message.entity.DelayedMessages; +import com.docus.server.message.enums.TaskWait; +import com.docus.server.message.feign.dto.CompensateTasRequest; +import com.docus.server.message.feign.service.CollectTaskService; +import com.docus.server.message.mapper.DelayedMessagesMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author YongBin Wen + * @date 2024/1/23 10:59 + */ + +public class DelayTaskIssuanceHelper { + private static Logger logger = LoggerFactory.getLogger(DelayTaskIssuanceHelper.class); + private static DelayTaskIssuanceHelper instance = new DelayTaskIssuanceHelper(); + + public static DelayTaskIssuanceHelper getInstance() { + return instance; + } + + private Thread delayTaskIssuanceThread; + private volatile boolean toStop = false; + + public void start() { + delayTaskIssuanceThread = new Thread(() -> { + final String messageType = TaskWait.SUBMIT_TASK_WAIT_DELAY_MSG_TYPE; + DelayedMessages condition = new DelayedMessages(); + condition.setMessageType(messageType); + DelayedMessagesMapper delayedMessagesMapper = JobAdminConfig.getJobAdminConfig().getDelayedMessagesMapper(); + CollectTaskService collectTaskService = JobAdminConfig.getJobAdminConfig().getCollectTaskService(); + while (!toStop) { + try { + LocalDateTime currentDateTime = LocalDateTime.now(); + String formatCurrentDateTime = Func.formatDateTime(currentDateTime); + // 数据量暂时不大,使用 + List delayedMessages = delayedMessagesMapper.findExecutableMessages(condition, formatCurrentDateTime); + if (Func.isNotEmpty(delayedMessages)) { + for (DelayedMessages message : delayedMessages) { + try { + String messageContent = message.getMessage(); + String[] split = messageContent.split("-"); + String patientId = split[0]; + String collectorId = split[1]; + List patientIds = Collections.singletonList(patientId); + List collectorIds = Collections.singletonList(collectorId); + + logger.info("提交延迟,补偿任务 patientIds:{} collectId:{}", patientIds, collectorIds); + CompensateTasRequest dto = new CompensateTasRequest(); + dto.setPatientIds(patientIds); + dto.setCollectorIds(collectorIds); + dto.setPriority(3); + CommonResult result = collectTaskService.compensateTask(dto); + logger.info("提交延迟,补偿任务结果:{}",Func.toJson(result)); + delayedMessagesMapper.delById(message.getId()); + } catch (Exception ex) { + if (!toStop) { + logger.error(">>>>>>>>>>> Delay Task Issuance thread error:" + ex.getMessage() + " ,delayMessage:" + Func.toJson(message), ex); + int retries = message.getRetries() + 1; + Integer delaySeconds = message.getDelaySeconds(); + LocalDateTime nexExecuteLocalDateTime = LocalDateTime.now().plusSeconds(delaySeconds); + String dateTime = Func.formatDateTime(nexExecuteLocalDateTime); + Date nexExecuteTime = Func.parseDate(dateTime, DateUtil.PATTERN_DATETIME); + + message.setRetries(retries); + message.setErrorMessage(ex.getMessage()); + message.setNextExecutionTime(nexExecuteTime); + delayedMessagesMapper.updateById(message); + } + } + } + } + + TimeUnit.SECONDS.sleep(10); + } catch (Exception ex) { + if (!toStop) { + logger.error(">>>>>>>>>>> Delay Task Issuance thread error:" + ex.getMessage(), ex); + } + } + + } + logger.info(">>>>>>>>>>> Delay Task Issuance thread stop"); + }); + delayTaskIssuanceThread.setDaemon(true); + delayTaskIssuanceThread.setName("delayTaskIssuanceHelper"); + delayTaskIssuanceThread.start(); + } + + public void toStop() { + toStop = false; + delayTaskIssuanceThread.interrupt(); + try { + delayTaskIssuanceThread.join(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/src/main/resources/mapper/DelayedMessagesMapper.xml b/src/main/resources/mapper/DelayedMessagesMapper.xml new file mode 100644 index 0000000..8e01f02 --- /dev/null +++ b/src/main/resources/mapper/DelayedMessagesMapper.xml @@ -0,0 +1,56 @@ + + + + + INSERT INTO + `docus_medicalrecord`.`delayed_messages` (`id`, `message_type`, `keyword`, `message`, `next_execution_time`, `delay_seconds`, `create_time`, `retries`, `error_message`) + VALUES (#{message.id}, #{message.messageType}, #{message.keyword}, #{message.message}, #{message.nextExecutionTime}, #{message.delaySeconds}, #{message.createTime}, 0, #{message.errorMessage}) + + + UPDATE `docus_medicalrecord`.`delayed_messages` + SET + `message_type` = #{message.messageType}, + `keyword` = #{message.keyword}, + `message` = #{message.message}, + `next_execution_time` = #{message.nextExecutionTime}, + `delay_seconds` = #{message.delaySeconds}, + `create_time` = #{message.createTime}, + `retries` = #{message.retries}, + `error_message` = #{message.errorMessage} + WHERE `id` = #{message.id} + + + + DELETE FROM `docus_medicalrecord`.`delayed_messages` WHERE `id`=#{id} + + + +