diff --git a/collector-scheduling-management/src/main/java/com/docus/server/controller/CommMsgController.java b/collector-scheduling-management/src/main/java/com/docus/server/controller/CommMsgController.java index 1d803ee..437f03c 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/controller/CommMsgController.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/controller/CommMsgController.java @@ -1,19 +1,11 @@ package com.docus.server.controller; -import com.docus.core.util.DateUtil; -import com.docus.core.util.json.JSON; import com.docus.server.api.scheduling.management.CommMsgApi; -import com.docus.server.common.netty.CommMsg; -import com.docus.server.common.netty.server.ChannelRepository; -import com.docus.server.convert.CommMsgConvert; import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.util.CharsetUtil; +import com.docus.server.service.ICommMsgService; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; -import java.util.Date; /** * 通用消息体 TCP API @@ -24,17 +16,10 @@ import java.util.Date; @RestController public class CommMsgController implements CommMsgApi { @Resource - private ChannelRepository channelRepository; + private ICommMsgService iCommMsgService; @Override public void clientCommand(CommMsgDTO commMsgDTO) { - Channel channel = channelRepository.get(commMsgDTO.getTerminatorIp()); - - CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO); - commMsg.setMessageTime(DateUtil.formatDateTime(new Date())); - - if (channel != null) { - channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); - } + iCommMsgService.clientCommand(commMsgDTO); } } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/convert/SchCollectorConfigConvert.java b/collector-scheduling-management/src/main/java/com/docus/server/convert/SchCollectorConfigConvert.java index 30c0d2f..e7ccd72 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/convert/SchCollectorConfigConvert.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/convert/SchCollectorConfigConvert.java @@ -5,10 +5,13 @@ import com.docus.server.dto.scheduling.management.schcollectorconfig.AddSchColle import com.docus.server.dto.scheduling.management.schcollectorconfig.EditSchCollectorConfigDTO; import com.docus.server.entity.scheduling.management.SchCollectorConfig; import com.docus.server.vo.scheduling.management.schcollectorconfig.SchCollectorConfigVO; +import com.docus.server.vo.scheduling.management.schcollectorconfig.TcpSchCollectorConfigVO; import org.mapstruct.Mapper; import org.mapstruct.Mappings; import org.mapstruct.factory.Mappers; +import java.util.List; + /** * 采集器配置 服务转换器 * @@ -32,5 +35,10 @@ public interface SchCollectorConfigConvert { @Mappings({}) PageResult convertVO(PageResult pageResult); + @Mappings({}) + List convertTcpVOList(List publicConfig); + + @Mappings({}) + TcpSchCollectorConfigVO convertTcpVO(SchCollectorConfig schCollectorConfig); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ICommMsgService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ICommMsgService.java new file mode 100644 index 0000000..615eee7 --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ICommMsgService.java @@ -0,0 +1,21 @@ +package com.docus.server.service; + +import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; + +public interface ICommMsgService { + + /** + * 向客户端发送TCP命令 + * + * @param commMsgDTO 消息体 + */ + void clientCommand(CommMsgDTO commMsgDTO); + + /** + * 向所有客户端发送TCP命令 + * + * @param commMsgDTO 消息体 + */ + void clientsCommand(CommMsgDTO commMsgDTO); + +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorConfigService.java b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorConfigService.java index f208251..23398eb 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorConfigService.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/ISchCollectorConfigService.java @@ -53,4 +53,9 @@ public interface ISchCollectorConfigService { * @return 分页列表 */ PageResult search(SearchDTO searchDTO); + + /** + * 更新采集器配置 + */ + void updateCollectorConfig(); } diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java new file mode 100644 index 0000000..88399e3 --- /dev/null +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/CommMsgServiceImpl.java @@ -0,0 +1,58 @@ +package com.docus.server.service.impl; + +import com.docus.core.util.DateUtil; +import com.docus.core.util.json.JSON; +import com.docus.server.common.netty.CommMsg; +import com.docus.server.common.netty.server.ChannelRepository; +import com.docus.server.convert.CommMsgConvert; +import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; +import com.docus.server.service.ICommMsgService; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.Map; +import java.util.Set; + +@Service +@Slf4j +public class CommMsgServiceImpl implements ICommMsgService { + + @Resource + private ChannelRepository channelRepository; + + @Override + public void clientCommand(CommMsgDTO commMsgDTO) { + Channel channel = channelRepository.get(commMsgDTO.getTerminatorIp()); + + CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO); + commMsg.setMessageTime(DateUtil.formatDateTime(new Date())); + + if (channel != null) { + channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); + } + } + + @Override + public void clientsCommand(CommMsgDTO commMsgDTO) { + + Map channelMap = channelRepository.getAll(); + Set keySet = channelMap.keySet(); + + for (String clientIp : keySet) { + Channel channel = channelMap.get(clientIp); + + CommMsg commMsg = CommMsgConvert.INSTANCE.convertDO(commMsgDTO); + commMsg.setMessageTime(DateUtil.formatDateTime(new Date())); + + if (channel != null) { + channel.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSON(commMsg), CharsetUtil.UTF_8)); + } + + } + } +} diff --git a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorConfigServiceImpl.java b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorConfigServiceImpl.java index 823818d..d9d9c3f 100644 --- a/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorConfigServiceImpl.java +++ b/collector-scheduling-management/src/main/java/com/docus/server/service/impl/SchCollectorConfigServiceImpl.java @@ -1,21 +1,27 @@ package com.docus.server.service.impl; import com.docus.core.util.ListUtils; +import com.docus.core.util.json.JSON; import com.docus.core.util.property.Setters; import com.docus.infrastructure.redis.service.IdService; import com.docus.infrastructure.web.request.SearchDTO; import com.docus.infrastructure.web.response.PageResult; +import com.docus.server.common.MsgConstants; import com.docus.server.convert.SchCollectorConfigConvert; import com.docus.server.dto.scheduling.management.schcollectorconfig.AddSchCollectorConfigDTO; import com.docus.server.dto.scheduling.management.schcollectorconfig.DeleteSchCollectorConfigDTO; import com.docus.server.dto.scheduling.management.schcollectorconfig.EditSchCollectorConfigDTO; +import com.docus.server.dto.scheduling.management.schterminator.CommMsgDTO; import com.docus.server.entity.scheduling.management.SchCollectorConfig; import com.docus.server.entity.scheduling.management.SchSystemParams; import com.docus.server.enums.ConfigTypeEnum; import com.docus.server.infrastructure.dao.ISchCollectorConfigDao; import com.docus.server.infrastructure.dao.ISchSystemParamsDao; +import com.docus.server.service.ICommMsgService; import com.docus.server.service.ISchCollectorConfigService; import com.docus.server.vo.scheduling.management.schcollectorconfig.SchCollectorConfigVO; +import com.docus.server.vo.scheduling.management.schcollectorconfig.TcpSchCollectorConfigContentVO; +import com.docus.server.vo.scheduling.management.schcollectorconfig.TcpSchCollectorConfigVO; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -37,6 +43,8 @@ public class SchCollectorConfigServiceImpl implements ISchCollectorConfigService private ISchSystemParamsDao iSchSystemParamsDao; @Resource private IdService idService; + @Resource + private ICommMsgService iCommMsgService; /** * 按主键查询 @@ -130,5 +138,29 @@ public class SchCollectorConfigServiceImpl implements ISchCollectorConfigService public int delete(DeleteSchCollectorConfigDTO deleteSchCollectorConfigDTO) { return iSchCollectorConfigDao.delete(deleteSchCollectorConfigDTO.getIds()); } + + /** + * 更新采集器配置文件 + */ + @Override + public void updateCollectorConfig() { + List publicConfig = iSchCollectorConfigDao.findBy("configType", ConfigTypeEnum.PUBLIC_CONFIG); + List privateConfig = iSchCollectorConfigDao.findBy("configType", ConfigTypeEnum.PRIVATE_CONFIG); + + List publicConfigVO = SchCollectorConfigConvert.INSTANCE.convertTcpVOList(publicConfig); + List privateConfigVO = SchCollectorConfigConvert.INSTANCE.convertTcpVOList(privateConfig); + + TcpSchCollectorConfigContentVO tcpSchCollectorConfigVO = TcpSchCollectorConfigContentVO.builder() + .publicConfigList(publicConfigVO) + .privateConfigList(privateConfigVO) + .build(); + + CommMsgDTO commMsgDTO = CommMsgDTO.builder() + .content(JSON.toJSON(tcpSchCollectorConfigVO)) + .messageType(MsgConstants.UPDATE_COLLECTOR_CONFIG) + .build(); + + iCommMsgService.clientsCommand(commMsgDTO); + } } diff --git a/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schcollectorconfig/TcpSchCollectorConfigContentVO.java b/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schcollectorconfig/TcpSchCollectorConfigContentVO.java new file mode 100644 index 0000000..73bc2cc --- /dev/null +++ b/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schcollectorconfig/TcpSchCollectorConfigContentVO.java @@ -0,0 +1,29 @@ +package com.docus.server.vo.scheduling.management.schcollectorconfig; + +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.List; + +/** + * 采集器配置 VO + * + * @author AutoGenerator + * @since 2023-07-15 + */ +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Data +@ApiModel(value = "TcpSchCollectorConfigContentVO对象", description = "采集器配置") +public class TcpSchCollectorConfigContentVO implements Serializable { + + List publicConfigList; + + List privateConfigList; + +} diff --git a/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schcollectorconfig/TcpSchCollectorConfigVO.java b/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schcollectorconfig/TcpSchCollectorConfigVO.java new file mode 100644 index 0000000..a5983a9 --- /dev/null +++ b/docus-client-interface/src/main/java/com/docus/server/vo/scheduling.management/schcollectorconfig/TcpSchCollectorConfigVO.java @@ -0,0 +1,31 @@ +package com.docus.server.vo.scheduling.management.schcollectorconfig; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 采集器配置 VO + * + * @author AutoGenerator + * @since 2023-07-15 + */ +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Data +@ApiModel(value = "TcpSchCollectorConfigVO对象", description = "采集器配置") +public class TcpSchCollectorConfigVO implements Serializable { + + @ApiModelProperty(value = "采集器Id") + private Long collectorId; + + @ApiModelProperty(value = "配置参数") + private String configJson; + +}