被动rabbitmq模板封装

segment2.0
beeajax 2 years ago
parent 0c2e7cb8cc
commit 16e1390263

@ -0,0 +1,56 @@
package com.docus.server.collect.basic.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
*
* 1. Queuevalue=durable=(true)exclusive=connection(false)autoDelete=(false)
* 2. Exchangevalue=type=(direct)durable=(true)autoDelete=(false)
* 3. QueueBindingkey=(string* # )*#0N.
* 4. RabbitListener bindings=Queue+Exchange+QueueBinding
* @RabbitListener(queues = "direct.queue")
* <p>
* <p>
* rabbitmq basic consumer
*
* @author linruifeng
* @see com.docus.server.collect.basic.mq.impl.BasicRabbitMqCollectServiceImpl#addTBasic(String)
* @see com.docus.server.collect.user.mq.test.RabbitMQProducer#topicMQ03()
*/
@Component
@Slf4j
public class RabbitMqBasicConsumer {
@Resource(name = "basicRabbitMqCollectServiceImpl")
private IBasicMqCollectService basicMqCollectService;
/**
* topic n:1 ()
*/
@RabbitListener(
bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = {"*.reg.msg"})})
public void getTopicMessage01(String message) {
log.info("rabbitmq message:{}", message);
basicMqCollectService.addTBasic(message);
}
/**
* topic n:1 ()
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic.queue.02"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = {"*.*.reg.msg.#"})})
public void getTopicMessage02(String message) {
log.info("rabbitmq message:{}", message);
basicMqCollectService.updateTBasic(message);
}
}

@ -1,15 +1,21 @@
package com.docus.server.collect.basic.mq; package com.docus.server.collect.basic.mq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
/**
* rocketmq
*
* @see com.docus.server.collect.user.mq.test.RocketMQProvdier
* @see com.docus.server.collect.basic.mq.impl.BasicRocketMqCollectServiceImpl#addTBasic(String)
*/
@Component @Component
@Slf4j @Slf4j
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") //启用rockertmq consumer打开下面的注解监听
//@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
public class RocketMqBasicConsumer implements RocketMQListener<String> { public class RocketMqBasicConsumer implements RocketMQListener<String> {
@Resource(name = "basicRocketMqCollectServiceImpl") @Resource(name = "basicRocketMqCollectServiceImpl")

@ -0,0 +1,55 @@
package com.docus.server.collect.dept.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
*
* 1. Queuevalue=durable=(true)exclusive=connection(false)autoDelete=(false)
* 2. Exchangevalue=type=(direct)durable=(true)autoDelete=(false)
* 3. QueueBindingkey=(string* # )*#0N.
* 4. RabbitListener bindings=Queue+Exchange+QueueBinding
* @RabbitListener(queues = "direct.queue")
* <p>
* rabbitmq
*
* @author linruifeng
* @see com.docus.server.collect.user.mq.impl.UserRabbitMqCollectServiceImpl#addUser(String)
* @see com.docus.server.collect.user.mq.test.RabbitMQProducer#topicMQ03()
*/
@Component
@Slf4j
public class RabbitMqDeptConsumer {
@Resource(name = "deptRabbitMqCollectServiceImpl")
private IDeptMqCollectService deptMqCollectService;
/**
* topic n:1 ()
*/
@RabbitListener(
bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = {"*.reg.msg"})})
public void getTopicMessage01(String message) {
log.info("rabbitmq message:{}", message);
deptMqCollectService.addDept(message);
}
/**
* topic n:1 ()
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic.queue.02"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = {"*.*.reg.msg.#"})})
public void getTopicMessage02(String message) {
log.info("rabbitmq message:{}", message);
deptMqCollectService.updateDept(message);
}
}

@ -1,15 +1,21 @@
package com.docus.server.collect.dept.mq; package com.docus.server.collect.dept.mq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
/**
* rocketmq
*
* @see com.docus.server.collect.user.mq.RocketMqUserConsumer#send(String)
* @see com.docus.server.collect.user.mq.test.MQTestRestController
*/
@Component @Component
@Slf4j @Slf4j
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") //启用rockertmq consumer打开下面的注解监听
//@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
public class RocketMqDeptConsumer implements RocketMQListener<String> { public class RocketMqDeptConsumer implements RocketMQListener<String> {
@Resource(name = "deptJmsMqCollectServiceImpl") @Resource(name = "deptJmsMqCollectServiceImpl")

@ -0,0 +1,55 @@
package com.docus.server.collect.user.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
*
* 1. Queuevalue=durable=(true)exclusive=connection(false)autoDelete=(false)
* 2. Exchangevalue=type=(direct)durable=(true)autoDelete=(false)
* 3. QueueBindingkey=(string* # )*#0N.
* 4. RabbitListener bindings=Queue+Exchange+QueueBinding
* @RabbitListener(queues = "direct.queue")
* <p>
* rabbitmq
*
* @author linruifeng
* @see com.docus.server.collect.user.mq.impl.UserRabbitMqCollectServiceImpl#addUser(String)
* @see com.docus.server.collect.user.mq.test.RabbitMQProducer#topicMQ03()
*/
@Component
@Slf4j
public class RabbitMqUserConsumer {
@Resource(name = "userRabbitMqCollectServiceImpl")
private IUserMqCollectService userMqCollectService;
/**
* topic n:1 ()
*/
@RabbitListener(
bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = {"*.reg.msg"})})
public void getTopicMessage01(String message) {
log.info("rabbitmq message:{}", message);
userMqCollectService.addUser(message);
}
/**
* topic n:1 ()
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic.queue.02"),
exchange = @Exchange(value = "topic.exchange", type = "topic"),
key = {"*.*.reg.msg.#"})})
public void getTopicMessage02(String message) {
log.info("rabbitmq message:{}", message);
userMqCollectService.updateUser(message);
}
}

@ -1,15 +1,18 @@
package com.docus.server.collect.user.mq; package com.docus.server.collect.user.mq;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
/**
* rocketmq
*/
@Component @Component
@Slf4j @Slf4j
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode") //启用rockertmq consumer打开下面的注解监听
//@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
public class RocketMqUserConsumer implements RocketMQListener<String> { public class RocketMqUserConsumer implements RocketMQListener<String> {
@Resource(name = "userRocketMqCollectServiceImpl") @Resource(name = "userRocketMqCollectServiceImpl")
@ -18,7 +21,7 @@ public class RocketMqUserConsumer implements RocketMQListener<String> {
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
log.info("RocketMQ message:{}", message); log.info("rocketmq message:{}", message);
userMqCollectService.addUser(message); userMqCollectService.addUser(message);
} catch (Exception e) { } catch (Exception e) {
log.error("errorMessage:{}", message); log.error("errorMessage:{}", message);

@ -0,0 +1,37 @@
package com.docus.server.collect.user.mq.test;
import com.docus.server.collect.IConverter;
import com.docus.server.collect.IHttpResult;
import com.docus.server.collect.web.enums.CollectTypeEnum;
import com.docus.server.collect.web.processor.VisitorProcessor;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wen yongbin
* @date 202322521:56:33
* @see CollectTypeEnum
* @see IConverter
* @see IHttpResult
* @see VisitorProcessor
*/
@Api(value = "用户管理接口", tags = "用户管理接口")
@Slf4j
@RestController
@RequestMapping("/api/rest/user")
public class MQTestRestController {
// @Inject
// private RmqProvdier rmqProvdier;
// /**
// * 测试rocket mq
// */
// @GetMapping("/test")
// public void test() {
// rmqProvdier.send("hello world");
// }
}

@ -0,0 +1,60 @@
package com.docus.server.collect.user.mq.test;
import com.docus.server.collect.web.domain.TaskConfig;
import com.docus.server.collect.web.service.impl.TaskConfigServiceImpl;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.inject.Inject;
import java.util.List;
/**
* @Description
* @Author jxb
* @Date 2019-03-09 09:43:47
*/
@RestController
@RequestMapping("mqProducer")
public class RabbitMQProducer {
@Autowired
public RabbitTemplate rabbitTemplate;
@Inject
private TaskConfigServiceImpl userService;
/**
* @Description topic n:1 (3)
* @Author jxb
* @Date 2019-03-09 09:56:45
*/
@RequestMapping(value = "/topicMQ01", method = {RequestMethod.GET})
public List<TaskConfig> topicMQ01() {
List<TaskConfig> users = userService.list(null);
for (TaskConfig user : users) {
rabbitTemplate.convertAndSend("topic.exchange", "jd.reg.msg", user.getName());
}
return users;
}
@RequestMapping(value = "/topicMQ02", method = {RequestMethod.GET})
public List<TaskConfig> topicMQ02() {
List<TaskConfig> users = userService.list(null);
for (TaskConfig user : users) {
rabbitTemplate.convertAndSend("topic.exchange", "tm.reg.msg", user.getName());
}
return users;
}
@RequestMapping(value = "/topicMQ03", method = {RequestMethod.GET})
public List<TaskConfig> topicMQ03() {
List<TaskConfig> users = userService.list(null);
for (TaskConfig user : users) {
rabbitTemplate.convertAndSend("topic.exchange", "super.fzb.reg.msg", user.getName());
}
return users;
}
}

@ -0,0 +1,28 @@
package com.docus.server.collect.user.mq.test;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
//
//import lombok.extern.slf4j.Slf4j;
//import org.apache.rocketmq.spring.core.RocketMQTemplate;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
@Component
@Slf4j
public class RocketMQProvdier {
// @Autowired
// private RocketMQTemplate rocketMQTemplate;
//
// public boolean send(String message) {
// try {
// // 发送消息
// rocketMQTemplate.convertAndSend("ta-cipher-persist", message);
// } catch (Exception e) {
// log.error("send message:{}", message, e);
// return false;
// }
// return true;
// }
}

@ -12,21 +12,24 @@ spring:
datasource: datasource:
master: master:
url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus_archivefile?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai url: jdbc:log4jdbc:mysql://db.docus.cn:3306/docus_archivefile?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: docus username: root
password: docus702 password: root
driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
his: his:
url: jdbc:log4jdbc:mysql://db.docus.cn:3306/his?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai url: jdbc:log4jdbc:mysql://127.0.0.1:3306/his?autoReconnect=true&allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: docus username: root
password: docus702 password: root
driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
# rabbitmq:
# host: 127.0.0.1
# port: 5672
# username: guest
# password: guest
redis: redis:
host: redis.docus.cn host: redis.docus.cn
password: JSdocus@702 # password: JSdocus@702
cloud: cloud:
nacos: nacos:
discovery: discovery:
@ -41,20 +44,21 @@ spring:
docus: docus:
dbtype: mysql dbtype: mysql
user:
# 用户默认密码
defpwd: fd29cd53ec12616e5f36b77d4afffbff
mybatis-plus: mybatis-plus:
configuration: configuration:
map-underscore-to-camel-case: true map-underscore-to-camel-case: true
call-setters-on-nulls: true call-setters-on-nulls: true
jdbc-type-for-null: null
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
global-config: global-config:
db-config: db-config:
update-strategy: ignored
field-strategy: NOT_EMPTY field-strategy: NOT_EMPTY
db-type: MYSQL db-type: MYSQL
mapper-locations: classpath*:/mapper/*Mapper.xml
type-enums-package: com.docus.server.collect.web.enums type-enums-package: com.docus.server.collect.web.enums
xxl: xxl:
job: job:
accessToken: accessToken:
@ -68,21 +72,16 @@ xxl:
logretentiondays: 30 logretentiondays: 30
logpath: D:/xxl-job/inspection logpath: D:/xxl-job/inspection
# api:
#ibm: soft-delete-column-name=state
# mq: soft-delete-column-value=1
# queueManager: MqManager01 logging:
# channel: chl_server01 level:
# connName: 10.222.23.130(1414) com:
# ackUrl: 10.222.23.159:7800/ack baomidou:
# domain: CDR mybatisplus: DEBUG
# key: cdr-0012-adef
# replica:
# queueManager: MqManager02
# channel: chl_server02
# connName: 10.222.23.131(1414)
rocketmq: #rocketmq:
name-server: 127.0.01:9876 # name-server: 127.0.01:9876
producer: # producer:
group: ta-cipher-encode # group: ta-cipher-encode

@ -1,5 +1,8 @@
package com.docus.server.collect.basic.mq; package com.docus.server.collect.basic.mq;
/**
* @author linruifeng
*/
public interface IBasicMqCollectService { public interface IBasicMqCollectService {
/** /**

@ -0,0 +1,57 @@
package com.docus.server.collect.basic.mq.impl;
import com.docus.core.util.json.JSON;
import com.docus.log.annotation.TrackGroup;
import com.docus.log.context.TrackHelper;
import com.docus.server.collect.basic.mq.IBasicMqCollectService;
import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService;
import com.docus.server.sys.common.pojo.dto.DeptDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author linruifeng
*/
@Service("basicRabbitMqCollectServiceImpl")
@Slf4j
public class BasicRabbitMqCollectServiceImpl implements IBasicMqCollectService {
@Resource
private CollectService collectService;
/**
*
*
* @param message
*/
@Override
@TrackGroup(
group = "RABBIT_MQ_PASSIVE_INSERT_BASIC",
beanNames = {"mqBasicConverter", "mqBasicResultImpl"},
processor = VisitorProcessor.class)
public void addTBasic(String message) {
log.info("注册基础数据 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue());
}
/**
*
*
* @param message
*/
@Override
@TrackGroup(
group = "RABBIT_MQ_PASSIVE_UPDATE_BASIC",
beanNames = {"mqBasicConverter", "mqBasicResultImpl"},
processor = VisitorProcessor.class)
public void updateTBasic(String message) {
log.info("变更基础数据 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue());
}
public DeptDTO getValue() {
return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class);
}
}

@ -0,0 +1,55 @@
package com.docus.server.collect.dept.mq.impl;
import com.docus.core.util.json.JSON;
import com.docus.log.annotation.TrackGroup;
import com.docus.log.context.TrackHelper;
import com.docus.server.collect.dept.mq.IDeptMqCollectService;
import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService;
import com.docus.server.sys.common.pojo.dto.DeptDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("deptRabbitMqCollectServiceImpl")
@Slf4j
public class DeptRabbitMqCollectServiceImpl implements IDeptMqCollectService {
@Resource
private CollectService collectService;
/**
*
*
* @param message
*/
@Override
@TrackGroup(
group = "RABBIT_MQ_PASSIVE_INSERT_DEPT",
beanNames = {"mqDeptConverter", "mqDeptResultImpl"},
processor = VisitorProcessor.class)
public void addDept(String message) {
log.info("注册科室 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue());
}
/**
*
*
* @param message
*/
@Override
@TrackGroup(
group = "RABBIT_MQ_PASSIVE_UPDATE_DEPT",
beanNames = {"mqDeptConverter", "mqDeptResultImpl"},
processor = VisitorProcessor.class)
public void updateDept(String message) {
log.info("变更科室 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue());
}
public DeptDTO getValue() {
return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class);
}
}

@ -6,7 +6,6 @@ import com.docus.log.annotation.TrackGroup;
import com.docus.log.context.TrackHelper; import com.docus.log.context.TrackHelper;
import com.docus.server.collect.IConverter; import com.docus.server.collect.IConverter;
import com.docus.server.collect.IHttpResult; import com.docus.server.collect.IHttpResult;
import com.docus.server.collect.user.mq.RmqProvdier;
import com.docus.server.collect.web.enums.CollectTypeEnum; import com.docus.server.collect.web.enums.CollectTypeEnum;
import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService; import com.docus.server.collect.web.service.CollectService;
@ -14,14 +13,12 @@ import com.docus.server.sys.common.pojo.dto.UserDTO;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject;
/** /**
@ -40,8 +37,6 @@ import javax.inject.Inject;
public class UserRestController { public class UserRestController {
@Resource @Resource
private CollectService collectService; private CollectService collectService;
@Inject
private RmqProvdier rmqProvdier;
/** /**
* http * http
@ -60,12 +55,4 @@ public class UserRestController {
collectService.insertOrUpdateUser(JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class)); collectService.insertOrUpdateUser(JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class));
return null; return null;
} }
/**
* rocket mq
*/
@GetMapping("/test")
public void test() {
rmqProvdier.send("hello world");
}
} }

@ -1,24 +0,0 @@
package com.docus.server.collect.user.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RmqProvdier {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public boolean send(String message) {
try {
// 发送消息
rocketMQTemplate.convertAndSend("ta-cipher-persist", message);
} catch (Exception e) {
log.error("send message:{}", message, e);
return false;
}
return true;
}
}

@ -0,0 +1,54 @@
package com.docus.server.collect.user.mq.impl;
import com.docus.core.util.json.JSON;
import com.docus.log.annotation.TrackGroup;
import com.docus.log.context.TrackHelper;
import com.docus.server.collect.user.mq.IUserMqCollectService;
import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService;
import com.docus.server.sys.common.pojo.dto.UserDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("userRabbitMqCollectServiceImpl")
@Slf4j
public class UserRabbitMqCollectServiceImpl implements IUserMqCollectService {
@Resource
private CollectService collectService;
/**
*
*
* @param message
*/
@Override
@TrackGroup(
group = "RABBIT_MQ_PASSIVE_INSERT_USER",
beanNames = {"mqUserConverter", "mqUserResultImpl"},
processor = VisitorProcessor.class)
public void addUser(String message) {
log.info("人员注册 队列接收信息:{}", message);
collectService.insertOrUpdateUser(getValue());
}
/**
*
*
* @param message
*/
@Override
@TrackGroup(
group = "RABBIT_MQ_PASSIVE_UPDATE_USER",
beanNames = {"mqUserConverter", "mqUserResultImpl"},
processor = VisitorProcessor.class)
public void updateUser(String message) {
log.info("人员变更 队列接收信息:{}", message);
collectService.insertOrUpdateUser(getValue());
}
public UserDTO getValue() {
return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), UserDTO.class);
}
}

@ -2,6 +2,9 @@ package com.docus.server.collect.web.enums;
public enum CollectTypeEnum implements IIntegerEnum { public enum CollectTypeEnum implements IIntegerEnum {
/**
* WS
*/
WEBSERVICE_XML_DEPT(0, "WEBSERVICE_XML_DEPT", "新增/修改-科室信息"), WEBSERVICE_XML_DEPT(0, "WEBSERVICE_XML_DEPT", "新增/修改-科室信息"),
WEBSERVICE_XML_USER(1, "WEBSERVICE_XML_USER", "新增/修改-用户信息"), WEBSERVICE_XML_USER(1, "WEBSERVICE_XML_USER", "新增/修改-用户信息"),
WEBSERVICE_XML_INSERT_BASIC(2, "WEBSERVICE_XML_BASIC", "新增/修改-基础数据"), WEBSERVICE_XML_INSERT_BASIC(2, "WEBSERVICE_XML_BASIC", "新增/修改-基础数据"),
@ -12,10 +15,16 @@ public enum CollectTypeEnum implements IIntegerEnum {
WEBSERVICE_XML_INSERT_INSPECTION_REPORT(6, "WEBSERVICE_XML_INSERT_INSPECTION_REPORT", "新增-检查报告的信息"), WEBSERVICE_XML_INSERT_INSPECTION_REPORT(6, "WEBSERVICE_XML_INSERT_INSPECTION_REPORT", "新增-检查报告的信息"),
WEBSERVICE_XML_UPDATE_INSPECTION_REPORT(7, "WEBSERVICE_XML_UPDATE_INSPECTION_REPORT", "更新-检查报告的信息"), WEBSERVICE_XML_UPDATE_INSPECTION_REPORT(7, "WEBSERVICE_XML_UPDATE_INSPECTION_REPORT", "更新-检查报告的信息"),
/**
* HTTP
*/
HTTP_PASSIVE_USER(8, "HTTP_PASSIVE_USER", "新增/修改-用户信息"), HTTP_PASSIVE_USER(8, "HTTP_PASSIVE_USER", "新增/修改-用户信息"),
HTTP_PASSIVE_DEPT(9, "HTTP_PASSIVE_DEPT", "新增/修改-科室信息"), HTTP_PASSIVE_DEPT(9, "HTTP_PASSIVE_DEPT", "新增/修改-科室信息"),
HTTP_PASSIVE_BASIC(10, "HTTP_PASSIVE_BASIC", "新增/修改-基础数据"), HTTP_PASSIVE_BASIC(10, "HTTP_PASSIVE_BASIC", "新增/修改-基础数据"),
/**
* MQ
*/
JMS_MQ_PASSIVE_INSERT_USER(11, "JMS_MQ_PASSIVE_INSERT_USER", "新增-用户信息"), JMS_MQ_PASSIVE_INSERT_USER(11, "JMS_MQ_PASSIVE_INSERT_USER", "新增-用户信息"),
JMS_MQ_PASSIVE_UPDATE_USER(12, "JMS_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"), JMS_MQ_PASSIVE_UPDATE_USER(12, "JMS_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"),
JMS_MQ_PASSIVE_INSERT_DEPT(13, "JMS_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"), JMS_MQ_PASSIVE_INSERT_DEPT(13, "JMS_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"),
@ -29,6 +38,13 @@ public enum CollectTypeEnum implements IIntegerEnum {
ROCKET_MQ_PASSIVE_UPDATE_DEPT(20, "ROCKET_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"), ROCKET_MQ_PASSIVE_UPDATE_DEPT(20, "ROCKET_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"),
ROCKET_MQ_PASSIVE_INSERT_BASIC(21, "ROCKET_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"), ROCKET_MQ_PASSIVE_INSERT_BASIC(21, "ROCKET_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"),
ROCKET_MQ_PASSIVE_UPDATE_BASIC(22, "ROCKET_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"), ROCKET_MQ_PASSIVE_UPDATE_BASIC(22, "ROCKET_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"),
RABBIT_MQ_PASSIVE_INSERT_USER(23, "RABBIT_MQ_PASSIVE_INSERT_USER", "新增-用户信息"),
RABBIT_MQ_PASSIVE_UPDATE_USER(24, "RABBIT_MQ_PASSIVE_UPDATE_USER", "修改-用户信息"),
RABBIT_MQ_PASSIVE_INSERT_DEPT(25, "RABBIT_MQ_PASSIVE_INSERT_DEPT", "新增-科室信息"),
RABBIT_MQ_PASSIVE_UPDATE_DEPT(26, "RABBIT_MQ_PASSIVE_UPDATE_DEPT", "修改-科室信息"),
RABBIT_MQ_PASSIVE_INSERT_BASIC(27, "RABBIT_MQ_PASSIVE_INSERT_BASIC", "新增-基础数据"),
RABBIT_MQ_PASSIVE_UPDATE_BASIC(28, "RABBIT_MQ_PASSIVE_UPDATE_BASIC", "修改-基础数据"),
; ;
private Integer value; private Integer value;

@ -41,7 +41,7 @@ public class VisitorProcessor extends AbstractProcessor {
Map<String, Object> params = context.getParams(); Map<String, Object> params = context.getParams();
params.put("taskId", taskId); params.put("taskId", taskId);
params.put("jsonStr", jsonStr); params.put("jsonStr", jsonStr);
params.put("methodName", context.getMethodName()); params.put("group", context.getGroup());
return null; return null;
} }
@ -67,6 +67,7 @@ public class VisitorProcessor extends AbstractProcessor {
return result.ok(params); return result.ok(params);
} else { } else {
log.info("=== AOP 异常通知 ==="); log.info("=== AOP 异常通知 ===");
log.error((String) params.get(context.getExMessageResult()));
messageService.updateTaskOriginalMessage(taskId, afterReturnResult, StateEnum.FAIL); messageService.updateTaskOriginalMessage(taskId, afterReturnResult, StateEnum.FAIL);
return result.fail(params); return result.fail(params);
} }

@ -209,12 +209,17 @@
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>4.1.87.Final</version> <version>4.1.87.Final</version>
</dependency> </dependency>
<!--rocketmq-->
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId> <artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version> <version>2.2.3</version>
</dependency> </dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>

Loading…
Cancel
Save