123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package com.ruoyi.common.config;
- import com.ruoyi.common.utils.mqtt.MqttCallbackHandler;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.util.UUID;
- /**
- * MQTT客户端配置类
- *
- * <p>配置MQTT客户端的连接和回调处理,包括自动重连、证书配置、回调处理等,
- * 创建MQTT连接并初始化相关组件。</p>
- *
- * @author andy
- * @version 1.0.0
- * @since 2025-06-17
- */
- @Slf4j
- @Configuration
- @RequiredArgsConstructor
- public class MqttConfig {
- private final MqttProperties mqttProperties;
- private final MqttCallbackHandler mqttCallbackHandler;
- /**
- * 创建MQTT连接选项
- *
- * @return MQTT连接选项
- */
- @Bean
- public MqttConnectOptions mqttConnectOptions() {
- MqttConnectOptions options = new MqttConnectOptions();
- // 设置是否清除会话
- options.setCleanSession(mqttProperties.isCleanSession());
- // 设置连接超时
- options.setConnectionTimeout(mqttProperties.getTimeout());
- // 设置心跳间隔
- options.setKeepAliveInterval(mqttProperties.getKeepAlive());
- // 设置自动重连
- options.setAutomaticReconnect(mqttProperties.isAutoReconnect());
- // 设置用户名和密码
- if (mqttProperties.getUsername() != null && !mqttProperties.getUsername().isEmpty()) {
- options.setUserName(mqttProperties.getUsername());
- if (mqttProperties.getPassword() != null && !mqttProperties.getPassword().isEmpty()) {
- options.setPassword(mqttProperties.getPassword().toCharArray());
- }
- }
- log.info("MQTT连接选项已配置,服务器地址: {}", mqttProperties.getServerUri());
- return options;
- }
- /**
- * 创建MQTT客户端
- *
- * @return MQTT客户端
- * @throws MqttException MQTT异常
- */
- @Bean
- public MqttAsyncClient mqttClient() throws MqttException {
- // 检查服务器URI
- if (mqttProperties.getServerUri() == null || mqttProperties.getServerUri().isEmpty()) {
- throw new IllegalArgumentException("MQTT服务器URI不能为空");
- }
- // 生成客户端ID
- String clientId = mqttProperties.getClientId();
- if (clientId == null || clientId.isEmpty()) {
- clientId = "spring-mqtt-" + UUID.randomUUID().toString().substring(0, 8);
- log.info("未配置客户端ID,已自动生成: {}", clientId);
- }
- // 创建客户端实例
- MqttAsyncClient client = new MqttAsyncClient(
- mqttProperties.getServerUri(),
- clientId,
- new MemoryPersistence());
- // 设置回调
- client.setCallback(mqttCallbackHandler);
- // 初始化连接
- try {
- client.connect(mqttConnectOptions()).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
- log.info("MQTT客户端已连接到服务器: {}", mqttProperties.getServerUri());
- } catch (MqttException e) {
- log.error("MQTT客户端连接失败: {}", e.getMessage(), e);
- throw e;
- }
- return client;
- }
- }
|