1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      MapReduce如何實現(xiàn)自定義排序-創(chuàng)新互聯(lián)

      MapReduce概念

      成都創(chuàng)新互聯(lián)公司專注于桐梓企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站建設(shè),商城網(wǎng)站開發(fā)。桐梓網(wǎng)站建設(shè)公司,為桐梓等地區(qū)提供建站服務(wù)。全流程按需定制設(shè)計,專業(yè)設(shè)計,全程項目跟蹤,成都創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)

      是一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。概念"Map(映射)"和"Reduce(歸約)",和它們的主要思想,都是從函數(shù)式編程語言里借來的,還有從矢量編程語言里借來的特性。它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統(tǒng)上。 當(dāng)前的軟件實現(xiàn)是指定一個Map(映射)函數(shù),用來把一組鍵值對映射成一組新的鍵值對,指定并發(fā)的Reduce(歸約)函數(shù),用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

      MapReduce提供了以下的主要功能:

      1)數(shù)據(jù)劃分和計算任務(wù)調(diào)度:

      系統(tǒng)自動將一個作業(yè)(Job)待處理的大數(shù)據(jù)劃分為很多個數(shù)據(jù)塊,每個數(shù)據(jù)塊對應(yīng)于一個計算任務(wù)(Task),并自動 調(diào)度計算節(jié)點來處理相應(yīng)的數(shù)據(jù)塊。作業(yè)和任務(wù)調(diào)度功能主要負責(zé)分配和調(diào)度計算節(jié)點(Map節(jié)點或Reduce節(jié)點),同時負責(zé)監(jiān)控這些節(jié)點的執(zhí)行狀態(tài),并 負責(zé)Map節(jié)點執(zhí)行的同步控制。

      2)數(shù)據(jù)/代碼互定位:

      為了減少數(shù)據(jù)通信,一個基本原則是本地化數(shù)據(jù)處理,即一個計算節(jié)點盡可能處理其本地磁盤上所分布存儲的數(shù)據(jù),這實現(xiàn)了代碼向 數(shù)據(jù)的遷移;當(dāng)無法進行這種本地化數(shù)據(jù)處理時,再尋找其他可用節(jié)點并將數(shù)據(jù)從網(wǎng)絡(luò)上傳送給該節(jié)點(數(shù)據(jù)向代碼遷移),但將盡可能從數(shù)據(jù)所在的本地機架上尋 找可用節(jié)點以減少通信延遲。

      3)系統(tǒng)優(yōu)化:

      為了減少數(shù)據(jù)通信開銷,中間結(jié)果數(shù)據(jù)進入Reduce節(jié)點前會進行一定的合并處理;一個Reduce節(jié)點所處理的數(shù)據(jù)可能會來自多個 Map節(jié)點,為了避免Reduce計算階段發(fā)生數(shù)據(jù)相關(guān)性,Map節(jié)點輸出的中間結(jié)果需使用一定的策略進行適當(dāng)?shù)膭澐痔幚恚WC相關(guān)性數(shù)據(jù)發(fā)送到同一個 Reduce節(jié)點;此外,系統(tǒng)還進行一些計算性能優(yōu)化處理,如對最慢的計算任務(wù)采用多備份執(zhí)行、選最快完成者作為結(jié)果。

      4)出錯檢測和恢復(fù):

      以低端商用服務(wù)器構(gòu)成的大規(guī)模MapReduce計算集群中,節(jié)點硬件(主機、磁盤、內(nèi)存等)出錯和軟件出錯是常態(tài),因此 MapReduce需要能檢測并隔離出錯節(jié)點,并調(diào)度分配新的節(jié)點接管出錯節(jié)點的計算任務(wù)。同時,系統(tǒng)還將維護數(shù)據(jù)存儲的可靠性,用多備份冗余存儲機制提 高數(shù)據(jù)存儲的可靠性,并能及時檢測和恢復(fù)出錯的數(shù)據(jù)。

      測試文本:

      tom 20 8000
      nancy 22 8000
      ketty 22 9000
      stone 19 10000
      green 19 11000
      white 39 29000
      socrates 30 40000

      ???MapReduce中,根據(jù)key進行分區(qū)、排序、分組
      MapReduce會按照基本類型對應(yīng)的key進行排序,如int類型的IntWritable,long類型的LongWritable,Text類型,默認升序排序
      ???為什么要自定義排序規(guī)則?現(xiàn)有需求,需要自定義key類型,并自定義key的排序規(guī)則,如按照人的salary降序排序,若相同,則再按age升序排序
      以Text類型為例:
      MapReduce如何實現(xiàn)自定義排序
      MapReduce如何實現(xiàn)自定義排序
      MapReduce如何實現(xiàn)自定義排序
      MapReduce如何實現(xiàn)自定義排序
      Text類實現(xiàn)了WritableComparable接口,并且有write()readFields()compare()方法
      readFields()方法:用來反序列化操作
      write()方法:用來序列化操作
      所以要想自定義類型用來排序需要有以上的方法
      自定義類代碼

      import org.apache.hadoop.io.WritableComparable;
      import java.io.DataInput;
      import java.io.DataOutput;
      import java.io.IOException;
      public class Person implements WritableComparable {
         private String name;
         private int age;
         private int salary;
         public Person() {
         }
         public Person(String name, int age, int salary) {
           //super();
           this.name = name;
           this.age = age;
           this.salary = salary;
         }
         public String getName() {
           return name;
         }
         public void setName(String name) {
           this.name = name;
         }
         public int getAge() {
           return age;
         }
         public void setAge(int age) {
           this.age = age;
         }
         public int getSalary() {
           return salary;
         }
         public void setSalary(int salary) {
           this.salary = salary;
         }
         @Override
         public String toString() {
           return this.salary + "  " + this.age + "   " + this.name;
         }
         //先比較salary,高的排序在前;若相同,age小的在前
         public int compareTo(Person o) {
           int compareResult1= this.salary - o.salary;
           if(compareResult1 != 0) {
             return -compareResult1;
           } else {
             return this.age - o.age;
           }
         }
         //序列化,將NewKey轉(zhuǎn)化成使用流傳送的二進制
         public void write(DataOutput dataOutput) throws IOException {
           dataOutput.writeUTF(name);
           dataOutput.writeInt(age);
           dataOutput.writeInt(salary);
         }
         //使用in讀字段的順序,要與write方法中寫的順序保持一致
         public void readFields(DataInput dataInput) throws IOException {
           //read string
           this.name = dataInput.readUTF();
           this.age = dataInput.readInt();
           this.salary = dataInput.readInt();
         }
      
      }

      MapReuduce程序:

      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import java.io.IOException;
      import java.net.URI;
      public class  SecondarySort {
         public static void main(String[] args) throws Exception {
           System.setProperty("HADOOP_USER_NAME","hadoop2.7");
           Configuration configuration = new Configuration();
           //設(shè)置本地運行的mapreduce程序 jar包
           configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target\\com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
           Job job = Job.getInstance(configuration, SecondarySort.class.getSimpleName());
           FileSystem fileSystem = FileSystem.get(URI.create(args[1]), configuration);
           if (fileSystem.exists(new Path(args[1]))) {
             fileSystem.delete(new Path(args[1]), true);
           }
           FileInputFormat.setInputPaths(job, new Path(args[0]));
           job.setMapperClass(MyMap.class);
           job.setMapOutputKeyClass(Person.class);
           job.setMapOutputValueClass(NullWritable.class);
           //設(shè)置reduce的個數(shù)
           job.setNumReduceTasks(1);
           job.setReducerClass(MyReduce.class);
           job.setOutputKeyClass(Person.class);
           job.setOutputValueClass(NullWritable.class);
           FileOutputFormat.setOutputPath(job, new Path(args[1]));
           job.waitForCompletion(true);
         }
         public static class MyMap extends
             Mapper {
           //LongWritable:輸入?yún)?shù)鍵類型,Text:輸入?yún)?shù)值類型
           //Persion:輸出參數(shù)鍵類型,NullWritable:輸出參數(shù)值類型
           @Override
           //map的輸出值是鍵值對,NullWritable說關(guān)心V的值
           protected void map(LongWritable key, Text value,
               Context context)
               throws IOException, InterruptedException {
             //LongWritable key:輸入?yún)?shù)鍵值對的鍵,Text value:輸入?yún)?shù)鍵值對的值
             //獲得一行數(shù)據(jù),輸入?yún)?shù)的鍵(距首行的位置),Hadoop讀取數(shù)據(jù)的時候逐行讀取文本
             //fields:代表著文本一行的的數(shù)據(jù)
             String[] fields = value.toString().split(" ");
             // 本列中文本一行數(shù)據(jù):nancy 22 8000
             String name = fields[0];
             //字符串轉(zhuǎn)換成int
             int age = Integer.parseInt(fields[1]);
             int salary = Integer.parseInt(fields[2]);
             //在自定義類中進行比較
             Person person = new Person(name, age, salary);
             context.write(person, NullWritable.get());
           }
         }
         public static class MyReduce extends
             Reducer {
           @Override
           protected void reduce(Person key, Iterable values, Context context) throws IOException, InterruptedException {
             context.write(key, NullWritable.get());
           }
         }
      }

      運行結(jié)果:

      40000  30   socrates
      29000  39   white
      11000  19   green
      10000  19   stone
      9000  22   ketty
      8000  20   tom
      8000  22   nancy

      另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


      本文名稱:MapReduce如何實現(xiàn)自定義排序-創(chuàng)新互聯(lián)
      本文地址:http://ef60e0e.cn/article/jsshc.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        西昌市| 习水县| 白玉县| 西昌市| 昭苏县| 洞口县| 嘉义县| 龙胜| 辽阳市| 富蕴县| 永仁县| 额敏县| 应城市| 广平县| 蒙自县| 桦甸市| 漳浦县| 油尖旺区| 澄迈县| 彰化县| 潼关县| 那坡县| 铜陵市| 如东县| 涪陵区| 含山县| 新河县| 乾安县| 武川县| 云阳县| 武功县| 长白| 安岳县| 遂溪县| 牙克石市| 临武县| 炉霍县| 嘉义市| 延寿县| 漠河县| 阿尔山市|