java实现springboot集成exmq(mqtt协议)
2022/7/13 1:25:49
本文主要是介绍java实现springboot集成exmq(mqtt协议),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1、application.yml配置
spring: mqtt: username: test password: qwerty123 host-url: tcp://172.18.42.34:32016 client-id: /dataProcessingTopicwf subscribe-id: /dataProcessing timeout: 100000 keep-alive-interval: 100 defaultTopic: $queue/+/dataProcessing
2、mqttclient
package com.catl.mqttutil.mqtt.client; import com.catl.mqttutil.mqtt.callback.PushCallback; import com.catl.mqttutil.mqtt.model.BytesModel; import com.catl.mqttutil.mqtt.properties.MqttProperties; import com.catl.mqttutil.mqtt.utils.MqttUtils; import io.netty.buffer.ByteBuf; import io.netty.util.internal.StringUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component @Slf4j public class MqttCustomClient { @Autowired private PushCallback pushCallback; @Autowired private MqttProperties mqttProperties; private static MqttClient client; public static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttCustomClient.client = client; } // public static String pubTopic; @PostConstruct public void init() { // ReqHeaderUtil.platformKey = stationProperties.getPlatformkey(); // pubTopic = stationProperties.getPlatformkey() + mqttProperties.getClientId(); this.connect(); } public String getPubTopic(String platformkey) { return platformkey + mqttProperties.getClientId(); } public void connect() { MqttClient client; try { log.info("开始连接mqtt服务端,mqttProperties={}", mqttProperties); client = new MqttClient(mqttProperties.getHostUrl(), "111111111111100", new MemoryPersistence()); MqttCustomClient.setClient(client); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); options.setAutomaticReconnect(true); client.setCallback(pushCallback); client.connect(options); if (client.isConnected()) { client.subscribe(mqttProperties.getDefaultTopic(), 1); } else { IMqttToken token = client.connectWithResult(options); token.waitForCompletion(); client.subscribe(mqttProperties.getDefaultTopic(), 1); } log.info("mqtt连接信息【subTopic=>{}】", mqttProperties.getDefaultTopic()); } catch (Exception e) { if (e.getMessage().contains("已连接") || e.getMessage().contains("connected")) { log.info("mqtt客户端已连接", e); } else { log.error("mqtt客户端连接异常", e); } } } /** * 发布 * * @param qos 连接方式 * @param retained 是否保留 * @param topic 主题 * @param pushMessage 消息体 */ public void publish(int qos, boolean retained, String topic, ByteBuf pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); byte[] array = pushMessage.array(); message.setPayload(array); MqttTopic mTopic = MqttCustomClient.client.getTopic(topic); if (null == mTopic) { log.error("订阅topic[{}]不存在", topic); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); token.setActionCallback(new IMqttActionListener() { @Override public void onSuccess(IMqttToken iMqttToken) { log.debug("mqtt服务器接收消息(publish) - 成功"); } @Override public void onFailure(IMqttToken iMqttToken, Throwable throwable) { log.debug("EMQX服务器接收消息失败!"); } }); log.error("发布topic[{}]成功", topic); } catch (MqttException e) { log.error("发布topic[{}]异常", topic); } try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 订阅某个主题 * * @param topic 主题 * @param qos 连接方式 */ public void subscribe(String topic, int qos) { log.info("【sub-开始订阅消息】,topic:【{}】,qos:【{}】", topic, qos); try { MqttCustomClient.client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * @param commandSign 命令标识 * @param vin 车辆vin码 * @param dataBytes 数据 * @description 发布主题消息 * @date 2021/7/19 下午5:03 * @author junliu **/ public void publish(short commandSign, String vin, byte[] dataBytes, String pubTopic) { vin = StringUtil.isNullOrEmpty(vin) ? vin : "00000000000000000000"; BytesModel model = new BytesModel(commandSign, vin, dataBytes); System.out.println("model" + model); ByteBuf byteBuf = MqttUtils.toByteBuf(model); System.out.println("byteBuf" + byteBuf); publish(1, false, pubTopic, byteBuf); } /** * @param commandSign 命令标识 * @param dataBytes 数据 * @description 发布主题消息 * @date 2021/7/19 下午5:03 * @author junliu **/ public void publish(short commandSign, byte[] dataBytes, String pubTopic) { this.publish(commandSign, "00000000000000000", dataBytes, pubTopic); } }
3、pushcallback
package com.catl.mqttutil.mqtt.callback; import com.catl.mqttutil.mqtt.client.MqttCustomClient; import com.catl.mqttutil.mqtt.model.BaseModel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消费监听类 * * @author yanxinghua * @since 2021/7/19 9:13 */ @Component @Slf4j public class PushCallback implements MqttCallback { @Override public void connectionLost(Throwable throwable) { log.error("mqtt连接断开,错误信息msg-{}", throwable.getMessage()); if (MqttCustomClient.getClient() == null || !MqttCustomClient.getClient().isConnected()) { log.info("mqtt重连中"); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) { try { log.info(">>>>>>>>>接收消息主题 : {},接收消息Qos :{}", topic, mqttMessage.getQos()); byte[] payload = mqttMessage.getPayload(); ByteBuf byteBuf = Unpooled.wrappedBuffer(payload); // 获取控制类型 byte[] msgIdBytes = new byte[1]; byteBuf.getBytes(2, msgIdBytes, 0, 1); // short msgId = Short.valueOf(ByteBufUtil.hexDump(reverse(msgIdBytes)), 16); // log.warn(">>>>>>>>>【{}消息报文】:{}", msgId, ByteBufUtil.hexDump(payload)); BaseModel model = new BaseModel(byteBuf); model.parseBody(); // SpringUtils.applicationContext.publishEvent(model); } catch (Exception e) { log.error("接受消息解析失败!msg-{}", e); } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } }
4、pom.xml
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
这篇关于java实现springboot集成exmq(mqtt协议)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-01为什么公共事业机构会偏爱 TiDB :TiDB 数据库在某省妇幼健康管理系统的应用
- 2024-04-26敏捷开发:想要快速交付就必须舍弃产品质量?
- 2024-04-26静态代码分析的这些好处,我竟然都不知道?
- 2024-04-26你在测试金字塔的哪一层?(下)
- 2024-04-26快刀斩乱麻,DevOps让代码评审也自动起来
- 2024-04-262024年最好用的10款ER图神器!
- 2024-04-2203-为啥大模型LLM还没能完全替代你?
- 2024-04-2101-大语言模型发展
- 2024-04-17基于SpringWeb MultipartFile文件上传、下载功能
- 2024-04-14个人开发者,Spring Boot 项目如何部署