MqttConfig.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package com.ruoyi.common.config;
  2. import com.ruoyi.common.utils.mqtt.MqttCallbackHandler;
  3. import lombok.RequiredArgsConstructor;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
  6. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  7. import org.eclipse.paho.client.mqttv3.MqttException;
  8. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import java.util.UUID;
  12. /**
  13. * MQTT客户端配置类
  14. *
  15. * <p>配置MQTT客户端的连接和回调处理,包括自动重连、证书配置、回调处理等,
  16. * 创建MQTT连接并初始化相关组件。</p>
  17. *
  18. * @author andy
  19. * @version 1.0.0
  20. * @since 2025-06-17
  21. */
  22. @Slf4j
  23. @Configuration
  24. @RequiredArgsConstructor
  25. public class MqttConfig {
  26. private final MqttProperties mqttProperties;
  27. private final MqttCallbackHandler mqttCallbackHandler;
  28. /**
  29. * 创建MQTT连接选项
  30. *
  31. * @return MQTT连接选项
  32. */
  33. @Bean
  34. public MqttConnectOptions mqttConnectOptions() {
  35. MqttConnectOptions options = new MqttConnectOptions();
  36. // 设置是否清除会话
  37. options.setCleanSession(mqttProperties.isCleanSession());
  38. // 设置连接超时
  39. options.setConnectionTimeout(mqttProperties.getTimeout());
  40. // 设置心跳间隔
  41. options.setKeepAliveInterval(mqttProperties.getKeepAlive());
  42. // 设置自动重连
  43. options.setAutomaticReconnect(mqttProperties.isAutoReconnect());
  44. // 设置用户名和密码
  45. if (mqttProperties.getUsername() != null && !mqttProperties.getUsername().isEmpty()) {
  46. options.setUserName(mqttProperties.getUsername());
  47. if (mqttProperties.getPassword() != null && !mqttProperties.getPassword().isEmpty()) {
  48. options.setPassword(mqttProperties.getPassword().toCharArray());
  49. }
  50. }
  51. log.info("MQTT连接选项已配置,服务器地址: {}", mqttProperties.getServerUri());
  52. return options;
  53. }
  54. /**
  55. * 创建MQTT客户端
  56. *
  57. * @return MQTT客户端
  58. * @throws MqttException MQTT异常
  59. */
  60. @Bean
  61. public MqttAsyncClient mqttClient() throws MqttException {
  62. // 检查服务器URI
  63. if (mqttProperties.getServerUri() == null || mqttProperties.getServerUri().isEmpty()) {
  64. throw new IllegalArgumentException("MQTT服务器URI不能为空");
  65. }
  66. // 生成客户端ID
  67. String clientId = mqttProperties.getClientId();
  68. if (clientId == null || clientId.isEmpty()) {
  69. clientId = "spring-mqtt-" + UUID.randomUUID().toString().substring(0, 8);
  70. log.info("未配置客户端ID,已自动生成: {}", clientId);
  71. }
  72. // 创建客户端实例
  73. MqttAsyncClient client = new MqttAsyncClient(
  74. mqttProperties.getServerUri(),
  75. clientId,
  76. new MemoryPersistence());
  77. // 设置回调
  78. client.setCallback(mqttCallbackHandler);
  79. // 初始化连接
  80. try {
  81. client.connect(mqttConnectOptions()).waitForCompletion(mqttProperties.getCommandTimeout() * 1000L);
  82. log.info("MQTT客户端已连接到服务器: {}", mqttProperties.getServerUri());
  83. } catch (MqttException e) {
  84. log.error("MQTT客户端连接失败: {}", e.getMessage(), e);
  85. throw e;
  86. }
  87. return client;
  88. }
  89. }