diff --git a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RabbitMqBasicConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RabbitMqBasicConsumer.java new file mode 100644 index 0000000..60dff5c --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RabbitMqBasicConsumer.java @@ -0,0 +1,56 @@ +package com.docus.server.collect.basic.mq; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 基于注解的绑定交换器、队列、路由键设置 + * 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false) + * 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false) + * 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔 + * 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置 + * 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue") + *

+ *

+ * rabbitmq basic consumer + * + * @author linruifeng + * @see com.docus.server.collect.basic.mq.impl.BasicRabbitMqCollectServiceImpl#addTBasic(String) + * @see com.docus.server.collect.user.mq.test.RabbitMQProducer#topicMQ03() + */ +@Component +@Slf4j +public class RabbitMqBasicConsumer { + @Resource(name = "basicRabbitMqCollectServiceImpl") + private IBasicMqCollectService basicMqCollectService; + + /** + * topic n:1 类型 交换器队列 消费(普通会员注册提醒) + */ + @RabbitListener( + bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), + exchange = @Exchange(value = "topic.exchange", type = "topic"), + key = {"*.reg.msg"})}) + public void getTopicMessage01(String message) { + log.info("rabbitmq message:{}", message); + basicMqCollectService.addTBasic(message); + } + + /** + * topic n:1 类型 交换器队列 消费(超级会员注册提醒) + */ + @RabbitListener(bindings = { + @QueueBinding(value = @Queue(value = "topic.queue.02"), + exchange = @Exchange(value = "topic.exchange", type = "topic"), + key = {"*.*.reg.msg.#"})}) + public void getTopicMessage02(String message) { + log.info("rabbitmq message:{}", message); + basicMqCollectService.updateTBasic(message); + } +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java index 4a46eb3..cc07646 100644 --- a/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java +++ b/collect-sdry/src/main/java/com/docus/server/collect/basic/mq/RocketMqBasicConsumer.java @@ -1,15 +1,21 @@ package com.docus.server.collect.basic.mq; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; +/** + * rocketmq 基础数据消费者 + * + * @see com.docus.server.collect.user.mq.test.RocketMQProvdier + * @see com.docus.server.collect.basic.mq.impl.BasicRocketMqCollectServiceImpl#addTBasic(String) + */ @Component @Slf4j -@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") +//启用rockertmq consumer,打开下面的注解监听 +//@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") public class RocketMqBasicConsumer implements RocketMQListener { @Resource(name = "basicRocketMqCollectServiceImpl") @@ -26,4 +32,4 @@ public class RocketMqBasicConsumer implements RocketMQListener { throw new RuntimeException("Message processing failed", e); } } -} \ No newline at end of file +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RabbitMqDeptConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RabbitMqDeptConsumer.java new file mode 100644 index 0000000..e932b36 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RabbitMqDeptConsumer.java @@ -0,0 +1,55 @@ +package com.docus.server.collect.dept.mq; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 基于注解的绑定交换器、队列、路由键设置 + * 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false) + * 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false) + * 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔 + * 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置 + * 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue") + *

+ * rabbitmq 被动接收用户信息 + * + * @author linruifeng + * @see com.docus.server.collect.user.mq.impl.UserRabbitMqCollectServiceImpl#addUser(String) + * @see com.docus.server.collect.user.mq.test.RabbitMQProducer#topicMQ03() + */ +@Component +@Slf4j +public class RabbitMqDeptConsumer { + @Resource(name = "deptRabbitMqCollectServiceImpl") + private IDeptMqCollectService deptMqCollectService; + + /** + * topic n:1 类型 交换器队列 消费(普通会员注册提醒) + */ + @RabbitListener( + bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), + exchange = @Exchange(value = "topic.exchange", type = "topic"), + key = {"*.reg.msg"})}) + public void getTopicMessage01(String message) { + log.info("rabbitmq message:{}", message); + deptMqCollectService.addDept(message); + } + + /** + * topic n:1 类型 交换器队列 消费(超级会员注册提醒) + */ + @RabbitListener(bindings = { + @QueueBinding(value = @Queue(value = "topic.queue.02"), + exchange = @Exchange(value = "topic.exchange", type = "topic"), + key = {"*.*.reg.msg.#"})}) + public void getTopicMessage02(String message) { + log.info("rabbitmq message:{}", message); + deptMqCollectService.updateDept(message); + } +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java index cef266c..7d53599 100644 --- a/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java +++ b/collect-sdry/src/main/java/com/docus/server/collect/dept/mq/RocketMqDeptConsumer.java @@ -1,15 +1,21 @@ package com.docus.server.collect.dept.mq; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; +/** + * rocketmq 科室数据消费者 + * + * @see com.docus.server.collect.user.mq.RocketMqUserConsumer#send(String) + * @see com.docus.server.collect.user.mq.test.MQTestRestController + */ @Component @Slf4j -@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") +//启用rockertmq consumer,打开下面的注解监听 +//@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") public class RocketMqDeptConsumer implements RocketMQListener { @Resource(name = "deptJmsMqCollectServiceImpl") @@ -26,4 +32,4 @@ public class RocketMqDeptConsumer implements RocketMQListener { throw new RuntimeException("Message processing failed", e); } } -} \ No newline at end of file +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RabbitMqUserConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RabbitMqUserConsumer.java new file mode 100644 index 0000000..4d54128 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RabbitMqUserConsumer.java @@ -0,0 +1,55 @@ +package com.docus.server.collect.user.mq; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 基于注解的绑定交换器、队列、路由键设置 + * 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false) + * 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false) + * 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔 + * 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置 + * 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue") + *

+ * rabbitmq 被动接收用户信息 + * + * @author linruifeng + * @see com.docus.server.collect.user.mq.impl.UserRabbitMqCollectServiceImpl#addUser(String) + * @see com.docus.server.collect.user.mq.test.RabbitMQProducer#topicMQ03() + */ +@Component +@Slf4j +public class RabbitMqUserConsumer { + @Resource(name = "userRabbitMqCollectServiceImpl") + private IUserMqCollectService userMqCollectService; + + /** + * topic n:1 类型 交换器队列 消费(普通会员注册提醒) + */ + @RabbitListener( + bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), + exchange = @Exchange(value = "topic.exchange", type = "topic"), + key = {"*.reg.msg"})}) + public void getTopicMessage01(String message) { + log.info("rabbitmq message:{}", message); + userMqCollectService.addUser(message); + } + + /** + * topic n:1 类型 交换器队列 消费(超级会员注册提醒) + */ + @RabbitListener(bindings = { + @QueueBinding(value = @Queue(value = "topic.queue.02"), + exchange = @Exchange(value = "topic.exchange", type = "topic"), + key = {"*.*.reg.msg.#"})}) + public void getTopicMessage02(String message) { + log.info("rabbitmq message:{}", message); + userMqCollectService.updateUser(message); + } +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java index 7f397f7..55eab49 100644 --- a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/RocketMqUserConsumer.java @@ -1,15 +1,18 @@ package com.docus.server.collect.user.mq; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; +/** + * rocketmq 被动接收用户信息 + */ @Component @Slf4j -@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") +//启用rockertmq consumer,打开下面的注解监听 +//@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") public class RocketMqUserConsumer implements RocketMQListener { @Resource(name = "userRocketMqCollectServiceImpl") @@ -18,7 +21,7 @@ public class RocketMqUserConsumer implements RocketMQListener { @Override public void onMessage(String message) { try { - log.info("RocketMQ message:{}", message); + log.info("rocketmq message:{}", message); userMqCollectService.addUser(message); } catch (Exception e) { log.error("errorMessage:{}", message); @@ -26,4 +29,4 @@ public class RocketMqUserConsumer implements RocketMQListener { throw new RuntimeException("Message processing failed", e); } } -} \ No newline at end of file +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/MQTestRestController.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/MQTestRestController.java new file mode 100644 index 0000000..cba76e5 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/MQTestRestController.java @@ -0,0 +1,37 @@ +package com.docus.server.collect.user.mq.test; + +import com.docus.server.collect.IConverter; +import com.docus.server.collect.IHttpResult; +import com.docus.server.collect.web.enums.CollectTypeEnum; +import com.docus.server.collect.web.processor.VisitorProcessor; +import io.swagger.annotations.Api; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + + +/** + * @author wen yongbin + * @date 2023年2月25日21:56:33 + * @see CollectTypeEnum 枚举 + * @see IConverter 通用转化器 + * @see IHttpResult 通用返回结果 + * @see VisitorProcessor 通用处理器 + */ + +@Api(value = "用户管理接口", tags = "用户管理接口") +@Slf4j +@RestController +@RequestMapping("/api/rest/user") +public class MQTestRestController { +// @Inject +// private RmqProvdier rmqProvdier; + +// /** +// * 测试rocket mq +// */ +// @GetMapping("/test") +// public void test() { +// rmqProvdier.send("hello world"); +// } +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/RabbitMQProducer.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/RabbitMQProducer.java new file mode 100644 index 0000000..d57326c --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/RabbitMQProducer.java @@ -0,0 +1,60 @@ +package com.docus.server.collect.user.mq.test; + +import com.docus.server.collect.web.domain.TaskConfig; +import com.docus.server.collect.web.service.impl.TaskConfigServiceImpl; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import javax.inject.Inject; +import java.util.List; + +/** + * @Description 生产者 + * @Author jxb + * @Date 2019-03-09 09:43:47 + */ +@RestController +@RequestMapping("mqProducer") +public class RabbitMQProducer { + + @Autowired + public RabbitTemplate rabbitTemplate; + @Inject + private TaskConfigServiceImpl userService; + + /** + * @Description topic n:1 类型 交换器队列 生产(3个) + * @Author jxb + * @Date 2019-03-09 09:56:45 + */ + @RequestMapping(value = "/topicMQ01", method = {RequestMethod.GET}) + public List topicMQ01() { + List users = userService.list(null); + for (TaskConfig user : users) { + rabbitTemplate.convertAndSend("topic.exchange", "jd.reg.msg", user.getName()); + } + return users; + } + + @RequestMapping(value = "/topicMQ02", method = {RequestMethod.GET}) + public List topicMQ02() { + List users = userService.list(null); + for (TaskConfig user : users) { + rabbitTemplate.convertAndSend("topic.exchange", "tm.reg.msg", user.getName()); + } + return users; + } + + @RequestMapping(value = "/topicMQ03", method = {RequestMethod.GET}) + public List topicMQ03() { + List users = userService.list(null); + for (TaskConfig user : users) { + rabbitTemplate.convertAndSend("topic.exchange", "super.fzb.reg.msg", user.getName()); + } + return users; + } + +} diff --git a/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/RocketMQProvdier.java b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/RocketMQProvdier.java new file mode 100644 index 0000000..2d410c8 --- /dev/null +++ b/collect-sdry/src/main/java/com/docus/server/collect/user/mq/test/RocketMQProvdier.java @@ -0,0 +1,28 @@ +package com.docus.server.collect.user.mq.test; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +// +//import lombok.extern.slf4j.Slf4j; +//import org.apache.rocketmq.spring.core.RocketMQTemplate; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Component; +// +@Component +@Slf4j +public class RocketMQProvdier { +// @Autowired +// private RocketMQTemplate rocketMQTemplate; +// +// public boolean send(String message) { +// try { +// // 发送消息 +// rocketMQTemplate.convertAndSend("ta-cipher-persist", message); +// } catch (Exception e) { +// log.error("send message:{}", message, e); +// return false; +// } +// return true; +// } +} diff --git a/collect-sdry/src/main/resources/bootstrap.yml b/collect-sdry/src/main/resources/bootstrap.yml index f8eff22..df8ace3 100644 --- a/collect-sdry/src/main/resources/bootstrap.yml +++ b/collect-sdry/src/main/resources/bootstrap.yml @@ -12,21 +12,24 @@ spring: datasource: master: url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus_archivefile?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai - username: docus - password: docus702 + username: root + password: root driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy type: com.alibaba.druid.pool.DruidDataSource his: - url: jdbc:log4jdbc:mysql://db.docus.cn:3306/his?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai - username: docus - password: docus702 + url: jdbc:log4jdbc:mysql://127.0.0.1:3306/his?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai + username: root + password: root driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy type: com.alibaba.druid.pool.DruidDataSource - - + # rabbitmq: + # host: 127.0.0.1 + # port: 5672 + # username: guest + # password: guest redis: host: redis.docus.cn - password: JSdocus@702 + # password: JSdocus@702 cloud: nacos: discovery: @@ -41,20 +44,21 @@ spring: docus: dbtype: mysql - user: - # 用户默认密码 - defpwd: fd29cd53ec12616e5f36b77d4afffbff mybatis-plus: configuration: map-underscore-to-camel-case: true call-setters-on-nulls: true + jdbc-type-for-null: null log-impl: org.apache.ibatis.logging.stdout.StdOutImpl global-config: db-config: + update-strategy: ignored field-strategy: NOT_EMPTY db-type: MYSQL + mapper-locations: classpath*:/mapper/*Mapper.xml type-enums-package: com.docus.server.collect.web.enums + xxl: job: accessToken: @@ -68,21 +72,16 @@ xxl: logretentiondays: 30 logpath: D:/xxl-job/inspection -# -#ibm: -# mq: -# queueManager: MqManager01 -# channel: chl_server01 -# connName: 10.222.23.130(1414) -# ackUrl: 10.222.23.159:7800/ack -# domain: CDR -# key: cdr-0012-adef -# replica: -# queueManager: MqManager02 -# channel: chl_server02 -# connName: 10.222.23.131(1414) +api: + soft-delete-column-name=state + soft-delete-column-value=1 +logging: + level: + com: + baomidou: + mybatisplus: DEBUG -rocketmq: - name-server: 127.0.01:9876 - producer: - group: ta-cipher-encode \ No newline at end of file +#rocketmq: +# name-server: 127.0.01:9876 +# producer: +# group: ta-cipher-encode diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java index 41d283a..ee6b1f4 100644 --- a/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/IBasicMqCollectService.java @@ -1,5 +1,8 @@ package com.docus.server.collect.basic.mq; +/** + * @author linruifeng + */ public interface IBasicMqCollectService { /** diff --git a/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java new file mode 100644 index 0000000..1d62dba --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/basic/mq/impl/BasicRabbitMqCollectServiceImpl.java @@ -0,0 +1,57 @@ +package com.docus.server.collect.basic.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.basic.mq.IBasicMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.sys.common.pojo.dto.DeptDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author linruifeng + */ +@Service("basicRabbitMqCollectServiceImpl") +@Slf4j +public class BasicRabbitMqCollectServiceImpl implements IBasicMqCollectService { + @Resource + private CollectService collectService; + + /** + * 订阅队列(基础数据) + * + * @param message 基础数据 + */ + @Override + @TrackGroup( + group = "RABBIT_MQ_PASSIVE_INSERT_BASIC", + beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, + processor = VisitorProcessor.class) + public void addTBasic(String message) { + log.info("注册基础数据 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + /** + * 订阅队列(变更基础数据) + * + * @param message 基础数据 + */ + @Override + @TrackGroup( + group = "RABBIT_MQ_PASSIVE_UPDATE_BASIC", + beanNames = {"mqBasicConverter", "mqBasicResultImpl"}, + processor = VisitorProcessor.class) + public void updateTBasic(String message) { + log.info("变更基础数据 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + public DeptDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class); + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java new file mode 100644 index 0000000..a77d308 --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/dept/mq/impl/DeptRabbitMqCollectServiceImpl.java @@ -0,0 +1,55 @@ +package com.docus.server.collect.dept.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.dept.mq.IDeptMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.sys.common.pojo.dto.DeptDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service("deptRabbitMqCollectServiceImpl") +@Slf4j +public class DeptRabbitMqCollectServiceImpl implements IDeptMqCollectService { + @Resource + private CollectService collectService; + + /** + * 订阅队列(注册科室) + * + * @param message 科室信息 + */ + @Override + @TrackGroup( + group = "RABBIT_MQ_PASSIVE_INSERT_DEPT", + beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, + processor = VisitorProcessor.class) + public void addDept(String message) { + log.info("注册科室 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + + /** + * 订阅队列(变更科室) + * + * @param message 科室信息 + */ + @Override + @TrackGroup( + group = "RABBIT_MQ_PASSIVE_UPDATE_DEPT", + beanNames = {"mqDeptConverter", "mqDeptResultImpl"}, + processor = VisitorProcessor.class) + public void updateDept(String message) { + log.info("变更科室 队列接收信息:{}", message); + collectService.insertOrUpdateDept(getValue()); + } + + public DeptDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class); + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java b/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java index dbe0dc8..aa50cd3 100644 --- a/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java +++ b/common-collect/src/main/java/com/docus/server/collect/user/http/UserRestController.java @@ -6,7 +6,6 @@ import com.docus.log.annotation.TrackGroup; import com.docus.log.context.TrackHelper; import com.docus.server.collect.IConverter; import com.docus.server.collect.IHttpResult; -import com.docus.server.collect.user.mq.RmqProvdier; import com.docus.server.collect.web.enums.CollectTypeEnum; import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.service.CollectService; @@ -14,14 +13,12 @@ import com.docus.server.sys.common.pojo.dto.UserDTO; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; -import javax.inject.Inject; /** @@ -40,8 +37,6 @@ import javax.inject.Inject; public class UserRestController { @Resource private CollectService collectService; - @Inject - private RmqProvdier rmqProvdier; /** * 被动http 用户信息接收,进行操作 @@ -60,12 +55,4 @@ public class UserRestController { collectService.insertOrUpdateUser(JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class)); return null; } - - /** - * 测试rocket mq - */ - @GetMapping("/test") - public void test() { - rmqProvdier.send("hello world"); - } } diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java deleted file mode 100644 index 42773bc..0000000 --- a/common-collect/src/main/java/com/docus/server/collect/user/mq/RmqProvdier.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.docus.server.collect.user.mq; - -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.core.RocketMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -public class RmqProvdier { - @Autowired - private RocketMQTemplate rocketMQTemplate; - - public boolean send(String message) { - try { - // 发送消息 - rocketMQTemplate.convertAndSend("ta-cipher-persist", message); - } catch (Exception e) { - log.error("send message:{}", message, e); - return false; - } - return true; - } -} \ No newline at end of file diff --git a/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java new file mode 100644 index 0000000..e5ca814 --- /dev/null +++ b/common-collect/src/main/java/com/docus/server/collect/user/mq/impl/UserRabbitMqCollectServiceImpl.java @@ -0,0 +1,54 @@ +package com.docus.server.collect.user.mq.impl; + +import com.docus.core.util.json.JSON; +import com.docus.log.annotation.TrackGroup; +import com.docus.log.context.TrackHelper; +import com.docus.server.collect.user.mq.IUserMqCollectService; +import com.docus.server.collect.web.processor.VisitorProcessor; +import com.docus.server.collect.web.service.CollectService; +import com.docus.server.sys.common.pojo.dto.UserDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service("userRabbitMqCollectServiceImpl") +@Slf4j +public class UserRabbitMqCollectServiceImpl implements IUserMqCollectService { + @Resource + private CollectService collectService; + + /** + * 订阅队列(注册人员) + * + * @param message 人员信息 + */ + @Override + @TrackGroup( + group = "RABBIT_MQ_PASSIVE_INSERT_USER", + beanNames = {"mqUserConverter", "mqUserResultImpl"}, + processor = VisitorProcessor.class) + public void addUser(String message) { + log.info("人员注册 队列接收信息:{}", message); + collectService.insertOrUpdateUser(getValue()); + } + + /** + * 订阅队列(变更人员) + * + * @param message 人员信息 + */ + @Override + @TrackGroup( + group = "RABBIT_MQ_PASSIVE_UPDATE_USER", + beanNames = {"mqUserConverter", "mqUserResultImpl"}, + processor = VisitorProcessor.class) + public void updateUser(String message) { + log.info("人员变更 队列接收信息:{}", message); + collectService.insertOrUpdateUser(getValue()); + } + + public UserDTO getValue() { + return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class); + } +} diff --git a/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java b/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java index ca42ca1..08414c9 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/enums/CollectTypeEnum.java @@ -2,6 +2,9 @@ package com.docus.server.collect.web.enums; public enum CollectTypeEnum implements IIntegerEnum { + /** + * WS + */ WEBSERVICE_XML_DEPT(0, "WEBSERVICE_XML_DEPT", "新增/修改-科室信息"), WEBSERVICE_XML_USER(1, "WEBSERVICE_XML_USER", "新增/修改-用户信息"), WEBSERVICE_XML_INSERT_BASIC(2, "WEBSERVICE_XML_BASIC", "新增/修改-基础数据"), @@ -12,10 +15,16 @@ public enum CollectTypeEnum implements IIntegerEnum { WEBSERVICE_XML_INSERT_INSPECTION_REPORT(6, "WEBSERVICE_XML_INSERT_INSPECTION_REPORT", "新增-检查报告的信息"), WEBSERVICE_XML_UPDATE_INSPECTION_REPORT(7, "WEBSERVICE_XML_UPDATE_INSPECTION_REPORT", "更新-检查报告的信息"), + /** + * HTTP + */ HTTP_PASSIVE_USER(8, "HTTP_PASSIVE_USER", "新增/修改-用户信息"), HTTP_PASSIVE_DEPT(9, "HTTP_PASSIVE_DEPT", "新增/修改-科室信息"), HTTP_PASSIVE_BASIC(10, "HTTP_PASSIVE_BASIC", "新增/修改-基础数据"), + /** + * MQ + */ JMS_MQ_PASSIVE_INSERT_USER(11, "JMS_MQ_PASSIVE_INSERT_USER", "新增-用户信息"), JMS_MQ_PASSIVE_UPDATE_USER(12, "JMS_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"), JMS_MQ_PASSIVE_INSERT_DEPT(13, "JMS_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"), @@ -29,6 +38,13 @@ public enum CollectTypeEnum implements IIntegerEnum { ROCKET_MQ_PASSIVE_UPDATE_DEPT(20, "ROCKET_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"), ROCKET_MQ_PASSIVE_INSERT_BASIC(21, "ROCKET_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"), ROCKET_MQ_PASSIVE_UPDATE_BASIC(22, "ROCKET_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"), + + RABBIT_MQ_PASSIVE_INSERT_USER(23, "RABBIT_MQ_PASSIVE_INSERT_USER", "新增-用户信息"), + RABBIT_MQ_PASSIVE_UPDATE_USER(24, "RABBIT_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"), + RABBIT_MQ_PASSIVE_INSERT_DEPT(25, "RABBIT_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"), + RABBIT_MQ_PASSIVE_UPDATE_DEPT(26, "RABBIT_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"), + RABBIT_MQ_PASSIVE_INSERT_BASIC(27, "RABBIT_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"), + RABBIT_MQ_PASSIVE_UPDATE_BASIC(28, "RABBIT_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"), ; private Integer value; diff --git a/common-collect/src/main/java/com/docus/server/collect/web/processor/VisitorProcessor.java b/common-collect/src/main/java/com/docus/server/collect/web/processor/VisitorProcessor.java index e3f02c0..620ae8c 100644 --- a/common-collect/src/main/java/com/docus/server/collect/web/processor/VisitorProcessor.java +++ b/common-collect/src/main/java/com/docus/server/collect/web/processor/VisitorProcessor.java @@ -41,7 +41,7 @@ public class VisitorProcessor extends AbstractProcessor { Map params = context.getParams(); params.put("taskId", taskId); params.put("jsonStr", jsonStr); - params.put("methodName", context.getMethodName()); + params.put("group", context.getGroup()); return null; } @@ -67,6 +67,7 @@ public class VisitorProcessor extends AbstractProcessor { return result.ok(params); } else { log.info("=== AOP 异常通知 ==="); + log.error((String) params.get(context.getExMessageResult())); messageService.updateTaskOriginalMessage(taskId, afterReturnResult, StateEnum.FAIL); return result.fail(params); } diff --git a/pom.xml b/pom.xml index 2ef3e46..7822d49 100644 --- a/pom.xml +++ b/pom.xml @@ -209,12 +209,17 @@ netty-all 4.1.87.Final - + org.apache.rocketmq rocketmq-spring-boot-starter 2.2.3 + + + org.springframework.boot + spring-boot-starter-amqp +