From 0c2e7cb8cc37398314aa2f37e159752e2b20cd5f Mon Sep 17 00:00:00 2001 From: linrf Date: Thu, 8 Jun 2023 18:04:39 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A2=AB=E5=8A=A8rocket=20mq=20=E6=A8=A1?= =?UTF-8?q?=E6=9D=BF=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../basic/mq/JmsIbmMqBasicConsumer.java | 37 +++++++++++++ .../collect/basic/mq/MqBasicConverter.java | 52 ++++++++++++++++++ .../collect/basic/mq/MqBasicResultImpl.java | 20 +++++++ .../basic/mq/RocketMqBasicConsumer.java | 29 ++++++++++ ...istener.java => JmsIbmMqDeptConsumer.java} | 30 +--------- .../collect/dept/mq/RocketMqDeptConsumer.java | 29 ++++++++++ .../collect/user/mq/JmsIbmMqUserConsumer.java | 38 +++++++++++++ .../collect/user/mq/RocketMqUserConsumer.java | 29 ++++++++++ collect-sdry/src/main/resources/bootstrap.yml | 7 ++- common-collect/pom.xml | 1 - .../basic/mq/IBasicMqCollectService.java | 19 +++++++ .../mq/impl/BasicJmsMqCollectServiceImpl.java | 47 ++++++++++++++++ .../impl/BasicRocketMqCollectServiceImpl.java | 54 ++++++++++++++++++ ....java => DeptJmsMqCollectServiceImpl.java} | 10 ++-- .../impl/DeptRocketMqCollectServiceImpl.java | 55 +++++++++++++++++++ .../collect/user/http/UserRestController.java | 15 ++++- .../server/collect/user/mq/RmqProvdier.java | 24 ++++++++ ....java => UserJmsMqCollectServiceImpl.java} | 10 ++-- .../impl/UserRocketMqCollectServiceImpl.java | 54 ++++++++++++++++++ .../collect/web/enums/CollectTypeEnum.java | 19 +++++-- pom.xml | 6 ++ 21 files changed, 538 insertions(+), 47 deletions(-) create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/basic/mq/JmsIbmMqBasicConsumer.java create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicConverter.java create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicResultImpl.java create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java rename collect-sdry/src/main/java/com/docus/server/collect/dept/mq/{JmsIbmListener.java => JmsIbmMqDeptConsumer.java} (52%) create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/user/mq/JmsIbmMqUserConsumer.java create mode 100644 collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java create mode 100644 common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java create mode 100644 common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java create mode 100644 common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java rename common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/{DeptMqCollectServiceImpl.java => DeptJmsMqCollectServiceImpl.java} (88%) create mode 100644 common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java create mode 100644 common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java rename common-collect/src/main/java/com/docus/server/collect/user/mq/impl/{UserMqCollectServiceImpl.java => UserJmsMqCollectServiceImpl.java} (88%) create mode 100644 common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java diff --git a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/JmsIbmMqBasicConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/JmsIbmMqBasicConsumer.java new file mode 100644 index 0000000..7e06803 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/JmsIbmMqBasicConsumer.java @@ -0,0 +1,37 @@ +package com.docus.server.collect.basic.mq; + +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author jiashi + * 梅州中医医院 MQ 监听 + */ +@Component +public class JmsIbmMqBasicConsumer { + @Resource(name = "basicJmsMqCollectServiceImpl") + private IBasicMqCollectService basicMqCollectService; + + /** + * 订阅队列(注册科室) + * + * @param message 科室信息 + */ +// @JmsIbmListener(destination = "TJ_createDepartment") + public void addBasic(String message) { + basicMqCollectService.addTBasic(message); + } + + + /** + * 订阅队列(变更科室) + * + * @param message 科室信息 + */ +// @JmsIbmListener(destination = "TJ_updateDepartment") + public void updateBasic(String message) { + basicMqCollectService.updateTBasic(message); + } + +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicConverter.java b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicConverter.java new file mode 100644 index 0000000..b95dfff --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicConverter.java @@ -0,0 +1,52 @@ +package com.docus.server.collect.basic.mq; + +import com.docus.server.collect.IConverter; +import com.docus.server.sys.common.pojo.dto.DeptDTO; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component("mqBasicConverter") +public class MqBasicConverter implements IConverter { + + @Override + public DeptDTO convert(String message, String group) { + //TODO: 根据协议进行解析。 +// XmlUtil xmlParseUtil = XmlUtil.of(message); +// Node msgIdNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/id/@extension"); +// Node receiverNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/receiver/device/id/item/@extension"); +// Node operateTypeNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/controlActProcess/subject/registrationRequest/subject1/valueSet/valueSetItems/@operateType"); +// Node deptCodeNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/controlActProcess/subject/registrationRequest/subject1/valueSet/valueSetItems/DEPT_CODE/@value"); +// Node deptNameNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/controlActProcess/subject/registrationRequest/subject1/valueSet/valueSetItems/DEPT_NAME/@value"); +// Node authorIdNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/controlActProcess/subject/registrationRequest/author/assignedEntity/id/item/@extension"); +// Node authorNameNode = xmlParseUtil.getNode("/PRVS_IN000002UV01/controlActProcess/subject/registrationRequest/author/assignedEntity/assignedPerson/name/item/part/@value"); +// +// DeptDTO deptDto = new DeptDTO(); +// deptDto.setMessageId(msgIdNode.getNodeValue()); +// deptDto.setReceiver(receiverNode.getNodeValue()); +// deptDto.setOperateType(operateTypeNode.getNodeValue()); +// deptDto.setDeptCode(deptCodeNode.getNodeValue()); +// deptDto.setDeptName(deptNameNode.getNodeValue()); +// deptDto.setAuthorId(authorIdNode.getNodeValue()); +// deptDto.setAuthorName(authorNameNode.getNodeValue()); +// deptDto.setSource(message);//原始xml报文 +// +// deptDto.setParams(ParamsUtils.addParam("msg", "操作成功!") +// .addParam("msgId", deptDto.getMessageId()) +// .addParam("receiver", deptDto.getReceiver()) +// .param()); + + + DeptDTO deptDto = new DeptDTO(); + deptDto.setMessageId(UUID.randomUUID().toString()); + deptDto.setReceiver("1"); + deptDto.setOperateType("2"); + deptDto.setDeptCode("3"); + deptDto.setDeptName("4"); + deptDto.setAuthorId("5"); + deptDto.setAuthorName("6"); + deptDto.setSource(message);//原始xml报文 + + return deptDto; + } +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicResultImpl.java b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicResultImpl.java new file mode 100644 index 0000000..48ccd40 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/MqBasicResultImpl.java @@ -0,0 +1,20 @@ +package com.docus.server.collect.basic.mq; + +import com.docus.server.collect.IMqResult; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component("mqBasicResultImpl") +public class MqBasicResultImpl implements IMqResult { + + @Override + public Void ok(Map params) { + return null; + } + + @Override + public Void fail(Map params) { + return null; + } +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java new file mode 100644 index 0000000..4a46eb3 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java @@ -0,0 +1,29 @@ +package com.docus.server.collect.basic.mq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +@Slf4j +@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") +public class RocketMqBasicConsumer implements RocketMQListener { + + @Resource(name = "basicRocketMqCollectServiceImpl") + private IBasicMqCollectService basicMqCollectService; + + @Override + public void onMessage(String message) { + try { + log.info("RocketMQ message:{}", message); + basicMqCollectService.addTBasic(message); + } catch (Exception e) { + log.error("errorMessage:{}", message); + // 抛出异常会重新消费消息 + throw new RuntimeException("Message processing failed", e); + } + } +} \ No newline at end of file diff --git a/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/JmsIbmListener.java b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/JmsIbmMqDeptConsumer.java similarity index 52% rename from collect-sdry/src/main/java/com/docus/server/collect/dept/mq/JmsIbmListener.java rename to collect-sdry/src/main/java/com/docus/server/collect/dept/mq/JmsIbmMqDeptConsumer.java index 84bb30e..2dfde11 100644 --- a/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/JmsIbmListener.java +++ b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/JmsIbmMqDeptConsumer.java @@ -1,6 +1,5 @@ package com.docus.server.collect.dept.mq; -import com.docus.server.collect.user.mq.IUserMqCollectService; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -10,11 +9,9 @@ import javax.annotation.Resource; * 梅州中医医院 MQ 监听 */ @Component -public class JmsIbmListener { - @Resource +public class JmsIbmMqDeptConsumer { + @Resource(name = "deptJmsMqCollectServiceImpl") private IDeptMqCollectService deptMqCollectService; - @Resource - private IUserMqCollectService userMqCollectService; /** * 订阅队列(注册科室) @@ -37,27 +34,4 @@ public class JmsIbmListener { deptMqCollectService.updateDept(message); } - - /** - * 订阅队列(注册人员) - * - * @param message 人员信息 - */ -// @JmsIbmListener(destination = "TJ_createPractitioner") - public void addUser(String message) { - userMqCollectService.addUser(message); - } - - - /** - * 订阅队列(变更人员) - * - * @param message 人员信息 - */ -// @JmsIbmListener(destination = "TJ_updatePractitioner") - public void updateUser(String message) { - userMqCollectService.updateUser(message); - } - - } diff --git a/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java new file mode 100644 index 0000000..cef266c --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java @@ -0,0 +1,29 @@ +package com.docus.server.collect.dept.mq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +@Slf4j +@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") +public class RocketMqDeptConsumer implements RocketMQListener { + + @Resource(name = "deptJmsMqCollectServiceImpl") + private IDeptMqCollectService deptMqCollectService; + + @Override + public void onMessage(String message) { + try { + log.info("RocketMQ message:{}", message); + deptMqCollectService.addDept(message); + } catch (Exception e) { + log.error("errorMessage:{}", message); + // 抛出异常会重新消费消息 + throw new RuntimeException("Message processing failed", e); + } + } +} \ No newline at end of file diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/JmsIbmMqUserConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/JmsIbmMqUserConsumer.java new file mode 100644 index 0000000..34e58e8 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/JmsIbmMqUserConsumer.java @@ -0,0 +1,38 @@ +package com.docus.server.collect.user.mq; + +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author jiashi + * 梅州中医医院 MQ 监听 + */ +@Component +public class JmsIbmMqUserConsumer { + @Resource(name = "userJmsMqCollectServiceImpl") + private IUserMqCollectService userMqCollectService; + + /** + * 订阅队列(注册人员) + * + * @param message 人员信息 + */ +// @JmsIbmListener(destination = "TJ_createPractitioner") + public void addUser(String message) { + userMqCollectService.addUser(message); + } + + + /** + * 订阅队列(变更人员) + * + * @param message 人员信息 + */ +// @JmsIbmListener(destination = "TJ_updatePractitioner") + public void updateUser(String message) { + userMqCollectService.updateUser(message); + } + + +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java new file mode 100644 index 0000000..7f397f7 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java @@ -0,0 +1,29 @@ +package com.docus.server.collect.user.mq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +@Slf4j +@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") +public class RocketMqUserConsumer implements RocketMQListener { + + @Resource(name = "userRocketMqCollectServiceImpl") + private IUserMqCollectService userMqCollectService; + + @Override + public void onMessage(String message) { + try { + log.info("RocketMQ message:{}", message); + userMqCollectService.addUser(message); + } catch (Exception e) { + log.error("errorMessage:{}", message); + // 抛出异常会重新消费消息 + throw new RuntimeException("Message processing failed", e); + } + } +} \ No newline at end of file diff --git a/collect-sdry/src/main/resources/bootstrap.yml b/collect-sdry/src/main/resources/bootstrap.yml index f025b73..f8eff22 100644 --- a/collect-sdry/src/main/resources/bootstrap.yml +++ b/collect-sdry/src/main/resources/bootstrap.yml @@ -80,4 +80,9 @@ xxl: # replica: # queueManager: MqManager02 # channel: chl_server02 -# connName: 10.222.23.131(1414) \ No newline at end of file +# connName: 10.222.23.131(1414) + +rocketmq: + name-server: 127.0.01:9876 + producer: + group: ta-cipher-encode \ No newline at end of file diff --git a/common-collect/pom.xml b/common-collect/pom.xml index d433381..104df68 100644 --- a/common-collect/pom.xml +++ b/common-collect/pom.xml @@ -36,6 +36,5 @@ org.springframework.retry spring-retry - diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java new file mode 100644 index 0000000..41d283a --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java @@ -0,0 +1,19 @@ +package com.docus.server.collect.basic.mq; + +public interface IBasicMqCollectService { + + /** + * 订阅队列(基础数据) + * + * @param message 基础数据 + */ + void addTBasic(String message); + + /** + * 订阅队列(变更基础数据) + * + * @param message 基础数据 + */ + void updateTBasic(String message); + +} diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java new file mode 100644 index 0000000..2ec6b2f --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicJmsMqCollectServiceImpl.java @@ -0,0 +1,47 @@ +package com.docus.server.collect.basic.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.basic.mq.IBasicMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.record.pojo.dto.TBasicDTO; +import com.docus.server.record.service.ITBasicService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * 每家mq 都不一样,这部分无法预先写好写入,需要根据每家独立编写。 + */ +@Slf4j +@Service("basicJmsMqCollectServiceImpl") +public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService { + @Resource + private ITBasicService tBasicService; + + @Override + @TrackGroup( + group = "JMS_MQ_PASSIVE_INSERT_BASIC", + beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, + processor = VisitorProcessor.class) + public void addTBasic(String message) { + log.info("新增基础数据:{}", message); + tBasicService.insertTBasic(getValue()); + } + + @Override + @TrackGroup( + group = "JMS_MQ_PASSIVE_UPDATE_BASIC", + beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, + processor = VisitorProcessor.class) + public void updateTBasic(String message) { + log.info("修改基础数据:{}", message); + tBasicService.updateTBasic(getValue()); + } + + public TBasicDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), TBasicDTO.class); + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java new file mode 100644 index 0000000..a61027d --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRocketMqCollectServiceImpl.java @@ -0,0 +1,54 @@ +package com.docus.server.collect.basic.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.basic.mq.IBasicMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.sys.common.pojo.dto.DeptDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service("basicRocketMqCollectServiceImpl") +@Slf4j +public class BasicRocketMqCollectServiceImpl implements IBasicMqCollectService { + @Resource + private CollectService collectService; + + /** + * 订阅队列(基础数据) + * + * @param message 基础数据 + */ + @Override + @TrackGroup( + group = "ROCKET_MQ_PASSIVE_INSERT_DEPT", + beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, + processor = VisitorProcessor.class) + public void addTBasic(String message) { + log.info("注册基础数据 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + /** + * 订阅队列(变更基础数据) + * + * @param message 基础数据 + */ + @Override + @TrackGroup( + group = "ROCKET_MQ_PASSIVE_UPDATE_DEPT", + beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, + processor = VisitorProcessor.class) + public void updateTBasic(String message) { + log.info("变更基础数据 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + public DeptDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class); + } +} \ No newline at end of file diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java similarity index 88% rename from common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptMqCollectServiceImpl.java rename to common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java index 91bd3ce..2ea1f42 100644 --- a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptJmsMqCollectServiceImpl.java @@ -4,9 +4,9 @@ import com.docus.core.util.json.JSON; import com.docus.log.annotation.TrackGroup; import com.docus.log.context.TrackHelper; import com.docus.server.collect.dept.mq.IDeptMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.service.CollectService; import com.docus.server.sys.common.pojo.dto.DeptDTO; -import com.docus.server.collect.web.processor.VisitorProcessor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -16,8 +16,8 @@ import javax.annotation.Resource; * 每家mq 都不一样,这部分无法预先写好写入,需要根据每家独立编写。 */ @Slf4j -@Service -public class DeptMqCollectServiceImpl implements IDeptMqCollectService { +@Service("deptJmsMqCollectServiceImpl") +public class DeptJmsMqCollectServiceImpl implements IDeptMqCollectService { @Resource private CollectService collectService; @@ -28,7 +28,7 @@ public class DeptMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "MQ_PASSIVE_INSERT_DEPT", + group = "JMS_MQ_PASSIVE_INSERT_DEPT", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void addDept(String message) { @@ -44,7 +44,7 @@ public class DeptMqCollectServiceImpl implements IDeptMqCollectService { */ @Override @TrackGroup( - group = "MQ_PASSIVE_UPDATE_DEPT", + group = "JMS_MQ_PASSIVE_UPDATE_DEPT", beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, processor = VisitorProcessor.class) public void updateDept(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java new file mode 100644 index 0000000..115e740 --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRocketMqCollectServiceImpl.java @@ -0,0 +1,55 @@ +package com.docus.server.collect.dept.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.dept.mq.IDeptMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.sys.common.pojo.dto.DeptDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service("deptRocketMqCollectServiceImpl") +@Slf4j +public class DeptRocketMqCollectServiceImpl implements IDeptMqCollectService { + @Resource + private CollectService collectService; + + /** + * 订阅队列(注册科室) + * + * @param message 科室信息 + */ + @Override + @TrackGroup( + group = "ROCKET_MQ_PASSIVE_INSERT_DEPT", + beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, + processor = VisitorProcessor.class) + public void addDept(String message) { + log.info("注册科室 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + + /** + * 订阅队列(变更科室) + * + * @param message 科室信息 + */ + @Override + @TrackGroup( + group = "ROCKET_MQ_PASSIVE_UPDATE_DEPT", + beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, + processor = VisitorProcessor.class) + public void updateDept(String message) { + log.info("变更科室 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + public DeptDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class); + } +} \ No newline at end of file diff --git a/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java b/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java index 380cd75..dbe0dc8 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java @@ -6,19 +6,22 @@ import com.docus.log.annotation.TrackGroup; import com.docus.log.context.TrackHelper; import com.docus.server.collect.IConverter; import com.docus.server.collect.IHttpResult; +import com.docus.server.collect.user.mq.RmqProvdier; import com.docus.server.collect.web.enums.CollectTypeEnum; +import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.service.CollectService; import com.docus.server.sys.common.pojo.dto.UserDTO; -import com.docus.server.collect.web.processor.VisitorProcessor; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import javax.inject.Inject; /** @@ -37,6 +40,8 @@ import javax.annotation.Resource; public class UserRestController { @Resource private CollectService collectService; + @Inject + private RmqProvdier rmqProvdier; /** * 被动http 用户信息接收,进行操作 @@ -55,4 +60,12 @@ public class UserRestController { collectService.insertOrUpdateUser(JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class)); return null; } + + /** + * 测试rocket mq + */ + @GetMapping("/test") + public void test() { + rmqProvdier.send("hello world"); + } } diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java new file mode 100644 index 0000000..42773bc --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java @@ -0,0 +1,24 @@ +package com.docus.server.collect.user.mq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class RmqProvdier { + @Autowired + private RocketMQTemplate rocketMQTemplate; + + public boolean send(String message) { + try { + // 发送消息 + rocketMQTemplate.convertAndSend("ta-cipher-persist", message); + } catch (Exception e) { + log.error("send message:{}", message, e); + return false; + } + return true; + } +} \ No newline at end of file diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java similarity index 88% rename from common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserMqCollectServiceImpl.java rename to common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java index ae3e199..3a17294 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserMqCollectServiceImpl.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserJmsMqCollectServiceImpl.java @@ -4,9 +4,9 @@ import com.docus.core.util.json.JSON; import com.docus.log.annotation.TrackGroup; import com.docus.log.context.TrackHelper; import com.docus.server.collect.user.mq.IUserMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.service.CollectService; import com.docus.server.sys.common.pojo.dto.UserDTO; -import com.docus.server.collect.web.processor.VisitorProcessor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -16,8 +16,8 @@ import javax.annotation.Resource; * 每家mq 都不一样,这部分无法预先写好写入,需要根据每家独立编写。 */ @Slf4j -@Service -public class UserMqCollectServiceImpl implements IUserMqCollectService { +@Service("userJmsMqCollectServiceImpl") +public class UserJmsMqCollectServiceImpl implements IUserMqCollectService { @Resource private CollectService collectService; @@ -29,7 +29,7 @@ public class UserMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "MQ_PASSIVE_INSERT_DEPT", + group = "JMS_MQ_PASSIVE_INSERT_USER", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void addUser(String message) { @@ -44,7 +44,7 @@ public class UserMqCollectServiceImpl implements IUserMqCollectService { */ @Override @TrackGroup( - group = "MQ_PASSIVE_UPDATE_USER", + group = "JMS_MQ_PASSIVE_UPDATE_USER", beanNames = {"mqUserConverter", "mqUserResultImpl"}, processor = VisitorProcessor.class) public void updateUser(String message) { diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java new file mode 100644 index 0000000..946f460 --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRocketMqCollectServiceImpl.java @@ -0,0 +1,54 @@ +package com.docus.server.collect.user.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.user.mq.IUserMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.sys.common.pojo.dto.UserDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service("userRocketMqCollectServiceImpl") +@Slf4j +public class UserRocketMqCollectServiceImpl implements IUserMqCollectService { + @Resource + private CollectService collectService; + + /** + * 订阅队列(注册人员) + * + * @param message 人员信息 + */ + @Override + @TrackGroup( + group = "ROCKET_MQ_PASSIVE_INSERT_USER", + beanNames = {"mqUserConverter", "mqUserResultImpl"}, + processor = VisitorProcessor.class) + public void addUser(String message) { + log.info("人员注册 队列接收信息:{}", message); + collectService.insertOrUpdateUser(getValue()); + } + + /** + * 订阅队列(变更人员) + * + * @param message 人员信息 + */ + @Override + @TrackGroup( + group = "ROCKET_MQ_PASSIVE_UPDATE_USER", + beanNames = {"mqUserConverter", "mqUserResultImpl"}, + processor = VisitorProcessor.class) + public void updateUser(String message) { + log.info("人员变更 队列接收信息:{}", message); + collectService.insertOrUpdateUser(getValue()); + } + + public UserDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class); + } +} \ No newline at end of file diff --git a/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java b/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java index 784dc9c..ca42ca1 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java @@ -16,12 +16,19 @@ public enum CollectTypeEnum implements IIntegerEnum { HTTP_PASSIVE_DEPT(9, "HTTP_PASSIVE_DEPT", "新增/修改-科室信息"), HTTP_PASSIVE_BASIC(10, "HTTP_PASSIVE_BASIC", "新增/修改-基础数据"), - MQ_PASSIVE_INSERT_USER(11, "MQ_PASSIVE_USER", "新增-用户信息"), - MQ_PASSIVE_UPDATE_USER(12, "MQ_PASSIVE_USER", "修改-用户信息"), - MQ_PASSIVE_INSERT_DEPT(13, "MQ_PASSIVE_DEPT", "新增-科室信息"), - MQ_PASSIVE_UPDATE_DEPT(14, "MQ_PASSIVE_DEPT", "修改-科室信息"), - MQ_PASSIVE_INSERT_BASIC(15, "MQ_PASSIVE_BASIC", "新增-基础数据"), - MQ_PASSIVE_UPDATE_BASIC(16, "MQ_PASSIVE_BASIC", "修改-基础数据"), + JMS_MQ_PASSIVE_INSERT_USER(11, "JMS_MQ_PASSIVE_INSERT_USER", "新增-用户信息"), + JMS_MQ_PASSIVE_UPDATE_USER(12, "JMS_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"), + JMS_MQ_PASSIVE_INSERT_DEPT(13, "JMS_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"), + JMS_MQ_PASSIVE_UPDATE_DEPT(14, "JMS_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"), + JMS_MQ_PASSIVE_INSERT_BASIC(15, "JMS_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"), + JMS_MQ_PASSIVE_UPDATE_BASIC(16, "JMS_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"), + + ROCKET_MQ_PASSIVE_INSERT_USER(17, "ROCKET_MQ_PASSIVE_INSERT_USER", "新增-用户信息"), + ROCKET_MQ_PASSIVE_UPDATE_USER(18, "ROCKET_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"), + ROCKET_MQ_PASSIVE_INSERT_DEPT(19, "ROCKET_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"), + ROCKET_MQ_PASSIVE_UPDATE_DEPT(20, "ROCKET_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"), + ROCKET_MQ_PASSIVE_INSERT_BASIC(21, "ROCKET_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"), + ROCKET_MQ_PASSIVE_UPDATE_BASIC(22, "ROCKET_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"), ; private Integer value; diff --git a/pom.xml b/pom.xml index 520f9cc..2ef3e46 100644 --- a/pom.xml +++ b/pom.xml @@ -209,6 +209,12 @@ netty-all 4.1.87.Final + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.3 +