2018.4.7
在Sprintboot中使用MQTT分为三步:
第一步定义一个收发数据的类给程序的其他类使用。
public class XRemoteDevice {
private static XRemoteDevice instance = new XRemoteDevice();
public static XRemoteDevice getInstance() { return instance; }
public void receive(Object topic, Object message) { System.out.println(\"Topic: \" + topic); System.out.println(\"Payload\" + message); }
public boolean send(String topic, String message) { try {
XServerContext.getGateway().sendToMqtt(message, topic); }
catch(Exception e) {
XLogger.getInstance().debug(\"Error When Sending \" + e); return false; }
return true; } }
/** * Created by Lenovo on 2018/4/4. */ @MessagingGateway(defaultRequestChannel = \"mqttOutboundChannel\") public interface MyGateway {
void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic); }
@Component
public class XServerContext implements ApplicationContextAware { private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (XServerContext.applicationContext == null) {
XServerContext.applicationContext = applicationContext; } }
public static MyGateway getGateway() {
return applicationContext.getBean(MyGateway.class); } }
第二步在pom文件中引入MQTT。
第三步在springboot主程序main函数类中定义MQTT的BEAN
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(config.getServer()); factory.setUserName(config.getName()); factory.setPassword(config.getPassword());
return factory; }
//////////////////////////////////////////////////////////////////////////////////////// // The bean function (inbound) //////////////////////////////////////////////////////////////////////////////////////// @Bean
public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(config.getClientId(), mqttClientFactory()); adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel());
for(String s : config.getTopics()) { adapter.addTopic(s, config.getQos()); }
return adapter; } @Bean
@ServiceActivator(inputChannel = \"mqttInputChannel\") public MessageHandler handler() { return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException { XRemoteDevice.getInstance().receive(message.getHeaders().get(\"mqtt_topic\"), message.getPayload()); } }; }
//////////////////////////////////////////////////////////////////////////////////////// // The bean function (outbound) //////////////////////////////////////////////////////////////////////////////////////// @Bean
@ServiceActivator(inputChannel = \"mqttOutboundChannel\") public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(config.getClientId(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(\"testTopic\"); return messageHandler; } @Bean
public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
因篇幅问题不能全部显示,请点此查看更多更全内容