feat:肇庆提交,任务延迟生成

master
wyb 3 weeks ago
parent 4bc453269f
commit ffb7399202

@ -1,5 +1,6 @@
[{
"method": "WS_RECORD_SUBMIT",
"delaySeconds": 60,
"collectorIds": ["2", "3", "4", "5", "6", "8"]
},
{

@ -3,18 +3,26 @@ package com.docus.server.message.busservice.impl;
import com.docus.core.util.DateUtil;
import com.docus.core.util.Func;
import com.docus.infrastructure.core.exception.BaseException;
import com.docus.infrastructure.redis.service.IdService;
import com.docus.infrastructure.web.api.CommonResult;
import com.docus.infrastructure.web.api.ResultCode;
import com.docus.server.message.busservice.ZqBusinessService;
import com.docus.server.message.config.CollectTaskConfig;
import com.docus.server.message.converters.ZqWsNurseSubmitMessageConvert;
import com.docus.server.message.converters.ZqWsRecordSubmitMessageConvert;
import com.docus.server.message.dto.*;
import com.docus.server.message.dto.Message;
import com.docus.server.message.dto.MessageResponse;
import com.docus.server.message.dto.TbasicQrPreciseCondition;
import com.docus.server.message.dto.ZqRecordSubmitDTO;
import com.docus.server.message.dto.ZqWsNurseSubmitDTO;
import com.docus.server.message.entity.DelayedMessages;
import com.docus.server.message.enums.TaskWait;
import com.docus.server.message.feign.dto.CompensateTasRequest;
import com.docus.server.message.feign.dto.HospitalSubmitNodeLogAddDTO;
import com.docus.server.message.feign.enums.HospitalSubmitNodeEnum;
import com.docus.server.message.feign.service.CollectTaskService;
import com.docus.server.message.feign.service.HospitalSubmitNodeServiceApi;
import com.docus.server.message.mapper.DelayedMessagesMapper;
import com.docus.server.message.mapper.TBasicMapper;
import com.docus.server.message.validate.ZqRecordSubmitValidate;
import com.docus.server.message.validate.ZqWsNurseSubmitValidate;
@ -22,7 +30,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.Objects;
/**
*
@ -43,6 +54,12 @@ public class ZqBusinessServiceImpl implements ZqBusinessService {
@Resource
private CollectTaskService collectTaskService;
@Resource
private DelayedMessagesMapper delayedMessagesMapper;
@Resource
private IdService idService;
@Override
public MessageResponse recordSubmitHandle(Message message) {
try {
@ -111,7 +128,15 @@ public class ZqBusinessServiceImpl implements ZqBusinessService {
hospitalSubmitNodeServiceApi.addLog(nodeLogAddDTO);
tBasicMapper.insertOrUpdateNurseSubmitTime(patientIds.get(0), Func.parseDate(submitDate, DateUtil.PATTERN_DATETIME));
compensateTask("护理提交",patientIds, taskConfig.getCollectorIds());
int delaySeconds = taskConfig.getDelaySeconds();
List<String> collectorIds = taskConfig.getCollectorIds();
if (delaySeconds > 0) {
addDelayTask(patientIds.get(0), collectorIds, delaySeconds);
} else {
compensateTask("护理提交", patientIds, taskConfig.getCollectorIds());
}
return new MessageResponse(ResultCode.SUCCESS.getCode(), success());
} catch (BaseException baseException) {
log.error(baseException.getMessage(), baseException);
@ -128,6 +153,37 @@ public class ZqBusinessServiceImpl implements ZqBusinessService {
}
}
private void addDelayTask(String patientId, List<String> collectorIds, int delaySeconds) {
LocalDateTime nextExecuteDateTime = LocalDateTime.now().plusSeconds(delaySeconds);
String formatNextExecuteDateTime = Func.formatDateTime(nextExecuteDateTime);
Date nextExecuteDate = Func.parseDate(formatNextExecuteDateTime, DateUtil.PATTERN_DATETIME);
for (String collectorId : collectorIds) {
String key = patientId + "-" + collectorId;
DelayedMessages messageCondition = new DelayedMessages();
messageCondition.setKeyword(key);
messageCondition.setMessageType(TaskWait.SUBMIT_TASK_WAIT_DELAY_MSG_TYPE);
DelayedMessages delayedMessages = delayedMessagesMapper.findMessage(messageCondition);
if (Objects.isNull(delayedMessages)) {
delayedMessages = new DelayedMessages();
delayedMessages.setId(idService.getDateSeq());
delayedMessages.setMessageType(TaskWait.SUBMIT_TASK_WAIT_DELAY_MSG_TYPE);
delayedMessages.setKeyword(key);
delayedMessages.setMessage(key);
delayedMessages.setDelaySeconds(delaySeconds);
delayedMessages.setNextExecutionTime(nextExecuteDate);
delayedMessages.setCreateTime(new Date());
delayedMessages.setRetries(0);
delayedMessages.setErrorMessage("");
delayedMessagesMapper.insert(delayedMessages);
} else {
delayedMessages.setDelaySeconds(delaySeconds);
delayedMessages.setNextExecutionTime(nextExecuteDate);
delayedMessagesMapper.updateById(delayedMessages);
}
}
}
private void verifyPatientIds(List<String> patientIds) {
if (Func.isEmpty(patientIds)) {
throw new BaseException("系统无此患者!");

@ -38,6 +38,7 @@ public class CollectTaskConfig {
public static class TaskConfig {
private String method;
private List<String> collectorIds;
private int delaySeconds;
private List<String> babyCollectorIds;
private String haveBabyQuery;
}

@ -0,0 +1,59 @@
package com.docus.server.message.config;
import com.docus.server.message.feign.service.CollectTaskService;
import com.docus.server.message.mapper.DelayedMessagesMapper;
import com.docus.server.message.scheduler.JobScheduler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* jobservicedaomapper
*
* @author YongBin Wen
* @date 2024/1/23 14:13
*/
@Component
public class JobAdminConfig implements InitializingBean, DisposableBean {
private static JobAdminConfig jobAdminConfig = null;
public static JobAdminConfig getJobAdminConfig() {
return jobAdminConfig;
}
// ---------------------- JobScheduler ----------------------
private JobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
jobAdminConfig = this;
xxlJobScheduler = new JobScheduler();
xxlJobScheduler.init();
}
@Override
public void destroy() throws Exception {
xxlJobScheduler.destroy();
}
// dao service
@Resource
private DelayedMessagesMapper delayedMessagesMapper;
@Resource
private CollectTaskService collectTaskService;
public DelayedMessagesMapper getDelayedMessagesMapper() {
return delayedMessagesMapper;
}
public CollectTaskService getCollectTaskService() {
return collectTaskService;
}
}

@ -0,0 +1,58 @@
package com.docus.server.message.entity;
import lombok.Data;
import java.util.Date;
/**
*
* @author wyb
*/
@Data
public class DelayedMessages {
/**
* ID
*/
private Long id;
/**
*
*/
private String messageType;
/**
*
*/
private String keyword;
/**
*
*/
private String message;
/**
*
*/
private Date nextExecutionTime;
/**
*
*/
private Integer delaySeconds;
/**
*
*/
private Date createTime;
/**
*
*/
private Integer retries;
/**
*
*/
private String errorMessage;
}

@ -0,0 +1,9 @@
package com.docus.server.message.enums;
/**
* @author YongBin Wen
* @date 2025/6/3 0003 15:44
*/
public interface TaskWait {
String SUBMIT_TASK_WAIT_DELAY_MSG_TYPE = "SUBMIT_TASK_WAIT_DELAY_MSG_TYPE";
}

@ -0,0 +1,25 @@
package com.docus.server.message.mapper;
import com.docus.server.message.entity.DelayedMessages;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* mapper
* @author wyb
*/
@Mapper
public interface DelayedMessagesMapper {
int delById(@Param("id") Long id);
int updateById(@Param("message") DelayedMessages message);
int insert(@Param("message") DelayedMessages message);
List<DelayedMessages> findExecutableMessages(@Param("condition") DelayedMessages condition,@Param("currentDateTime") String formatCurrentDateTime);
DelayedMessages findMessage(@Param("condition") DelayedMessages condition);
}

@ -0,0 +1,26 @@
package com.docus.server.message.scheduler;
import com.docus.server.message.thread.DelayTaskIssuanceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author YongBin Wen
* @date 2024/1/23 14:04
*/
public class JobScheduler {
private static final Logger logger = LoggerFactory.getLogger(JobScheduler.class);
public void init() throws Exception {
DelayTaskIssuanceHelper.getInstance().start();
logger.info(">>>>>>>>> init job admin success.");
}
public void destroy() throws Exception {
DelayTaskIssuanceHelper.getInstance().toStop();
}
}

@ -0,0 +1,110 @@
package com.docus.server.message.thread;
import com.docus.core.util.DateUtil;
import com.docus.core.util.Func;
import com.docus.infrastructure.web.api.CommonResult;
import com.docus.server.message.config.JobAdminConfig;
import com.docus.server.message.entity.DelayedMessages;
import com.docus.server.message.enums.TaskWait;
import com.docus.server.message.feign.dto.CompensateTasRequest;
import com.docus.server.message.feign.service.CollectTaskService;
import com.docus.server.message.mapper.DelayedMessagesMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author YongBin Wen
* @date 2024/1/23 10:59
*/
public class DelayTaskIssuanceHelper {
private static Logger logger = LoggerFactory.getLogger(DelayTaskIssuanceHelper.class);
private static DelayTaskIssuanceHelper instance = new DelayTaskIssuanceHelper();
public static DelayTaskIssuanceHelper getInstance() {
return instance;
}
private Thread delayTaskIssuanceThread;
private volatile boolean toStop = false;
public void start() {
delayTaskIssuanceThread = new Thread(() -> {
final String messageType = TaskWait.SUBMIT_TASK_WAIT_DELAY_MSG_TYPE;
DelayedMessages condition = new DelayedMessages();
condition.setMessageType(messageType);
DelayedMessagesMapper delayedMessagesMapper = JobAdminConfig.getJobAdminConfig().getDelayedMessagesMapper();
CollectTaskService collectTaskService = JobAdminConfig.getJobAdminConfig().getCollectTaskService();
while (!toStop) {
try {
LocalDateTime currentDateTime = LocalDateTime.now();
String formatCurrentDateTime = Func.formatDateTime(currentDateTime);
// 数据量暂时不大,使用
List<DelayedMessages> delayedMessages = delayedMessagesMapper.findExecutableMessages(condition, formatCurrentDateTime);
if (Func.isNotEmpty(delayedMessages)) {
for (DelayedMessages message : delayedMessages) {
try {
String messageContent = message.getMessage();
String[] split = messageContent.split("-");
String patientId = split[0];
String collectorId = split[1];
List<String> patientIds = Collections.singletonList(patientId);
List<String> collectorIds = Collections.singletonList(collectorId);
logger.info("提交延迟,补偿任务 patientIds:{} collectId:{}", patientIds, collectorIds);
CompensateTasRequest dto = new CompensateTasRequest();
dto.setPatientIds(patientIds);
dto.setCollectorIds(collectorIds);
dto.setPriority(3);
CommonResult<String> result = collectTaskService.compensateTask(dto);
logger.info("提交延迟,补偿任务结果:{}",Func.toJson(result));
delayedMessagesMapper.delById(message.getId());
} catch (Exception ex) {
if (!toStop) {
logger.error(">>>>>>>>>>> Delay Task Issuance thread error:" + ex.getMessage() + " ,delayMessage:" + Func.toJson(message), ex);
int retries = message.getRetries() + 1;
Integer delaySeconds = message.getDelaySeconds();
LocalDateTime nexExecuteLocalDateTime = LocalDateTime.now().plusSeconds(delaySeconds);
String dateTime = Func.formatDateTime(nexExecuteLocalDateTime);
Date nexExecuteTime = Func.parseDate(dateTime, DateUtil.PATTERN_DATETIME);
message.setRetries(retries);
message.setErrorMessage(ex.getMessage());
message.setNextExecutionTime(nexExecuteTime);
delayedMessagesMapper.updateById(message);
}
}
}
}
TimeUnit.SECONDS.sleep(10);
} catch (Exception ex) {
if (!toStop) {
logger.error(">>>>>>>>>>> Delay Task Issuance thread error:" + ex.getMessage(), ex);
}
}
}
logger.info(">>>>>>>>>>> Delay Task Issuance thread stop");
});
delayTaskIssuanceThread.setDaemon(true);
delayTaskIssuanceThread.setName("delayTaskIssuanceHelper");
delayTaskIssuanceThread.start();
}
public void toStop() {
toStop = false;
delayTaskIssuanceThread.interrupt();
try {
delayTaskIssuanceThread.join();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.docus.server.message.mapper.DelayedMessagesMapper">
<insert id="insert">
INSERT INTO
`docus_medicalrecord`.`delayed_messages` (`id`, `message_type`, `keyword`, `message`, `next_execution_time`, `delay_seconds`, `create_time`, `retries`, `error_message`)
VALUES (#{message.id}, #{message.messageType}, #{message.keyword}, #{message.message}, #{message.nextExecutionTime}, #{message.delaySeconds}, #{message.createTime}, 0, #{message.errorMessage})
</insert>
<update id="updateById">
UPDATE `docus_medicalrecord`.`delayed_messages`
SET
`message_type` = #{message.messageType},
`keyword` = #{message.keyword},
`message` = #{message.message},
`next_execution_time` = #{message.nextExecutionTime},
`delay_seconds` = #{message.delaySeconds},
`create_time` = #{message.createTime},
`retries` = #{message.retries},
`error_message` = #{message.errorMessage}
WHERE `id` = #{message.id}
</update>
<delete id="delById">
DELETE FROM `docus_medicalrecord`.`delayed_messages` WHERE `id`=#{id}
</delete>
<select id="findExecutableMessages" resultType="com.docus.server.message.entity.DelayedMessages">
select
*
from
`docus_medicalrecord`.`delayed_messages`
where
`next_execution_time` <![CDATA[<=]]> #{currentDateTime}
<if test="condition.keyword != null and condition.keyword != '' ">
and `keyword` = #{condition.keyword}
</if>
<if test="condition.messageType != null and condition.messageType != '' ">
and `message_type` = #{condition.messageType}
</if>
</select>
<select id="findMessage" resultType="com.docus.server.message.entity.DelayedMessages">
select
*
from
`docus_medicalrecord`.`delayed_messages`
where
1=1
<if test="condition.keyword != null and condition.keyword != '' ">
and `keyword` = #{condition.keyword}
</if>
<if test="condition.messageType != null and condition.messageType != '' ">
and `message_type` = #{condition.messageType}
</if>
</select>
</mapper>
Loading…
Cancel
Save