1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      基SpringIntegration如何實現(xiàn)MQTT客戶端簡單訂閱發(fā)布功能

      這篇文章主要介紹“基Spring Integration如何實現(xiàn)MQTT客戶端簡單訂閱發(fā)布功能”,在日常操作中,相信很多人在基Spring Integration如何實現(xiàn)MQTT客戶端簡單訂閱發(fā)布功能問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”基Spring Integration如何實現(xiàn)MQTT客戶端簡單訂閱發(fā)布功能”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

      公司主營業(yè)務(wù):網(wǎng)站制作、成都網(wǎng)站設(shè)計、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出潼南免費做網(wǎng)站回饋大家。

      1 簡介

      Spring Integration 提供入站(inbound)和出站(outbound)通道適配器,以支持MQTT消息協(xié)議。使用這兩適配器,需要加入依賴:

      
      
          org.springframework.integration
          spring-integration-mqtt
          5.2.1.RELEASE
      
      // Gradle
      compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"

      當(dāng)前的MQTT Integration實現(xiàn)使用的是Eclipse Paho MQTT客戶端庫。兩個適配器的配置都是使用DefaultMqttPahoClientFactory實現(xiàn)的。有關(guān)配置選項的更多信息,請參閱Eclipse Paho MQTT文檔定義。

      > 建議配置MqttConnectOptions對象并將其注入工廠(factory),而不是在工廠本身里設(shè)置(不推薦使用)MQTT連接選項。

      2 Inbound(消息驅(qū)動)通道適配器

      入站通道適配器由MqttPahoMessageDrivenChannelAdapter實現(xiàn)。常用的配置項有:

      • 客戶端ID

      • MQTT Broker URL

      • 待訂閱的主題列表

      • 帶訂閱的主題QoS值列表

      • MqttMessageConverter(可選)。默認(rèn)情況下,默認(rèn)的DefaultPaHomeMessageConverter生成一條帶有字符串有效負(fù)載的消息,其頭部內(nèi)容如下:

        • mqtt_topic: 接收消息的主題

        • mqtt_duplicate: 如果消息是重復(fù)的,則為true

        • mqtt_qos: 服務(wù)質(zhì)量,你可以將DefaultPahoMessageConverter聲明為并將payloadAsBytes屬性設(shè)置為true,從而將DefaultPahoMessageConverter返回有效負(fù)載中的原始byte[]

      • 客戶端工廠

      • 發(fā)送超時。僅當(dāng)通道可能阻塞(例如當(dāng)前已滿的有界隊列通道)時才適用。

      • 錯誤通道。下游異常將以錯誤消息的形式發(fā)送到此通道(如果提供)。有效負(fù)載是包含失敗消息和原因的MessagingException

      • 恢復(fù)間隔。它控制適配器在發(fā)生故障后嘗試重新連接的時間間隔。默認(rèn)為10000毫秒(10秒)。

      > 從Spring 4.1版開始,可以省略URL。相反,你可以在DefaultMqttPahoClientFactoryserver URIs屬性中提供服務(wù)器uri。例如,這樣做允許連接到高可用(HA)集群。

      Spring 4.2.2開始,當(dāng)適配器成功訂閱到主題了,MqttSubscribedEvent事件就會被觸發(fā)。當(dāng)連接失敗或者訂閱失敗,MqttConnectionFailedEvent事件會被觸發(fā)。這兩個事件都能夠被一個Bean通過實現(xiàn)ApplicationListener而接收到。另外,名為recoveryInterval的新屬性控制適配器在失敗后嘗試重新連接的時間間隔。默認(rèn)為10000毫秒(10秒)。

      @Component
      public class MQTTSubscribedListener implements ApplicationListener {
          private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class);
      
          @Override
          public void onApplicationEvent(MqttSubscribedEvent event) {
              LOGGER.debug("Subscribed Success: " + event.getMessage());
          }
      }

      > 在版本Spring 4.2.3之前,當(dāng)適配器停止時,客戶端總是取消訂閱。這是不正確的,因為如果客戶端QOS大于0,我們需要保持訂閱處于活動狀態(tài),以便在下次啟動時傳遞適配器停止時到達(dá)的消息。這還需要將客戶機(jī)工廠上的cleanSession屬性設(shè)置為false。默認(rèn)為true。從4.2.3版開始,如果cleanSession屬性為false,則適配器不會取消訂閱(默認(rèn)情況下),這個默認(rèn)行為可以通過在工廠上設(shè)置consumerCloseAction屬性來重寫此行為。它可以有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN,最后一項(默認(rèn)設(shè)置)僅在cleanSession屬性為true時取消訂閱。若要還原到4.2.3之前的行為,請始終使用“取消訂閱”設(shè)置項。

      > 注意:從Spring 5.0開始,topic、qos和retained屬性映射到.RECEIVED_…h(huán)eaders(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以避免意外傳播到(默認(rèn)情況下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。

      public MessageHandler handler() {
          return new MessageHandler() {
              @Override
              public void handleMessage(Message message) throws MessagingException {
                  LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload());
              }
          };
      }

      2.1 在運(yùn)行時添加和刪除主題

      Spring4.1開始,你可以通過編程更改適配器訂閱的主題。Spring Integration提供了addTopic()removeTopic()方法。添加主題時,可以選擇指定QoS值(默認(rèn)是1)。你還可以通過向具有適當(dāng)有效負(fù)載的發(fā)送適當(dāng)?shù)南硇薷闹黝}。示例:

      myMqttAdapter.addTopic('foo', 1)

      停止和啟動適配器對主題列表(topics設(shè)置項)沒有影響(它不會還原到配置中的原始設(shè)置)。這些更改不會保留到應(yīng)用程序上下文的生命周期之外。新的應(yīng)用程序上下文將還原為配置的設(shè)置。

      在適配器停止(或與代理斷開連接)時更改主題列表(topics)將在下次建立連接時生效。

      2.2 使用Java配置配置

      以下Spring Boot應(yīng)用程序顯示了如何使用Java配置配置入站(inbound)適配器的示例:

      @SpringBootApplication
      public class MqttJavaApplication {
      
          public static void main(String[] args) {
              new SpringApplicationBuilder(MqttJavaApplication.class)
                      .web(false)
                      .run(args);
          }
      
          @Bean
          public MessageChannel mqttInputChannel() {
              return new DirectChannel();
          }
      
          @Bean
          public MessageProducer inbound() {
              MqttPahoMessageDrivenChannelAdapter adapter =
                      new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                       "topic1", "topic2");
              adapter.setCompletionTimeout(5000);
              adapter.setConverter(new DefaultPahoMessageConverter());
              adapter.setQos(1);
              adapter.setOutputChannel(mqttInputChannel());
              return adapter;
          }
      
          @Bean
          @ServiceActivator(inputChannel = "mqttInputChannel")
          public MessageHandler handler() {
              return new MessageHandler() {
      
                  @Override
                  public void handleMessage(Message message) throws MessagingException {
                      System.out.println(message.getPayload());
                  }
      
              };
          }
      }

      2.3 使用Java DSL配置

      下面的Spring Boot應(yīng)用程序提供了使用Java DSL配置入站適配器的示例:

      @SpringBootApplication
      public class MqttJavaApplication {
      
          public static void main(String[] args) {
              new SpringApplicationBuilder(MqttJavaApplication.class)
                  .web(false)
                  .run(args);
          }
      
          @Bean
          public IntegrationFlow mqttInbound() {
              return IntegrationFlows.from(
                               new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                              "testClient", "topic1", "topic2");)
                      .handle(m -> System.out.println(m.getPayload()))
                      .get();
          }
      
      }

      3 出站通道適配器

      出站通道適配器由MqttPahoMessageHandler實現(xiàn),MqttPahoMessageHandler包裝在ConsumerEndpoint中。為了方便起見,可以使用名稱空間配置它。

      從Spring 4.1開始,適配器支持異步發(fā)送操作,在確認(rèn)交付之前避免阻塞。如果需要,可以發(fā)出應(yīng)用程序事件以使應(yīng)用程序確認(rèn)傳遞。

      以下列表顯示出站通道適配器可用的屬性:

      • MQTT Client ID

      • MQTT Broker URL

      • Converter(MqttMessageConver,可選的),默認(rèn)的DefaultPaHomeMessageConverter可識別以下標(biāo)題:

        • mqtt_topic: 消息將發(fā)送到的主題

        • mqtt_retained: 如果要保留消息,則為true

        • mqtt_qos:消息服務(wù)質(zhì)量

      • 客戶端工廠

      • default-qos,默認(rèn)的服務(wù)質(zhì)量。如果找不到mqtt_qos頭或qos表達(dá)式返回空值,則使用它。如果提供自定義轉(zhuǎn)換器,則不使用它。

      • 用于計算以確定qos的表達(dá)式。缺省值是headers[mqtt_qos]

      • 保留標(biāo)志的默認(rèn)值。如果找不到mqtt_retained頭,則使用它。如果提供了自定義轉(zhuǎn)換器,則不使用它。

      • 要計算以確定保留布爾值的表達(dá)式。默認(rèn)為headers[mqtt_retained]

      • 消息發(fā)送到的默認(rèn)主題(如果找不到mqtt_topic頭,則使用)

      • 要計算以確定目標(biāo)主題的表達(dá)式。默認(rèn)為headers['mqtt_topic']

      • async如果為true,則調(diào)用方不會阻塞。而是在發(fā)送消息時等待傳遞確認(rèn)。默認(rèn)值為false(發(fā)送將阻塞,直到確認(rèn)發(fā)送)

      • async-events,當(dāng)async和async事件(async-events)都為true時,將發(fā)出MqttMessageSentEvent。它包含消息、主題、客戶端庫生成的消息id、clientId和clientInstance(每次連接客戶端時遞增)。當(dāng)客戶端庫確認(rèn)傳遞時,將發(fā)出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使傳遞與發(fā)送相關(guān)。任何ApplicationListener或事件入站通道適配器都可以接收這些事件。請注意,MqttMessageDeliveredEvent可能在MqttMessageSentEvent之前收到。默認(rèn)值為false

      > 注意,同樣地,從Spring 4.1開始,可以省略URL。相反,可以在DefaultMqttPahoClientFactorserver URIs屬性中提供服務(wù)器uri。例如,這允許連接到高可用(HA)集群。

      3.1 使用Java配置配置

      下面的Spring Boot應(yīng)用程序展示了如何使用Java配置配置出站適配器的示例:

      @SpringBootApplication
      @IntegrationComponentScan
      public class MqttJavaApplication {
      
          public static void main(String[] args) {
              ConfigurableApplicationContext context =
                      new SpringApplicationBuilder(MqttJavaApplication.class)
                              .web(false)
                              .run(args);
              MyGateway gateway = context.getBean(MyGateway.class);
              gateway.sendToMqtt("foo");
          }
      
          @Bean
          public MqttPahoClientFactory mqttClientFactory() {
              DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
              MqttConnectOptions options = new MqttConnectOptions();
              options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
              options.setUserName("username");
              options.setPassword("password".toCharArray());
              factory.setConnectionOptions(options);
              return factory;
          }
      
          @Bean
          @ServiceActivator(inputChannel = "mqttOutboundChannel")
          public MessageHandler mqttOutbound() {
              MqttPahoMessageHandler messageHandler =
                             new MqttPahoMessageHandler("testClient", mqttClientFactory());
              messageHandler.setAsync(true);
              messageHandler.setDefaultTopic("testTopic");
              return messageHandler;
          }
      
          @Bean
          public MessageChannel mqttOutboundChannel() {
              return new DirectChannel();
          }
      
          @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
          public interface MyGateway {
      
              void sendToMqtt(String data);
      
          }
      
      }

      3.2 使用Java DSL配置

      下面的Spring Boot應(yīng)用程序提供了使用Java DSL配置出站適配器的示例:

      @SpringBootApplication
      public class MqttJavaApplication {
      
          public static void main(String[] args) {
              new SpringApplicationBuilder(MqttJavaApplication.class)
                  .web(false)
                  .run(args);
          }
      
         	@Bean
         	public IntegrationFlow mqttOutboundFlow() {
         	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
          }
      
      }

      到此,關(guān)于“基Spring Integration如何實現(xiàn)MQTT客戶端簡單訂閱發(fā)布功能”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
      文章題目:基SpringIntegration如何實現(xiàn)MQTT客戶端簡單訂閱發(fā)布功能
      標(biāo)題來源:http://ef60e0e.cn/article/jjjgch.html

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        虞城县| 繁峙县| 久治县| 义乌市| 陵川县| 佛山市| 阿克陶县| 略阳县| 象山县| 竹溪县| 婺源县| 福海县| 固安县| 宜章县| 桐城市| 民勤县| 威海市| 新河县| 迭部县| 桑日县| 潜江市| 邵阳市| 开封市| 凤台县| 河北省| 和政县| 湘潭县| 成武县| 贡嘎县| 维西| 友谊县| 长海县| 鸡泽县| 凌海市| 衡阳县| 西藏| 平舆县| 博白县| 皋兰县| 宁德市| 永仁县|