package com.ruoyi.common.utils.mqtt; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** * MQTT回调处理器 * * <p>处理MQTT客户端的各种回调事件,包括连接丢失、消息到达和消息发送完成。 * 通过Spring事件机制将消息分发到其他组件进行处理。</p> * * @author andy * @version 1.0.0 * @since 2025-06-17 */ @Slf4j @Component @RequiredArgsConstructor public class MqttCallbackHandler implements MqttCallback { private final ApplicationEventPublisher eventPublisher; /** * 连接丢失回调 * 当与MQTT服务器的连接意外断开时调用此方法 * * @param cause 连接丢失原因 */ @Override public void connectionLost(Throwable cause) { log.warn("MQTT连接丢失: {}", cause.getMessage()); // 连接丢失后的处理逻辑 // 可以根据需要添加重连机制,通常由MqttConnectOptions.setAutomaticReconnect处理 } /** * 消息到达回调 * 当从订阅的主题接收到新消息时调用此方法 * * @param topic 消息的主题 * @param message 接收到的消息 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload(), StandardCharsets.UTF_8); log.info("收到MQTT消息: 主题={}, 消息内容={}, QoS={}", topic, payload, message.getQos()); // 创建事件对象 com.ruoyi.common.model.MqttMessage mqttMessage = new com.ruoyi.common.model.MqttMessage(); mqttMessage.setTopic(topic); mqttMessage.setPayload(payload); mqttMessage.setQos(message.getQos()); mqttMessage.setRetained(message.isRetained()); // 发布事件,通知其他组件处理此消息 eventPublisher.publishEvent(mqttMessage); } /** * 消息发送完成回调 * 当消息成功发送到服务器后调用此方法 * * @param token 消息传递令牌 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { try { MqttMessage message = token.getMessage(); if (message != null) { String payload = new String(message.getPayload(), StandardCharsets.UTF_8); log.debug("MQTT消息发送完成: {}", payload); } } catch (Exception e) { log.error("处理消息发送完成回调时出错: {}", e.getMessage(), e); } } }