java实现springboot集成exmq(mqtt协议)

2022/7/13 1:25:49

本文主要是介绍java实现springboot集成exmq(mqtt协议),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、application.yml配置

spring:
  mqtt:
    username: test
    password: qwerty123
    host-url: tcp://172.18.42.34:32016
    client-id: /dataProcessingTopicwf
    subscribe-id: /dataProcessing
    timeout: 100000
    keep-alive-interval: 100
    defaultTopic: $queue/+/dataProcessing

2、mqttclient

package com.catl.mqttutil.mqtt.client;

import com.catl.mqttutil.mqtt.callback.PushCallback;
import com.catl.mqttutil.mqtt.model.BytesModel;
import com.catl.mqttutil.mqtt.properties.MqttProperties;
import com.catl.mqttutil.mqtt.utils.MqttUtils;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MqttCustomClient {

    @Autowired
    private PushCallback pushCallback;

    @Autowired
    private MqttProperties mqttProperties;

    private static MqttClient client;

    public static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttCustomClient.client = client;
    }

//    public static String pubTopic;

    @PostConstruct
    public void init() {
//        ReqHeaderUtil.platformKey = stationProperties.getPlatformkey();
//        pubTopic = stationProperties.getPlatformkey() + mqttProperties.getClientId();
        this.connect();
    }

    public String getPubTopic(String platformkey) {
        return platformkey + mqttProperties.getClientId();
    }

    public void connect() {
        MqttClient client;
        try {
            log.info("开始连接mqtt服务端,mqttProperties={}", mqttProperties);
            client = new MqttClient(mqttProperties.getHostUrl(), "111111111111100", new MemoryPersistence());
            MqttCustomClient.setClient(client);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
            options.setAutomaticReconnect(true);
            client.setCallback(pushCallback);
            client.connect(options);
            if (client.isConnected()) {
                client.subscribe(mqttProperties.getDefaultTopic(), 1);
            } else {
                IMqttToken token = client.connectWithResult(options);
                token.waitForCompletion();
                client.subscribe(mqttProperties.getDefaultTopic(), 1);
            }
            log.info("mqtt连接信息【subTopic=>{}】", mqttProperties.getDefaultTopic());
        } catch (Exception e) {
            if (e.getMessage().contains("已连接") || e.getMessage().contains("connected")) {
                log.info("mqtt客户端已连接", e);
            } else {
                log.error("mqtt客户端连接异常", e);
            }
        }
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public void publish(int qos, boolean retained, String topic, ByteBuf pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        byte[] array = pushMessage.array();
        message.setPayload(array);
        MqttTopic mTopic = MqttCustomClient.client.getTopic(topic);
        if (null == mTopic) {
            log.error("订阅topic[{}]不存在", topic);
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            token.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    log.debug("mqtt服务器接收消息(publish) - 成功");
                }
                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    log.debug("EMQX服务器接收消息失败!");
                }
            });
            log.error("发布topic[{}]成功", topic);
        } catch (MqttException e) {
            log.error("发布topic[{}]异常", topic);
        }
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        log.info("【sub-开始订阅消息】,topic:【{}】,qos:【{}】", topic, qos);
        try {
            MqttCustomClient.client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * @param commandSign 命令标识
     * @param vin         车辆vin码
     * @param dataBytes   数据
     * @description 发布主题消息
     * @date 2021/7/19 下午5:03
     * @author junliu
     **/
    public void publish(short commandSign, String vin, byte[] dataBytes, String pubTopic) {
        vin = StringUtil.isNullOrEmpty(vin) ? vin : "00000000000000000000";
        BytesModel model = new BytesModel(commandSign, vin, dataBytes);
        System.out.println("model" + model);
        ByteBuf byteBuf = MqttUtils.toByteBuf(model);
        System.out.println("byteBuf" + byteBuf);
        publish(1, false, pubTopic, byteBuf);
    }

    /**
     * @param commandSign 命令标识
     * @param dataBytes   数据
     * @description 发布主题消息
     * @date 2021/7/19 下午5:03
     * @author junliu
     **/
    public void publish(short commandSign, byte[] dataBytes, String pubTopic) {
        this.publish(commandSign, "00000000000000000", dataBytes, pubTopic);
    }

}

3、pushcallback

package com.catl.mqttutil.mqtt.callback;


import com.catl.mqttutil.mqtt.client.MqttCustomClient;
import com.catl.mqttutil.mqtt.model.BaseModel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;




/**
 * 消费监听类
 *
 * @author yanxinghua
 * @since 2021/7/19 9:13
 */
@Component
@Slf4j
public class PushCallback implements MqttCallback {

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("mqtt连接断开,错误信息msg-{}", throwable.getMessage());
        if (MqttCustomClient.getClient() == null || !MqttCustomClient.getClient().isConnected()) {
            log.info("mqtt重连中");
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        try {
            log.info(">>>>>>>>>接收消息主题 : {},接收消息Qos :{}", topic, mqttMessage.getQos());
            byte[] payload = mqttMessage.getPayload();
            ByteBuf byteBuf = Unpooled.wrappedBuffer(payload);
            // 获取控制类型
            byte[] msgIdBytes = new byte[1];
            byteBuf.getBytes(2, msgIdBytes, 0, 1);
//            short msgId = Short.valueOf(ByteBufUtil.hexDump(reverse(msgIdBytes)), 16);
//            log.warn(">>>>>>>>>【{}消息报文】:{}", msgId, ByteBufUtil.hexDump(payload));
            BaseModel model = new BaseModel(byteBuf);
            model.parseBody();
//            SpringUtils.applicationContext.publishEvent(model);
        } catch (Exception e) {
            log.error("接受消息解析失败!msg-{}", e);
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

}

4、pom.xml

<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>


这篇关于java实现springboot集成exmq(mqtt协议)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程