MqttCallbackHandler.java 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package com.ruoyi.common.utils.mqtt;
  2. import lombok.RequiredArgsConstructor;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  5. import org.eclipse.paho.client.mqttv3.MqttCallback;
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;
  7. import org.springframework.context.ApplicationEventPublisher;
  8. import org.springframework.stereotype.Component;
  9. import java.nio.charset.StandardCharsets;
  10. /**
  11. * MQTT回调处理器
  12. *
  13. * <p>处理MQTT客户端的各种回调事件,包括连接丢失、消息到达和消息发送完成。
  14. * 通过Spring事件机制将消息分发到其他组件进行处理。</p>
  15. *
  16. * @author andy
  17. * @version 1.0.0
  18. * @since 2025-06-17
  19. */
  20. @Slf4j
  21. @Component
  22. @RequiredArgsConstructor
  23. public class MqttCallbackHandler implements MqttCallback {
  24. private final ApplicationEventPublisher eventPublisher;
  25. /**
  26. * 连接丢失回调
  27. * 当与MQTT服务器的连接意外断开时调用此方法
  28. *
  29. * @param cause 连接丢失原因
  30. */
  31. @Override
  32. public void connectionLost(Throwable cause) {
  33. log.warn("MQTT连接丢失: {}", cause.getMessage());
  34. // 连接丢失后的处理逻辑
  35. // 可以根据需要添加重连机制,通常由MqttConnectOptions.setAutomaticReconnect处理
  36. }
  37. /**
  38. * 消息到达回调
  39. * 当从订阅的主题接收到新消息时调用此方法
  40. *
  41. * @param topic 消息的主题
  42. * @param message 接收到的消息
  43. */
  44. @Override
  45. public void messageArrived(String topic, MqttMessage message) throws Exception {
  46. String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
  47. log.info("收到MQTT消息: 主题={}, 消息内容={}, QoS={}", topic, payload, message.getQos());
  48. // 创建事件对象
  49. com.ruoyi.common.model.MqttMessage mqttMessage = new com.ruoyi.common.model.MqttMessage();
  50. mqttMessage.setTopic(topic);
  51. mqttMessage.setPayload(payload);
  52. mqttMessage.setQos(message.getQos());
  53. mqttMessage.setRetained(message.isRetained());
  54. // 发布事件,通知其他组件处理此消息
  55. eventPublisher.publishEvent(mqttMessage);
  56. }
  57. /**
  58. * 消息发送完成回调
  59. * 当消息成功发送到服务器后调用此方法
  60. *
  61. * @param token 消息传递令牌
  62. */
  63. @Override
  64. public void deliveryComplete(IMqttDeliveryToken token) {
  65. try {
  66. MqttMessage message = token.getMessage();
  67. if (message != null) {
  68. String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
  69. log.debug("MQTT消息发送完成: {}", payload);
  70. }
  71. } catch (Exception e) {
  72. log.error("处理消息发送完成回调时出错: {}", e.getMessage(), e);
  73. }
  74. }
  75. }