Quellcode durchsuchen

add(master):
1.新增mqtt调用相关代码

andywu vor 1 Woche
Ursprung
Commit
ab073739ad

+ 11 - 3
pom.xml

@@ -3,7 +3,7 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
-	
+
     <groupId>com.ruoyi</groupId>
     <artifactId>ruoyi</artifactId>
     <version>3.8.9</version>
@@ -11,7 +11,7 @@
     <name>boman</name>
     <url>https://www.baidu.com/</url>
     <description>物业管理系统</description>
-    
+
     <properties>
         <ruoyi.version>3.8.9</ruoyi.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -35,6 +35,7 @@
         <logback.version>1.2.13</logback.version>
         <spring-security.version>5.7.12</spring-security.version>
         <spring-framework.version>5.3.39</spring-framework.version>
+        <lombok.version>1.18.30</lombok.version>
     </properties>
 
     <!-- 依赖声明 -->
@@ -169,6 +170,13 @@
                 <version>${fastjson.version}</version>
             </dependency>
 
+            <!-- Lombok -->
+            <dependency>
+                <groupId>org.projectlombok</groupId>
+                <artifactId>lombok</artifactId>
+                <version>${lombok.version}</version>
+            </dependency>
+
             <!-- Token生成与解析-->
             <dependency>
                 <groupId>io.jsonwebtoken</groupId>
@@ -271,4 +279,4 @@
         </pluginRepository>
     </pluginRepositories>
 
-</project>
+</project>

+ 7 - 1
ruoyi-admin/pom.xml

@@ -77,6 +77,12 @@
             <artifactId>ruoyi-generator</artifactId>
         </dependency>
 
+        <!-- lombok-->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -109,4 +115,4 @@
         <finalName>${project.artifactId}</finalName>
     </build>
 
-</project>
+</project>

+ 227 - 0
ruoyi-admin/src/main/java/com/ruoyi/web/controller/mqtt/MqttController.java

@@ -0,0 +1,227 @@
+package com.ruoyi.web.controller.mqtt;
+
+import com.ruoyi.common.model.MqttMessage;
+import com.ruoyi.common.utils.mqtt.MqttService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * MQTT控制器
+ *
+ * <p>提供MQTT相关操作的REST API接口,包括消息发布、主题订阅和连接管理等,
+ * 方便通过HTTP请求操作MQTT客户端。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Slf4j
+@RestController
+@RequiredArgsConstructor
+@RequestMapping("/mqtt")
+public class MqttController {
+
+    private final MqttService mqttService;
+
+    /**
+     * 获取MQTT连接状态
+     *
+     * @return 连接状态信息
+     */
+    @GetMapping("/status")
+    public ResponseEntity<Map<String, Object>> getStatus() {
+        Map<String, Object> status = new HashMap<>();
+        status.put("connected", mqttService.isConnected());
+        status.put("timestamp", System.currentTimeMillis());
+
+        return ResponseEntity.ok(status);
+    }
+
+    /**
+     * 重新连接MQTT服务器
+     *
+     * @return 重连结果
+     */
+    @PostMapping("/reconnect")
+    public ResponseEntity<Map<String, Object>> reconnect() {
+        Map<String, Object> result = new HashMap<>();
+
+        try {
+            CompletableFuture<Void> future = mqttService.reconnect();
+            future.get(10, TimeUnit.SECONDS);
+
+            result.put("success", true);
+            result.put("message", "已成功重新连接到MQTT服务器");
+            return ResponseEntity.ok(result);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("重连MQTT服务器失败: {}", e.getMessage(), e);
+
+            result.put("success", false);
+            result.put("message", "重连MQTT服务器失败: " + e.getMessage());
+            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
+        }
+    }
+
+    /**
+     * 发布MQTT消息
+     *
+     * @param message MQTT消息对象
+     * @return 发布结果
+     */
+    @PostMapping("/publish")
+    public ResponseEntity<Map<String, Object>> publishMessage(@RequestBody MqttMessage message) {
+        Map<String, Object> result = new HashMap<>();
+
+        if (message.getTopic() == null || message.getTopic().isEmpty()) {
+            result.put("success", false);
+            result.put("message", "消息主题不能为空");
+            return ResponseEntity.badRequest().body(result);
+        }
+
+        if (message.getPayload() == null) {
+            result.put("success", false);
+            result.put("message", "消息内容不能为空");
+            return ResponseEntity.badRequest().body(result);
+        }
+
+        try {
+            CompletableFuture<Void> future = mqttService.publish(message);
+            future.get(10, TimeUnit.SECONDS);
+
+            result.put("success", true);
+            result.put("message", "消息发布成功");
+            result.put("topic", message.getTopic());
+            return ResponseEntity.ok(result);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("发布消息失败: {}", e.getMessage(), e);
+
+            result.put("success", false);
+            result.put("message", "发布消息失败: " + e.getMessage());
+            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
+        }
+    }
+
+    /**
+     * 简化版发布消息接口
+     *
+     * @param topic 消息主题
+     * @param payload 消息内容
+     * @return 发布结果
+     */
+    @RequestMapping("/publish/simple")
+    public ResponseEntity<Map<String, Object>> publishSimpleMessage(
+            @RequestParam String topic,
+            @RequestParam String payload) {
+
+        MqttMessage message = new MqttMessage(topic, payload);
+        return publishMessage(message);
+    }
+
+    /**
+     * 订阅主题
+     *
+     * @param topic 要订阅的主题
+     * @return 订阅结果
+     */
+    @RequestMapping("/subscribe")
+    public ResponseEntity<Map<String, Object>> subscribeTopic(@RequestParam String topic) {
+        Map<String, Object> result = new HashMap<>();
+
+        if (topic == null || topic.isEmpty()) {
+            result.put("success", false);
+            result.put("message", "订阅主题不能为空");
+            return ResponseEntity.badRequest().body(result);
+        }
+
+        try {
+            String subscribedTopic = mqttService.subscribe(topic);
+
+            result.put("success", true);
+            result.put("message", "主题订阅成功");
+            result.put("topic", subscribedTopic);
+            return ResponseEntity.ok(result);
+        } catch (MqttException e) {
+            log.error("订阅主题失败: {}", e.getMessage(), e);
+
+            result.put("success", false);
+            result.put("message", "订阅主题失败: " + e.getMessage());
+            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
+        }
+    }
+
+    /**
+     * 批量订阅主题
+     *
+     * @param topics 要订阅的主题列表
+     * @return 订阅结果
+     */
+    @PostMapping("/subscribe/batch")
+    public ResponseEntity<Map<String, Object>> subscribeTopics(@RequestBody List<String> topics) {
+        Map<String, Object> result = new HashMap<>();
+
+        if (topics == null || topics.isEmpty()) {
+            result.put("success", false);
+            result.put("message", "订阅主题列表不能为空");
+            return ResponseEntity.badRequest().body(result);
+        }
+
+        try {
+            List<String> subscribedTopics = mqttService.subscribe(topics);
+
+            result.put("success", true);
+            result.put("message", "批量主题订阅成功");
+            result.put("topics", subscribedTopics);
+            return ResponseEntity.ok(result);
+        } catch (MqttException e) {
+            log.error("批量订阅主题失败: {}", e.getMessage(), e);
+
+            result.put("success", false);
+            result.put("message", "批量订阅主题失败: " + e.getMessage());
+            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
+        }
+    }
+
+    /**
+     * 取消订阅主题
+     *
+     * @param topic 要取消订阅的主题
+     * @return 取消订阅结果
+     */
+    @PostMapping("/unsubscribe")
+    public ResponseEntity<Map<String, Object>> unsubscribeTopic(@RequestParam String topic) {
+        Map<String, Object> result = new HashMap<>();
+
+        if (topic == null || topic.isEmpty()) {
+            result.put("success", false);
+            result.put("message", "取消订阅的主题不能为空");
+            return ResponseEntity.badRequest().body(result);
+        }
+
+        try {
+            mqttService.unsubscribe(topic);
+
+            result.put("success", true);
+            result.put("message", "主题取消订阅成功");
+            result.put("topic", topic);
+            return ResponseEntity.ok(result);
+        } catch (MqttException e) {
+            log.error("取消订阅主题失败: {}", e.getMessage(), e);
+
+            result.put("success", false);
+            result.put("message", "取消订阅主题失败: " + e.getMessage());
+            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
+        }
+    }
+}

+ 16 - 1
ruoyi-admin/src/main/resources/application.yml

@@ -72,4 +72,19 @@ xss:
   # 排除链接(多个用逗号分隔)
   excludes: /system/notice,/new/news,/new/news/put
   # 匹配链接
-  urlPatterns: /system/*,/monitor/*,/tool/*
+  urlPatterns: /system/*,/monitor/*,/tool/*
+
+# MQTT配置
+mqtt:
+  server-uri: tcp://13.229.167.76:1883
+  #  server-uri: tcp://127.0.0.1:1883
+  client-id: JavaClient
+  username:
+  password:
+  default-topic: test/topic
+  default-qos: 1
+  timeout: 30
+  keep-alive: 60
+  clean-session: true
+  auto-reconnect: true
+  command-timeout: 10

+ 37 - 3
ruoyi-common/pom.xml

@@ -15,6 +15,12 @@
         common通用工具
     </description>
 
+    <properties>
+        <paho.mqtt.version>1.2.5</paho.mqtt.version>
+        <moquette.version>0.16</moquette.version>
+        <lombok.version>1.18.30</lombok.version>
+    </properties>
+
     <dependencies>
 
         <!-- Spring框架基本的核心工具 -->
@@ -52,19 +58,25 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
-  
+
         <!-- JSON工具类 -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
-        
+
         <!-- 阿里JSON解析器 -->
         <dependency>
             <groupId>com.alibaba.fastjson2</groupId>
             <artifactId>fastjson2</artifactId>
         </dependency>
 
+        <!-- Lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
         <!-- io常用工具类 -->
         <dependency>
             <groupId>commons-io</groupId>
@@ -119,6 +131,28 @@
             <artifactId>javax.servlet-api</artifactId>
         </dependency>
 
+        <!-- === MQTT 相关依赖 === -->
+        <!-- Moquette MQTT broker -->
+        <dependency>
+            <groupId>io.moquette</groupId>
+            <artifactId>moquette-broker</artifactId>
+            <version>${moquette.version}</version>
+        </dependency>
+
+        <!-- Eclipse Paho MQTT Client (MQTT 3.x) -->
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>${paho.mqtt.version}</version>
+        </dependency>
+
+        <!-- Eclipse Paho MQTT Client (MQTT 5.0) -->
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
+            <version>${paho.mqtt.version}</version>
+        </dependency>
+
     </dependencies>
 
-</project>
+</project>

+ 107 - 0
ruoyi-common/src/main/java/com/ruoyi/common/config/MqttConfig.java

@@ -0,0 +1,107 @@
+package com.ruoyi.common.config;
+
+import com.ruoyi.common.utils.mqtt.MqttCallbackHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.UUID;
+
+/**
+ * MQTT客户端配置类
+ *
+ * <p>配置MQTT客户端的连接和回调处理,包括自动重连、证书配置、回调处理等,
+ * 创建MQTT连接并初始化相关组件。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Slf4j
+@Configuration
+@RequiredArgsConstructor
+public class MqttConfig {
+
+    private final MqttProperties mqttProperties;
+    private final MqttCallbackHandler mqttCallbackHandler;
+
+    /**
+     * 创建MQTT连接选项
+     *
+     * @return MQTT连接选项
+     */
+    @Bean
+    public MqttConnectOptions mqttConnectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+
+        // 设置是否清除会话
+        options.setCleanSession(mqttProperties.isCleanSession());
+
+        // 设置连接超时
+        options.setConnectionTimeout(mqttProperties.getTimeout());
+
+        // 设置心跳间隔
+        options.setKeepAliveInterval(mqttProperties.getKeepAlive());
+
+        // 设置自动重连
+        options.setAutomaticReconnect(mqttProperties.isAutoReconnect());
+
+        // 设置用户名和密码
+        if (mqttProperties.getUsername() != null && !mqttProperties.getUsername().isEmpty()) {
+            options.setUserName(mqttProperties.getUsername());
+
+            if (mqttProperties.getPassword() != null && !mqttProperties.getPassword().isEmpty()) {
+                options.setPassword(mqttProperties.getPassword().toCharArray());
+            }
+        }
+
+        log.info("MQTT连接选项已配置,服务器地址: {}", mqttProperties.getServerUri());
+        return options;
+    }
+
+    /**
+     * 创建MQTT客户端
+     *
+     * @return MQTT客户端
+     * @throws MqttException MQTT异常
+     */
+    @Bean
+    public MqttAsyncClient mqttClient() throws MqttException {
+        // 检查服务器URI
+        if (mqttProperties.getServerUri() == null || mqttProperties.getServerUri().isEmpty()) {
+            throw new IllegalArgumentException("MQTT服务器URI不能为空");
+        }
+
+        // 生成客户端ID
+        String clientId = mqttProperties.getClientId();
+        if (clientId == null || clientId.isEmpty()) {
+            clientId = "spring-mqtt-" + UUID.randomUUID().toString().substring(0, 8);
+            log.info("未配置客户端ID,已自动生成: {}", clientId);
+        }
+
+        // 创建客户端实例
+        MqttAsyncClient client = new MqttAsyncClient(
+                mqttProperties.getServerUri(),
+                clientId,
+                new MemoryPersistence());
+
+        // 设置回调
+        client.setCallback(mqttCallbackHandler);
+
+        // 初始化连接
+        try {
+            client.connect(mqttConnectOptions()).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
+            log.info("MQTT客户端已连接到服务器: {}", mqttProperties.getServerUri());
+        } catch (MqttException e) {
+            log.error("MQTT客户端连接失败: {}", e.getMessage(), e);
+            throw e;
+        }
+
+        return client;
+    }
+}

+ 76 - 0
ruoyi-common/src/main/java/com/ruoyi/common/config/MqttProperties.java

@@ -0,0 +1,76 @@
+package com.ruoyi.common.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * MQTT连接属性配置类
+ *
+ * <p>用于从application.yml或application.properties文件中加载MQTT相关的配置项,
+ * 支持自定义MQTT服务器地址、客户端ID、用户名密码等各种连接参数。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Data
+@Configuration
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttProperties {
+
+    /**
+     * 服务器地址URI,例如: tcp://localhost:1883
+     */
+    private String serverUri;
+
+    /**
+     * 客户端ID
+     */
+    private String clientId;
+
+    /**
+     * 用户名
+     */
+    private String username;
+
+    /**
+     * 密码
+     */
+    private String password;
+
+    /**
+     * 超时时间,单位秒
+     */
+    private int timeout = 30;
+
+    /**
+     * 保活时间,单位秒
+     */
+    private int keepAlive = 60;
+
+    /**
+     * 是否自动重连
+     */
+    private boolean autoReconnect = true;
+
+    /**
+     * 是否清除会话
+     */
+    private boolean cleanSession = true;
+
+    /**
+     * 默认QoS级别
+     */
+    private int defaultQos = 1;
+
+    /**
+     * 默认主题
+     */
+    private String defaultTopic = "test/topic";
+
+    /**
+     * 消息发布/订阅超时,单位秒
+     */
+    private int commandTimeout = 10;
+}

+ 83 - 0
ruoyi-common/src/main/java/com/ruoyi/common/model/MqttMessage.java

@@ -0,0 +1,83 @@
+package com.ruoyi.common.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+
+/**
+ * MQTT消息模型类
+ *
+ * <p>封装MQTT消息的各种属性,包括主题、内容、QoS等,
+ * 可用于消息发布和接收场景。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class MqttMessage {
+
+    /**
+     * 消息主题
+     */
+    private String topic;
+
+    /**
+     * 消息内容
+     */
+    private String payload;
+
+    /**
+     * 服务质量
+     * <p>0 - 最多发送一次,不保证送达</p>
+     * <p>1 - 至少发送一次,确保送达但可能重复</p>
+     * <p>2 - 确保仅送达一次</p>
+     */
+    private int qos = 1;
+
+    /**
+     * 是否为保留消息
+     */
+    private boolean retained = false;
+
+    /**
+     * 消息发送/接收时间
+     */
+    private LocalDateTime timestamp = LocalDateTime.now();
+
+    /**
+     * 消息自定义属性
+     */
+    private Map<String, Object> properties;
+
+    /**
+     * 带主题和内容的构造函数
+     *
+     * @param topic 消息主题
+     * @param payload 消息内容
+     */
+    public MqttMessage(String topic, String payload) {
+        this.topic = topic;
+        this.payload = payload;
+    }
+
+    /**
+     * 带主题、内容、QoS和保留标志的构造函数
+     *
+     * @param topic 消息主题
+     * @param payload 消息内容
+     * @param qos 服务质量
+     * @param retained 是否保留
+     */
+    public MqttMessage(String topic, String payload, int qos, boolean retained) {
+        this.topic = topic;
+        this.payload = payload;
+        this.qos = qos;
+        this.retained = retained;
+    }
+}

+ 84 - 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/mqtt/MqttCallbackHandler.java

@@ -0,0 +1,84 @@
+package com.ruoyi.common.utils.mqtt;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * MQTT回调处理器
+ *
+ * <p>处理MQTT客户端的各种回调事件,包括连接丢失、消息到达和消息发送完成。
+ * 通过Spring事件机制将消息分发到其他组件进行处理。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class MqttCallbackHandler implements MqttCallback {
+
+    private final ApplicationEventPublisher eventPublisher;
+
+    /**
+     * 连接丢失回调
+     * 当与MQTT服务器的连接意外断开时调用此方法
+     *
+     * @param cause 连接丢失原因
+     */
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.warn("MQTT连接丢失: {}", cause.getMessage());
+        // 连接丢失后的处理逻辑
+        // 可以根据需要添加重连机制,通常由MqttConnectOptions.setAutomaticReconnect处理
+    }
+
+    /**
+     * 消息到达回调
+     * 当从订阅的主题接收到新消息时调用此方法
+     *
+     * @param topic 消息的主题
+     * @param message 接收到的消息
+     */
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
+        log.info("收到MQTT消息: 主题={}, 消息内容={}, QoS={}", topic, payload, message.getQos());
+
+        // 创建事件对象
+        com.ruoyi.common.model.MqttMessage mqttMessage = new com.ruoyi.common.model.MqttMessage();
+        mqttMessage.setTopic(topic);
+        mqttMessage.setPayload(payload);
+        mqttMessage.setQos(message.getQos());
+        mqttMessage.setRetained(message.isRetained());
+
+        // 发布事件,通知其他组件处理此消息
+        eventPublisher.publishEvent(mqttMessage);
+    }
+
+    /**
+     * 消息发送完成回调
+     * 当消息成功发送到服务器后调用此方法
+     *
+     * @param token 消息传递令牌
+     */
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        try {
+            MqttMessage message = token.getMessage();
+            if (message != null) {
+                String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
+                log.debug("MQTT消息发送完成: {}", payload);
+            }
+        } catch (Exception e) {
+            log.error("处理消息发送完成回调时出错: {}", e.getMessage(), e);
+        }
+    }
+}

+ 61 - 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/mqtt/MqttMessageListener.java

@@ -0,0 +1,61 @@
+package com.ruoyi.common.utils.mqtt;
+
+import com.ruoyi.common.model.MqttMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * MQTT消息监听器
+ *
+ * <p>通过Spring事件机制接收MQTT消息,可以添加自定义的业务逻辑来处理接收到的消息。
+ * 可以创建多个不同的监听器来处理不同主题或类型的消息。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Slf4j
+@Component
+public class MqttMessageListener {
+
+    /**
+     * 处理接收到的MQTT消息
+     *
+     * @param message MQTT消息对象
+     */
+    @EventListener
+    public void handleMqttMessage(MqttMessage message) {
+        log.info("收到MQTT消息事件: 主题={}, 消息内容={}", message.getTopic(), message.getPayload());
+
+        // TODO: 添加自定义的业务逻辑来处理消息
+        // 例如:解析JSON消息内容,更新数据库,触发其他操作等
+
+        // 可以根据主题进行不同的处理
+        if (message.getTopic().startsWith("device/")) {
+            handleDeviceMessage(message);
+        } else if (message.getTopic().startsWith("alert/")) {
+            handleAlertMessage(message);
+        }
+    }
+
+    /**
+     * 处理设备相关消息
+     *
+     * @param message MQTT消息
+     */
+    private void handleDeviceMessage(MqttMessage message) {
+        // 设备消息处理逻辑
+        log.debug("处理设备消息: {}", message.getPayload());
+    }
+
+    /**
+     * 处理告警相关消息
+     *
+     * @param message MQTT消息
+     */
+    private void handleAlertMessage(MqttMessage message) {
+        // 告警消息处理逻辑
+        log.debug("处理告警消息: {}", message.getPayload());
+    }
+}

+ 271 - 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/mqtt/MqttService.java

@@ -0,0 +1,271 @@
+package com.ruoyi.common.utils.mqtt;
+
+
+import com.ruoyi.common.config.MqttProperties;
+import com.ruoyi.common.model.MqttMessage;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * MQTT服务类
+ *
+ * <p>提供MQTT消息发布和订阅功能的核心服务,封装了底层MQTT客户端操作,
+ * 提供简单易用的API供应用程序使用。</p>
+ *
+ * @author andy
+ * @version 1.0.0
+ * @since 2025-06-17
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class MqttService {
+
+    private final MqttAsyncClient mqttClient;
+    private final MqttProperties mqttProperties;
+
+    /**
+     * 初始化方法
+     * 在服务启动时订阅默认主题
+     */
+    @PostConstruct
+    public void init() {
+        // 如果配置了默认主题,则自动订阅
+        if (mqttProperties.getDefaultTopic() != null && !mqttProperties.getDefaultTopic().isEmpty()) {
+            try {
+                subscribe(mqttProperties.getDefaultTopic());
+                log.info("已自动订阅默认主题: {}", mqttProperties.getDefaultTopic());
+            } catch (MqttException e) {
+                log.error("自动订阅默认主题失败: {}", e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * 销毁方法
+     * 在服务关闭时断开MQTT连接
+     */
+    @PreDestroy
+    public void destroy() {
+        try {
+            if (mqttClient != null && mqttClient.isConnected()) {
+                mqttClient.disconnect().waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
+                log.info("MQTT客户端已断开连接");
+            }
+        } catch (MqttException e) {
+            log.error("断开MQTT连接时出错: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 检查客户端连接状态
+     *
+     * @return 是否已连接
+     */
+    public boolean isConnected() {
+        return mqttClient != null && mqttClient.isConnected();
+    }
+
+    /**
+     * 重新连接到MQTT服务器
+     *
+     * @return 重连操作的CompletableFuture
+     */
+    public CompletableFuture<Void> reconnect() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        try {
+            if (mqttClient == null) {
+                future.completeExceptionally(new IllegalStateException("MQTT客户端未初始化"));
+                return future;
+            }
+
+            if (mqttClient.isConnected()) {
+                log.info("MQTT客户端已经连接,无需重连");
+                future.complete(null);
+                return future;
+            }
+
+            mqttClient.connect(new MqttConnectOptions())
+                    .setActionCallback(new IMqttActionListener() {
+                        @Override
+                        public void onSuccess(IMqttToken token) {
+                            log.info("MQTT客户端重连成功");
+                            future.complete(null);
+                        }
+
+                        @Override
+                        public void onFailure(IMqttToken token, Throwable exception) {
+                            log.error("MQTT客户端重连失败: {}", exception.getMessage(), exception);
+                            future.completeExceptionally(exception);
+                        }
+                    });
+        } catch (MqttException e) {
+            log.error("MQTT重连过程中出错: {}", e.getMessage(), e);
+            future.completeExceptionally(e);
+        }
+
+        return future;
+    }
+
+    /**
+     * 发布消息
+     *
+     * @param message MQTT消息对象
+     * @return 发布操作的CompletableFuture
+     */
+    public CompletableFuture<Void> publish(MqttMessage message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        try {
+            if (!isConnected()) {
+                throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
+            }
+
+            // 创建Paho消息对象
+            org.eclipse.paho.client.mqttv3.MqttMessage mqttMessage =
+                    new org.eclipse.paho.client.mqttv3.MqttMessage(message.getPayload().getBytes(StandardCharsets.UTF_8));
+            mqttMessage.setQos(message.getQos());
+            mqttMessage.setRetained(message.isRetained());
+
+            // 发布消息
+            mqttClient.publish(message.getTopic(), mqttMessage)
+                    .setActionCallback(new IMqttActionListener() {
+                        @Override
+                        public void onSuccess(IMqttToken token) {
+                            log.debug("消息发布成功: 主题={}, 内容={}", message.getTopic(), message.getPayload());
+                            future.complete(null);
+                        }
+
+                        @Override
+                        public void onFailure(IMqttToken token, Throwable exception) {
+                            log.error("消息发布失败: {}", exception.getMessage(), exception);
+                            future.completeExceptionally(exception);
+                        }
+                    });
+        } catch (Exception e) {
+            log.error("发布消息时出错: {}", e.getMessage(), e);
+            future.completeExceptionally(e);
+        }
+
+        return future;
+    }
+
+    /**
+     * 使用单独参数发布消息
+     *
+     * @param topic 消息主题
+     * @param payload 消息内容
+     * @return 发布操作的CompletableFuture
+     */
+    public CompletableFuture<Void> publish(String topic, String payload) {
+        MqttMessage message = new MqttMessage(topic, payload, mqttProperties.getDefaultQos(), false);
+        return publish(message);
+    }
+
+    /**
+     * 使用完整参数发布消息
+     *
+     * @param topic 消息主题
+     * @param payload 消息内容
+     * @param qos 服务质量
+     * @param retained 是否保留
+     * @return 发布操作的CompletableFuture
+     */
+    public CompletableFuture<Void> publish(String topic, String payload, int qos, boolean retained) {
+        MqttMessage message = new MqttMessage(topic, payload, qos, retained);
+        return publish(message);
+    }
+
+    /**
+     * 订阅主题
+     *
+     * @param topic 要订阅的主题
+     * @return 订阅成功的主题
+     * @throws MqttException MQTT异常
+     */
+    public String subscribe(String topic) throws MqttException {
+        return subscribe(topic, mqttProperties.getDefaultQos());
+    }
+
+    /**
+     * 使用指定QoS订阅主题
+     *
+     * @param topic 要订阅的主题
+     * @param qos 服务质量
+     * @return 订阅成功的主题
+     * @throws MqttException MQTT异常
+     */
+    public String subscribe(String topic, int qos) throws MqttException {
+        if (!isConnected()) {
+            throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
+        }
+
+        mqttClient.subscribe(topic, qos).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
+        log.info("已订阅主题: {}, QoS: {}", topic, qos);
+        return topic;
+    }
+
+    /**
+     * 批量订阅主题
+     *
+     * @param topics 要订阅的主题列表
+     * @return 订阅成功的主题列表
+     * @throws MqttException MQTT异常
+     */
+    public List<String> subscribe(List<String> topics) throws MqttException {
+        if (!isConnected()) {
+            throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
+        }
+
+        String[] topicArray = topics.toArray(new String[0]);
+        int[] qosArray = new int[topics.size()];
+
+        // 默认使用相同的QoS级别
+        Arrays.fill(qosArray, mqttProperties.getDefaultQos());
+
+        mqttClient.subscribe(topicArray, qosArray).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
+        log.info("已批量订阅主题: {}", topics);
+        return topics;
+    }
+
+    /**
+     * 取消订阅主题
+     *
+     * @param topic 要取消订阅的主题
+     * @throws MqttException MQTT异常
+     */
+    public void unsubscribe(String topic) throws MqttException {
+        if (!isConnected()) {
+            throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
+        }
+
+        mqttClient.unsubscribe(topic).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
+        log.info("已取消订阅主题: {}", topic);
+    }
+
+    /**
+     * 批量取消订阅主题
+     *
+     * @param topics 要取消订阅的主题列表
+     * @throws MqttException MQTT异常
+     */
+    public void unsubscribe(List<String> topics) throws MqttException {
+        if (!isConnected()) {
+            throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
+        }
+
+        String[] topicArray = topics.toArray(new String[0]);
+        mqttClient.unsubscribe(topicArray).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
+        log.info("已批量取消订阅主题: {}", topics);
+    }
+}

+ 1 - 1
ruoyi-system/pom.xml

@@ -25,4 +25,4 @@
 
     </dependencies>
 
-</project>
+</project>