Merge branch 'feature/iot' of https://gitee.com/zhijiantianya/ruoyi-vue-pro into feature/iot

 Conflicts:
	yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/tdengine/IotDeviceLogDataMapper.java
	yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceLogDataService.java
	yudao-server/src/main/resources/application-local.yaml
This commit is contained in:
alwayssuper 2025-01-10 20:46:43 +08:00
commit f1d887d0e0
75 changed files with 1721 additions and 622 deletions

1
.gitignore vendored
View File

@ -51,3 +51,4 @@ rebel.xml
application-my.yaml
/yudao-ui-app/unpackage/
**/.DS_Store

View File

View File

@ -1,2 +0,0 @@
http-plugin
http-plugin@0.0.1

View File

@ -67,6 +67,7 @@
<bizlog-sdk.version>3.0.6</bizlog-sdk.version>
<mqtt.version>1.2.5</mqtt.version>
<pf4j-spring.version>0.9.0</pf4j-spring.version>
<vertx.version>4.4.0</vertx.version>
<!-- 三方云服务相关 -->
<okio.version>3.5.0</okio.version>
<okhttp3.version>4.11.0</okhttp3.version>
@ -613,6 +614,19 @@
<version>${pf4j-spring.version}</version>
</dependency>
<!-- Vert.x 核心依赖 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<!-- Vert.x Web 模块 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>${vertx.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -33,6 +33,13 @@
</exclusion>
</exclusions>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.api;
import java.util.HashMap;
import java.util.Map;
// TODO 芋艿纠结下
/**
* 服务注册表 - 插架模块使用无法使用 Spring 注入
*/

View File

@ -1,17 +1,20 @@
package cn.iocoder.yudao.module.iot.api.device;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import jakarta.validation.Valid;
/**
* 设备数据 API
*
* @author haohao
*/
public interface DeviceDataApi {
/**
* 保存设备数据
*
* @param productKey 产品 key
* @param deviceName 设备名称
* @param message 消息
* @param createDTO 设备数据
*/
void saveDeviceData(String productKey, String deviceName, String message);
void saveDeviceData(@Valid DeviceDataCreateReqDTO createDTO);
}

View File

@ -0,0 +1,31 @@
package cn.iocoder.yudao.module.iot.api.device.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import jakarta.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class DeviceDataCreateReqDTO {
/**
* 产品标识
*/
@NotNull(message = "产品标识不能为空")
private String productKey;
/**
* 设备名称
*/
@NotNull(message = "设备名称不能为空")
private String deviceName;
/**
* 消息
*/
@NotNull(message = "消息不能为空")
private String message;
}

View File

@ -13,8 +13,8 @@ import java.util.Arrays;
@Getter
public enum IotPluginDeployTypeEnum implements IntArrayValuable {
UPLOAD(0, "上传 jar"), // TODO @haohaoUPLOAD ALONE 感觉有点冲突前者是部署方式后者是运行方式这个后续再讨论下哈
ALONE(1, "独立运行");
JAR(0, "JAR 部署"),
STANDALONE(1, "独立部署");
public static final int[] ARRAYS = Arrays.stream(values()).mapToInt(IotPluginDeployTypeEnum::getDeployType).toArray();
@ -48,4 +48,5 @@ public enum IotPluginDeployTypeEnum implements IntArrayValuable {
public int[] array() {
return ARRAYS;
}
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.module.iot.mqttrpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
// TODO @芋艿要不要加个 mqtt 值了的前缀
/**
* MQTT RPC 请求
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest {
/**
* 方法名
*/
private String method;
/**
* 参数
*/
// TODO @haohaoobject 对象会不会不好序列化
private Object[] params;
/**
* 关联 ID
*/
private String correlationId;
/**
* 回复地址
*/
private String replyTo;
}

View File

@ -0,0 +1,33 @@
package cn.iocoder.yudao.module.iot.mqttrpc.common;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* MQTT RPC 响应
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcResponse {
/**
* 关联 ID
*/
private String correlationId;
/**
* 结果
*/
// TODO @haohaoobject 对象会不会不好反序列化
private Object result;
/**
* 错误
*/
private String error;
}

View File

@ -0,0 +1,19 @@
package cn.iocoder.yudao.module.iot.mqttrpc.common;
import cn.hutool.json.JSONUtil;
/**
* 序列化工具类
*
*/
public class SerializationUtils {
public static String serialize(Object obj) {
return JSONUtil.toJsonStr(obj);
}
public static <T> T deserialize(String json, Class<T> clazz) {
return JSONUtil.toBean(json, clazz);
}
}

View File

@ -64,6 +64,16 @@
<artifactId>yudao-spring-boot-starter-excel</artifactId>
</dependency>
<!-- Vert.x 核心依赖 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<!-- Vert.x Web 模块 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.api.device;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@ -17,8 +18,8 @@ public class DeviceDataApiImpl implements DeviceDataApi {
private IotDevicePropertyDataService deviceDataService;
@Override
public void saveDeviceData(String productKey, String deviceName, String message) {
deviceDataService.saveDeviceData(productKey, deviceName, message);
public void saveDeviceData(DeviceDataCreateReqDTO createDTO) {
deviceDataService.saveDeviceData(createDTO);
}
}

View File

@ -3,19 +3,15 @@ package cn.iocoder.yudao.module.iot.controller.admin.device;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.IotDeviceSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataSimulatorSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.*;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDataDO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotTimeDataRespVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceLogDataService;
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@ -36,6 +32,9 @@ public class IotDeviceDataController {
@Resource
private IotDeviceLogDataService iotDeviceLogDataService;
@Resource // TODO @superservice 之间不用空行原因是这样更简洁空行主要是为了间隔提升可读性
private IotDeviceLogDataService deviceLogDataService;
// TODO @浩浩这里的 /latest-list包括方法名
@GetMapping("/latest")
@Operation(summary = "获取设备属性最新数据")
@ -52,12 +51,22 @@ public class IotDeviceDataController {
return success(BeanUtils.toBean(list, IotTimeDataRespVO.class));
}
// TODO:数据权限
@PostMapping("/simulator")
@Operation(summary = "模拟设备")
public CommonResult<Boolean> simulatorDevice(@Valid @RequestBody IotDeviceDataSimulatorSaveReqVO simulatorReqVO) {
//TODO:先生成一下日志 后续完善模拟设备代码逻辑
//TODO:先生成一下设备日志 后续完善模拟设备代码逻辑
// TODO @super应该 deviceDataService 里面有个 simulatorDevice然后里面去 insert 日志
iotDeviceLogDataService.createDeviceLog(simulatorReqVO);
return success(true);
}
// TODO:数据权限
@GetMapping("/log/page")
@Operation(summary = "获得设备日志分页")
public CommonResult<PageResult<IotDeviceLogRespVO>> getDeviceLogPage(@Valid IotDeviceLogPageReqVO pageReqVO) {
PageResult<IotDeviceLogDO> pageResult = deviceLogDataService.getDeviceLogPage(pageReqVO);
return success(BeanUtils.toBean(pageResult, IotDeviceLogRespVO.class));
}
}

View File

@ -14,7 +14,7 @@ public class IotDeviceSaveReqVO {
private Long id;
@Schema(description = "设备编号", requiredMode = Schema.RequiredMode.AUTO, example = "177")
@Size(max = 50, message = "设备编号长度不能超过50个字符")
@Size(max = 50, message = "设备编号长度不能超过 50 个字符")
private String deviceKey;
@Schema(description = "设备名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "王五")

View File

@ -4,24 +4,26 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
import java.time.LocalDateTime;
// TODO super SaveReqVO => ReqVO
@Schema(description = "管理后台 - IoT 模拟设备数据 Request VO")
@Data
public class IotDeviceDataSimulatorSaveReqVO {
@Schema(description = "消息ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "msg123")
// TODO @super感觉后端随机更合适
@Schema(description = "消息 ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "msg123")
private String id;
// TODO @super不用传递 productKey因为 deviceKey 可以推导出来
@Schema(description = "产品ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "product123")
@NotEmpty(message = "产品ID不能为空")
private String productKey;
// TODO @super中文写作规范中英文之间要有空格例如说设备 IDps这里应该是设备标识
@Schema(description = "设备ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "device123")
@NotEmpty(message = "设备ID不能为空")
private String deviceKey;
// TODO @supertypesubType是不是不用传递因为模拟只有属性
@Schema(description = "消息/日志类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "property")
@NotEmpty(message = "消息类型不能为空")
private String type;
@ -34,7 +36,8 @@ public class IotDeviceDataSimulatorSaveReqVO {
@NotEmpty(message = "数据内容不能为空")
private String content;
// TODO @芋艿需要讨论下reportTime 到底以那个为准
@Schema(description = "上报时间", requiredMode = Schema.RequiredMode.REQUIRED)
private LocalDateTime reportTime;
private Long reportTime;
}

View File

@ -0,0 +1,33 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND;
@Schema(description = "管理后台 - IoT 设备日志分页查询 Request VO")
@Data
public class IotDeviceLogPageReqVO extends PageParam {
@Schema(description = "设备标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "device123")
@NotEmpty(message = "设备标识不能为空")
private String deviceKey;
// TODO @super对应的枚举类
@Schema(description = "消息类型", example = "property")
private String type;
@Schema(description = "标识符", example = "temperature")
// TODO @super对应的枚举类
private String subType;
@Schema(description = "创建时间")
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
private LocalDateTime[] createTime;
}

View File

@ -0,0 +1,36 @@
package cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
@Schema(description = "管理后台 - IoT 设备日志 Response VO")
@Data
public class IotDeviceLogRespVO {
@Schema(description = "日志编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "1024")
private String id;
@Schema(description = "产品标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "product123")
private String productKey;
@Schema(description = "设备标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "device123")
private String deviceKey;
@Schema(description = "消息类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "property")
private String type;
@Schema(description = "标识符", requiredMode = Schema.RequiredMode.REQUIRED, example = "temperature")
private String subType;
@Schema(description = "日志内容", requiredMode = Schema.RequiredMode.REQUIRED)
private String content;
@Schema(description = "上报时间", requiredMode = Schema.RequiredMode.REQUIRED)
private LocalDateTime reportTime;
@Schema(description = "记录时间戳", requiredMode = Schema.RequiredMode.REQUIRED)
private LocalDateTime ts;
}

View File

@ -9,7 +9,7 @@ import org.springframework.web.multipart.MultipartFile;
@Data
public class PluginInfoImportReqVO {
@Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
@Schema(description = "主键 ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
private Long id;
@Schema(description = "插件文件", requiredMode = Schema.RequiredMode.REQUIRED)

View File

@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.iot.controller.admin.plugin.vo;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@ -11,7 +13,8 @@ public class PluginInfoPageReqVO extends PageParam {
@Schema(description = "插件名称", example = "http")
private String name;
@Schema(description = "状态")
@Schema(description = "状态", example = "1")
@InEnum(IotPluginStatusEnum.class)
private Integer status;
}

View File

@ -1,7 +1,5 @@
package cn.iocoder.yudao.module.iot.controller.admin.plugin.vo;
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
import com.alibaba.excel.annotation.ExcelProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@ -9,63 +7,48 @@ import java.time.LocalDateTime;
@Schema(description = "管理后台 - IoT 插件信息 Response VO")
@Data
@ExcelIgnoreUnannotated
public class PluginInfoRespVO {
@Schema(description = "主键 ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
@ExcelProperty("主键 ID")
private Long id;
@Schema(description = "插件包标识符", requiredMode = Schema.RequiredMode.REQUIRED, example = "24627")
@ExcelProperty("插件包标识符")
private String pluginKey;
@Schema(description = "插件名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六")
@ExcelProperty("插件名称")
private String name;
@Schema(description = "描述", example = "你猜")
@ExcelProperty("描述")
private String description;
@Schema(description = "部署方式", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty("部署方式")
private Integer deployType;
@Schema(description = "插件包文件名", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("插件包文件名")
private String fileName;
@Schema(description = "插件版本", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("插件版本")
private String version;
@Schema(description = "插件类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "2")
@ExcelProperty("插件类型")
private Integer type;
@Schema(description = "设备插件协议类型")
@ExcelProperty("设备插件协议类型")
private String protocol;
@Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("状态")
private Integer status;
@Schema(description = "插件配置项描述信息")
@ExcelProperty("插件配置项描述信息")
private String configSchema;
@Schema(description = "插件配置信息")
@ExcelProperty("插件配置信息")
private String config;
@Schema(description = "插件脚本")
@ExcelProperty("插件脚本")
private String script;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("创建时间")
private LocalDateTime createTime;
}

View File

@ -1,12 +1,18 @@
package cn.iocoder.yudao.module.iot.controller.admin.plugin.vo;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.Data;
@Schema(description = "管理后台 - IoT 插件信息新增/修改 Request VO")
@Data
public class PluginInfoSaveReqVO {
// TODO @haohao新增的字段有点多每个都需要哇
// TODO @haohao一些枚举字段需要加枚举校验例如说deployTypestatustype
@Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "11546")
private Long id;
@ -35,6 +41,7 @@ public class PluginInfoSaveReqVO {
private String protocol;
@Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED)
@InEnum(IotPluginStatusEnum.class)
private Integer status;
@Schema(description = "插件配置项描述信息")

View File

@ -74,6 +74,7 @@ public class IotThingModelController {
return success(IotThingModelConvert.INSTANCE.convertList(list));
}
// TODO @puhui @supergetThingModelListByProductId getThingModelListByProductId 可以融合么
@GetMapping("/list")
@Operation(summary = "获得产品物模型列表")
@PreAuthorize("@ss.hasPermission('iot:thing-model:query')")

View File

@ -6,11 +6,10 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@Schema(description = "管理后台 - IoT 产品物模型List Request VO")
@Data
public class IotThingModelListReqVO {
@Schema(description = "功能标识")
private String identifier;
@ -21,7 +20,8 @@ public class IotThingModelListReqVO {
@InEnum(IotThingModelTypeEnum.class)
private Integer type;
@Schema(description = "产品ID", requiredMode = Schema.RequiredMode.REQUIRED)
@NotNull(message = "产品ID不能为空")
@Schema(description = "产品 ID", requiredMode = Schema.RequiredMode.REQUIRED)
@NotNull(message = "产品 ID 不能为空")
private Long productId;
}

View File

@ -1,16 +1,15 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.device;
import cn.hutool.core.date.DateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* IoT 设备日志数据 DO
*
* 目前使用 TDengine 存储
*
* @author alwayssuper
*/
@Data
@ -18,45 +17,52 @@ import java.time.LocalDateTime;
@NoArgsConstructor
@AllArgsConstructor
public class IotDeviceLogDO {
// TODO @芋艿消息 ID 的生成逻辑
/**
* 消息ID
* 消息 ID
*/
private String id;
// TODO @super关联要 @下
/**
* 产品ID
* 产品标识
*/
private String productKey;
// TODO @super关联要 @下
/**
* 设备ID
* 设备标识
*/
private String deviceKey;
// TODO @super枚举类
/**
* 消息/日志类型
* 日志类型
*/
private String type;
// TODO @super枚举类
/**
* 标识符用于标识具体的属性事件或服务
*/
private String subType;
/**
* 数据内容存储具体的消息数据内容通常是JSON格式
* 数据内容
*
* 存储具体的消息数据内容通常是 JSON 格式
*/
private String content;
/**
* 上报时间戳
*/
private DateTime reportTime;
private Long reportTime;
/**
* 时序时间
*/
private DateTime ts;
private Long ts;
}

View File

@ -33,19 +33,22 @@ public class PluginInstanceDO extends BaseDO {
*/
private String mainId;
/**
* 插件id
* 插件 ID
* <p>
* 关联 {@link PluginInfoDO#getId()}
*/
private Long pluginId;
/**
* 插件主程序所在ip
* 插件主程序所在 IP
*/
private String ip;
/**
* 插件主程序端口
*/
private Integer port;
// TODO @haohao字段改成 heartbeatTimeLocalDateTime
/**
* 心跳时间心路时间超过 30 秒需要剔除
*/

View File

@ -6,7 +6,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
// TODO @芋艿纠结下字段
@Deprecated
@Deprecated // TODO @super看看啥时候删除下哈
/**
* TD 物模型消息日志的数据库
*/
@ -25,7 +25,7 @@ public class ThingModelMessageDO {
/**
* 系统扩展参数
*
*
* 例如设备状态系统时间固件版本等系统级信息
*/
private Object system;

View File

@ -4,6 +4,9 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
import java.util.List;
import org.apache.ibatis.annotations.Mapper;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.*;
@ -22,4 +25,10 @@ public interface PluginInfoMapper extends BaseMapperX<PluginInfoDO> {
.orderByDesc(PluginInfoDO::getId));
}
default List<PluginInfoDO> selectListByStatus(Integer status) {
return selectList(new LambdaQueryWrapperX<PluginInfoDO>()
.eq(PluginInfoDO::getStatus, status)
.orderByAsc(PluginInfoDO::getId));
}
}

View File

@ -21,6 +21,7 @@ public interface PluginInstanceMapper extends BaseMapperX<PluginInstanceDO> {
.eq(PluginInstanceDO::getPluginId, pluginId));
}
// TODO @haohao这个还需要么相关不用的 VO 可以删除
default PageResult<PluginInstanceDO> selectPage(PluginInstancePageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<PluginInstanceDO>()
.eqIfPresent(PluginInstanceDO::getMainId, reqVO.getMainId())

View File

@ -1,15 +1,18 @@
package cn.iocoder.yudao.module.iot.dal.tdengine;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceLogPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO;
import cn.iocoder.yudao.module.iot.framework.tdengine.core.annotation.TDengineDS;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* IoT 设备日志 Mapper
* IOT 设备日志数据 Mapper 接口
*
* @author alwayssuper
* 基于 TDengine 实现设备日志的存储
*/
@Mapper
@TDengineDS
@ -18,22 +21,58 @@ public interface IotDeviceLogDataMapper {
/**
* 创建设备日志超级表
<<<<<<< HEAD
* 初始化只创建一次
*/
void createDeviceLogSTable();
=======
*
* 注意初始化时只需创建一次
*/
void createDeviceLogSTable();
// TODO @super是不是删除哈
>>>>>>> deab8c1cc6bb7864d9c40e0c369f649f6f9bfa41
/**
* 创建设备日志子表
*
* @param deviceKey 设备标识
*/
<<<<<<< HEAD
void createDeviceLogTable( @Param("deviceKey") String deviceKey);
/**
* 插入设备日志数据
*
=======
void createDeviceLogTable(@Param("deviceKey") String deviceKey);
// TODO @super单个参数不用加 @Param
/**
* 插入设备日志数据
*
* 如果子表不存在会自动创建子表
*
>>>>>>> deab8c1cc6bb7864d9c40e0c369f649f6f9bfa41
* @param log 设备日志数据
*/
void insert(@Param("log") IotDeviceLogDO log);
/**
* 获得设备日志分页
*
* @param reqVO 分页查询条件
* @return 设备日志列表
*/
List<IotDeviceLogDO> selectPage(@Param("reqVO") IotDeviceLogPageReqVO reqVO);
/**
* 获得设备日志总数
*
* @param reqVO 查询条件
* @return 日志总数
*/
Long selectCount(@Param("reqVO") IotDeviceLogPageReqVO reqVO);
}

View File

@ -1,10 +1,6 @@
package cn.iocoder.yudao.module.iot.dal.tdengine;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessageDO;
import cn.iocoder.yudao.module.iot.framework.tdengine.core.annotation.TDengineDS;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@ -13,7 +9,7 @@ import org.apache.ibatis.annotations.Param;
* 处理 TD 中物模型消息日志的操作
*/
@Mapper
@Deprecated
@Deprecated // TODO super什么时候删除下哈
@TDengineDS
@InterceptorIgnore(tenantLine = "true") // 避免 SQL 解析因为 JSqlParser TDengine SQL 解析会报错
public interface TdThingModelMessageMapper {

View File

@ -17,7 +17,7 @@ import org.springframework.stereotype.Component;
* @author ahh
*/
@Slf4j
@Component
//@Component
public class EmqxCallback implements MqttCallbackExtended {
@Lazy

View File

@ -19,7 +19,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Data
@Component
//@Component
public class EmqxClient {
@Resource

View File

@ -12,8 +12,8 @@ import org.springframework.stereotype.Component;
* @author ahh
*/
@Data
@Component
@ConfigurationProperties("iot.emq")
//@Component
//@ConfigurationProperties("iot.emq")
public class MqttConfig {
/**

View File

@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.iot.emq.service;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
// TODO @芋艿在瞅瞅
@ -16,7 +16,7 @@ import org.springframework.stereotype.Service;
* @author ahh
*/
@Slf4j
@Service
// @Service
public class EmqxServiceImpl implements EmqxService {
@Resource
@ -34,7 +34,12 @@ public class EmqxServiceImpl implements EmqxService {
String productKey = topic.split("/")[2];
String deviceName = topic.split("/")[3];
String message = new String(mqttMessage.getPayload());
iotDeviceDataService.saveDeviceData(productKey, deviceName, message);
DeviceDataCreateReqDTO createDTO = DeviceDataCreateReqDTO.builder()
.productKey(productKey)
.deviceName(deviceName)
.message(message)
.build();
iotDeviceDataService.saveDeviceData(createDTO);
}
}

View File

@ -13,7 +13,7 @@ import org.springframework.stereotype.Component;
*
* @author ahh
*/
@Component
//@Component
public class EmqxStart implements ApplicationRunner {
@Resource

View File

@ -0,0 +1,51 @@
package cn.iocoder.yudao.module.iot.framework.plugin;
import java.util.List;
import javax.annotation.Resource;
import org.pf4j.spring.SpringPluginManager;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
@Component
@Slf4j
public class PluginStart implements ApplicationRunner {
@Resource
private PluginInfoService pluginInfoService;
@Resource
private SpringPluginManager pluginManager;
@Override
public void run(ApplicationArguments args) {
TenantUtils.executeIgnore(() -> { // 1. 忽略租户上下文执行
List<PluginInfoDO> pluginInfoList = pluginInfoService
.getPluginInfoListByStatus(IotPluginStatusEnum.RUNNING.getStatus()); // 2. 获取运行中的插件列表
if (CollUtil.isEmpty(pluginInfoList)) { // 3. 检查插件列表是否为空
log.info("[run] 没有需要启动的插件"); // 4. 日志记录没有插件需要启动
return;
}
pluginInfoList.forEach(pluginInfo -> { // 5. 使用lambda表达式遍历插件列表
try {
log.info("[run][启动插件] pluginKey = {}", pluginInfo.getPluginKey()); // 6. 日志记录插件启动信息
pluginManager.startPlugin(pluginInfo.getPluginKey()); // 7. 启动插件
} catch (Exception e) {
log.error("[run][启动插件失败] pluginKey = {}", pluginInfo.getPluginKey(), e); // 8. 记录启动失败的日志
}
});
});
}
}

View File

@ -31,7 +31,13 @@ public class UnifiedConfiguration {
@DependsOn(SERVICE_REGISTRY_INITIALIZED_MARKER)
public SpringPluginManager pluginManager() {
log.info("[init][实例化 SpringPluginManager]");
SpringPluginManager springPluginManager = new SpringPluginManager();
SpringPluginManager springPluginManager = new SpringPluginManager() {
@Override
public void startPlugins() {
// 禁用插件启动避免插件启动时启动所有插件
log.info("[init][禁用默认启动所有插件]");
}
};
springPluginManager.addPluginStateListener(new CustomPluginStateListener());
return springPluginManager;
}

View File

@ -16,7 +16,7 @@ import org.springframework.core.annotation.Order;
@Slf4j
@RequiredArgsConstructor
@Configuration
@Order(Integer.MAX_VALUE) // 保证在最后执行
@Order
public class TDengineTableInitConfiguration implements ApplicationRunner {
private final IotDeviceLogDataService deviceLogService;
@ -26,15 +26,18 @@ public class TDengineTableInitConfiguration implements ApplicationRunner {
try {
// 初始化设备日志表
deviceLogService.initTDengineSTable();
log.info("初始化 设备日志表 TDengine 表结构成功");
// TODO @super这个日志是不是不用打不然重复啦
log.info("[run]初始化 设备日志表 TDengine 表结构成功");
} catch (Exception ex) {
// TODO @super初始化失败打印 error 日志退出系统不然跑起来就初始啦
if (ex.getMessage().contains("Table already exists")) {
log.info("TDengine 设备日志超级表已存在,跳过创建");
return;
}else{
} else{
log.error("初始化 设备日志表 TDengine 表结构失败", ex);
}
throw ex;
}
}
}

View File

@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.job.plugin;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.service.plugin.PluginInstanceService;
import org.springframework.scheduling.annotation.Scheduled;
@ -23,7 +22,8 @@ public class PluginInstancesJob {
@Scheduled(initialDelay = 60, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
public void updatePluginInstances() {
TenantUtils.executeIgnore(() -> {
pluginInstanceService.updatePluginInstances();
pluginInstanceService.reportPluginInstances();
});
}
}

View File

@ -0,0 +1,40 @@
package cn.iocoder.yudao.module.iot.mqttrpc.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
/**
* MQTT 代理地址
*/
private String broker;
/**
* MQTT 用户名
*/
private String username;
/**
* MQTT 密码
*/
private String password;
/**
* MQTT 客户端 ID
*/
private String clientId;
/**
* MQTT 请求主题
*/
private String requestTopic;
/**
* MQTT 响应主题前缀
*/
private String responseTopicPrefix;
}

View File

@ -0,0 +1,100 @@
package cn.iocoder.yudao.module.iot.mqttrpc.server;
import cn.hutool.core.lang.UUID;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse;
import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils;
import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
// TODO @芋艿server 逻辑再瞅瞅
// TODO @haohao如果只写在 iot biz 那么后续 server => client 貌似不方便微信再讨论下~
@Service
@Slf4j
public class RpcServer {
private final MqttConfig mqttConfig;
private final MqttClient mqttClient;
private final Map<String, MethodInvoker> methodRegistry = new HashMap<>();
public RpcServer(MqttConfig mqttConfig) throws MqttException {
this.mqttConfig = mqttConfig;
this.mqttClient = new MqttClient(mqttConfig.getBroker(), "rpc-server-" + UUID.randomUUID(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
this.mqttClient.connect(options);
}
@PostConstruct
public void init() throws MqttException {
mqttClient.subscribe(mqttConfig.getRequestTopic(), this::handleRequest);
log.info("RPC Server subscribed to topic: {}", mqttConfig.getRequestTopic());
}
private void handleRequest(String topic, MqttMessage message) {
RpcRequest request = SerializationUtils.deserialize(new String(message.getPayload()), RpcRequest.class);
RpcResponse response = new RpcResponse();
response.setCorrelationId(request.getCorrelationId());
try {
MethodInvoker invoker = methodRegistry.get(request.getMethod());
if (invoker == null) {
throw new NoSuchMethodException("Unknown method: " + request.getMethod());
}
Object result = invoker.invoke(request.getParams());
response.setResult(result);
} catch (Exception e) {
response.setError(e.getMessage());
log.error("Error processing RPC request: {}", e.getMessage(), e);
}
String replyPayload = SerializationUtils.serialize(response);
MqttMessage replyMessage = new MqttMessage(replyPayload.getBytes());
replyMessage.setQos(1);
try {
mqttClient.publish(request.getReplyTo(), replyMessage);
log.info("Published response to {}", request.getReplyTo());
} catch (MqttException e) {
log.error("Failed to publish response: {}", e.getMessage(), e);
}
}
/**
* 注册可调用的方法
*
* @param methodName 方法名称
* @param invoker 方法调用器
*/
public void registerMethod(String methodName, MethodInvoker invoker) {
methodRegistry.put(methodName, invoker);
log.info("Registered method: {}", methodName);
}
@PreDestroy
public void cleanup() throws MqttException {
mqttClient.disconnect();
log.info("RPC Server disconnected");
}
/**
* 方法调用器接口
*/
@FunctionalInterface
public interface MethodInvoker {
Object invoke(Object[] params) throws Exception;
}
}

View File

@ -1,6 +1,9 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataSimulatorSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceLogPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO;
/**
* IoT 设备日志数据 Service 接口
@ -10,13 +13,27 @@ import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDevi
public interface IotDeviceLogDataService {
/**
* 初始化 TDengine
* 初始化 TDengine 超级表
*
*系统启动时会自动初始化一次
*/
void initTDengineSTable();
/**
* 模拟设备创建设备日志
* @param simulatorReqVO 模拟设备信息
* 插入设备日志
*
* 当该设备第一次插入日志时自动创建该设备的设备日志子表
*
* @param simulatorReqVO 设备日志模拟数据
*/
void createDeviceLog(IotDeviceDataSimulatorSaveReqVO simulatorReqVO);
/**
* 获得设备日志分页
*
* @param pageReqVO 分页查询
* @return 设备日志分页
*/
PageResult<IotDeviceLogDO> getDeviceLogPage(IotDeviceLogPageReqVO pageReqVO);
}

View File

@ -1,8 +1,9 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.hutool.core.date.DateTime;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataSimulatorSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceLogPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDeviceLogDataMapper;
import jakarta.annotation.Resource;
@ -10,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.List;
/**
* IoT 设备日志数据 Service 实现了
@ -19,27 +20,43 @@ import java.time.LocalDateTime;
*/
@Service
@Slf4j
@Validated
public class IotDeviceLogDataServiceImpl implements IotDeviceLogDataService{
@Resource
private IotDeviceLogDataMapper iotDeviceLogDataMapper;
// TODO @super方法名defineDeviceLog未来有可能别人使用别的记录日志例如说 es 之类的
@Override
public void initTDengineSTable() {
try {
// 创建设备日志超级表
iotDeviceLogDataMapper.createDeviceLogSTable();
log.info("创建设备日志超级表成功");
} catch (Exception ex) {
throw ex;
}
// TODO @super改成不存在才创建
iotDeviceLogDataMapper.createDeviceLogSTable();
}
@Override
public void createDeviceLog(IotDeviceDataSimulatorSaveReqVO simulatorReqVO) {
// 1. 转换请求对象为 DO
IotDeviceLogDO iotDeviceLogDO = BeanUtils.toBean(simulatorReqVO, IotDeviceLogDO.class);
iotDeviceLogDO.setTs(DateTime.now());
// 2. 处理时间字段
// TODO @super一次性的字段不用单独给个变量
long currentTime = System.currentTimeMillis();
// 2.1 设置时序时间为当前时间
iotDeviceLogDO.setTs(currentTime); // TODO @superTS在SQL中直接NOW 咱们的TS数据获取是走哪一种 now()
// 3. 插入数据
// TODO @super不要直接调用对方的 IotDeviceLogDataMapper通过 service
iotDeviceLogDataMapper.insert(iotDeviceLogDO);
}
// TODO @super iotDeviceLogDataService
@Override
public PageResult<IotDeviceLogDO> getDeviceLogPage(IotDeviceLogPageReqVO pageReqVO) {
// 查询数据
List<IotDeviceLogDO> list = iotDeviceLogDataMapper.selectPage(pageReqVO);
Long total = iotDeviceLogDataMapper.selectCount(pageReqVO);
// 构造分页结果
return new PageResult<>(list, total);
}
}

View File

@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataPageReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDataDO;
import jakarta.validation.Valid;
@ -25,12 +26,9 @@ public interface IotDevicePropertyDataService {
/**
* 保存设备数据
*
* @param productKey 产品 key
* @param deviceName 设备名称
* @param message 消息
* <p>参见 <a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.3a3335aeUdzkz2#concept-mvc-4tw-y2b">JSON 格式</a>
* @param createDTO 设备数据
*/
void saveDeviceData(String productKey, String deviceName, String message);
void saveDeviceData(DeviceDataCreateReqDTO createDTO);
/**
* 获得设备属性最新数据

View File

@ -6,6 +6,7 @@ import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.model.dataType.ThingModelDateOrTextDataSpecs;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
@ -14,10 +15,9 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.SelectVisualDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyDataMapper;
import cn.iocoder.yudao.module.iot.dal.redis.deviceData.DeviceDataRedisDAO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyDataMapper;
import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDMLMapper;
import cn.iocoder.yudao.module.iot.dal.tdengine.TdThingModelMessageMapper;
import cn.iocoder.yudao.module.iot.enums.IotConstants;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotThingModelTypeEnum;
@ -57,7 +57,7 @@ public class IotDevicePropertyDataServiceImpl implements IotDevicePropertyDataSe
.put(IotDataSpecsDataTypeEnum.FLOAT.getDataType(), TDengineTableField.TYPE_FLOAT)
.put(IotDataSpecsDataTypeEnum.DOUBLE.getDataType(), TDengineTableField.TYPE_DOUBLE)
.put(IotDataSpecsDataTypeEnum.ENUM.getDataType(), TDengineTableField.TYPE_TINYINT) // TODO 芋艿为什么要映射为 TINYINT 的说明
.put( IotDataSpecsDataTypeEnum.BOOL.getDataType(), TDengineTableField.TYPE_TINYINT) // TODO 芋艿为什么要映射为 TINYINT 的说明
.put(IotDataSpecsDataTypeEnum.BOOL.getDataType(), TDengineTableField.TYPE_TINYINT) // TODO 芋艿为什么要映射为 TINYINT 的说明
.put(IotDataSpecsDataTypeEnum.TEXT.getDataType(), TDengineTableField.TYPE_NCHAR)
.put(IotDataSpecsDataTypeEnum.DATE.getDataType(), TDengineTableField.TYPE_TIMESTAMP)
.put(IotDataSpecsDataTypeEnum.STRUCT.getDataType(), TDengineTableField.TYPE_NCHAR) // TODO 芋艿怎么映射
@ -110,7 +110,6 @@ public class IotDevicePropertyDataServiceImpl implements IotDevicePropertyDataSe
return;
}
newFields.add(0, new TDengineTableField(TDengineTableField.FIELD_TS, TDengineTableField.TYPE_TIMESTAMP));
// 2.1.1 创建产品超级表
devicePropertyDataMapper.createProductPropertySTable(product.getProductKey(), newFields);
return;
}
@ -131,20 +130,20 @@ public class IotDevicePropertyDataServiceImpl implements IotDevicePropertyDataSe
}
@Override
public void saveDeviceData(String productKey, String deviceName, String message) {
public void saveDeviceData(DeviceDataCreateReqDTO createDTO) {
// 1. 根据产品 key 和设备名称获得设备信息
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceName(productKey, deviceName);
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceName(createDTO.getProductKey(), createDTO.getDeviceName());
// 2. 解析消息保存数据
JSONObject jsonObject = new JSONObject(message);
log.info("[saveDeviceData][productKey({}) deviceName({}) data({})]", productKey, deviceName, jsonObject);
JSONObject jsonObject = new JSONObject(createDTO.getMessage());
log.info("[saveDeviceData][productKey({}) deviceName({}) data({})]", createDTO.getProductKey(), createDTO.getDeviceName(), jsonObject);
ThingModelMessage thingModelMessage = ThingModelMessage.builder()
.id(jsonObject.getStr("id"))
.sys(jsonObject.get("sys"))
.method(jsonObject.getStr("method"))
.params(jsonObject.get("params"))
.time(jsonObject.getLong("time") == null ? System.currentTimeMillis() : jsonObject.getLong("time"))
.productKey(productKey)
.deviceName(deviceName)
.productKey(createDTO.getProductKey())
.deviceName(createDTO.getDeviceName())
.deviceKey(device.getDeviceKey())
.build();
thingModelMessageService.saveThingModelMessage(device, thingModelMessage);

View File

@ -0,0 +1,43 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.iocoder.yudao.module.iot.mqttrpc.server.RpcServer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
@RequiredArgsConstructor
public class ExampleService {
private final RpcServer rpcServer;
@PostConstruct
public void registerMethods() {
rpcServer.registerMethod("add", params -> {
if (params.length != 2) {
throw new IllegalArgumentException("add方法需要两个参数");
}
int a = ((Number) params[0]).intValue();
int b = ((Number) params[1]).intValue();
return add(a, b);
});
rpcServer.registerMethod("concat", params -> {
if (params.length != 2) {
throw new IllegalArgumentException("concat方法需要两个参数");
}
String str1 = params[0].toString();
String str2 = params[1].toString();
return concat(str1, str2);
});
}
private int add(int a, int b) {
return a + b;
}
private String concat(String a, String b) {
return a + b;
}
}

View File

@ -4,6 +4,7 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.PluginInfoPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.plugin.vo.PluginInfoSaveReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.validation.Valid;
import org.springframework.web.multipart.MultipartFile;
@ -66,7 +67,7 @@ public interface PluginInfoService {
* 更新插件的状态
*
* @param id 插件id
* @param status 状态
* @param status 状态 {@link IotPluginStatusEnum}
*/
void updatePluginStatus(Long id, Integer status);
@ -76,4 +77,12 @@ public interface PluginInfoService {
* @return 插件信息列表
*/
List<PluginInfoDO> getPluginInfoList();
}
/**
* 根据状态获得插件信息列表
*
* @param status 状态 {@link IotPluginStatusEnum}
* @return 插件信息列表
*/
List<PluginInfoDO> getPluginInfoListByStatus(Integer status);
}

View File

@ -9,25 +9,16 @@ import cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInfoMapper;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginDescriptor;
import org.pf4j.PluginState;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPluginManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.PLUGIN_INFO_DELETE_FAILED_RUNNING;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.PLUGIN_INFO_NOT_EXISTS;
/**
* IoT 插件信息 Service 实现类
@ -41,18 +32,17 @@ public class PluginInfoServiceImpl implements PluginInfoService {
@Resource
private PluginInfoMapper pluginInfoMapper;
@Resource
private PluginInstanceService pluginInstanceService;
@Resource
private SpringPluginManager pluginManager;
@Value("${pf4j.pluginsDir}")
private String pluginsDir;
@Override
public Long createPluginInfo(PluginInfoSaveReqVO createReqVO) {
// 插入
PluginInfoDO pluginInfo = BeanUtils.toBean(createReqVO, PluginInfoDO.class);
pluginInfoMapper.insert(pluginInfo);
// 返回
return pluginInfo.getId();
}
@ -67,41 +57,21 @@ public class PluginInfoServiceImpl implements PluginInfoService {
@Override
public void deletePluginInfo(Long id) {
// 校验存在
// 1.1 校验存在
PluginInfoDO pluginInfoDO = validatePluginInfoExists(id);
// 停止插件
// 1.2 停止插件
if (IotPluginStatusEnum.RUNNING.getStatus().equals(pluginInfoDO.getStatus())) {
throw exception(PLUGIN_INFO_DELETE_FAILED_RUNNING);
}
// 卸载插件
PluginWrapper plugin = pluginManager.getPlugin(pluginInfoDO.getPluginKey());
if (plugin != null) {
// 查询插件是否是启动状态
if (plugin.getPluginState().equals(PluginState.STARTED)) {
// 停止插件
pluginManager.stopPlugin(plugin.getPluginId());
}
// 卸载插件
pluginManager.unloadPlugin(plugin.getPluginId());
}
// 2. 卸载插件
pluginInstanceService.stopAndUnloadPlugin(pluginInfoDO.getPluginKey());
// 删除
// 3. 删除插件文件
pluginInstanceService.deletePluginFile(pluginInfoDO);
// 4. 删除插件信息
pluginInfoMapper.deleteById(id);
// 删除插件文件
Executors.newSingleThreadExecutor().submit(() -> {
try {
TimeUnit.SECONDS.sleep(1); // 等待 1 避免插件未卸载完毕
File file = new File(pluginsDir, pluginInfoDO.getFileName());
if (file.exists() && !file.delete()) {
log.error("[deletePluginInfo][删除插件文件({}) 失败]", pluginInfoDO.getFileName());
}
} catch (InterruptedException e) {
log.error("[deletePluginInfo][删除插件文件({}) 失败]", pluginInfoDO.getFileName(), e);
}
});
}
private PluginInfoDO validatePluginInfoExists(Long id) {
@ -127,99 +97,37 @@ public class PluginInfoServiceImpl implements PluginInfoService {
// 1. 校验插件信息是否存在
PluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
// 2. 获取插件标识
String pluginKey = pluginInfoDo.getPluginKey();
// 2. 停止并卸载旧的插件
pluginInstanceService.stopAndUnloadPlugin(pluginInfoDo.getPluginKey());
// 3. 停止并卸载旧的插
stopAndUnloadPlugin(pluginKey);
// 3 上传新的插件文件更新插件启用状态文
String pluginKeyNew = pluginInstanceService.uploadAndLoadNewPlugin(file);
// 4. 上传新的插件文件
String pluginKeyNew = uploadAndLoadNewPlugin(file);
// 5. 更新插件启用状态文件
updatePluginStatusFile(pluginKeyNew, false);
// 6. 更新插件信息
// 4. 更新插件信息
updatePluginInfo(pluginInfoDo, pluginKeyNew, file);
}
// 停止并卸载旧的插件
private void stopAndUnloadPlugin(String pluginKey) {
PluginWrapper plugin = pluginManager.getPlugin(pluginKey);
if (plugin != null) {
if (plugin.getPluginState().equals(PluginState.STARTED)) {
pluginManager.stopPlugin(pluginKey); // 停止插件
}
pluginManager.unloadPlugin(pluginKey); // 卸载插件
}
}
// 上传并加载新的插件文件
private String uploadAndLoadNewPlugin(MultipartFile file) {
Path pluginsPath = Paths.get(pluginsDir);
try {
if (!Files.exists(pluginsPath)) {
Files.createDirectories(pluginsPath); // 创建插件目录
}
String filename = file.getOriginalFilename();
if (filename != null) {
Path jarPath = pluginsPath.resolve(filename);
Files.copy(file.getInputStream(), jarPath, StandardCopyOption.REPLACE_EXISTING); // 保存上传的 JAR 文件
return pluginManager.loadPlugin(jarPath.toAbsolutePath()); // 加载插件
} else {
throw exception(PLUGIN_INSTALL_FAILED);
}
} catch (Exception e) {
throw exception(PLUGIN_INSTALL_FAILED);
}
}
// 更新插件状态文件
private void updatePluginStatusFile(String pluginKeyNew, boolean isEnabled) {
Path enabledFilePath = Paths.get(pluginsDir, "enabled.txt");
Path disabledFilePath = Paths.get(pluginsDir, "disabled.txt");
Path targetFilePath = isEnabled ? enabledFilePath : disabledFilePath;
Path oppositeFilePath = isEnabled ? disabledFilePath : enabledFilePath;
try {
PluginWrapper pluginWrapper = pluginManager.getPlugin(pluginKeyNew);
if (pluginWrapper == null) {
throw exception(PLUGIN_INSTALL_FAILED);
}
String pluginInfo = pluginKeyNew + "@" + pluginWrapper.getDescriptor().getVersion();
List<String> targetLines = Files.exists(targetFilePath) ? Files.readAllLines(targetFilePath)
: new ArrayList<>();
List<String> oppositeLines = Files.exists(oppositeFilePath) ? Files.readAllLines(oppositeFilePath)
: new ArrayList<>();
if (!targetLines.contains(pluginInfo)) {
targetLines.add(pluginInfo);
Files.write(targetFilePath, targetLines, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
}
if (oppositeLines.contains(pluginInfo)) {
oppositeLines.remove(pluginInfo);
Files.write(oppositeFilePath, oppositeLines, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
}
} catch (IOException e) {
throw exception(PLUGIN_INSTALL_FAILED);
}
}
// 更新插件信息
/**
* 更新插件信息
*
* @param pluginInfoDo 插件信息
* @param pluginKeyNew 插件标识符
* @param file 文件
*/
private void updatePluginInfo(PluginInfoDO pluginInfoDo, String pluginKeyNew, MultipartFile file) {
pluginInfoDo.setPluginKey(pluginKeyNew);
pluginInfoDo.setStatus(IotPluginStatusEnum.STOPPED.getStatus());
pluginInfoDo.setFileName(file.getOriginalFilename());
pluginInfoDo.setScript("");
// 创建新的插件信息对象并链式设置属性
PluginInfoDO updatedPluginInfo = new PluginInfoDO()
.setId(pluginInfoDo.getId())
.setPluginKey(pluginKeyNew)
.setStatus(IotPluginStatusEnum.STOPPED.getStatus())
.setFileName(file.getOriginalFilename())
.setScript("")
.setConfigSchema(pluginManager.getPlugin(pluginKeyNew).getDescriptor().getPluginDescription())
.setVersion(pluginManager.getPlugin(pluginKeyNew).getDescriptor().getVersion())
.setDescription(pluginManager.getPlugin(pluginKeyNew).getDescriptor().getPluginDescription());
PluginDescriptor pluginDescriptor = pluginManager.getPlugin(pluginKeyNew).getDescriptor();
pluginInfoDo.setConfigSchema(pluginDescriptor.getPluginDescription());
pluginInfoDo.setVersion(pluginDescriptor.getVersion());
pluginInfoDo.setDescription(pluginDescriptor.getPluginDescription());
pluginInfoMapper.updateById(pluginInfoDo);
// 执行更新
pluginInfoMapper.updateById(updatedPluginInfo);
}
@Override
@ -227,44 +135,24 @@ public class PluginInfoServiceImpl implements PluginInfoService {
// 1. 校验插件信息是否存在
PluginInfoDO pluginInfoDo = validatePluginInfoExists(id);
// 2. 校验插件状态是否有效
if (!IotPluginStatusEnum.contains(status)) {
throw exception(PLUGIN_STATUS_INVALID);
}
// 2. 更新插件状态
pluginInstanceService.updatePluginStatus(pluginInfoDo, status);
// 3. 获取插件标识和插件实例
String pluginKey = pluginInfoDo.getPluginKey();
PluginWrapper plugin = pluginManager.getPlugin(pluginKey);
// 4. 根据状态更新插件
if (plugin != null) {
// 4.1 如果目标状态是运行且插件未启动则启动插件
if (status.equals(IotPluginStatusEnum.RUNNING.getStatus())
&& plugin.getPluginState() != PluginState.STARTED) {
pluginManager.startPlugin(pluginKey);
updatePluginStatusFile(pluginKey, true); // 更新插件状态文件为启用
}
// 4.2 如果目标状态是停止且插件已启动则停止插件
else if (status.equals(IotPluginStatusEnum.STOPPED.getStatus())
&& plugin.getPluginState() == PluginState.STARTED) {
pluginManager.stopPlugin(pluginKey);
updatePluginStatusFile(pluginKey, false); // 更新插件状态文件为禁用
}
} else {
// 5. 插件不存在且状态为停止抛出异常
if (IotPluginStatusEnum.STOPPED.getStatus().equals(pluginInfoDo.getStatus())) {
throw exception(PLUGIN_STATUS_INVALID);
}
}
// 6. 更新数据库中的插件状态
pluginInfoDo.setStatus(status);
pluginInfoMapper.updateById(pluginInfoDo);
// 3. 更新数据库中的插件状态
PluginInfoDO updatedPluginInfo = new PluginInfoDO();
updatedPluginInfo.setId(id);
updatedPluginInfo.setStatus(status);
pluginInfoMapper.updateById(updatedPluginInfo);
}
@Override
public List<PluginInfoDO> getPluginInfoList() {
return pluginInfoMapper.selectList(null);
return pluginInfoMapper.selectList();
}
@Override
public List<PluginInfoDO> getPluginInfoListByStatus(Integer status) {
return pluginInfoMapper.selectListByStatus(status);
}
}

View File

@ -1,5 +1,8 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
import org.springframework.web.multipart.MultipartFile;
/**
* IoT 插件实例 Service 接口
*
@ -8,8 +11,38 @@ package cn.iocoder.yudao.module.iot.service.plugin;
public interface PluginInstanceService {
/**
* 更新IoT 插件实例
* 上报插件实例心跳
*/
void updatePluginInstances();
void reportPluginInstances();
/**
* 停止并卸载插件
*
* @param pluginKey 插件标识符
*/
void stopAndUnloadPlugin(String pluginKey);
/**
* 删除插件文件
*
* @param pluginInfoDo 插件信息
*/
void deletePluginFile(PluginInfoDO pluginInfoDo);
/**
* 上传并加载新的插件文件
*
* @param file 插件文件
* @return 插件标识符
*/
String uploadAndLoadNewPlugin(MultipartFile file);
/**
* 更新插件状态
*
* @param pluginInfoDo 插件信息
* @param status 新状态
*/
void updatePluginStatus(PluginInfoDO pluginInfoDo, Integer status);
}

View File

@ -1,19 +1,34 @@
package cn.iocoder.yudao.module.iot.service.plugin;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.plugininstance.PluginInstanceDO;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInfoMapper;
import cn.iocoder.yudao.module.iot.dal.mysql.plugin.PluginInstanceMapper;
import cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants;
import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginState;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPluginManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
/**
* IoT 插件实例 Service 实现类
@ -25,76 +40,149 @@ import java.util.List;
@Slf4j
public class PluginInstanceServiceImpl implements PluginInstanceService {
/**
* 主程序id
*/
// TODO @haohao这个可以后续确认下有没更合适的标识例如说 mac 地址之类的
// 简化的 UUID + mac 地址 会不会好一些一台机子有可能会部署多个插件
// 那就 mac@uuid
public static final String MAIN_ID = IdUtil.fastSimpleUUID();
@Resource
private PluginInfoService pluginInfoService;
private PluginInfoMapper pluginInfoMapper;
@Resource
private PluginInstanceMapper pluginInstanceMapper;
@Resource
private SpringPluginManager pluginManager;
@Value("${pf4j.pluginsDir}")
private String pluginsDir;
@Value("${server.port:48080}")
private int port;
@Override
public void stopAndUnloadPlugin(String pluginKey) {
PluginWrapper plugin = pluginManager.getPlugin(pluginKey);
// TODO @haohao改成 if return 会更简洁一点
if (plugin != null) {
if (plugin.getPluginState().equals(PluginState.STARTED)) {
pluginManager.stopPlugin(pluginKey); // 停止插件
log.info("已停止插件: {}", pluginKey);
}
pluginManager.unloadPlugin(pluginKey); // 卸载插件
log.info("已卸载插件: {}", pluginKey);
} else {
log.warn("插件不存在或已卸载: {}", pluginKey);
}
}
@Override
public void updatePluginInstances() {
// 1. 查询 pf4j 插件列表
public void deletePluginFile(PluginInfoDO pluginInfoDO) {
File file = new File(pluginsDir, pluginInfoDO.getFileName());
// TODO @haohao改成 if return 会更简洁一点
if (file.exists()) {
try {
TimeUnit.SECONDS.sleep(1); // 等待 1 避免插件未卸载完毕
if (!file.delete()) {
log.error("[deletePluginInfo][删除插件文件({}) 失败]", pluginInfoDO.getFileName());
}
} catch (InterruptedException e) {
log.error("[deletePluginInfo][删除插件文件({}) 失败]", pluginInfoDO.getFileName(), e);
}
}
}
@Override
public String uploadAndLoadNewPlugin(MultipartFile file) {
String pluginKeyNew;
// TODO @haohao多节点是不是要上传 s3 之类的存储器然后定时去加载
Path pluginsPath = Paths.get(pluginsDir);
try {
FileUtil.mkdir(pluginsPath.toFile()); // 创建插件目录
String filename = file.getOriginalFilename();
if (filename != null) {
Path jarPath = pluginsPath.resolve(filename);
Files.copy(file.getInputStream(), jarPath, StandardCopyOption.REPLACE_EXISTING); // 保存上传的 JAR 文件
pluginKeyNew = pluginManager.loadPlugin(jarPath.toAbsolutePath()); // 加载插件
log.info("已加载插件: {}", pluginKeyNew);
} else {
throw exception(ErrorCodeConstants.PLUGIN_INSTALL_FAILED);
}
} catch (IOException e) {
log.error("[uploadAndLoadNewPlugin][上传插件文件失败]", e);
throw exception(ErrorCodeConstants.PLUGIN_INSTALL_FAILED, e);
} catch (Exception e) {
log.error("[uploadAndLoadNewPlugin][加载插件失败]", e);
throw exception(ErrorCodeConstants.PLUGIN_INSTALL_FAILED, e);
}
return pluginKeyNew;
}
@Override
public void updatePluginStatus(PluginInfoDO pluginInfoDo, Integer status) {
String pluginKey = pluginInfoDo.getPluginKey();
PluginWrapper plugin = pluginManager.getPlugin(pluginKey);
// TODO @haohao改成 if return 会更简洁一点
if (plugin != null) {
// 启动插件
if (status.equals(IotPluginStatusEnum.RUNNING.getStatus())
&& plugin.getPluginState() != PluginState.STARTED) {
pluginManager.startPlugin(pluginKey);
log.info("已启动插件: {}", pluginKey);
}
// 停止插件
else if (status.equals(IotPluginStatusEnum.STOPPED.getStatus())
&& plugin.getPluginState() == PluginState.STARTED) {
pluginManager.stopPlugin(pluginKey);
log.info("已停止插件: {}", pluginKey);
}
} else {
// 插件不存在且状态为停止抛出异常
if (IotPluginStatusEnum.STOPPED.getStatus().equals(pluginInfoDo.getStatus())) {
throw exception(ErrorCodeConstants.PLUGIN_STATUS_INVALID);
}
}
}
@Override
public void reportPluginInstances() {
// 1.1 获取 pf4j 插件列表
List<PluginWrapper> plugins = pluginManager.getPlugins();
// 2. 查询插件信息列表
List<PluginInfoDO> pluginInfos = pluginInfoService.getPluginInfoList();
// 1.2 获取插件信息列表并转换为 Map 以便快速查找
List<PluginInfoDO> pluginInfos = pluginInfoMapper.selectList();
Map<String, PluginInfoDO> pluginInfoMap = pluginInfos.stream()
.collect(Collectors.toMap(PluginInfoDO::getPluginKey, Function.identity()));
// 动态获取主程序的 IP 和端口
String mainIp = getLocalIpAddress();
// 1.3 获取本机 IP MAC 地址
String ip = NetUtil.getLocalhostStr();
String mac = NetUtil.getLocalMacAddress();
String mainId = MAIN_ID + "-" + mac;
// 3. 遍历插件列表并保存为插件实例
// 2. 遍历插件列表并保存为插件实例
for (PluginWrapper plugin : plugins) {
String pluginKey = plugin.getPluginId();
PluginInfoDO pluginInfo = pluginInfos.stream()
.filter(pluginInfoDO -> pluginInfoDO.getPluginKey().equals(pluginKey))
.findFirst()
.orElse(null);
// 4. 如果插件信息不存在则跳过
// 2.1 查找插件信息
PluginInfoDO pluginInfo = pluginInfoMap.get(pluginKey);
if (pluginInfo == null) {
log.error("插件信息不存在pluginKey = {}", pluginKey);
continue;
}
// 5. 查询插件实例
PluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(MAIN_ID, pluginInfo.getId());
// 6. 如果插件实例不存在则创建
// 2.2 情况一如果插件实例不存在则创建
PluginInstanceDO pluginInstance = pluginInstanceMapper.selectByMainIdAndPluginId(mainId,
pluginInfo.getId());
if (pluginInstance == null) {
pluginInstance = new PluginInstanceDO();
pluginInstance.setPluginId(pluginInfo.getId());
pluginInstance.setMainId(MAIN_ID);
pluginInstance.setIp(mainIp);
pluginInstance.setPort(port);
pluginInstance.setHeartbeatAt(System.currentTimeMillis());
// 4.4 如果插件实例不存在则创建
pluginInstance = PluginInstanceDO.builder().pluginId(pluginInfo.getId()).mainId(MAIN_ID + "-" + mac)
.ip(ip).port(port).heartbeatAt(System.currentTimeMillis()).build();
pluginInstanceMapper.insert(pluginInstance);
} else {
// 7. 如果插件实例存在则更新
// 2.2 情况二如果存在则更新 heartbeatAt
// TODO @haohao这里最好 new update避免并发更新虽然目前没有
pluginInstance.setHeartbeatAt(System.currentTimeMillis());
pluginInstanceMapper.updateById(pluginInstance);
}
}
}
private String getLocalIpAddress() {
try {
List<String> ipList = NetUtil.localIpv4s().stream()
.filter(ip -> !ip.startsWith("0.0") && !ip.startsWith("127.") && !ip.startsWith("169.254") && !ip.startsWith("255.255.255.255"))
.toList();
return ipList.isEmpty() ? "127.0.0.1" : ipList.get(0);
} catch (Exception e) {
log.error("获取本地IP地址失败", e);
return "127.0.0.1"; // 默认值
}
}
}

View File

@ -9,7 +9,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.mysql.product.IotProductMapper;
import cn.iocoder.yudao.module.iot.enums.product.IotProductStatusEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
import cn.iocoder.yudao.module.iot.service.tdengine.IotThingModelMessageService;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Lazy;
@ -116,14 +115,15 @@ public class IotProductServiceImpl implements IotProductService {
public void updateProductStatus(Long id, Integer status) {
// 1. 校验存在
validateProductExists(id);
// 2. 更新
IotProductDO updateObj = IotProductDO.builder().id(id).status(status).build();
// 3. 产品是发布状态
if (Objects.equals(status, IotProductStatusEnum.PUBLISHED.getStatus())) {
// 3.1 创建产品超级表数据模型
devicePropertyDataService.defineDevicePropertyData(id);
// 2. 产品是发布状态
if (Objects.equals(status, IotProductStatusEnum.PUBLISHED.getStatus())) {
// 创建产品超级表数据模型
devicePropertyDataService.defineDevicePropertyData(id);
}
// 3. 更新
IotProductDO updateObj = IotProductDO.builder().id(id).status(status).build();
productMapper.updateById(updateObj);
}

View File

@ -7,20 +7,20 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.IotDeviceStatusUpdateReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDataDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.*;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.FieldParser;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdFieldDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdTableDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.redis.deviceData.DeviceDataRedisDAO;
import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDDLMapper;
import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDMLMapper;
import cn.iocoder.yudao.module.iot.dal.tdengine.TdThingModelMessageMapper;
import cn.iocoder.yudao.module.iot.enums.IotConstants;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStatusEnum;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotThingModelTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.util.IotTdDatabaseUtils;
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -61,13 +61,9 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
@Resource
private TdEngineDMLMapper tdEngineDMLMapper;
@Resource
private TdThingModelMessageMapper tdThingModelMessageMapper;
@Resource
private DeviceDataRedisDAO deviceDataRedisDAO;
// TODO @haohao这个方法可以考虑加下 1. 2. 3. 更有层次感
@Override
@TenantIgnore

View File

@ -6,20 +6,20 @@
<!-- 创建设备日志超级表 初始化只创建一次-->
<update id="createDeviceLogSTable">
CREATE STABLE device_log(
ts TIMESTAMP,
id NCHAR(50),
product_key NCHAR(50),
type NCHAR(50),
subType NCHAR(50),
content NCHAR(1024),
report_time TIMESTAMP
)TAGS (
device_key NCHAR(50)
)
CREATE STABLE device_log (
ts TIMESTAMP,
id NCHAR(50),
product_key NCHAR(50),
type NCHAR(50),
<!-- TODO @super下划线 sub_type -->
subType NCHAR(50),
content NCHAR(1024),
report_time TIMESTAMP
) TAGS (
device_key NCHAR(50)
)
</update>
<!-- 创建设备日志子表 讨论TDengine 在子表不存在的情况下 可在数据插入时 自动建表 要不要去掉创建子表的逻辑 由第一次插入数据时自动创建-->
<update id="createDeviceLogTable">
CREATE TABLE device_log_${deviceKey} USING device_log TAGS('${deviceKey}')
@ -41,4 +41,38 @@
)
</insert>
<select id="selectPage" resultType="cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO">
SELECT ts, id, device_key, product_key, type, subType, content, report_time
FROM device_log_${reqVO.deviceKey}
<where>
<if test="reqVO.type != null and reqVO.type != ''">
AND type = #{reqVO.type}
</if>
<if test="reqVO.subType != null and reqVO.subType != ''">
AND subType = #{reqVO.subType}
</if>
<if test="reqVO.createTime != null">
AND ts BETWEEN #{reqVO.createTime[0]} AND #{reqVO.createTime[1]}
</if>
</where>
ORDER BY ts DESC
LIMIT #{reqVO.pageSize} OFFSET #{reqVO.pageNo}
</select>
<select id="selectCount" resultType="Long">
SELECT COUNT(*)
FROM device_log_${reqVO.deviceKey}
<where>
<if test="reqVO.type != null and reqVO.type != ''">
AND type = #{reqVO.type}
</if>
<if test="reqVO.subType != null and reqVO.subType != ''">
AND subType = #{reqVO.subType}
</if>
<if test="reqVO.createTime != null">
AND ts BETWEEN #{reqVO.createTime[0]} AND #{reqVO.createTime[1]}
</if>
</where>
</select>
</mapper>

View File

@ -2,11 +2,6 @@
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <modelVersion>4.0.0</modelVersion>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-plugin</artifactId>-->
<!-- <version>0.0.1</version>-->
<!-- <packaging>pom</packaging>-->
<parent>
<artifactId>yudao-module-iot</artifactId>
<groupId>cn.iocoder.boot</groupId>
@ -15,6 +10,7 @@
<modules>
<module>yudao-module-iot-demo-plugin</module>
<module>yudao-module-iot-http-plugin</module>
<module>yudao-module-iot-mqtt-plugin</module>
</modules>
<modelVersion>4.0.0</modelVersion>

View File

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>yudao-module-iot-plugin</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>2.2.0-snapshot</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-http-plugin</artifactId>
<name>${project.artifactId}</name>
<version>2.2.0-snapshot</version>
<description>物联网 插件模块 - http 插件</description>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifestEntries>
<Plugin-Id>${plugin.id}</Plugin-Id>
<Plugin-Class>${plugin.class}</Plugin-Class>
<Plugin-Version>${plugin.version}</Plugin-Version>
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
<Plugin-Description>${plugin.description}</Plugin-Description>
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer>
<mainClass>cn.iocoder.yudao.module.iot.HttpPluginSpringbootApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.id>http-plugin</plugin.id>
<plugin.description>http-plugin-0.0.1</plugin.description>
<plugin.provider>ahh</plugin.provider>
</properties>
</project>

View File

@ -1,5 +1,5 @@
plugin.id=http-plugin
plugin.class=cn.iocoder.yudao.module.iot.plugin.HttpPlugin
plugin.class=cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin
plugin.version=0.0.1
plugin.provider=ahh
plugin.dependencies=

View File

@ -21,7 +21,7 @@
<properties>
<!-- 插件相关 -->
<plugin.id>http-plugin</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.HttpPlugin</plugin.class>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.provider>ahh</plugin.provider>
<plugin.description>http-plugin-0.0.1</plugin.description>
@ -30,27 +30,6 @@
<build>
<plugins>
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>read-project-properties</goal>
</goals>
<configuration>
<files>
<file>plugin.properties</file>
</files>
</configuration>
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@ -118,6 +97,29 @@
<skip>true</skip>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <version>3.4.1</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <shadedArtifactAttached>true</shadedArtifactAttached>-->
<!-- <shadedClassifierName>shaded</shadedClassifierName>-->
<!-- <transformers>-->
<!-- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">-->
<!-- <mainClass>cn.iocoder.yudao.module.iot.HttpPluginSpringbootApplication</mainClass>-->
<!-- </transformer>-->
<!-- </transformers>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
</plugins>
</build>
@ -145,10 +147,20 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Vert.x 核心依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version> <!-- 版本可根据需要调整 -->
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<!-- Vert.x Web 模块 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,11 @@
package cn.iocoder.yudao.module.iot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class HttpPluginSpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(HttpPluginSpringbootApplication.class, args);
}
}

View File

@ -0,0 +1,38 @@
package cn.iocoder.yudao.module.iot.controller;
import cn.iocoder.yudao.module.iot.mqttrpc.client.RpcClient;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
// TODO 芋艿后续 review
/**
* 插件实例 RPC 接口
*
* @author 芋道源码
*/
@RestController
@RequestMapping("/rpc")
@RequiredArgsConstructor
public class RpcController {
@Resource
private RpcClient rpcClient;
@PostMapping("/add")
public CompletableFuture<Object> add(@RequestParam int a, @RequestParam int b) throws Exception {
return rpcClient.call("add", new Object[]{a, b}, 10);
}
@PostMapping("/concat")
public CompletableFuture<Object> concat(@RequestParam String str1, @RequestParam String str2) throws Exception {
return rpcClient.call("concat", new Object[]{str1, str2}, 10);
}
}

View File

@ -0,0 +1,93 @@
package cn.iocoder.yudao.module.iot.mqttrpc.client;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest;
import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse;
import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils;
import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.UUID;
import java.util.concurrent.*;
// TODO @芋艿需要考虑怎么公用
@Service
@Slf4j
public class RpcClient {
private final MqttConfig mqttConfig;
private final MqttClient mqttClient;
private final ConcurrentMap<String, CompletableFuture<RpcResponse>> pendingRequests = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public RpcClient(MqttConfig mqttConfig) throws MqttException {
this.mqttConfig = mqttConfig;
this.mqttClient = new MqttClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
this.mqttClient.connect(options);
}
@PostConstruct
public void init() throws MqttException {
mqttClient.subscribe(mqttConfig.getResponseTopicPrefix() + "#", this::handleResponse);
log.info("RPC Client subscribed to topics: {}", mqttConfig.getResponseTopicPrefix() + "#");
}
private void handleResponse(String topic, MqttMessage message) {
String correlationId = topic.substring(mqttConfig.getResponseTopicPrefix().length());
RpcResponse response = SerializationUtils.deserialize(new String(message.getPayload()), RpcResponse.class);
CompletableFuture<RpcResponse> future = pendingRequests.remove(correlationId);
if (future != null) {
if (response.getError() != null) {
future.completeExceptionally(new RuntimeException(response.getError()));
} else {
future.complete(response);
}
} else {
log.warn("Received response for unknown correlationId: {}", correlationId);
}
}
public CompletableFuture<Object> call(String method, Object[] params, int timeoutSeconds) throws MqttException {
String correlationId = UUID.randomUUID().toString();
String replyTo = mqttConfig.getResponseTopicPrefix() + correlationId;
RpcRequest request = new RpcRequest(method, params, correlationId, replyTo);
String payload = SerializationUtils.serialize(request);
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
mqttClient.publish(mqttConfig.getRequestTopic(), message);
CompletableFuture<RpcResponse> futureResponse = new CompletableFuture<>();
pendingRequests.put(correlationId, futureResponse);
// 设置超时
scheduler.schedule(() -> {
CompletableFuture<RpcResponse> removed = pendingRequests.remove(correlationId);
if (removed != null) {
removed.completeExceptionally(new TimeoutException("RPC call timed out"));
}
}, timeoutSeconds, TimeUnit.SECONDS);
// 返回最终的结果
return futureResponse.thenApply(RpcResponse::getResult);
}
@PreDestroy
public void cleanup() throws MqttException {
mqttClient.disconnect();
scheduler.shutdown();
log.info("RPC Client disconnected");
}
}

View File

@ -0,0 +1,41 @@
package cn.iocoder.yudao.module.iot.mqttrpc.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
/**
* MQTT 代理地址
*/
private String broker;
/**
* MQTT 用户名
*/
private String username;
/**
* MQTT 密码
*/
private String password;
/**
* MQTT 客户端 ID
*/
private String clientId;
/**
* MQTT 请求主题
*/
private String requestTopic;
/**
* MQTT 响应主题前缀
*/
private String responseTopicPrefix;
}

View File

@ -1,147 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* 基于 Netty HTTP 处理器用于接收设备上报的数据并调用主程序的 DeviceDataApi 接口进行处理
*
* 1. 请求格式JSON 格式地址为 POST /sys/{productKey}/{deviceName}/thing/event/property/post
* 2. 返回结果JSON 格式包含统一的 codedataidmessagemethodversion 字段
*/
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final DeviceDataApi deviceDataApi;
public HttpHandler(DeviceDataApi deviceDataApi) {
this.deviceDataApi = deviceDataApi;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
// 期望的路径格式: /sys/{productKey}/{deviceName}/thing/event/property/post
// 使用 "/" 拆分路径
String uri = request.uri();
String[] parts = uri.split("/");
/*
拆分结果示例:
parts[0] = ""
parts[1] = "sys"
parts[2] = productKey
parts[3] = deviceName
parts[4] = "thing"
parts[5] = "event"
parts[6] = "property"
parts[7] = "post"
*/
boolean isCorrectPath = parts.length == 8
&& "sys".equals(parts[1])
&& "thing".equals(parts[4])
&& "event".equals(parts[5])
&& "property".equals(parts[6])
&& "post".equals(parts[7]);
if (!isCorrectPath) {
writeResponse(ctx, HttpResponseStatus.NOT_FOUND, "Not Found");
return;
}
String productKey = parts[2];
String deviceName = parts[3];
// 从请求中获取原始数据尝试解析请求数据为 JSON 对象
String requestBody = request.content().toString(CharsetUtil.UTF_8);
JSONObject jsonData;
try {
jsonData = JSONUtil.parseObj(requestBody);
} catch (Exception e) {
JSONObject res = createResponseJson(
400,
new JSONObject(),
null,
"请求数据不是合法的 JSON 格式: " + e.getMessage(),
"thing.event.property.post",
"1.0"
);
writeResponse(ctx, HttpResponseStatus.BAD_REQUEST, res.toString());
return;
}
String id = jsonData.getStr("id", null);
try {
// 调用主程序的接口保存数据
deviceDataApi.saveDeviceData(productKey, deviceName, jsonData.toString());
// 构造成功响应内容
JSONObject successRes = createResponseJson(
200,
new JSONObject(),
id,
"success",
"thing.event.property.post",
"1.0"
);
writeResponse(ctx, HttpResponseStatus.OK, successRes.toString());
} catch (Exception e) {
JSONObject errorRes = createResponseJson(
500,
new JSONObject(),
id,
"The format of result is error!",
"thing.event.property.post",
"1.0"
);
writeResponse(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, errorRes.toString());
}
}
/**
* 创建标准化的响应 JSON 对象
*
* @param code 响应状态码业务层面的
* @param data 返回的数据对象JSON
* @param id 请求的 id可选
* @param message 返回的提示信息
* @param method 返回的 method 标识
* @param version 返回的版本号
* @return 构造好的 JSON 对象
*/
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method, String version) {
JSONObject res = new JSONObject();
res.set("code", code);
res.set("data", data != null ? data : new JSONObject());
res.set("id", id);
res.set("message", message);
res.set("method", method);
res.set("version", version);
return res;
}
/**
* 向客户端返回 HTTP 响应的辅助方法
*
* @param ctx 通道上下文
* @param status HTTP 响应状态码网络层面的
* @param content 响应内容JSON 字符串或其他文本
*/
private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, String content) {
// 设置响应头为 JSON 类型和正确的编码
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
status,
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 发送响应并在发送完成后关闭连接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}

View File

@ -1,94 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginWrapper;
import org.pf4j.Plugin;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class HttpPlugin extends Plugin {
private static final int PORT = 8092;
private ExecutorService executorService;
private DeviceDataApi deviceDataApi;
public HttpPlugin(PluginWrapper wrapper) {
super(wrapper);
// 初始化线程池
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
public void start() {
log.info("HttpPlugin.start()");
// 重新初始化线程池确保它是活跃的
if (executorService.isShutdown() || executorService.isTerminated()) {
executorService = Executors.newSingleThreadExecutor();
}
// ServiceRegistry 中获取主程序暴露的 DeviceDataApi 接口实例
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
// 异步启动 Netty 服务器
executorService.submit(this::startHttpServer);
}
@Override
public void stop() {
log.info("HttpPlugin.stop()");
// 停止线程池
executorService.shutdownNow();
}
/**
* 启动 HTTP 服务
*/
private void startHttpServer() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) {
channel.pipeline().addLast(new HttpServerCodec());
channel.pipeline().addLast(new HttpObjectAggregator(65536));
// 将从 ServiceRegistry 获取的 deviceDataApi 传入处理器
channel.pipeline().addLast(new HttpHandler(deviceDataApi));
}
});
// 绑定端口并启动服务器
ChannelFuture future = bootstrap.bind(PORT).sync();
log.info("HTTP 服务器启动成功,端口为: {}", PORT);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("HTTP 服务启动被中断", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,105 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import io.vertx.core.Handler;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpVertxHandler implements Handler<RoutingContext> {
private final DeviceDataApi deviceDataApi;
public HttpVertxHandler(DeviceDataApi deviceDataApi) {
this.deviceDataApi = deviceDataApi;
}
@Override
public void handle(RoutingContext ctx) {
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
RequestBody requestBody = ctx.body();
JSONObject jsonData;
try {
jsonData = JSONUtil.parseObj(requestBody.asJsonObject());
} catch (Exception e) {
JSONObject res = createResponseJson(
400,
new JSONObject(),
null,
"请求数据不是合法的 JSON 格式: " + e.getMessage(),
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(400)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(res.toString());
return;
}
String id = jsonData.getStr("id", null);
try {
// 调用主程序的接口保存数据
DeviceDataCreateReqDTO createDTO = DeviceDataCreateReqDTO.builder()
.productKey(productKey)
.deviceName(deviceName)
.message(jsonData.toString())
.build();
deviceDataApi.saveDeviceData(createDTO);
// 构造成功响应内容
JSONObject successRes = createResponseJson(
200,
new JSONObject(),
id,
"success",
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(200)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(successRes.toString());
} catch (Exception e) {
JSONObject errorRes = createResponseJson(
500,
new JSONObject(),
id,
"The format of result is error!",
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(errorRes.toString());
}
}
/**
* 创建标准化的响应 JSON 对象
*
* @param code 响应状态码业务层面的
* @param data 返回的数据对象JSON
* @param id 请求的 id可选
* @param message 返回的提示信息
* @param method 返回的 method 标识
* @param version 返回的版本号
* @return 构造好的 JSON 对象
*/
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method,
String version) {
JSONObject res = new JSONObject();
res.set("code", code);
res.set("data", data != null ? data : new JSONObject());
res.set("id", id);
res.set("message", message);
res.set("method", method);
res.set("version", version);
return res;
}
}

View File

@ -0,0 +1,82 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPlugin;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpVertxPlugin extends SpringPlugin {
private static final int PORT = 8092;
private Vertx vertx;
private DeviceDataApi deviceDataApi;
public HttpVertxPlugin(PluginWrapper wrapper) {
super(wrapper);
}
@Override
public void start() {
log.info("HttpVertxPlugin.start()");
// 获取 DeviceDataApi 实例
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
// 初始化 Vert.x
vertx = Vertx.vertx();
Router router = Router.router(vertx);
// 处理 Body
router.route().handler(BodyHandler.create());
// 设置路由
router.post("/sys/:productKey/:deviceName/thing/event/property/post")
.handler(new HttpVertxHandler(deviceDataApi));
// 启动 HTTP 服务器
vertx.createHttpServer()
.requestHandler(router)
.listen(PORT, http -> {
if (http.succeeded()) {
log.info("HTTP 服务器启动成功,端口为: {}", PORT);
} else {
log.error("HTTP 服务器启动失败", http.cause());
}
});
}
@Override
public void stop() {
log.info("HttpVertxPlugin.stop()");
if (vertx != null) {
vertx.close(ar -> {
if (ar.succeeded()) {
log.info("Vert.x 关闭成功");
} else {
log.error("Vert.x 关闭失败", ar.cause());
}
});
}
}
@Override
protected ApplicationContext createApplicationContext() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.setClassLoader(getWrapper().getPluginClassLoader());
applicationContext.refresh();
return applicationContext;
}
}

View File

@ -0,0 +1,15 @@
server:
port: 8092
spring:
application:
name: yudao-module-iot-http-plugin
# MQTT-RPC 配置
mqtt:
broker: tcp://chaojiniu.top:1883
username: haohao
password: ahh@123456
clientId: mqtt-rpc-client-${random.int}
requestTopic: rpc/request
responseTopicPrefix: rpc/response/

View File

@ -0,0 +1,6 @@
plugin.id=mqtt-plugin
plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin
plugin.version=0.0.1
plugin.provider=ahh
plugin.dependencies=
plugin.description=mqtt-plugin-0.0.1

View File

@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-plugin</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>yudao-module-iot-mqtt-plugin</artifactId>
<name>${project.artifactId}</name>
<description>
物联网 插件模块 - mqtt 插件
</description>
<properties>
<!-- 插件相关 -->
<plugin.id>mqtt-plugin</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.MqttPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.provider>ahh</plugin.provider>
<plugin.description>mqtt-plugin-0.0.1</plugin.description>
<plugin.dependencies/>
</properties>
<build>
<plugins>
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>read-project-properties</goal>
</goals>
<configuration>
<files>
<file>plugin.properties</file>
</files>
</configuration>
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>unzip jar file</id>
<phase>package</phase>
<configuration>
<target>
<unzip src="target/${project.artifactId}-${project.version}.${project.packaging}"
dest="target/plugin-classes"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/assembly.xml
</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>attached</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifestEntries>
<Plugin-Id>${plugin.id}</Plugin-Id>
<Plugin-Class>${plugin.class}</Plugin-Class>
<Plugin-Version>${plugin.version}</Plugin-Version>
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
<Plugin-Description>${plugin.description}</Plugin-Description>
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- 其他依赖项 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- PF4J Spring 集成 -->
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<scope>provided</scope>
</dependency>
<!-- 项目依赖 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,31 @@
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<scope>runtime</scope>
<outputDirectory>lib</outputDirectory>
<includes>
<include>*:jar:*</include>
</includes>
</dependencySet>
</dependencySets>
<!--
<fileSets>
<fileSet>
<directory>target/classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
-->
<fileSets>
<fileSet>
<directory>target/plugin-classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -0,0 +1,45 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class MqttPlugin extends Plugin {
private ExecutorService executorService;
@Resource
private DeviceDataApi deviceDataApi;
public MqttPlugin(PluginWrapper wrapper) {
super(wrapper);
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
public void start() {
log.info("MqttPlugin.start()");
if (executorService.isShutdown() || executorService.isTerminated()) {
executorService = Executors.newSingleThreadExecutor();
}
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
}
@Override
public void stop() {
log.info("MqttPlugin.stop()");
}
}

View File

@ -212,4 +212,9 @@ iot:
# 保持连接
keepalive: 60
# 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
clearSession: true
clearSession: true
# 插件配置
pf4j:
pluginsDir: ${user.home}/plugins # 插件目录

View File

@ -183,10 +183,8 @@ logging:
cn.iocoder.yudao.module.crm.dal.mysql: debug
cn.iocoder.yudao.module.erp.dal.mysql: debug
cn.iocoder.yudao.module.iot.dal.mysql: debug
cn.iocoder.yudao.module.iot.dal.tdengine: DEBUG
cn.iocoder.yudao.module.ai.dal.mysql: debug
org.springframework.context.support.PostProcessorRegistrationDelegate: ERROR # TODO 芋艿先禁用Spring Boot 3.X 存在部分错误的 WARN 提示
com.taosdata: DEBUG # TDengine 的日志级别
debug: false
@ -283,3 +281,16 @@ iot:
# 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
clearSession: true
# MQTT-RPC 配置
mqtt:
broker: tcp://127.0.0.1:1883
username: root
password: 123456
clientId: mqtt-rpc-server-${random.int}
requestTopic: rpc/request
responseTopicPrefix: rpc/response/
# 插件配置
pf4j:
pluginsDir: /Users/anhaohao/code/gitee/ruoyi-vue-pro/plugins # 插件目录