diff --git a/src/main/java/com/docus/server/report/config/JobAdminConfig.java b/src/main/java/com/docus/server/report/config/JobAdminConfig.java new file mode 100644 index 0000000..05561fe --- /dev/null +++ b/src/main/java/com/docus/server/report/config/JobAdminConfig.java @@ -0,0 +1,60 @@ +package com.docus.server.report.config; + +import com.docus.server.report.listener.NisReportDownloadWaitHandler; +import com.docus.server.report.mapper.DelayedMessagesMapper; +import com.docus.server.report.scheduler.JobScheduler; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * job执行需要的一些必须得service,dao,mapper和配置等管理,从这里获取,保证依赖都有注入 + * + * @author YongBin Wen + * @date 2024/1/23 14:13 + */ +@Component +public class JobAdminConfig implements InitializingBean, DisposableBean { + + private static JobAdminConfig jobAdminConfig = null; + + public static JobAdminConfig getJobAdminConfig() { + return jobAdminConfig; + } + + // ---------------------- JobScheduler ---------------------- + + private JobScheduler xxlJobScheduler; + + @Override + public void afterPropertiesSet() throws Exception { + jobAdminConfig = this; + + xxlJobScheduler = new JobScheduler(); + xxlJobScheduler.init(); + } + + @Override + public void destroy() throws Exception { + xxlJobScheduler.destroy(); + } + + // dao service + + @Resource + private NisReportDownloadWaitHandler nisReportDownloadWaitHandler; + + + @Resource + private DelayedMessagesMapper delayedMessagesMapper; + + public NisReportDownloadWaitHandler getNisReportDownloadWaitHandler() { + return nisReportDownloadWaitHandler; + } + + public DelayedMessagesMapper getDelayedMessagesMapper() { + return delayedMessagesMapper; + } +} diff --git a/src/main/java/com/docus/server/report/consts/ReportDownloadWait.java b/src/main/java/com/docus/server/report/consts/ReportDownloadWait.java index c96ee9e..0e4a406 100644 --- a/src/main/java/com/docus/server/report/consts/ReportDownloadWait.java +++ b/src/main/java/com/docus/server/report/consts/ReportDownloadWait.java @@ -12,4 +12,9 @@ public interface ReportDownloadWait { * 等待报告下载完成的redisKey,docus:collect:report:download:wait:{采集器id}:{病案主键} */ String REPORT_DOWNLOAD_WAIT_KEY = "docus:collect:report:download:wait:%s:%s"; + + /** + * 等待护理报告下载完成的延迟消息处理类型 + */ + String NIS_REPORT_DOWNLOAD_WAIT_DELAY_MSG_TYPE = "docus:collect:report:download:wait:nis"; } diff --git a/src/main/java/com/docus/server/report/entity/DelayedMessages.java b/src/main/java/com/docus/server/report/entity/DelayedMessages.java new file mode 100644 index 0000000..2555dca --- /dev/null +++ b/src/main/java/com/docus/server/report/entity/DelayedMessages.java @@ -0,0 +1,58 @@ +package com.docus.server.report.entity; + +import lombok.Data; + +import java.util.Date; + +/** + * 延迟消息类 + * @author wyb + */ +@Data +public class DelayedMessages { + /** + * 消息ID + */ + private Long id; + + /** + * 消息类型 + */ + private String messageType; + + /** + * 消息检索关键词 + */ + private String keyword; + + /** + * 消息内容 + */ + private String message; + + /** + * 下次执行时间 + */ + private Date nextExecutionTime; + + /** + * 延迟时间(秒) + */ + private Integer delaySeconds; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 已重试次数 + */ + private Integer retries; + + /** + * 错误信息 + */ + private String errorMessage; + +} \ No newline at end of file diff --git a/src/main/java/com/docus/server/report/job/ReportJob.java b/src/main/java/com/docus/server/report/job/ReportJob.java index d8229ae..85f16e3 100644 --- a/src/main/java/com/docus/server/report/job/ReportJob.java +++ b/src/main/java/com/docus/server/report/job/ReportJob.java @@ -1,6 +1,7 @@ package com.docus.server.report.job; import com.alibaba.fastjson.JSONObject; +import com.docus.core.util.DateUtil; import com.docus.core.util.Func; import com.docus.infrastructure.core.exception.BaseException; import com.docus.infrastructure.redis.service.IdService; @@ -20,8 +21,10 @@ import com.docus.server.report.config.ZdAssortConfig; import com.docus.server.report.consts.ReportDownloadWait; import com.docus.server.report.entity.AfJobTime; import com.docus.server.report.entity.AfReportRecord; +import com.docus.server.report.entity.DelayedMessages; import com.docus.server.report.mapper.AfJobTimeMapper; import com.docus.server.report.mapper.AfReportRecordMapper; +import com.docus.server.report.mapper.DelayedMessagesMapper; import com.docus.server.report.service.ReportService; import com.docus.server.report.service.ShunDePeopleBusinessService; import com.docus.server.report.util.TableJsonRead; @@ -73,6 +76,8 @@ public class ReportJob { private ShunDePeopleBusinessService shunDePeopleBusinessService; @Resource RedisOps redisOps; + @Resource + private DelayedMessagesMapper delayedMessagesMapper; @Value("${docus.report.waittime}") private int waittime; @@ -285,9 +290,40 @@ public class ReportJob { if (babyTask) { return; } - // 设置一个redisKey,过期被监听处理 - String redisKey = String.format(ReportDownloadWait.REPORT_DOWNLOAD_WAIT_KEY, reportQueryNurseInsertSugarCollectorId, patientId); - redisOps.setEx(redisKey, "0", waittime * 60); + + sendNisDelayVerifyMessage(patientId); + } + } + /** + * 保存延迟验证护理消息 + */ + public void sendNisDelayVerifyMessage(String patientId) { + int delaySeconds = waittime * 60; + String msgType = ReportDownloadWait.NIS_REPORT_DOWNLOAD_WAIT_DELAY_MSG_TYPE; + DelayedMessages condition = new DelayedMessages(); + condition.setMessageType(msgType); + condition.setKeyword(patientId); + DelayedMessages message = delayedMessagesMapper.findMessage(condition); + + LocalDateTime nextExecuteDateTime = LocalDateTime.now().plusSeconds(delaySeconds); + String formatNextExecuteDateTime = Func.formatDateTime(nextExecuteDateTime); + Date nextExecuteDate = Func.parseDate(formatNextExecuteDateTime, DateUtil.PATTERN_DATETIME); + if(Objects.isNull(message)){ + message=new DelayedMessages(); + message.setId(idService.getDateSeq()); + message.setMessageType(msgType); + message.setKeyword(patientId); + message.setMessage(patientId); + message.setDelaySeconds(delaySeconds); + message.setNextExecutionTime(nextExecuteDate); + message.setCreateTime(new Date()); + message.setRetries(0); + message.setErrorMessage(""); + delayedMessagesMapper.insert(message); + }else { + message.setDelaySeconds(delaySeconds); + message.setNextExecutionTime(nextExecuteDate); + delayedMessagesMapper.updateById(message); } } diff --git a/src/main/java/com/docus/server/report/listener/NisReportDownloadWaitHandler.java b/src/main/java/com/docus/server/report/listener/NisReportDownloadWaitHandler.java index 6d4a6dd..db37255 100644 --- a/src/main/java/com/docus/server/report/listener/NisReportDownloadWaitHandler.java +++ b/src/main/java/com/docus/server/report/listener/NisReportDownloadWaitHandler.java @@ -10,7 +10,6 @@ import com.docus.server.collection.entity.TBasic; import com.docus.server.collection.entity.TBasicExtend; import com.docus.server.collection.mapper.TBasicMapper; import com.docus.server.common.util.RedisKeyExpirationHandler; -import com.docus.server.common.util.RedisKeyExpirationListener; import com.docus.server.report.api.MedicalRecordService; import com.docus.server.report.api.request.CqcAuditRequest; import com.docus.server.report.config.SdRyReportQueryConfig; @@ -22,7 +21,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; @@ -34,8 +32,6 @@ import java.util.Objects; @Component @Slf4j public class NisReportDownloadWaitHandler extends RedisKeyExpirationHandler { - @Autowired - private RedisKeyExpirationListener redisKeyExpirationListener; @Autowired private RedisOps redisOps; @Autowired @@ -49,10 +45,9 @@ public class NisReportDownloadWaitHandler extends RedisKeyExpirationHandler { @Resource private MedicalRecordService medicalRecordService; - @PostConstruct - public void registerRedisKeyExpireListener() { - redisKeyExpirationListener.register(this); - } + + + public static void main(String[] args) { String collectId = "3"; @@ -71,10 +66,14 @@ public class NisReportDownloadWaitHandler extends RedisKeyExpirationHandler { return; } String patientId = getPatientId(expireRedisKey); + verify(patientId); + } + + public void verify(String patientId) { log.info("病案主键:{} 护理采集数量校对开始!", patientId); boolean verifyNisFileCountResult = verifyNisFileCountAndPushErr(patientId); if (verifyNisFileCountResult) { - tBasicMapper.updateNursCollectState(patientId,1); + tBasicMapper.updateNursCollectState(patientId, 1); nisQualityControl(patientId); } } @@ -166,4 +165,5 @@ public class NisReportDownloadWaitHandler extends RedisKeyExpirationHandler { return expireRedisKey.replace(nisKeyPrefix, ""); } + } diff --git a/src/main/java/com/docus/server/report/mapper/DelayedMessagesMapper.java b/src/main/java/com/docus/server/report/mapper/DelayedMessagesMapper.java new file mode 100644 index 0000000..66878b4 --- /dev/null +++ b/src/main/java/com/docus/server/report/mapper/DelayedMessagesMapper.java @@ -0,0 +1,23 @@ +package com.docus.server.report.mapper; + +import com.docus.server.report.entity.DelayedMessages; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * 延迟消息 mapper + * @author wyb + */ +public interface DelayedMessagesMapper { + + int delById(@Param("id") Long id); + + int updateById(@Param("message") DelayedMessages message); + + int insert(@Param("message") DelayedMessages message); + + List findExecutableMessages(@Param("condition") DelayedMessages condition,@Param("currentDateTime") String formatCurrentDateTime); + + DelayedMessages findMessage(@Param("condition") DelayedMessages condition); +} diff --git a/src/main/java/com/docus/server/report/scheduler/JobScheduler.java b/src/main/java/com/docus/server/report/scheduler/JobScheduler.java new file mode 100644 index 0000000..62e8bef --- /dev/null +++ b/src/main/java/com/docus/server/report/scheduler/JobScheduler.java @@ -0,0 +1,26 @@ +package com.docus.server.report.scheduler; + +import com.docus.server.report.thread.VerifyNisReportHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author YongBin Wen + * @date 2024/1/23 14:04 + */ + +public class JobScheduler { + + private static final Logger logger = LoggerFactory.getLogger(JobScheduler.class); + + public void init() throws Exception { + VerifyNisReportHelper.getInstance().start(); + + logger.info(">>>>>>>>> init job admin success."); + } + + + public void destroy() throws Exception { + VerifyNisReportHelper.getInstance().toStop(); + } +} diff --git a/src/main/java/com/docus/server/report/thread/VerifyNisReportHelper.java b/src/main/java/com/docus/server/report/thread/VerifyNisReportHelper.java new file mode 100644 index 0000000..1ef6502 --- /dev/null +++ b/src/main/java/com/docus/server/report/thread/VerifyNisReportHelper.java @@ -0,0 +1,95 @@ +package com.docus.server.report.thread; + +import com.docus.core.util.DateUtil; +import com.docus.core.util.Func; +import com.docus.server.report.config.JobAdminConfig; +import com.docus.server.report.consts.ReportDownloadWait; +import com.docus.server.report.entity.DelayedMessages; +import com.docus.server.report.listener.NisReportDownloadWaitHandler; +import com.docus.server.report.mapper.DelayedMessagesMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author YongBin Wen + * @date 2024/1/23 10:59 + */ + +public class VerifyNisReportHelper { + private static Logger logger = LoggerFactory.getLogger(VerifyNisReportHelper.class); + private static VerifyNisReportHelper instance = new VerifyNisReportHelper(); + + public static VerifyNisReportHelper getInstance() { + return instance; + } + + private Thread nisVerifyThread; + private volatile boolean toStop = false; + + public void start() { + nisVerifyThread = new Thread(() -> { + final String messageType = ReportDownloadWait.NIS_REPORT_DOWNLOAD_WAIT_DELAY_MSG_TYPE; + DelayedMessages condition = new DelayedMessages(); + condition.setMessageType(messageType); + DelayedMessagesMapper delayedMessagesMapper = JobAdminConfig.getJobAdminConfig().getDelayedMessagesMapper(); + NisReportDownloadWaitHandler handler = JobAdminConfig.getJobAdminConfig().getNisReportDownloadWaitHandler(); + while (!toStop) { + try { + LocalDateTime currentDateTime = LocalDateTime.now(); + String formatCurrentDateTime = Func.formatDateTime(currentDateTime); + // 数据量暂时不大,使用 + List delayedMessages = delayedMessagesMapper.findExecutableMessages(condition, formatCurrentDateTime); + if (Func.isNotEmpty(delayedMessages)) { + for (DelayedMessages message : delayedMessages) { + try { + String patientId = message.getMessage(); + handler.verify(patientId); + delayedMessagesMapper.delById(message.getId()); + } catch (Exception ex) { + if (!toStop) { + logger.error(">>>>>>>>>>> verify nis report thread error:" + ex.getMessage() + " ,delayMessage:" + Func.toJson(message), ex); + int retries = message.getRetries() + 1; + Integer delaySeconds = message.getDelaySeconds(); + LocalDateTime nexExecuteLocalDateTime = LocalDateTime.now().plusSeconds(delaySeconds); + String dateTime = Func.formatDateTime(nexExecuteLocalDateTime); + Date nexExecuteTime = Func.parseDate(dateTime, DateUtil.PATTERN_DATETIME); + + message.setRetries(retries); + message.setErrorMessage(ex.getMessage()); + message.setNextExecutionTime(nexExecuteTime); + delayedMessagesMapper.updateById(message); + } + } + } + } + + TimeUnit.SECONDS.sleep(10); + } catch (Exception ex) { + if (!toStop) { + logger.error(">>>>>>>>>>> verify nis report thread error:" + ex.getMessage(), ex); + } + } + + } + logger.info(">>>>>>>>>>> verify nis report thread stop"); + }); + nisVerifyThread.setDaemon(true); + nisVerifyThread.setName("verifyNisReportHelper"); + nisVerifyThread.start(); + } + + public void toStop() { + toStop = false; + nisVerifyThread.interrupt(); + try { + nisVerifyThread.join(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/src/main/resources/bootstrap.yml b/src/main/resources/bootstrap.yml index a021cfb..95a98fd 100644 --- a/src/main/resources/bootstrap.yml +++ b/src/main/resources/bootstrap.yml @@ -41,7 +41,7 @@ spring: shared-configs: - comm.${spring.cloud.nacos.config.file-extension} profiles: - active: dev + active: test sdry: # 顺德人医查询检查、检验报告之前查患者交叉索引的wsdl配置 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 2c982f3..552aec8 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -26,26 +26,6 @@ 30 - - - - [%d{yyyy-MM-dd' 'HH:mm:ss.sss}] [%C] [%t] [%X{traceId}] [%L] [%-5p] %m%n - utf-8 - - - - - ${log.path}external%d.%i.log - - 500MB - - 30 - - - - - diff --git a/src/main/resources/mapper/DelayedMessagesMapper.xml b/src/main/resources/mapper/DelayedMessagesMapper.xml new file mode 100644 index 0000000..249e219 --- /dev/null +++ b/src/main/resources/mapper/DelayedMessagesMapper.xml @@ -0,0 +1,56 @@ + + + + + INSERT INTO + `docus_medicalrecord`.`delayed_messages` (`id`, `message_type`, `keyword`, `message`, `next_execution_time`, `delay_seconds`, `create_time`, `retries`, `error_message`) + VALUES (#{message.id}, #{message.messageType}, #{message.keyword}, #{message.message}, #{message.nextExecutionTime}, #{message.delaySeconds}, #{message.createTime}, 0, #{message.errorMessage}) + + + UPDATE `docus_medicalrecord`.`delayed_messages` + SET + `message_type` = #{message.messageType}, + `keyword` = #{message.keyword}, + `message` = #{message.message}, + `next_execution_time` = #{message.nextExecutionTime}, + `delay_seconds` = #{message.delaySeconds}, + `create_time` = #{message.createTime}, + `retries` = #{message.retries}, + `error_message` = #{message.errorMessage} + WHERE `id` = #{message.id} + + + + DELETE FROM `docus_medicalrecord`.`delayed_messages` WHERE `id`=#{id} + + + +