package com.ruoyi.common.config; import com.ruoyi.common.utils.mqtt.MqttCallbackHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.UUID; /** * MQTT客户端配置类 * *

配置MQTT客户端的连接和回调处理,包括自动重连、证书配置、回调处理等, * 创建MQTT连接并初始化相关组件。

* * @author andy * @version 1.0.0 * @since 2025-06-17 */ @Slf4j @Configuration @RequiredArgsConstructor public class MqttConfig { private final MqttProperties mqttProperties; private final MqttCallbackHandler mqttCallbackHandler; /** * 创建MQTT连接选项 * * @return MQTT连接选项 */ @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清除会话 options.setCleanSession(mqttProperties.isCleanSession()); // 设置连接超时 options.setConnectionTimeout(mqttProperties.getTimeout()); // 设置心跳间隔 options.setKeepAliveInterval(mqttProperties.getKeepAlive()); // 设置自动重连 options.setAutomaticReconnect(mqttProperties.isAutoReconnect()); // 设置用户名和密码 if (mqttProperties.getUsername() != null && !mqttProperties.getUsername().isEmpty()) { options.setUserName(mqttProperties.getUsername()); if (mqttProperties.getPassword() != null && !mqttProperties.getPassword().isEmpty()) { options.setPassword(mqttProperties.getPassword().toCharArray()); } } log.info("MQTT连接选项已配置,服务器地址: {}", mqttProperties.getServerUri()); return options; } /** * 创建MQTT客户端 * * @return MQTT客户端 * @throws MqttException MQTT异常 */ @Bean public MqttAsyncClient mqttClient() throws MqttException { // 检查服务器URI if (mqttProperties.getServerUri() == null || mqttProperties.getServerUri().isEmpty()) { throw new IllegalArgumentException("MQTT服务器URI不能为空"); } // 生成客户端ID String clientId = mqttProperties.getClientId(); if (clientId == null || clientId.isEmpty()) { clientId = "spring-mqtt-" + UUID.randomUUID().toString().substring(0, 8); log.info("未配置客户端ID,已自动生成: {}", clientId); } // 创建客户端实例 MqttAsyncClient client = new MqttAsyncClient( mqttProperties.getServerUri(), clientId, new MemoryPersistence()); // 设置回调 client.setCallback(mqttCallbackHandler); // 初始化连接 try { client.connect(mqttConnectOptions()).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L); log.info("MQTT客户端已连接到服务器: {}", mqttProperties.getServerUri()); } catch (MqttException e) { log.error("MQTT客户端连接失败: {}", e.getMessage(), e); throw e; } return client; } }