【代码优化】IoT: 数据桥梁 config 优化

This commit is contained in:
puhui999 2025-03-09 13:29:12 +08:00
parent ff9267ad75
commit b1d3b73b6d
18 changed files with 325 additions and 259 deletions

View File

@ -15,5 +15,8 @@ public class DictTypeConstants {
public static final String VALIDATE_TYPE = "iot_validate_type";
public static final String DEVICE_STATE = "iot_device_state";
public static final String IOT_DATA_BRIDGE_DIRECTION_ENUM = "iot_data_bridge_direction_enum";
public static final String IOT_DATA_BRIDGE_TYPE_ENUM = "iot_data_bridge_type_enum";
}

View File

@ -13,14 +13,14 @@ import java.util.Arrays;
*/
@RequiredArgsConstructor
@Getter
public enum IotDataBridgDirectionEnum implements ArrayValuable<Integer> {
public enum IotDataBridgeDirectionEnum implements ArrayValuable<Integer> {
INPUT(1), // 输入
OUTPUT(2); // 输出
private final Integer type;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgDirectionEnum::getType).toArray(Integer[]::new);
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeDirectionEnum::getType).toArray(Integer[]::new);
@Override
public Integer[] array() {

View File

@ -13,25 +13,26 @@ import java.util.Arrays;
*/
@RequiredArgsConstructor
@Getter
public enum IotDataBridgTypeEnum implements ArrayValuable<Integer> {
public enum IotDataBridgeTypeEnum implements ArrayValuable<Integer> {
HTTP(1),
TCP(2),
WEBSOCKET(3),
HTTP(1, "HTTP"),
TCP(2, "TCP"),
WEBSOCKET(3, "WEBSOCKET"),
MQTT(10),
MQTT(10, "MQTT"),
DATABASE(20),
REDIS_STREAM(21),
DATABASE(20, "DATABASE"),
REDIS_STREAM(21, "REDIS_STREAM"),
ROCKETMQ(30),
RABBITMQ(31),
KAFKA(32)
;
ROCKETMQ(30, "ROCKETMQ"),
RABBITMQ(31, "RABBITMQ"),
KAFKA(32, "KAFKA");
private final Integer type;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgTypeEnum::getType).toArray(Integer[]::new);
private final String name;
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeTypeEnum::getType).toArray(Integer[]::new);
@Override
public Integer[] array() {

View File

@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge;
import cn.iocoder.yudao.framework.excel.core.annotations.DictFormat;
import cn.iocoder.yudao.framework.excel.core.convert.DictConvert;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig;
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
import com.alibaba.excel.annotation.ExcelProperty;
import io.swagger.v3.oas.annotations.media.Schema;
@ -9,6 +10,10 @@ import lombok.Data;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_DIRECTION_ENUM;
import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_TYPE_ENUM;
import static cn.iocoder.yudao.module.system.enums.DictTypeConstants.COMMON_STATUS;
@Schema(description = "管理后台 - IoT 数据桥梁 Response VO")
@Data
@ExcelIgnoreUnannotated
@ -28,22 +33,22 @@ public class IotDataBridgeRespVO {
@Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty(value = "桥梁状态", converter = DictConvert.class)
@DictFormat("common_status") // TODO 代码优化建议设置到对应的 DictTypeConstants 枚举类中
@DictFormat(COMMON_STATUS)
private Integer status;
@Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty(value = "桥梁方向", converter = DictConvert.class)
@DictFormat("iot_data_bridg_direction_enum") // TODO 代码优化建议设置到对应的 DictTypeConstants 枚举类中
@DictFormat(IOT_DATA_BRIDGE_DIRECTION_ENUM)
private Integer direction;
@Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@ExcelProperty(value = "桥梁类型", converter = DictConvert.class)
@DictFormat("iot_data_bridg_type_enum") // TODO 代码优化建议设置到对应的 DictTypeConstants 枚举类中
@DictFormat(IOT_DATA_BRIDGE_TYPE_ENUM)
private Integer type;
@Schema(description = "桥梁配置")
@ExcelProperty("桥梁配置")
private String config;
private IotDataBridgeConfig config;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("创建时间")

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@ -32,6 +33,7 @@ public class IotDataBridgeSaveReqVO {
private Integer type;
@Schema(description = "桥梁配置")
private String config;
@NotNull(message = "桥梁配置不能为空")
private IotDataBridgeConfig config;
}

View File

@ -0,0 +1,32 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
/**
* 抽象类 IotDataBridgeConfig
*
* 用于表示数据桥梁配置数据的通用类型根据具体的 "type" 字段动态映射到对应的子类
* 提供多态支持适用于不同类型的数据结构序列化和反序列化场景
*
* @author HUIHUI
*/
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "HTTP"),
@JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "KAFKA"),
@JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "MQTT"),
@JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "RABBITMQ"),
@JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "REDIS_STREAM"),
@JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "ROCKETMQ"),
})
public abstract class IotDataBridgeConfig {
/**
* 配置类型
*/
private String type;
}

View File

@ -0,0 +1,36 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
import java.util.Map;
/**
* HTTP 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeHttpConfig extends IotDataBridgeConfig {
/**
* 请求 URL
*/
private String url;
/**
* 请求方法
*/
private String method;
/**
* 请求头
*/
private Map<String, String> headers;
/**
* 请求参数
*/
private Map<String, String> query;
/**
* 请求体
*/
private String body;
}

View File

@ -0,0 +1,35 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* Kafka 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeKafkaMQConfig extends IotDataBridgeConfig {
/**
* Kafka 服务器地址
*/
private String bootstrapServers;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 是否启用 SSL
*/
private Boolean ssl;
/**
* 主题
*/
private String topic;
}

View File

@ -0,0 +1,34 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* MQTT 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeMqttConfig extends IotDataBridgeConfig {
/**
* MQTT 服务器地址
*/
private String url;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 客户端编号
*/
private String clientId;
/**
* 主题
*/
private String topic;
}

View File

@ -0,0 +1,46 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* RabbitMQ 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRabbitMQConfig extends IotDataBridgeConfig {
/**
* RabbitMQ 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 虚拟主机
*/
private String virtualHost;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 队列名称
*/
private String queue;
}

View File

@ -0,0 +1,34 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* Redis Stream MQ 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRedisStreamMQConfig extends IotDataBridgeConfig {
/**
* Redis 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 密码
*/
private String password;
/**
* 数据库索引
*/
private Integer database;
/**
* 主题
*/
private String topic;
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config;
import lombok.Data;
/**
* RocketMQ 配置
*
* @author HUIHUI
*/
@Data
public class IotDataBridgeRocketMQConfig extends IotDataBridgeConfig {
/**
* RocketMQ 名称服务器地址
*/
private String nameServer;
/**
* 访问密钥
*/
private String accessKey;
/**
* 秘密钥匙
*/
private String secretKey;
/**
* 生产者组
*/
private String group;
/**
* 主题
*/
private String topic;
/**
* 标签
*/
private String tags;
}

View File

@ -1,16 +1,16 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeDirectionEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.*;
import java.util.Map;
/**
* IoT 数据桥梁 DO
*
@ -48,14 +48,14 @@ public class IotDataBridgeDO extends BaseDO {
/**
* 桥梁方向
*
* 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgDirectionEnum}
* 枚举 {@link IotDataBridgeDirectionEnum}
*/
private Integer direction;
/**
* 桥梁类型
*
* 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum}
* 枚举 {@link IotDataBridgeTypeEnum}
*/
private Integer type;
@ -63,211 +63,6 @@ public class IotDataBridgeDO extends BaseDO {
* 桥梁配置
*/
@TableField(typeHandler = JacksonTypeHandler.class)
private Config config;
/**
* 文件客户端的配置
* 不同实现的客户端需要不同的配置通过子类来定义
*
* @author 芋道源码
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
// @JsonTypeInfo 注解的作用Jackson 多态
// 1. 序列化到时数据库时增加 @class 属性
// 2. 反序列化到内存对象时通过 @class 属性可以创建出正确的类型
public interface Config {
}
/**
* HTTP 配置
*/
@Data
public static class HttpConfig implements Config {
/**
* 请求 URL
*/
private String url;
/**
* 请求方法
*/
private String method;
/**
* 请求头
*/
private Map<String, String> headers;
/**
* 请求参数
*/
private Map<String, String> query;
/**
* 请求体
*/
private String body;
}
/**
* MQTT 配置
*/
@Data
public static class MqttConfig implements Config {
/**
* MQTT 服务器地址
*/
private String url;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 客户端编号
*/
private String clientId;
/**
* 主题
*/
private String topic;
}
/**
* RocketMQ 配置
*/
@Data
public static class RocketMQConfig implements Config {
/**
* RocketMQ 名称服务器地址
*/
private String nameServer;
/**
* 访问密钥
*/
private String accessKey;
/**
* 秘密钥匙
*/
private String secretKey;
/**
* 生产者组
*/
private String group;
/**
* 主题
*/
private String topic;
/**
* 标签
*/
private String tags;
}
/**
* Kafka 配置
*/
@Data
public static class KafkaMQConfig implements Config {
/**
* Kafka 服务器地址
*/
private String bootstrapServers;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 是否启用 SSL
*/
private Boolean ssl;
/**
* 主题
*/
private String topic;
}
/**
* RabbitMQ 配置
*/
@Data
public static class RabbitMQConfig implements Config {
/**
* RabbitMQ 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 虚拟主机
*/
private String virtualHost;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 队列名称
*/
private String queue;
}
/**
* Redis Stream MQ 配置
*/
@Data
public static class RedisStreamMQConfig implements Config {
/**
* Redis 服务器地址
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 密码
*/
private String password;
/**
* 数据库索引
*/
private Integer database;
/**
* 主题
*/
private String topic;
}
private IotDataBridgeConfig config;
}

View File

@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeHttpConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@ -25,19 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_
*/
@Component
@Slf4j
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBridgeDO.HttpConfig> {
public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBridgeHttpConfig> {
@Resource
private RestTemplate restTemplate;
@Override
public Integer getType() {
return IotDataBridgTypeEnum.HTTP.getType();
return IotDataBridgeTypeEnum.HTTP.getType();
}
@Override
@SuppressWarnings({"unchecked", "deprecation"})
public void execute0(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) {
public void execute0(IotDeviceMessage message, IotDataBridgeHttpConfig config) {
String url = null;
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
HttpEntity<String> requestEntity = null;

View File

@ -1,7 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeKafkaMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -26,17 +27,17 @@ import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class IotKafkaMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.KafkaMQConfig, KafkaTemplate<String, String>> {
AbstractCacheableDataBridgeExecute<IotDataBridgeKafkaMQConfig, KafkaTemplate<String, String>> {
private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间
@Override
public Integer getType() {
return IotDataBridgTypeEnum.KAFKA.getType();
return IotDataBridgeTypeEnum.KAFKA.getType();
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataBridgeKafkaMQConfig config) throws Exception {
// 1. 获取或创建 KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = getProducer(config);
@ -47,7 +48,7 @@ public class IotKafkaMQDataBridgeExecute extends
}
@Override
protected KafkaTemplate<String, String> initProducer(IotDataBridgeDO.KafkaMQConfig config) {
protected KafkaTemplate<String, String> initProducer(IotDataBridgeKafkaMQConfig config) {
// 1.1 构建生产者配置
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
@ -82,7 +83,7 @@ public class IotKafkaMQDataBridgeExecute extends
IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig();
IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig();
config.setBootstrapServers("127.0.0.1:9092");
config.setTopic("test-topic");
config.setSsl(false);

View File

@ -1,7 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRabbitMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@ -22,16 +23,16 @@ import java.time.LocalDateTime;
@Component
@Slf4j
public class IotRabbitMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RabbitMQConfig, Channel> {
AbstractCacheableDataBridgeExecute<IotDataBridgeRabbitMQConfig, Channel> {
@Override
public Integer getType() {
return IotDataBridgTypeEnum.RABBITMQ.getType();
return IotDataBridgeTypeEnum.RABBITMQ.getType();
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataBridgeRabbitMQConfig config) throws Exception {
// 1. 获取或创建 Channel
Channel channel = getProducer(config);
@ -48,7 +49,7 @@ public class IotRabbitMQDataBridgeExecute extends
@Override
@SuppressWarnings("resource")
protected Channel initProducer(IotDataBridgeDO.RabbitMQConfig config) throws Exception {
protected Channel initProducer(IotDataBridgeRabbitMQConfig config) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(config.getHost());
@ -81,7 +82,7 @@ public class IotRabbitMQDataBridgeExecute extends
IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig();
IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig();
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("/");

View File

@ -2,8 +2,9 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@ -30,15 +31,15 @@ import java.time.LocalDateTime;
@Component
@Slf4j
public class IotRedisStreamMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RedisStreamMQConfig, RedisTemplate<String, Object>> {
AbstractCacheableDataBridgeExecute<IotDataBridgeRedisStreamMQConfig, RedisTemplate<String, Object>> {
@Override
public Integer getType() {
return IotDataBridgTypeEnum.REDIS_STREAM.getType();
return IotDataBridgeTypeEnum.REDIS_STREAM.getType();
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamMQConfig config) throws Exception {
// 1. 获取 RedisTemplate
RedisTemplate<String, Object> redisTemplate = getProducer(config);
@ -50,7 +51,7 @@ public class IotRedisStreamMQDataBridgeExecute extends
}
@Override
protected RedisTemplate<String, Object> initProducer(IotDataBridgeDO.RedisStreamMQConfig config) {
protected RedisTemplate<String, Object> initProducer(IotDataBridgeRedisStreamMQConfig config) {
// 1.1 创建 Redisson 配置
Config redissonConfig = new Config();
SingleServerConfig serverConfig = redissonConfig.useSingleServer()
@ -101,7 +102,7 @@ public class IotRedisStreamMQDataBridgeExecute extends
IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig();
IotDataBridgeRedisStreamMQConfig config = new IotDataBridgeRedisStreamMQConfig();
config.setHost("127.0.0.1");
config.setPort(6379);
config.setDatabase(0);

View File

@ -1,7 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRocketMQConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@ -23,15 +24,15 @@ import java.time.LocalDateTime;
@Component
@Slf4j
public class IotRocketMQDataBridgeExecute extends
AbstractCacheableDataBridgeExecute<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> {
AbstractCacheableDataBridgeExecute<IotDataBridgeRocketMQConfig, DefaultMQProducer> {
@Override
public Integer getType() {
return IotDataBridgTypeEnum.ROCKETMQ.getType();
return IotDataBridgeTypeEnum.ROCKETMQ.getType();
}
@Override
public void execute0(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) throws Exception {
public void execute0(IotDeviceMessage message, IotDataBridgeRocketMQConfig config) throws Exception {
// 1. 获取或创建 Producer
DefaultMQProducer producer = getProducer(config);
@ -52,7 +53,7 @@ public class IotRocketMQDataBridgeExecute extends
}
@Override
protected DefaultMQProducer initProducer(IotDataBridgeDO.RocketMQConfig config) throws Exception {
protected DefaultMQProducer initProducer(IotDataBridgeRocketMQConfig config) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
producer.setNamesrvAddr(config.getNameServer());
producer.start();
@ -70,7 +71,7 @@ public class IotRocketMQDataBridgeExecute extends
IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
// 2. 创建共享的配置
IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig();
config.setNameServer("127.0.0.1:9876");
config.setGroup("test-group");
config.setTopic("test-topic");