1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
      storm操作zookeeper的方法是什么

      這篇文章主要介紹“storm操作zookeeper的方法是什么”,在日常操作中,相信很多人在storm操作zookeeper的方法是什么問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”storm操作zookeeper的方法是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

      網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了烏拉特后免費(fèi)建站歡迎大家使用!

      storm操作zookeeper的主要函數(shù)都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定義了兩個(gè)重要protocol:ClusterState和StormClusterState。
      clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協(xié)議中封裝了一組與zookeeper進(jìn)行交互的基礎(chǔ)函數(shù),如獲取子節(jié)點(diǎn)函數(shù),獲取子節(jié)點(diǎn)數(shù)據(jù)函數(shù)等,ClusterState協(xié)議定義如下:

      ClusterState協(xié)議

      (defprotocol ClusterState
          (set-ephemeral-node [this path data])
          (delete-node [this path])
          (create-sequential [this path data])
          ;; if node does not exist, create persistent with this data
          (set-data [this path data])
          (get-data [this path watch?])
          (get-version [this path watch?])
          (get-data-with-version [this path watch?])
          (get-children [this path watch?])
          (mkdirs [this path])
          (close [this])
          (register [this callback])
          (unregister [this id]))

      StormClusterState協(xié)議封裝了一組storm與zookeeper進(jìn)行交互的函數(shù),可以將StormClusterState協(xié)議中的函數(shù)看成ClusterState協(xié)議中函數(shù)的"組合"。StormClusterState協(xié)議定義如下:

      StormClusterState協(xié)議

      (defprotocol StormClusterState
        (assignments [this callback])
        (assignment-info [this storm-id callback])
        (assignment-info-with-version [this storm-id callback])
        (assignment-version [this storm-id callback])
        (active-storms [this])
        (storm-base [this storm-id callback])
        (get-worker-heartbeat [this storm-id node port])
        (executor-beats [this storm-id executor->node+port])
        (supervisors [this callback])
        (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
        (setup-heartbeats! [this storm-id])
        (teardown-heartbeats! [this storm-id])
        (teardown-topology-errors! [this storm-id])
        (heartbeat-storms [this])
        (error-topologies [this])
        (worker-heartbeat! [this storm-id node port info])
        (remove-worker-heartbeat! [this storm-id node port])
        (supervisor-heartbeat! [this supervisor-id info])
        (activate-storm! [this storm-id storm-base])
        (update-storm! [this storm-id new-elems])
        (remove-storm-base! [this storm-id])
        (set-assignment! [this storm-id info])
        (remove-storm! [this storm-id])
        (report-error [this storm-id task-id node port error])
        (errors [this storm-id task-id])
        (disconnect [this]))

      命名空間backtype.storm.cluster除了定義ClusterState和StormClusterState這兩個(gè)重要協(xié)議外,還定義了兩個(gè)重要函數(shù):mk-distributed-cluster-state和mk-storm-cluster-state。
      mk-distributed-cluster-state函數(shù)如下:
      該函數(shù)返回一個(gè)實(shí)現(xiàn)了ClusterState協(xié)議的對(duì)象,通過(guò)這個(gè)對(duì)象就可以與zookeeper進(jìn)行交互了。

      mk-distributed-cluster-state函數(shù)

      (defn mk-distributed-cluster-state
      ;; conf綁定了storm.yaml中的配置信息,是一個(gè)map對(duì)象
      [conf]
      ;; zk綁定一個(gè)zk client,Storm使用CuratorFramework與Zookeeper進(jìn)行交互 
      (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT)                       :auth-conf conf)]
          ;; 創(chuàng)建storm集群在zookeeper上的根目錄,默認(rèn)值為/storm
          (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
          (.close zk))
      ;; callbacks綁定回調(diào)函數(shù)集合,是一個(gè)map對(duì)象
      (let [callbacks (atom {})
          ;; active標(biāo)示zookeeper集群狀態(tài) 
          active (atom true)
          ;; zk重新綁定新的zk client,該zk client設(shè)置了watcher,這樣當(dāng)zookeeper集群的狀態(tài)發(fā)生變化時(shí),zk server會(huì)給zk client發(fā)送相應(yīng)的event,zk client設(shè)置的watcher會(huì)調(diào)用callbacks中相應(yīng)回調(diào)函數(shù)來(lái)處理event
          ;; 啟動(dòng)nimbus時(shí),callbacks是一個(gè)空集合,所以nimbus端收到event后不會(huì)調(diào)用任何回調(diào)函數(shù);但是啟動(dòng)supervisor時(shí),callbacks中注冊(cè)了回調(diào)函數(shù),所以當(dāng)supervisor收到zk server發(fā)送的event后,會(huì)調(diào)用相應(yīng)的回調(diào)函數(shù)
          ;; mk-client函數(shù)定義在zookeeper.clj文件中,請(qǐng)參見(jiàn)其定義部分
          zk (zk/mk-client conf
                           (conf STORM-ZOOKEEPER-SERVERS)
                           (conf STORM-ZOOKEEPER-PORT)
                           :auth-conf conf
                           :root (conf STORM-ZOOKEEPER-ROOT)
                           ;; :watcher綁定一個(gè)函數(shù),指定zk client的默認(rèn)watcher函數(shù),state標(biāo)示當(dāng)前zk client的狀態(tài);type標(biāo)示事件類型;path標(biāo)示zookeeper上產(chǎn)生該事件的znode
                           ;; 該watcher函數(shù)主要功能就是執(zhí)行callbacks集合中的函數(shù),callbacks集合中的函數(shù)是在mk-storm-cluster-state函數(shù)中通過(guò)調(diào)用ClusterState的register函數(shù)添加的
                           :watcher (fn [state type path]
                                      (when @active
                                        (when-not (= :connected state)
                                          (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                        (when-not (= :none type)
                                          (doseq [callback (vals @callbacks)]
                                            (callback type path))))))]
      ;; reify相當(dāng)于java中的implements,這里表示實(shí)現(xiàn)一個(gè)協(xié)議
      (reify
       ClusterState
       ;; register函數(shù)用于將回調(diào)函數(shù)加入callbacks中,key是一個(gè)32位的標(biāo)識(shí)
       (register
         [this callback]
         (let [id (uuid)]
           (swap! callbacks assoc id callback)
           id))
       ;; unregister函數(shù)用于將指定key的回調(diào)函數(shù)從callbacks中刪除
       (unregister
         [this id]
         (swap! callbacks dissoc id))
       ;; 在zookeeper上添加一個(gè)臨時(shí)節(jié)點(diǎn)
       (set-ephemeral-node
         [this path data]
         (zk/mkdirs zk (parent-path path))
         (if (zk/exists zk path false)
           (try-cause
             (zk/set-data zk path data) ; should verify that it's ephemeral
             (catch KeeperException$NoNodeException e
               (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
               (zk/create-node zk path data :ephemeral)
               ))
           (zk/create-node zk path data :ephemeral)))
       ;; 在zookeeper上添加一個(gè)順序節(jié)點(diǎn)
       (create-sequential
         [this path data]
         (zk/create-node zk path data :sequential))
       ;; 修改某個(gè)節(jié)點(diǎn)數(shù)據(jù)
       (set-data
         [this path data]
         ;; note: this does not turn off any existing watches
         (if (zk/exists zk path false)
           (zk/set-data zk path data)
           (do
             (zk/mkdirs zk (parent-path path))
             (zk/create-node zk path data :persistent))))
       ;; 刪除指定節(jié)點(diǎn)
       (delete-node
         [this path]
         (zk/delete-recursive zk path))
       ;; 獲取指定節(jié)點(diǎn)數(shù)據(jù)。path標(biāo)示節(jié)點(diǎn)路徑;watch?是一個(gè)布爾類型值,表示是否需要對(duì)該節(jié)點(diǎn)進(jìn)行"觀察",如果watch?=true,當(dāng)調(diào)用set-data函數(shù)修改該節(jié)點(diǎn)數(shù)據(jù)后,
       ;; 會(huì)給zk client發(fā)送一個(gè)事件,zk client接收事件后,會(huì)調(diào)用創(chuàng)建zk client時(shí)指定的默認(rèn)watcher函數(shù)(即:watcher綁定的函數(shù))
       (get-data
         [this path watch?]
         (zk/get-data zk path watch?))
       ;; 與get-data函數(shù)的區(qū)別就是獲取指定節(jié)點(diǎn)數(shù)據(jù)的同時(shí),獲取節(jié)點(diǎn)數(shù)據(jù)的version,version表示節(jié)點(diǎn)數(shù)據(jù)修改的次數(shù)
       (get-data-with-version
         [this path watch?]
         (zk/get-data-with-version zk path watch?))
       ;; 獲取指定節(jié)點(diǎn)的version,watch?的含義與get-data函數(shù)中的watch?相同
       (get-version 
         [this path watch?]
         (zk/get-version zk path watch?))
       ;; 獲取指定節(jié)點(diǎn)的子節(jié)點(diǎn)列表,watch?的含義與get-data函數(shù)中的watch?相同
       (get-children
         [this path watch?]
         (zk/get-children zk path watch?))
       ;; 在zookeeper上創(chuàng)建一個(gè)節(jié)點(diǎn)
       (mkdirs
         [this path]
         (zk/mkdirs zk path))
       ;; 關(guān)閉zk client
       (close
         [this]
         (reset! active false)
         (.close zk)))))

      mk-storm-cluster-state函數(shù)定義如下:
      mk-storm-cluster-state函數(shù)非常重要,該函數(shù)返回一個(gè)實(shí)現(xiàn)了StormClusterState協(xié)議的實(shí)例,通過(guò)該實(shí)例storm就可以更加方便與zookeeper進(jìn)行交互。
      在啟動(dòng)nimbus和supervisor的函數(shù)中均調(diào)用了mk-storm-cluster-state函數(shù)。關(guān)于nimbus和supervisor的啟動(dòng)將在之后的文章中介紹。

      mk-storm-cluster-state函數(shù)

      (defn mk-storm-cluster-state
        [cluster-state-spec]
        ;; satisfies?謂詞相當(dāng)于java中的instanceof,判斷cluster-state-spec是不是ClusterState實(shí)例
        (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                  [false cluster-state-spec]
                                  [true (mk-distributed-cluster-state cluster-state-spec)])
          ;; 綁定topology id->回調(diào)函數(shù)的map,當(dāng)/assignments/{topology id}數(shù)據(jù)發(fā)生變化時(shí),zk client執(zhí)行assignment-info-callback中topology id所對(duì)應(yīng)的回調(diào)函數(shù)
          assignment-info-callback (atom {})
          ;; assignment-info-with-version-callback與assignment-info-callback類似
          assignment-info-with-version-callback (atom {})
          ;; assignment-version-callback與assignments-callback類似
          assignment-version-callback (atom {})
          ;; 當(dāng)/supervisors標(biāo)示的znode的子節(jié)點(diǎn)發(fā)生變化時(shí),zk client執(zhí)行supervisors-callback指向的函數(shù)
          supervisors-callback (atom nil)
          ;; 當(dāng)/assignments標(biāo)示的znode的子節(jié)點(diǎn)發(fā)生變化時(shí),zk client執(zhí)行assignments-callback指向的函數(shù)
          assignments-callback (atom nil)
          ;; 當(dāng)/storms/{topology id}標(biāo)示的znode的數(shù)據(jù)發(fā)生變化時(shí),zk client執(zhí)行storm-base-callback中topology id所對(duì)應(yīng)的回調(diào)函數(shù)
          storm-base-callback (atom {})
          ;; register函數(shù)將"回調(diào)函數(shù)(fn ...)"添加到cluster-state的callbacks集合中,并返回標(biāo)示該回調(diào)函數(shù)的uuid
          state-id (register
                     cluster-state
                     ;; 定義"回調(diào)函數(shù)",type標(biāo)示事件類型,path標(biāo)示znode
                     (fn [type path]
                       ;; subtree綁定路徑前綴如"assignments"、"storms"、"supervisors"等,args存放topology id
                       (let [[subtree & args] (tokenize-path path)]
                         ;; condp相當(dāng)于java中的switch
                         (condp = subtree
                           ;; 當(dāng)subtree="assignments"時(shí),如果args為空,說(shuō)明是/assignments的子節(jié)點(diǎn)發(fā)生變化,執(zhí)行assignments-callback指向的回調(diào)函數(shù),否則
               ;; 說(shuō)明/assignments/{topology id}標(biāo)示的節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化,執(zhí)行assignment-info-callback指向的回調(diào)函數(shù)
                           ASSIGNMENTS-ROOT (if (empty? args)
                                              (issue-callback! assignments-callback)
                                              (issue-map-callback! assignment-info-callback (first args)))
                           ;; 當(dāng)subtree="supervisors"時(shí),說(shuō)明是/supervisors的子節(jié)點(diǎn)發(fā)生變化,執(zhí)行supervisors-callback指向的回調(diào)函數(shù)
                           SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                           ;; 當(dāng)subtree="storms"時(shí),說(shuō)明是/storms/{topology id}標(biāo)示的節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化,執(zhí)行storm-base-callback指向的回調(diào)函數(shù)
                           STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                           ;; this should never happen
                           (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
      ;; 在zookeeper上創(chuàng)建storm運(yùn)行topology所必需的znode
      (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
        (mkdirs cluster-state p))
      ;; 返回一個(gè)實(shí)現(xiàn)StormClusterState協(xié)議的實(shí)例
      (reify
        StormClusterState
        ;; 獲取/assignments的子節(jié)點(diǎn)列表,如果callback不為空,將其賦值給assignments-callback,并對(duì)/assignments添加"節(jié)點(diǎn)觀察"
        (assignments
          [this callback]
          (when callback
            (reset! assignments-callback callback))
          (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
        ;; 獲取/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù),即storm-id的分配信息,如果callback不為空,將其添加到assignment-info-callback中,并對(duì)/assignments/{storm-id}添加"數(shù)據(jù)觀察"
        (assignment-info
          [this storm-id callback]
          (when callback
            (swap! assignment-info-callback assoc storm-id callback))
          (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
        ;; 獲取/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù)包括version信息,如果callback不為空,將其添加到assignment-info-with-version-callback中,并對(duì)/assignments/{storm-id}添加"數(shù)據(jù)觀察"
        (assignment-info-with-version 
          [this storm-id callback]
          (when callback
            (swap! assignment-info-with-version-callback assoc storm-id callback))
          (let [{data :data version :version} 
                (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
          {:data (maybe-deserialize data)
           :version version}))
        ;; 獲取/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù)的version信息,如果callback不為空,將其添加到assignment-version-callback中,并對(duì)/assignments/{storm-id}添加"數(shù)據(jù)觀察"
        (assignment-version 
          [this storm-id callback]
          (when callback
            (swap! assignment-version-callback assoc storm-id callback))
          (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
        ;; 獲取storm集群中正在運(yùn)行的topology id即/storms的子節(jié)點(diǎn)列表
        (active-storms
          [this]
          (get-children cluster-state STORMS-SUBTREE false))
        ;; 獲取storm集群中所有有心跳的topology id即/workerbeats的子節(jié)點(diǎn)列表
        (heartbeat-storms
          [this]
          (get-children cluster-state WORKERBEATS-SUBTREE false))
        ;; 獲取所有有錯(cuò)誤的topology id即/errors的子節(jié)點(diǎn)列表
        (error-topologies
          [this]
          (get-children cluster-state ERRORS-SUBTREE false))
        ;; 獲取指定storm-id進(jìn)程的心跳信息,即/workerbeats/{storm-id}/{node-port}節(jié)點(diǎn)數(shù)據(jù)
        (get-worker-heartbeat
          [this storm-id node port]
          (-> cluster-state
              (get-data (workerbeat-path storm-id node port) false)
              maybe-deserialize))
        ;; 獲取指定進(jìn)程中所有線程的心跳信息
        (executor-beats
          [this storm-id executor->node+port]
          ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
          ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
          ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
          ;; we avoid situations like that
          (let [node+port->executors (reverse-map executor->node+port)
                all-heartbeats (for [[[node port] executors] node+port->executors]
                                 (->> (get-worker-heartbeat this storm-id node port)
                                      (convert-executor-beats executors)
                                      ))]
            (apply merge all-heartbeats)))
        ;; 獲取/supervisors的子節(jié)點(diǎn)列表,如果callback不為空,將其賦值給supervisors-callback,并對(duì)/supervisors添加"節(jié)點(diǎn)觀察" 
        (supervisors
          [this callback]
          (when callback
            (reset! supervisors-callback callback))
          (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
        ;; 獲取/supervisors/{supervisor-id}節(jié)點(diǎn)數(shù)據(jù),即supervisor的心跳信息
        (supervisor-info
          [this supervisor-id]
          (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
        ;; 設(shè)置進(jìn)程心跳信息
        (worker-heartbeat!
          [this storm-id node port info]
          (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
        ;; 刪除進(jìn)程心跳信息
        (remove-worker-heartbeat!
          [this storm-id node port]
          (delete-node cluster-state (workerbeat-path storm-id node port)))
        ;; 創(chuàng)建指定storm-id的topology的用于存放心跳信息的節(jié)點(diǎn)
        (setup-heartbeats!
          [this storm-id]
          (mkdirs cluster-state (workerbeat-storm-root storm-id)))
        ;; 刪除指定storm-id的topology的心跳信息節(jié)點(diǎn)
        (teardown-heartbeats!
          [this storm-id]
          (try-cause
            (delete-node cluster-state (workerbeat-storm-root storm-id))
            (catch KeeperException e
              (log-warn-error e "Could not teardown heartbeats for " storm-id))))
        ;; 刪除指定storm-id的topology的錯(cuò)誤信息節(jié)點(diǎn)
        (teardown-topology-errors!
          [this storm-id]
          (try-cause
            (delete-node cluster-state (error-storm-root storm-id))
            (catch KeeperException e
              (log-warn-error e "Could not teardown errors for " storm-id))))
        ;; 創(chuàng)建臨時(shí)節(jié)點(diǎn)存放supervisor的心跳信息
        (supervisor-heartbeat!
          [this supervisor-id info]
          (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
        ;; 創(chuàng)建/storms/{storm-id}節(jié)點(diǎn)
        (activate-storm!
          [this storm-id storm-base]
          (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
        ;; 更新topology對(duì)應(yīng)的StormBase對(duì)象,即更新/storm/{storm-id}節(jié)點(diǎn)
        (update-storm!
          [this storm-id new-elems]
          ;; base綁定storm-id在zookeeper上的StormBase對(duì)象
          (let [base (storm-base this storm-id nil)
                ;; executors綁定component名稱->組件并行度的map
                executors (:component->executors base)
                ;; new-elems綁定合并后的組件并行度map,update函數(shù)將組件新并行度map合并到舊map中
                new-elems (update new-elems :component->executors (partial merge executors))]
            ;; 更新StormBase對(duì)象中的組件并行度map,并寫入zookeeper的/storms/{storm-id}節(jié)點(diǎn)
            (set-data cluster-state (storm-path storm-id)
                      (-> base
                          (merge new-elems)
                          Utils/serialize))))
        ;; 獲取storm-id的StormBase對(duì)象,即讀取/storms/{storm-id}節(jié)點(diǎn)數(shù)據(jù),如果callback不為空,將其賦值給storm-base-callback,并為/storms/{storm-id}節(jié)點(diǎn)添加"數(shù)據(jù)觀察"
        (storm-base
          [this storm-id callback]
          (when callback
            (swap! storm-base-callback assoc storm-id callback))
          (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
        ;; 刪除storm-id的StormBase對(duì)象,即刪除/storms/{storm-id}節(jié)點(diǎn)
        (remove-storm-base!
          [this storm-id]
          (delete-node cluster-state (storm-path storm-id)))
        ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù)
        (set-assignment!
          [this storm-id info]
          (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
        ;; 刪除storm-id的分配信息,同時(shí)刪除其StormBase信息,即刪除/assignments/{storm-id}節(jié)點(diǎn)和/storms/{storm-id}節(jié)點(diǎn)
        (remove-storm!
          [this storm-id]
          (delete-node cluster-state (assignment-path storm-id))
          (remove-storm-base! this storm-id))
        ;; 將組件異常信息寫入zookeeper
        (report-error
          [this storm-id component-id node port error]
          ;; path綁定"/errors/{storm-id}/{component-id}"
          (let [path (error-path storm-id component-id)
                ;; data綁定異常信息,包括異常時(shí)間、異常堆棧信息、主機(jī)和端口
                data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
                ;; 創(chuàng)建/errors/{storm-id}/{component-id}節(jié)點(diǎn)
                _ (mkdirs cluster-state path)
                ;; 創(chuàng)建/errors/{storm-id}/{component-id}的子順序節(jié)點(diǎn),并寫入異常信息
                _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
                ;; to-kill綁定除去順序節(jié)點(diǎn)編號(hào)最大的前10個(gè)節(jié)點(diǎn)的剩余節(jié)點(diǎn)的集合
                to-kill (->> (get-children cluster-state path false)
                             (sort-by parse-error-path)
                             reverse
                             (drop 10))]
            ;; 刪除to-kill中包含的節(jié)點(diǎn)
            (doseq [k to-kill]
              (delete-node cluster-state (str path "/" k)))))
        ;; 得到給定的storm-id component-id下的異常信息
        (errors
          [this storm-id component-id]
          (let [path (error-path storm-id component-id)
                _ (mkdirs cluster-state path)
                children (get-children cluster-state path false)
                errors (dofor [c children]
                              (let [data (-> (get-data cluster-state (str path "/" c) false)
                                             maybe-deserialize)]
                                (when data
                                  (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
                                  )))
                ]
            (->> (filter not-nil? errors)
                 (sort-by (comp - :time-secs)))))
        ;; 關(guān)閉連接,在關(guān)閉連接前,將回調(diào)函數(shù)從cluster-state的callbacks中刪除
        (disconnect
          [this]
          (unregister cluster-state state-id)
          (when solo?
            (close cluster-state))))))

      zookeeper.clj中mk-client函數(shù)

      mk-client函數(shù)創(chuàng)建一個(gè)CuratorFramework實(shí)例,為該實(shí)例注冊(cè)了CuratorListener,當(dāng)一個(gè)后臺(tái)操作完成或者指定的watch被觸發(fā)時(shí)將會(huì)執(zhí)行CuratorListener中的eventReceived()。eventReceived中調(diào)用的wacher函數(shù)就是mk-distributed-cluster-state中:watcher綁定的函數(shù)。

      (defnk mk-client
        [conf servers port
         :root ""
         :watcher default-watcher
         :auth-conf nil]
        (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
          (.. fk
              (getCuratorListenable)
              (addListener
                (reify CuratorListener
                  (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
                     (when (= (.getType e) CuratorEventType/WATCHED)
                       (let [^WatchedEvent event (.getWatchedEvent e)]
                         (watcher (zk-keeper-states (.getState event))
                                  (zk-event-types (.getType event))
                                  (.getPath event))))))))
          (.start fk)
          fk))

      以上就是storm與zookeeper進(jìn)行交互的源碼分析,我覺(jué)得最重要的部分就是如何給zk client添加"wacher",storm的很多功能都是通過(guò)zookeeper的wacher機(jī)制實(shí)現(xiàn)的,如"分配信息領(lǐng)取"。添加"wacher"大概分為以下幾個(gè)步驟:

      1. mk-distributed-cluster-state函數(shù)創(chuàng)建了一個(gè)zk client,并通過(guò):watcher給該zk client指定了"wacher"函數(shù),這個(gè)"wacher"函數(shù)只是簡(jiǎn)單調(diào)用ClusterState的callbacks集合中的函數(shù),這樣這個(gè)"wacher"函數(shù)執(zhí)行 哪些函數(shù)將由ClusterState實(shí)例決定

      2. ClusterState實(shí)例提供register函數(shù)來(lái)更新callbacks集合,ClusterState實(shí)例被傳遞給了mk-storm-cluster-state函數(shù),在mk-storm-cluster-state中調(diào)用register添加了一個(gè)函數(shù)(fn [type path] ... ),這個(gè)函數(shù)實(shí)現(xiàn)了"watcher"函數(shù)的全部邏輯

      3. mk-storm-cluster-state中注冊(cè)的函數(shù)執(zhí)行的具體內(nèi)容由StormClusterState實(shí)例決定,對(duì)zookeeper節(jié)點(diǎn)添加"觀察"也是通過(guò)StormClusterState實(shí)例實(shí)現(xiàn)的,這樣我們就可以通過(guò)StormClusterState實(shí)例對(duì)我們感興趣的節(jié)點(diǎn)添加"觀察"和"回調(diào)函數(shù)",當(dāng)節(jié)點(diǎn)或節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化后,zk server就會(huì)給zk client發(fā)送"通知",zk client中的"wather"函數(shù)將被調(diào)用,進(jìn)而我們注冊(cè)的"回到函數(shù)"將被執(zhí)行。

      到此,關(guān)于“storm操作zookeeper的方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!


      本文標(biāo)題:storm操作zookeeper的方法是什么
      本文URL:http://ef60e0e.cn/article/jehpss.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        卢湾区| 苏尼特右旗| 安康市| 呼图壁县| 马公市| 陇南市| 泸西县| 衡水市| 高青县| 长海县| 怀仁县| 绥阳县| 荣成市| 邢台县| 台北市| 阿拉善右旗| 德庆县| 仙居县| 正蓝旗| 安多县| 太白县| 汤原县| 当阳市| 兴宁市| 新宁县| 泰宁县| 错那县| 天台县| 三门县| 赫章县| 凤台县| 禹州市| 彭阳县| 社会| 汕头市| 漾濞| 德江县| 彰武县| 丘北县| 贵溪市| 蓬溪县|