1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      RocketMQ怎么在springBoot中使用

      今天就跟大家聊聊有關(guān)RocketMQ怎么在springBoot中使用,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

      成都創(chuàng)新互聯(lián)公司是一家專注于成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、成都外貿(mào)網(wǎng)站建設(shè)與策劃設(shè)計(jì),五常網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)十載,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:五常等地區(qū)。五常做網(wǎng)站價格咨詢:18982081108

      pom 配置:    

      
       org.springframework.boot
       spring-boot-starter-parent
       1.5.10.RELEASE
      
      
        org.apache.rocketmq
        rocketmq-client
        4.2.0
      

      application.properties  配置:

      # 消費(fèi)者的組名
      apache.rocketmq.consumer.PushConsumer=PushConsumer
      # 生產(chǎn)者的組名
      apache.rocketmq.producer.producerGroup=Producer
      # NameServer地址
      apache.rocketmq.namesrvAddr=localhost:9876

      java代碼:

      生產(chǎn)者

      package test.config.rocketmq;
      
      import org.apache.rocketmq.client.producer.DefaultMQProducer;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.common.message.Message;
      import org.apache.rocketmq.remoting.common.RemotingHelper;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.stereotype.Component;
      import org.springframework.util.StopWatch;
      import javax.annotation.PostConstruct;
      
      @Component
      public class RocketMQClient {
        /**
         * 生產(chǎn)者的組名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
      
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
      
        @PostConstruct
        public void defaultMQProducer() {
          //生產(chǎn)者的組名
          DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
          //指定NameServer地址,多個地址以 ; 隔開
          producer.setNamesrvAddr(namesrvAddr);
          producer.setVipChannelEnabled(false);
          try {
            /**
             * Producer對象在使用之前必須要調(diào)用start初始化,初始化一次即可
             * 注意:切記不可以在每次發(fā)送消息時,都調(diào)用start方法
             */
            producer.start();
      
            //創(chuàng)建一個消息實(shí)例,包含 topic、tag 和 消息體
            //如下:topic 為 "TopicTest",tag 為 "push"
            Message message = new Message("TopicTest", "push", "發(fā)送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
      
            StopWatch stop = new StopWatch();
            stop.start();
      
            for (int i = 0; i < 1; i++) {
              SendResult result = producer.send(message);
              System.out.println("發(fā)送響應(yīng):MsgId:" + result.getMsgId() + ",發(fā)送狀態(tài):" + result.getSendStatus());
            }
            stop.stop();
            System.out.println("----------------發(fā)送一萬條消息耗時:" + stop.getTotalTimeMillis());
          } catch (Exception e) {
            e.printStackTrace();
          } finally {
            producer.shutdown();
          }
        }
      }

      消費(fèi)者: 

      import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
      import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
      import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
      import org.apache.rocketmq.common.message.MessageExt;
      import org.apache.rocketmq.remoting.common.RemotingHelper;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.PostConstruct;
      
      
      @Component
      public class RocketMQServer {
        /**
         * 消費(fèi)者的組名
         */
        @Value("${apache.rocketmq.consumer.PushConsumer}")
        private String consumerGroup;
      
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
      
        @PostConstruct
        public void defaultMQPushConsumer() {
          //消費(fèi)者的組名
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
      
          //指定NameServer地址,多個地址以 ; 隔開
          consumer.setNamesrvAddr(namesrvAddr);
          consumer.setVipChannelEnabled(false);
          try {
            //訂閱PushTopic下Tag為push的消息
            consumer.subscribe("TopicTest", "push");
      
            //設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
            //如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
              try {
                for (MessageExt messageExt : list) {
      
                  System.out.println("messageExt: " + messageExt);//輸出消息內(nèi)容
      
                  String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
      
                  System.out.println("消費(fèi)響應(yīng):msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//輸出消息內(nèi)容
                }
              } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功
            });
            consumer.start();
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }

      掉坑總結(jié):

      1.rocketMQ啟動時,命令不是  mqbroker -n 127.0.0.1:9876

               正確應(yīng)該是:mqbroker -n 127.0.0.1:9876 butiautoCreateTopicEnable=true

               否則會拋出:No route info of this topic, TopicTest

      2.客戶端連接時拋出異常

              org.apache.rocketmq.client.exception.MQClientException: 

              Send [3] times, still failed, cost [3180]ms, Topic: TopicTest, BrokersSent: \

              [WIN-93CGO0S5G25, WIN-93CGO0S5G25, WIN-93CGO0S5G25]

      解決方式兩種

      1.producer.setVipChannelEnabled(false); 生產(chǎn)者和消費(fèi)者添加這行代買。

      2.降rocketmq版本,降成3.2.6

      關(guān)于spring.rocketmq.name-server的坑

      看下圖:

      RocketMQ怎么在springBoot中使用

      注意:

      如果你是SpringBoot2.0+的框架,或者是JDK10。

      你需要將你自己的項(xiàng)目配置文件中的,spring.rocketmq.name-server改成

      spring.rocketmq.nameServer。注意是nameServer。

      不然就會報(bào)各種稀奇古怪的bug。

      關(guān)于啟動報(bào)內(nèi)存不足的錯

      在安裝啟動Name Server和Broker的時候,一定要修改配置文件,不然內(nèi)存會爆炸。

      Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory 

      RocketMQ怎么在springBoot中使用

      看完上述內(nèi)容,你們對RocketMQ怎么在springBoot中使用有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。


      新聞名稱:RocketMQ怎么在springBoot中使用
      網(wǎng)頁地址:http://ef60e0e.cn/article/jjjsgh.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        连江县| 扶余县| 米泉市| 临江市| 阳城县| 黄骅市| 马尔康县| 增城市| 龙胜| 北流市| 土默特左旗| 依安县| 昔阳县| 金山区| 即墨市| 嘉荫县| 禄劝| 浙江省| 洱源县| 深水埗区| 托里县| 铅山县| 浦县| 大英县| 夏邑县| 惠来县| 紫阳县| 迭部县| 五台县| 兴国县| 图片| 微山县| 钦州市| 新源县| 枞阳县| 平原县| 黑山县| 昌江| 保康县| 兴隆县| 宣恩县|