1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      6.sparkcore之鍵值對操作

      ??鍵值對RDD(pair RDD)是spark中許多操作所需要的常見數(shù)據(jù)類型,通常用來進(jìn)行聚合計(jì)算。

      目前創(chuàng)新互聯(lián)已為上千余家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、綿陽服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、萬柏林網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

      創(chuàng)建Pair RDD

      ??spark有多種方式可以創(chuàng)建pair RDD。比如:很多存儲鍵值對的數(shù)據(jù)格式在讀取時(shí)直接返回pair RDD;通過map()算子將普通的RDD轉(zhuǎn)為pair RDD。

      scala

      # 使用第一個(gè)單詞作為鍵創(chuàng)建一個(gè)pair RDD
      val pairs = lines.map(x => (x.split(" ")(0), x))

      java

      # 使用第一個(gè)單詞作為鍵創(chuàng)建一個(gè)pair RDD
      # jdk1.8后也支持lambda表達(dá)式方式
      PairFunction keyData = new PairFunction() {
        public Tuple2 call(String x) {
          return new Tuple2(x.split(" ")[0], x);
        }
      };
      JavaPairRDD pairs = lines.mapToPair(keyData);

      python

      # 使用第一個(gè)單詞作為鍵創(chuàng)建一個(gè)pair RDD
      pairs = lines.map(lambda x: (x.split(" ")[0], x))

      ??從一個(gè)內(nèi)存中的數(shù)據(jù)集創(chuàng)建pair RDD時(shí),scala和python只需要對這個(gè)二元組集合調(diào)用SparkContext的parallelize()方法即可;而java需要使用SparkContext.parallelizePairs()方法。

      pair RDD轉(zhuǎn)化操作

      轉(zhuǎn)化操作總覽

      針對單個(gè)Pair RDD的轉(zhuǎn)化操作
      函數(shù)名作用示例
      reduceByKey(func) 合并具有相同鍵的值 rdd.reduceByKey((x, y) => x + y)
      groupByKey() 對具有相同鍵的值進(jìn)行分組 rdd.groupByKey()
      combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 使用不同的返回類型合并具有相同鍵的值 rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
      mapValues(func) 對pair RDD中的每個(gè)值應(yīng)用一個(gè)函數(shù)而不改變鍵 rdd.mapValues(x => x + 1)
      flatMapValues(func) 對pair RDD中的每個(gè)值應(yīng)用一個(gè)返回迭代器的函數(shù),生成對應(yīng)原鍵的鍵值對記錄 rdd.flatMapValues(x => (x to 5))
      keys() 返回一個(gè)僅包含鍵的RDD rdd.keys
      values() 返回一個(gè)僅包含值得RDD rdd.values
      sortByKey() 返回一個(gè)根據(jù)鍵排序的RDD rdd.sortByKey()
      針對兩個(gè)Pair RDD的轉(zhuǎn)化操作
      函數(shù)名作用示例
      subtractByKey 刪除RDD中鍵與other RDD中鍵相同的元素 rdd.subtractByKey(other)
      join 對兩個(gè)RDD進(jìn)行內(nèi)連接 rdd.join(other)
      leftOuterJoin 對兩個(gè)RDD進(jìn)行連接操作,確保第二個(gè)RDD的鍵必須存在(左外連接) rdd.leftOuterJoin(other)
      rightOuterJoin 對兩個(gè)RDD進(jìn)行連接操作,確保第一個(gè)RDD的鍵必須存在(右外連接) rdd.rightOuterJoin(other)
      cogroup 將兩個(gè)RDD中擁有相同鍵的數(shù)據(jù)分組在一起 rdd.cogroup(other)

      聚合

      • 使用mapValues()和reduceByKey()計(jì)算每個(gè)鍵對應(yīng)值的均值。
      scala
      rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      python
      rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
      • 使用flatMap()、map()和reduceByKey()計(jì)算單詞統(tǒng)計(jì)
      scala
      val input = sc.textFile("s3://...")
      val words = input.flatMap(x => x.split(" "))
      val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
      java
      JavaRDD input = sc.textFile("s3://...");
      JavaRDD words = input.flatMap(new FlatMapFunction() {
         public Iterable call(String x) {
              return Arrays.asList(x.split(" "));
         }
      });
      JavaPairRDD result = words.mapToPair(new PairFunction() {
        public Tuple2 call(String x) {
          return new Tuple2(x, 1);
        }
      }).reduceByKey(
          new Function2() {
              public Integer call(Integer a, Integer b) {
                  return a + b;
              }
          }
      )
      python
      rdd = sc.textFile("s3://...")
      words = rdd.flatMap(lambda x: x.split(" "))
      result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
      • 使用combineByKey()返回與輸入數(shù)據(jù)不同類型的返回值,求每個(gè)鍵對應(yīng)的平均值
        執(zhí)行原理
        1.combineByKey()作用于rdd的每個(gè)分區(qū)。
        2.如果訪問的元素在分區(qū)中第一次出現(xiàn),就使用createCombiner()方法創(chuàng)建那個(gè)鍵對應(yīng)累加器的初始值。
        3.如果訪問的元素在當(dāng)前分區(qū)已經(jīng)出現(xiàn)過,就使用mergeValue()方法將該鍵的累加器對應(yīng)的當(dāng)前值和新值合并。
        4.如果有兩個(gè)或多個(gè)分區(qū)都有對應(yīng)同一個(gè)鍵的累加器時(shí),就使用mergeCombiners()方法將各個(gè)分區(qū)的結(jié)果進(jìn)行合并。
      scala
      val result = rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{case (key, value) => (key, value._1 / value._2.toFloat)}
      java
      public static class AvgCount implements Serializable {
          public int total_;
          public int num_;
          public AvgCount(int total, int num) {
              total_ = total;
              num_ = num;
          }
          public float avg() {
              return total_/(float)num_;
          }
      }
      
      Function createAcc = new Function() {
          public AvgCount call(Integer x) {
              return new AvgCount(x, 1);
          }
      };
      
      Function2 addAndCount = new Function2() {
          public AvgCount call(AvgCount a, Integer x) {
              a.total_ += x;
              a.num_ += 1;
              return a;
          }
      };
      
      Function2 combine = new Function2() {
          public AvgCount call(AvgCount a, AvgCount b) {
              a.total_ += b.total_;
              a.num_ += b.num_;
              return a;
          }
      };
      
      AvgCount initial = new AvgCount(0, 0);
      JavaPairRDD avgCounts = input.combineByKey(createAcc, addAndCount, combine);
      Map countMap = avgCounts.collectAsMap();
      for (Entry entry : countMap.entrySet()) {
          System.out.println(entry.getKey() + ":" + entry.getValue().avg());
      }
      python
      sumCount = input.combineByKey((lambda x: (x, 1)), (lambda x, y: (x[0] + y, x[1] + 1)), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
      sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

      分組

      ??對于單個(gè)RDD數(shù)據(jù)進(jìn)行分組時(shí),使用groupByKey()。如果先使用groupByKey(),再使用reduce()或fold()時(shí),可能使用一種根據(jù)鍵進(jìn)行聚合的函數(shù)更高效。比如,rdd.reduceByKey(func)與rdd.groupByKey().mapValues(value => value.reduce(func))等價(jià),但前者更高效,因?yàn)楸苊饬藶槊總€(gè)鍵存放值列表的步驟。

      ??對多個(gè)共享同一個(gè)鍵的RDD進(jìn)行分組時(shí),使用cogroup()。cogroup方法會得到結(jié)果RDD類型為[(K, (Iterable[V], Iterable[W]))]。

      連接

      ??將一組有鍵的數(shù)據(jù)與另一組有鍵的數(shù)據(jù)連接使用是對鍵值對數(shù)據(jù)執(zhí)行的常用操作。連接方式主要有:內(nèi)連接、左外連接、右外連接。

      val storeAddress = sc.parallelize(Seq((Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"), (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")))
      val storeRating = sc.parallelize(Seq(Store("Ritual"), 4.9), (Store("Philz"), 4.8)))
      # 內(nèi)連接
      storeAddress.join(storeRating)
      #左外連接
      storeAddress.leftOuterJoin(storeRating)
      #右外連接
      storeAddress.rightOuterJoin(storeRating)

      排序

      ??將數(shù)據(jù)排序輸出是很常見的場景。sortByKey()函數(shù)接收一個(gè)叫做ascending的參數(shù),表示是否讓結(jié)果升序排序(默認(rèn)true)。有時(shí),也可以提供自定義比較函數(shù)。比如,以字符串順序?qū)φ麛?shù)進(jìn)行自定義排序。

      scala
      implicit val sortIntegersByString = new Ordering[Int] {
          override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
      }
      rdd.sortByKey()
      java
      class IntegerComparator implements Comparator {
          public int compare(Integer a, Integer b) {
              return String.valueOf(a).compareTo(String.valueOf(b))
          }
      }
      rdd.sortByKey(new IntegerComparator());
      python
      rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))

      Pair RDD行動操作

      ??和轉(zhuǎn)化操作一樣,所有基礎(chǔ)RDD支持的行動操作也都在pair RDD上可用。另外,Pair RDD提供了一些額外的行動操作。

      函數(shù)作用示例
      countByKey 對每個(gè)鍵對應(yīng)的元素分別計(jì)數(shù) rdd.countByKey()
      collectAsMap 將結(jié)果以映射表的形式返回 rdd.collectAsMap()
      lookup(key) 返回指定鍵對應(yīng)的所有值 rdd.lookup(3)

      忠于技術(shù),熱愛分享。歡迎關(guān)注公眾號:java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。

      6.spark core之鍵值對操作


      文章標(biāo)題:6.sparkcore之鍵值對操作
      瀏覽路徑:http://ef60e0e.cn/article/jpgdsj.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        伊宁县| 多伦县| 沾益县| 章丘市| 德兴市| 建昌县| 沈丘县| 太保市| 昌图县| 兴安盟| 广宗县| 麻阳| 巫山县| 南充市| 柳江县| 冷水江市| 宜都市| 保亭| 定南县| 内丘县| 永新县| 永仁县| 彩票| 瑞丽市| 永川市| 会泽县| 南康市| 温泉县| 咸宁市| 乌海市| 丰镇市| 海阳市| 冕宁县| 安图县| 松江区| 大洼县| 台州市| 姚安县| 蓬莱市| 宝山区| 罗定市|