被动rocket mq 模板实现

segment2.0
linrf 2 years ago
parent 5a48adc153
commit ec0eb5313c

@ -5,10 +5,19 @@ import com.docus.server.collect.web.utils.PeriodTime;
public interface IJob<T> { public interface IJob<T> {
/**
*
*/
void startCollectAll(String taskConfigId); void startCollectAll(String taskConfigId);
/**
*
*/
void startCollectIncrement(String taskConfigId); void startCollectIncrement(String taskConfigId);
/**
*
*/
void get(PeriodTime periodTime, TaskConfig taskConfig); void get(PeriodTime periodTime, TaskConfig taskConfig);
} }

@ -1,4 +1,4 @@
package com.docus.server.collect.basic.mq; package com.docus.server.collect.basic.http.test;
import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpResponse;

@ -5,8 +5,8 @@ import com.docus.log.annotation.TrackGroup;
import com.docus.log.context.TrackHelper; import com.docus.log.context.TrackHelper;
import com.docus.server.collect.basic.mq.IBasicMqCollectService; import com.docus.server.collect.basic.mq.IBasicMqCollectService;
import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService;
import com.docus.server.record.pojo.dto.TBasicDTO; import com.docus.server.record.pojo.dto.TBasicDTO;
import com.docus.server.record.service.ITBasicService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -19,7 +19,7 @@ import javax.annotation.Resource;
@Service("basicJmsMqCollectServiceImpl") @Service("basicJmsMqCollectServiceImpl")
public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService { public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService {
@Resource @Resource
private ITBasicService tBasicService; private CollectService collectService;
@Override @Override
@TrackGroup( @TrackGroup(
@ -28,7 +28,7 @@ public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService {
processor = VisitorProcessor.class) processor = VisitorProcessor.class)
public void addTBasic(String message) { public void addTBasic(String message) {
log.info("新增基础数据:{}", message); log.info("新增基础数据:{}", message);
tBasicService.insertTBasic(getValue()); collectService.insertOrUpdateBasic(getValue());
} }
@Override @Override
@ -38,7 +38,7 @@ public class BasicJmsMqCollectServiceImpl implements IBasicMqCollectService {
processor = VisitorProcessor.class) processor = VisitorProcessor.class)
public void updateTBasic(String message) { public void updateTBasic(String message) {
log.info("修改基础数据:{}", message); log.info("修改基础数据:{}", message);
tBasicService.updateTBasic(getValue()); collectService.insertOrUpdateBasic(getValue());
} }
public TBasicDTO getValue() { public TBasicDTO getValue() {

@ -6,7 +6,7 @@ import com.docus.log.context.TrackHelper;
import com.docus.server.collect.basic.mq.IBasicMqCollectService; import com.docus.server.collect.basic.mq.IBasicMqCollectService;
import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService; import com.docus.server.collect.web.service.CollectService;
import com.docus.server.sys.common.pojo.dto.DeptDTO; import com.docus.server.record.pojo.dto.TBasicDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -33,7 +33,7 @@ public class BasicRabbitMqCollectServiceImpl implements IBasicMqCollectService {
processor = VisitorProcessor.class) processor = VisitorProcessor.class)
public void addTBasic(String message) { public void addTBasic(String message) {
log.info("注册基础数据 队列接收信息:{}", message); log.info("注册基础数据 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue()); collectService.insertOrUpdateBasic(getValue());
} }
/** /**
@ -48,10 +48,10 @@ public class BasicRabbitMqCollectServiceImpl implements IBasicMqCollectService {
processor = VisitorProcessor.class) processor = VisitorProcessor.class)
public void updateTBasic(String message) { public void updateTBasic(String message) {
log.info("变更基础数据 队列接收信息:{}", message); log.info("变更基础数据 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue()); collectService.insertOrUpdateBasic(getValue());
} }
public DeptDTO getValue() { public TBasicDTO getValue() {
return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class); return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), TBasicDTO.class);
} }
} }

@ -6,7 +6,7 @@ import com.docus.log.context.TrackHelper;
import com.docus.server.collect.basic.mq.IBasicMqCollectService; import com.docus.server.collect.basic.mq.IBasicMqCollectService;
import com.docus.server.collect.web.processor.VisitorProcessor; import com.docus.server.collect.web.processor.VisitorProcessor;
import com.docus.server.collect.web.service.CollectService; import com.docus.server.collect.web.service.CollectService;
import com.docus.server.sys.common.pojo.dto.DeptDTO; import com.docus.server.record.pojo.dto.TBasicDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -30,7 +30,7 @@ public class BasicRocketMqCollectServiceImpl implements IBasicMqCollectService {
processor = VisitorProcessor.class) processor = VisitorProcessor.class)
public void addTBasic(String message) { public void addTBasic(String message) {
log.info("注册基础数据 队列接收信息:{}", message); log.info("注册基础数据 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue()); collectService.insertOrUpdateBasic(getValue());
} }
/** /**
@ -45,10 +45,10 @@ public class BasicRocketMqCollectServiceImpl implements IBasicMqCollectService {
processor = VisitorProcessor.class) processor = VisitorProcessor.class)
public void updateTBasic(String message) { public void updateTBasic(String message) {
log.info("变更基础数据 队列接收信息:{}", message); log.info("变更基础数据 队列接收信息:{}", message);
collectService.insertOrUpdateDept(getValue()); collectService.insertOrUpdateBasic(getValue());
} }
public DeptDTO getValue() { public TBasicDTO getValue() {
return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), DeptDTO.class); return JSON.fromJSON((String) TrackHelper.getValue("jsonStr"), TBasicDTO.class);
} }
} }

@ -1,6 +1,8 @@
package com.docus.server.collect.web.service; package com.docus.server.collect.web.service;
import com.docus.core.util.Func; import com.docus.core.util.Func;
import com.docus.server.record.pojo.dto.TBasicDTO;
import com.docus.server.record.service.ITBasicService;
import com.docus.server.sys.common.pojo.dto.DeptDTO; import com.docus.server.sys.common.pojo.dto.DeptDTO;
import com.docus.server.sys.common.pojo.dto.UserDTO; import com.docus.server.sys.common.pojo.dto.UserDTO;
import com.docus.server.sys.service.IPowerDeptService; import com.docus.server.sys.service.IPowerDeptService;
@ -17,13 +19,16 @@ public class CollectService {
private IPowerDeptService deptService; private IPowerDeptService deptService;
@Resource @Resource
private IPowerUserService userService; private IPowerUserService userService;
@Resource
private ITBasicService tBasicService;
/** /**
* 3 * 3
*
*/ */
/*@Async("recordMessage") /*@Async("recordMessage")
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))*/ @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))*/
@Transactional @Transactional(rollbackFor = Exception.class)
public void insertOrUpdateDept(DeptDTO deptDTO) { public void insertOrUpdateDept(DeptDTO deptDTO) {
//异步写入归档系统,失败自动重试。 //异步写入归档系统,失败自动重试。
if (checkType(deptDTO.getOperateType(), DEL_TYPE)) { if (checkType(deptDTO.getOperateType(), DEL_TYPE)) {
@ -33,7 +38,12 @@ public class CollectService {
} }
} }
@Transactional /**
*
*
* @param userDTO
*/
@Transactional(rollbackFor = Exception.class)
public void insertOrUpdateUser(UserDTO userDTO) { public void insertOrUpdateUser(UserDTO userDTO) {
// 判断操作类型 // 判断操作类型
if (checkType(userDTO.getOperateType(), DEL_TYPE)) { if (checkType(userDTO.getOperateType(), DEL_TYPE)) {
@ -43,6 +53,21 @@ public class CollectService {
} }
} }
/**
*
*
* @param tBasicDTO
*/
@Transactional(rollbackFor = Exception.class)
public void insertOrUpdateBasic(TBasicDTO tBasicDTO) {
Integer num = tBasicService.findByJzh(tBasicDTO.getJzh());
if (num > 0) {
tBasicService.updateTBasic(tBasicDTO);
} else {
tBasicService.insertTBasic(tBasicDTO);
}
}
protected boolean checkType(String operateType, String delType) { protected boolean checkType(String operateType, String delType) {
return Func.isNotEmpty(operateType) return Func.isNotEmpty(operateType)

@ -28,4 +28,12 @@ public interface ITBasicService {
* @param basicDTOList * @param basicDTOList
*/ */
void batchSaveBasics(List<TBasicDTO> basicDTOList); void batchSaveBasics(List<TBasicDTO> basicDTOList);
/**
*
*
* @param jzh
* @return
*/
Integer findByJzh(String jzh);
} }

@ -203,6 +203,12 @@ public class TBasicServiceImpl extends ServiceImpl<TBasicMapper, TBasic> impleme
public void batchSaveBasics(List<TBasicDTO> basicDTOList) { public void batchSaveBasics(List<TBasicDTO> basicDTOList) {
basicDTOList.forEach(this::updateTBasic); basicDTOList.forEach(this::updateTBasic);
} }
@Override
public Integer findByJzh(String jzh) {
//判断jzh是否重复
return tBasicMapper.selectOne(jzh);
}
} }

Loading…
Cancel
Save