【采集调度器-后端TCP API】采集调度器下发任务命令 API

segment2.0
linrf 2 years ago
parent bd21350507
commit abd6074895

@ -6,7 +6,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync
@EnableFeignClients(basePackages = ("com.docus.core.excel.feign"))
@EnableFeignClients(basePackages = {"com.docus.core.excel.feign", "com.docus.server.api.taskdistribute"})
@SpringBootApplication(scanBasePackages = {"com.docus"})
public class AppRunBootstrap {
public static void main(String[] args) {

@ -1,21 +1,33 @@
package com.docus.server.common;
import com.docus.core.util.Func;
import com.docus.core.util.ListUtils;
import com.docus.core.util.json.JSON;
import com.docus.server.api.taskdistribute.TaskDistributeApi;
import com.docus.server.common.netty.server.ChannelRepository;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.BusyStateEnum;
import com.docus.server.enums.OnlineStateEnum;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
@Slf4j
public class SchedulerTask {
@Resource
private ChannelRepository channelRepository;
@Resource
private TaskDistributeApi taskDistributeApi;
//定时任务
// 5 * * * * ? 在每分钟的5秒执行
@ -26,22 +38,98 @@ public class SchedulerTask {
Map<String, NettyTerminatorDTO> ipToTerminatorCacheMap = channelRepository.getIpToTerminatorCacheMap();
Set<String> keySet = ipToTerminatorCacheMap.keySet();
//没有可用的通道
if (Func.isEmpty(ipToTerminatorCacheMap)) {
return;
}
Collection<NettyTerminatorDTO> values = ipToTerminatorCacheMap.values();
List<NettyTerminatorDTO> AllTerminators = Lists.newArrayList();
List<NettyTerminatorDTO> onlyTerminators = Lists.newArrayList();
List<NettyTerminatorDTO> priorityTerminators = Lists.newArrayList();
List<NettyTerminatorDTO> noOnlyTerminators = Lists.newArrayList();
Map<String, NettyTerminatorDTO> onlyCollectorIdMap = Maps.newConcurrentMap();
Map<String, NettyTerminatorDTO> priorityCollectorIdsMap = Maps.newConcurrentMap();
for (NettyTerminatorDTO t : values) {
//是否空闲
BusyStateEnum busyState = t.getBusyState();
//是否在线
OnlineStateEnum onlineState = t.getOnlineState();
//只采集
String onlyCollectorIds = t.getOnlyCollectorIds();
//优先采集
String priorityCollectorIds = t.getPriorityCollectorIds();
//在线且空闲的通道
if (valid(busyState, onlineState)) {
for (String clientIp : keySet) {
NettyTerminatorDTO nettyTerminatorDTO = ipToTerminatorCacheMap.get(clientIp);
//所有空闲,在线的通道
AllTerminators.add(t);
System.out.println(JSON.toJSON(nettyTerminatorDTO));
//有无偏好属性的,
// 只采集的+ 没有优先采集的
if (Func.isNotBlank(onlyCollectorIds) && Func.isBlank(priorityCollectorIds)) {
onlyTerminators.add(t);
String[] split = onlyCollectorIds.split(",");
for (String k : split) {
onlyCollectorIdMap.put(k, t);
}
}
//只采集的+优先采集的
if (Func.isNotBlank(onlyCollectorIds) && Func.isNotBlank(priorityCollectorIds)) {
priorityTerminators.add(t);
String[] split = priorityCollectorIds.split(",");
for (String k : split) {
priorityCollectorIdsMap.put(k, t);
}
}
//没有偏好属性的
if (Func.isBlank(onlyCollectorIds)) {
noOnlyTerminators.add(t);
}
}
System.out.println(JSON.toJSON(t));
}
// 只采集的+ 没有优先采集的A
Map<String, NettyTerminatorDTO> stringNettyTerminatorDTOMap = ListUtils.toMap(onlyTerminators, NettyTerminatorDTO::getTerminatorIp);
//只采集的+优先采集的,A,B
Map<String, NettyTerminatorDTO> stringNettyTerminatorDTOMap1 = ListUtils.toMap(priorityTerminators, NettyTerminatorDTO::getTerminatorIp);
//没有偏好属性的,C
Map<String, NettyTerminatorDTO> stringNettyTerminatorDTOMap2 = ListUtils.toMap(noOnlyTerminators, NettyTerminatorDTO::getTerminatorIp);
//拿到所有偏好属性
if (CollectionUtils.isEmpty(AllTerminators)) {
//:todo 任务平台需要修改发布任务策略
ReportDownTwoDTO task = taskDistributeApi.getTask(String.valueOf(AllTerminators.size()));
}
//todo:执行业务
log.info("定时任务: 执行完毕");
} catch (Exception e) {
log.error("定时任务执行出错", e);
}
}
private boolean valid(BusyStateEnum busyState, OnlineStateEnum onlineState) {
return BusyStateEnum.IDLE.equals(busyState)
&& OnlineStateEnum.ONLINE.equals(onlineState);
}
}

@ -3,6 +3,7 @@ package com.docus.server.common.netty.server;
import com.docus.server.dto.scheduling.management.schterminator.NettyTerminatorDTO;
import com.docus.server.enums.OnlineStateEnum;
import com.docus.server.service.ISchTerminatorService;
import com.docus.server.vo.scheduling.management.schterminator.SchTerminatorVO;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
@ -39,15 +40,18 @@ public class ChannelRepository {
//客户端上线
String terminatorIp = nettyTerminatorDTO.getTerminatorIp();
IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel);
//更新数据库终端数据
SchTerminatorVO schTerminatorVO = iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState());
nettyTerminatorDTO.setId(schTerminatorVO.getId());
//缓存
IP_CHANNEL_CACHE_MAP.put(terminatorIp, channel);
IP_TERMINATOR_CACHE_MAP.put(terminatorIp, nettyTerminatorDTO);
AttributeKey<String> attributeKey = AttributeKey.valueOf("ip");
channel.attr(attributeKey).set(terminatorIp);
//更新数据库终端数据
iSchTerminatorService.saveOrUpdate(terminatorIp, nettyTerminatorDTO.getOnlineState());
}
public String getClientKey(Channel channel) {

@ -99,7 +99,6 @@ public class NettyBusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
}
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setId(ctx.channel().id().asShortText());
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.OFFLINE);

@ -64,7 +64,6 @@ public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
}
NettyTerminatorDTO nettyTerminatorDTO = new NettyTerminatorDTO();
nettyTerminatorDTO.setId(ctx.channel().id().asShortText());
nettyTerminatorDTO.setTerminatorIp(clientIp);
nettyTerminatorDTO.setBusyState(BusyStateEnum.IDLE);
nettyTerminatorDTO.setOnlineState(OnlineStateEnum.ONLINE);

@ -57,9 +57,9 @@ public interface ISchTerminatorService {
/**
*
*
* @param terminatorIp
* @param onlineState
* @return
*/
void saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState);
SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineState);
}

@ -55,20 +55,22 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
}
@Override
public void saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) {
public SchTerminatorVO saveOrUpdate(String terminatorIp, OnlineStateEnum onlineStateEnum) {
SchTerminator schTerminator = iSchTerminatorDao.findOneBy("terminatorIp", terminatorIp);
if (Func.isNull(schTerminator)) {
SchTerminator schTerminatorVO = new SchTerminator();
SchTerminator schTerminatorDO = new SchTerminator();
schTerminatorVO.setId(idService.getDateSeq());
schTerminatorVO.setTerminatorIp(terminatorIp);
schTerminatorVO.setTerminatorName(terminatorIp);
schTerminatorVO.setOnlineState(onlineStateEnum);
schTerminatorVO.setBusyState(BusyStateEnum.IDLE);
schTerminatorDO.setId(idService.getDateSeq());
schTerminatorDO.setTerminatorIp(terminatorIp);
schTerminatorDO.setTerminatorName(terminatorIp);
schTerminatorDO.setOnlineState(onlineStateEnum);
schTerminatorDO.setBusyState(BusyStateEnum.IDLE);
iSchTerminatorDao.save(schTerminatorVO);
iSchTerminatorDao.save(schTerminatorDO);
return SchTerminatorConvert.INSTANCE.convertVO(schTerminatorDO);
} else {
schTerminator.setBusyState(BusyStateEnum.IDLE);
@ -76,7 +78,10 @@ public class SchTerminatorServiceImpl implements ISchTerminatorService {
schTerminator.setUpdateTime(new Date());
iSchTerminatorDao.updateById(schTerminator);
return SchTerminatorConvert.INSTANCE.convertVO(schTerminator);
}
}
/**

@ -0,0 +1,27 @@
package com.docus.server.api.taskdistribute;
import com.docus.server.dto.scheduling.management.schcollector.task.ReportDownTwoDTO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
*
*
* @author AutoGenerator
* @since 2023-07-15
*/
@Api(value = "任务平台接口", tags = "任务平台接口")
@FeignClient(url = "${taskdistribute.api.url:http://192.168.16.85:9296}", name = "collector-scheduling-management.TaskDistributeApi")
@RequestMapping("/api")
public interface TaskDistributeApi {
@ApiOperation("向客户端发送TCP命令")
@GetMapping("/noviewtask/GetTask")
ReportDownTwoDTO getTask(@RequestParam String collectid);
}

@ -21,8 +21,8 @@ import java.util.Date;
@ApiModel(value = "NettyTerminatorDTO", description = "NettyTerminatorDTO")
public class NettyTerminatorDTO implements Serializable {
@ApiModelProperty(value = "channelId主键")
private String id;
@ApiModelProperty(value = "终端主键")
private Long id;
@ApiModelProperty(value = "终端IP")
private String terminatorIp;

Loading…
Cancel
Save