被动rocket mq 模板实现
parent
7cbce8eab1
commit
0c2e7cb8cc
@ -0,0 +1,20 @@
|
|||||||
|
package com.docus.server.collect.basic.mq;
|
||||||
|
|
||||||
|
import com.docus.server.collect.IMqResult;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Component("mqBasicResultImpl")
|
||||||
|
public class MqBasicResultImpl implements IMqResult {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void ok(Map<String, Object> params) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void fail(Map<String, Object> params) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package com.docus.server.collect.basic.mq;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
|
||||||
|
public class RocketMqBasicConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource(name = "basicRocketMqCollectServiceImpl")
|
||||||
|
private IBasicMqCollectService basicMqCollectService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message) {
|
||||||
|
try {
|
||||||
|
log.info("RocketMQ message:{}", message);
|
||||||
|
basicMqCollectService.addTBasic(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("errorMessage:{}", message);
|
||||||
|
// 抛出异常会重新消费消息
|
||||||
|
throw new RuntimeException("Message processing failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package com.docus.server.collect.dept.mq;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
|
||||||
|
public class RocketMqDeptConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource(name = "deptJmsMqCollectServiceImpl")
|
||||||
|
private IDeptMqCollectService deptMqCollectService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message) {
|
||||||
|
try {
|
||||||
|
log.info("RocketMQ message:{}", message);
|
||||||
|
deptMqCollectService.addDept(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("errorMessage:{}", message);
|
||||||
|
// 抛出异常会重新消费消息
|
||||||
|
throw new RuntimeException("Message processing failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package com.docus.server.collect.user.mq;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RocketMQMessageListener(topic = "ta-cipher-persist", consumerGroup = "ta-cipher-encode")
|
||||||
|
public class RocketMqUserConsumer implements RocketMQListener<String> {
|
||||||
|
|
||||||
|
@Resource(name = "userRocketMqCollectServiceImpl")
|
||||||
|
private IUserMqCollectService userMqCollectService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message) {
|
||||||
|
try {
|
||||||
|
log.info("RocketMQ message:{}", message);
|
||||||
|
userMqCollectService.addUser(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("errorMessage:{}", message);
|
||||||
|
// 抛出异常会重新消费消息
|
||||||
|
throw new RuntimeException("Message processing failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,24 @@
|
|||||||
|
package com.docus.server.collect.user.mq;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class RmqProvdier {
|
||||||
|
@Autowired
|
||||||
|
private RocketMQTemplate rocketMQTemplate;
|
||||||
|
|
||||||
|
public boolean send(String message) {
|
||||||
|
try {
|
||||||
|
// 发送消息
|
||||||
|
rocketMQTemplate.convertAndSend("ta-cipher-persist", message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("send message:{}", message, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue