123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package com.ruoyi.common.utils.mqtt;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.stereotype.Component;
- import java.nio.charset.StandardCharsets;
- /**
- * MQTT回调处理器
- *
- * <p>处理MQTT客户端的各种回调事件,包括连接丢失、消息到达和消息发送完成。
- * 通过Spring事件机制将消息分发到其他组件进行处理。</p>
- *
- * @author andy
- * @version 1.0.0
- * @since 2025-06-17
- */
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class MqttCallbackHandler implements MqttCallback {
- private final ApplicationEventPublisher eventPublisher;
- /**
- * 连接丢失回调
- * 当与MQTT服务器的连接意外断开时调用此方法
- *
- * @param cause 连接丢失原因
- */
- @Override
- public void connectionLost(Throwable cause) {
- log.warn("MQTT连接丢失: {}", cause.getMessage());
- // 连接丢失后的处理逻辑
- // 可以根据需要添加重连机制,通常由MqttConnectOptions.setAutomaticReconnect处理
- }
- /**
- * 消息到达回调
- * 当从订阅的主题接收到新消息时调用此方法
- *
- * @param topic 消息的主题
- * @param message 接收到的消息
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
- log.info("收到MQTT消息: 主题={}, 消息内容={}, QoS={}", topic, payload, message.getQos());
- // 创建事件对象
- com.ruoyi.common.model.MqttMessage mqttMessage = new com.ruoyi.common.model.MqttMessage();
- mqttMessage.setTopic(topic);
- mqttMessage.setPayload(payload);
- mqttMessage.setQos(message.getQos());
- mqttMessage.setRetained(message.isRetained());
- // 发布事件,通知其他组件处理此消息
- eventPublisher.publishEvent(mqttMessage);
- }
- /**
- * 消息发送完成回调
- * 当消息成功发送到服务器后调用此方法
- *
- * @param token 消息传递令牌
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- try {
- MqttMessage message = token.getMessage();
- if (message != null) {
- String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
- log.debug("MQTT消息发送完成: {}", payload);
- }
- } catch (Exception e) {
- log.error("处理消息发送完成回调时出错: {}", e.getMessage(), e);
- }
- }
- }
|