91偷拍精品一区二区三区_欧美亚洲免费在线_日本不卡高清视频_中文字幕无码一区二区视频_日韩毛片无码一区二区三区,将夜 猫腻 小说,欢乐颂第二季,如何发布网络小说

?
    開發技術 / Technology
    您的當前位置:網站首頁 > 行業洞察 > 開發技術

    MapReduce從HBase讀寫數據簡單示例

    日期:2015年1月29日  作者:zhjw  來源:互聯網    點擊:1092

    就用單詞計數這個例子,需要統計的單詞存在HBase中的word表,MapReduce執行的時候從word表讀取數據,統計結束后將結果寫入到HBase的stat表中。

     

      1、在eclipse中建立一個hadoop項目,然后從hbase的發布包中引入如下jar

    hbase-0.94.13.jar
    zookeeper-3.4.5.jar
    protobuf-java-2.4.0a.jar
    guava-11.0.2.jar

     

      2、在HBase中建立相關的表和初始化測試數據                                         

    復制代碼
    package cn.luxh.app;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    public class InitData {
        
        public static void main(String[] args) throws IOException {
            //創建一個word表,只有一個列族content
            HBaseUtil.createTable("word","content");
            
            //獲取word表
            HTable htable = HBaseUtil.getHTable("word");
            htable.setAutoFlush(false);
            
            //創建測試數據
           List<Put> puts = new ArrayList<Put>();
           
           Put put1 = HBaseUtil.getPut("1","content",null,"The Apache Hadoop software library is a framework");
           Put put2 = HBaseUtil.getPut("2","content",null,"The common utilities that support the other Hadoop modules");
           Put put3 = HBaseUtil.getPut("3","content",null,"Hadoop by reading the documentation");
           Put put4 = HBaseUtil.getPut("4","content",null,"Hadoop from the release page");
           Put put5 = HBaseUtil.getPut("5","content",null,"Hadoop on the mailing list");
           
           puts.add(put1);
           puts.add(put2);
           puts.add(put3);
           puts.add(put4);
           puts.add(put5);
           
           //提交測試數據
          htable.put(puts);
          htable.flushCommits();
          htable.close();
            //創建stat表,只有一個列祖result
          HBaseUtil.createTable("stat","result");
        }
    }
    復制代碼

      1)代碼中的HBaseUtil工具類參考:http://www.cnblogs.com/luxh/archive/2013/04/16/3025172.html

      2)執行上面的程序后,查看HBase中是否已創建成功

    hbase(main):012:0> list
    TABLE 
    stat 
    word 
    2 row(s) in 0.4730 seconds

      3)查看word中的測試數據

    復制代碼
    hbase(main):005:0> scan 'word'
    ROW                    COLUMN+CELL                                                     
     1                     column=content:, timestamp=1385447676510, value=The Apache Hadoo
                           p software library is a framework                               
     2                     column=content:, timestamp=1385447676510, value=The common utili
                           ties that support the other Hadoop modules                      
     3                     column=content:, timestamp=1385447676510, value=Hadoop by readin
                           g the documentation                                             
     4                     column=content:, timestamp=1385447676510, value=Hadoop from the 
                           release page                                                    
     5                     column=content:, timestamp=1385447676510, value=Hadoop on the ma
                           iling list                                                      
    5 row(s) in 5.7810 seconds
    復制代碼

     

      3、MapReduce程序                                                                          

    復制代碼
    package cn.luxh.app;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    public class WordStat {
        
        /**
         * TableMapper<Text,IntWritable>  Text:輸出的key類型,IntWritable:輸出的value類型
         */
        public static class MyMapper extends TableMapper<Text,IntWritable>{
            
            private static IntWritable one = new IntWritable(1);
            private static Text word = new Text();
            
            @Override
            protected void map(ImmutableBytesWritable key, Result value,
                    Context context)
                    throws IOException, InterruptedException {
                //表里面只有一個列族,所以我就直接獲取每一行的值
                String words = Bytes.toString(value.list().get(0).getValue());
                StringTokenizer st = new StringTokenizer(words); 
                while (st.hasMoreTokens()) {
                     String s = st.nextToken();
                     word.set(s);
                     context.write(word, one);
                }
            }
        }
        
        /**
         * TableReducer<Text,IntWritable>  Text:輸入的key類型,IntWritable:輸入的value類型,ImmutableBytesWritable:輸出類型
         */
        public static class MyReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{
            
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Context context)
                    throws IOException, InterruptedException {
                
                int sum = 0;
                for(IntWritable val:values) {
                    sum+=val.get();
                }
                //添加一行記錄,每一個單詞作為行鍵
                Put put = new Put(Bytes.toBytes(key.toString()));
                //在列族result中添加一個標識符num,賦值為每個單詞出現的次數
                //String.valueOf(sum)先將數字轉化為字符串,否則存到數據庫后會變成x00x00x00x這種形式
                //然后再轉二進制存到hbase。
                put.add(Bytes.toBytes("result"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(sum)));
                context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
            }
        }
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
            Configuration conf = HBaseConfiguration.create();
            Job job = new Job(conf,"wordstat");
            job.setJarByClass(Blog.class);
            
            
            Scan scan = new Scan();
            //指定要查詢的列族
            scan.addColumn(Bytes.toBytes("content"),null);
            //指定Mapper讀取的表為word
            TableMapReduceUtil.initTableMapperJob("word", scan, MyMapper.class, Text.class, IntWritable.class, job);
         //指定Reducer寫入的表為stat
            TableMapReduceUtil.initTableReducerJob("stat", MyReducer.class, job);
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
    復制代碼

      等待程序執行結束,查看統計表stat

    復制代碼
    hbase(main):014:0> scan 'stat'
    ROW                    COLUMN+CELL                                                     
     Apache                column=result:num, timestamp=1385449492309, value=1             
     Hadoop                column=result:num, timestamp=1385449492309, value=5             
     The                   column=result:num, timestamp=1385449492309, value=2             
     a                     column=result:num, timestamp=1385449492309, value=1             
     by                    column=result:num, timestamp=1385449492309, value=1             
     common                column=result:num, timestamp=1385449492309, value=1             
     documentation         column=result:num, timestamp=1385449492309, value=1             
     framework             column=result:num, timestamp=1385449492309, value=1             
     from                  column=result:num, timestamp=1385449492309, value=1             
     is                    column=result:num, timestamp=1385449492309, value=1             
     library               column=result:num, timestamp=1385449492309, value=1             
     list                  column=result:num, timestamp=1385449492309, value=1             
     mailing               column=result:num, timestamp=1385449492309, value=1             
     modules               column=result:num, timestamp=1385449492309, value=1             
     on                    column=result:num, timestamp=1385449492309, value=1             
     other                 column=result:num, timestamp=1385449492309, value=1             
     page                  column=result:num, timestamp=1385449492309, value=1             
     reading               column=result:num, timestamp=1385449492309, value=1             
     release               column=result:num, timestamp=1385449492309, value=1             
     software              column=result:num, timestamp=1385449492309, value=1             
     support               column=result:num, timestamp=1385449492309, value=1             
     that                  column=result:num, timestamp=1385449492309, value=1             
     the                   column=result:num, timestamp=1385449492309, value=4             
     utilities             column=result:num, timestamp=1385449492309, value=1             
    24 row(s) in 0.7970 seconds
    復制代碼

     

    主站蜘蛛池模板: 台江县| 浏阳市| 隆尧县| 聂拉木县| 乡宁县| 车险| 敦煌市| 永新县| 剑河县| 冕宁县| 彭山县| 北海市| 临沭县| 鞍山市| 远安县| 阜康市| 阜宁县| 竹北市| 钟山县| 左云县| 开封市| 宁波市| 百色市| 平塘县| 九台市| 建昌县| 台北县| 理塘县| 宜兰市| 雷山县| 铜川市| 鄂州市| 刚察县| 开平市| 博乐市| 梅河口市| 竹山县| 贵阳市| 那坡县| 唐河县| 陈巴尔虎旗|