MapReduce整合HBase

2021/5/6 10:26:23

本文主要是介绍MapReduce整合HBase,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录
  • 概述
  • 读取HBase
    • UserRateTopn.java
    • RateBean.java
  • 写入HBase
    • WordCount.java
  • 读写HBase
    • HbaseTopn.java


概述

  • MapReduce是运算框架

  • HBase是数据存储系统

  • MapReduce读写各类数据系统是由相应的InputFormat和OutputFormat来支持

  • HBase为MapReduce开发了TableInputFormat和TableOutputFormat

读取HBase

UserRateTopn.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public class UserRateTopn {

    // 继承HBase提供的TableMapper,唯一用处是确定KEYIN,VALUEIN的类型:ImmutableBytesWritable,Result
    public static class TopnMapper extends TableMapper<Text, RateBean> {

        final byte[] F = "f1".getBytes();
        RateBean b = new RateBean();
        Text k = new Text();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

            Counter counter = context.getCounter("DitryRead", "error_line");

            try {
                byte[] a = value.getValue(F, "a".getBytes());
                byte[] g = value.getValue(F, "g".getBytes());
                byte[] n = value.getValue(F, "n".getBytes());
                byte[] s = value.getValue(F, "s".getBytes());
                byte[] r = value.getValue(F, "r".getBytes());

                byte[] rowkeyBytes = key.get();
                String rk = new String(rowkeyBytes);

                String movieId = rk.substring(0, 6);
                String uid = rk.substring(rk.length() - 6);

                k.set(uid);
                b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r)));

                context.write(k, b);
            }catch (Exception e) {
                counter.increment(1);
            }
        }
    }

    public static class TopnReducer extends Reducer<Text, RateBean, RateBean, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException {

            ArrayList<RateBean> lst = new ArrayList<>();

            // 迭代数据放入list缓存
            for (RateBean bean : values) {
                RateBean newBean = new RateBean();
                newBean.set(bean.getUid(), bean.getAge(), bean.getGender(), bean.getMovieId(), bean.getMovieName(), bean.getStyle(), bean.getRate());
                lst.add(newBean);
            }

            // 排序
            lst.sort((o1, o2) -> o2.getRate() - o1.getRate());

            int topn = 5;
            for(int i = 0; i < Math.min(topn, lst.size()); i++) {
                context.write(lst.get(i), NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
//        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
        conf.set("mapred.jar", "hbasemr-1.0-SNAPSHOT.jar");

        Job job = Job.getInstance(conf);

        job.setJarByClass(UserRateTopn.class);

        // 设置了Mapper为HBase提供的mapper,以及InputFormat为hbase提供的TableInputFormat
        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job);

        job.setReducerClass(TopnReducer.class);
        job.setOutputKeyClass(RateBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);// 默认,可不写
        FileOutputFormat.setOutputPath(job, new Path("d:/out"));

        job.waitForCompletion(true);
    }
}

RateBean.java

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class RateBean implements Writable {

    private String uid;
    private int age;
    private String gender;

    private String movieId;
    private String movieName;
    private String style;

    private int rate;

    public void set(String uid, int age, String gender, String movieId, String movieName, String style, int rate) {
        this.uid = uid;
        this.age = age;
        this.gender = gender;
        this.movieId = movieId;
        this.movieName = movieName;
        this.style = style;
        this.rate = rate;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public String getMovieName() {
        return movieName;
    }

    public void setMovieName(String movieName) {
        this.movieName = movieName;
    }

    public String getStyle() {
        return style;
    }

    public void setStyle(String style) {
        this.style = style;
    }

    public int getRate() {
        return rate;
    }

    public void setRate(int rate) {
        this.rate = rate;
    }

    @Override
    public String toString() {
        return "RateBean{" +
                "uid='" + uid + '\'' +
                ", age=" + age +
                ", gender='" + gender + '\'' +
                ", movieId='" + movieId + '\'' +
                ", movieName='" + movieName + '\'' +
                ", style='" + style + '\'' +
                ", rate=" + rate +
                '}';
    }

    @Override
    public void write(DataOutput out) throws IOException {

        out.writeUTF(this.uid);
        out.writeInt(this.age);
        out.writeUTF(this.gender);
        out.writeUTF(this.movieId);
        out.writeUTF(this.movieName);
        out.writeUTF(this.style);
        out.writeInt(this.rate);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        this.uid = in.readUTF();
        this.age = in.readInt();
        this.gender = in.readUTF();
        this.movieId = in.readUTF();
        this.movieName = in.readUTF();
        this.style = in.readUTF();
        this.rate = in.readInt();

    }
}

写入HBase

WordCount.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WordCount {

    public static class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] words = value.toString().split(" ");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    public static class WcReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }

            Put put = new Put((key.toString().getBytes()));
            put.addColumn("f".getBytes(), "cnt".getBytes(), Bytes.toBytes(count));

            context.write(null, put);
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");

        Job job = Job.getInstance(conf);

        job.setJarByClass(WordCount.class);

        job.setMapperClass(WcMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 使用HBase提供的工具设置输出OutputFormat为TableOutputFormat,以及指定目标表名称
        TableMapReduceUtil.initTableReducerJob("wordcount", WcReducer.class, job);

        // 设置文件输入路径
        FileInputFormat.setInputPaths(job, new Path("data/input/"));

        job.waitForCompletion(true);
    }
}

读写HBase

HbaseTopn.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;
import java.util.ArrayList;

public class HbaseTopn {

    public static class TopnMapper extends TableMapper<Text, RateBean> {

        final byte[] F = "f1".getBytes();
        Text k = new Text();
        RateBean b = new RateBean();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            Counter counter = context.getCounter("DitryRead", "error_line");
            try {
                byte[] a = value.getValue(F, "a".getBytes());
                byte[] g = value.getValue(F, "g".getBytes());
                byte[] n = value.getValue(F, "n".getBytes());
                byte[] r = value.getValue(F, "r".getBytes());
                byte[] s = value.getValue(F, "s".getBytes());

                byte[] rkBytes = key.get();
                String rk = new String(rkBytes);

                String movieId = rk.substring(0, 6);
                String uid = rk.substring(rk.length() - 6);

                k.set(uid);
                b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r)));

                context.write(k, b);
            } catch (Exception e) {
                counter.increment(1);
            }
        }
    }

    public static class TopnReducer extends TableReducer<Text, RateBean, ImmutableBytesWritable> {

        @Override
        protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException {

            ArrayList<RateBean> lst = new ArrayList<>();

            for (RateBean b : values) {
                RateBean newBean = new RateBean();
                newBean.set(b.getUid(), b.getAge(), b.getGender(), b.getMovieId(), b.getMovieName(),b.getStyle(),b.getRate());

                lst.add(newBean);
            }

            lst.sort((o1, o2) -> o2.getRate() - o1.getRate());

            int topn = 5;
            for (int i = 0; i < Math.min(topn, lst.size()); i++) {

                // 用uid+movieId作为rowkey,用bean的toString作为value
                RateBean rateBean = lst.get(i);
                String uid = rateBean.getUid();
                String movieId = rateBean.getMovieId();

                Put put = new Put((uid + " " + movieId).getBytes());
                put.addColumn("f".getBytes(), "c".getBytes(), rateBean.toString().getBytes());

                context.write(null, put);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
        conf.set("mapreduce.map.tasks", "5");

        Job job = Job.getInstance(conf);

        job.setJarByClass(HbaseTopn.class);

        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job);

        job.setNumReduceTasks(3);
        TableMapReduceUtil.initTableReducerJob("topn", TopnReducer.class, job);

        job.waitForCompletion(true );

    }
}


这篇关于MapReduce整合HBase的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程