您的当前位置:首页正文

在Springboot中使用MQTT收发不同主题的消息

2020-04-12 来源:钮旅网
在SprintBoot使用MQTT

2018.4.7

在Sprintboot中使用MQTT分为三步:

第一步定义一个收发数据的类给程序的其他类使用。

public class XRemoteDevice {

private static XRemoteDevice instance = new XRemoteDevice();

public static XRemoteDevice getInstance() { return instance; }

public void receive(Object topic, Object message) { System.out.println(\"Topic: \" + topic); System.out.println(\"Payload\" + message); }

public boolean send(String topic, String message) { try {

XServerContext.getGateway().sendToMqtt(message, topic); }

catch(Exception e) {

XLogger.getInstance().debug(\"Error When Sending \" + e); return false; }

return true; } }

/** * Created by Lenovo on 2018/4/4. */ @MessagingGateway(defaultRequestChannel = \"mqttOutboundChannel\") public interface MyGateway {

void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic); }

@Component

public class XServerContext implements ApplicationContextAware { private static ApplicationContext applicationContext = null;

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

if (XServerContext.applicationContext == null) {

XServerContext.applicationContext = applicationContext; } }

public static MyGateway getGateway() {

return applicationContext.getBean(MyGateway.class); } }

第二步在pom文件中引入MQTT。

org.springframework.boot

spring-boot-starter-integration

org.springframework.integration spring-integration-stream

org.springframework.integration spring-integration-mqtt

第三步在springboot主程序main函数类中定义MQTT的BEAN

@Bean

public MqttPahoClientFactory mqttClientFactory() {

DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(config.getServer()); factory.setUserName(config.getName()); factory.setPassword(config.getPassword());

return factory; }

//////////////////////////////////////////////////////////////////////////////////////// // The bean function (inbound) //////////////////////////////////////////////////////////////////////////////////////// @Bean

public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean

public MessageProducer inbound() {

MqttPahoMessageDrivenChannelAdapter adapter =

new MqttPahoMessageDrivenChannelAdapter(config.getClientId(), mqttClientFactory()); adapter.setCompletionTimeout(5000);

adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel());

for(String s : config.getTopics()) { adapter.addTopic(s, config.getQos()); }

return adapter; } @Bean

@ServiceActivator(inputChannel = \"mqttInputChannel\") public MessageHandler handler() { return new MessageHandler() {

@Override

public void handleMessage(Message message) throws MessagingException { XRemoteDevice.getInstance().receive(message.getHeaders().get(\"mqtt_topic\"), message.getPayload()); } }; }

//////////////////////////////////////////////////////////////////////////////////////// // The bean function (outbound) //////////////////////////////////////////////////////////////////////////////////////// @Bean

@ServiceActivator(inputChannel = \"mqttOutboundChannel\") public MessageHandler mqttOutbound() {

MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(config.getClientId(), mqttClientFactory());

messageHandler.setAsync(true);

messageHandler.setDefaultTopic(\"testTopic\"); return messageHandler; } @Bean

public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }

因篇幅问题不能全部显示,请点此查看更多更全内容