MapReduce整合HBase
2021/5/6 10:26:23
本文主要是介绍MapReduce整合HBase,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
目录- 概述
- 读取HBase
- UserRateTopn.java
- RateBean.java
- 写入HBase
- WordCount.java
- 读写HBase
- HbaseTopn.java
概述
-
MapReduce是运算框架
-
HBase是数据存储系统
-
MapReduce读写各类数据系统是由相应的InputFormat和OutputFormat来支持
-
HBase为MapReduce开发了TableInputFormat和TableOutputFormat
读取HBase
UserRateTopn.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.ArrayList; public class UserRateTopn { // 继承HBase提供的TableMapper,唯一用处是确定KEYIN,VALUEIN的类型:ImmutableBytesWritable,Result public static class TopnMapper extends TableMapper<Text, RateBean> { final byte[] F = "f1".getBytes(); RateBean b = new RateBean(); Text k = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Counter counter = context.getCounter("DitryRead", "error_line"); try { byte[] a = value.getValue(F, "a".getBytes()); byte[] g = value.getValue(F, "g".getBytes()); byte[] n = value.getValue(F, "n".getBytes()); byte[] s = value.getValue(F, "s".getBytes()); byte[] r = value.getValue(F, "r".getBytes()); byte[] rowkeyBytes = key.get(); String rk = new String(rowkeyBytes); String movieId = rk.substring(0, 6); String uid = rk.substring(rk.length() - 6); k.set(uid); b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r))); context.write(k, b); }catch (Exception e) { counter.increment(1); } } } public static class TopnReducer extends Reducer<Text, RateBean, RateBean, NullWritable> { @Override protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException { ArrayList<RateBean> lst = new ArrayList<>(); // 迭代数据放入list缓存 for (RateBean bean : values) { RateBean newBean = new RateBean(); newBean.set(bean.getUid(), bean.getAge(), bean.getGender(), bean.getMovieId(), bean.getMovieName(), bean.getStyle(), bean.getRate()); lst.add(newBean); } // 排序 lst.sort((o1, o2) -> o2.getRate() - o1.getRate()); int topn = 5; for(int i = 0; i < Math.min(topn, lst.size()); i++) { context.write(lst.get(i), NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181"); conf.set("mapred.jar", "hbasemr-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(conf); job.setJarByClass(UserRateTopn.class); // 设置了Mapper为HBase提供的mapper,以及InputFormat为hbase提供的TableInputFormat Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job); job.setReducerClass(TopnReducer.class); job.setOutputKeyClass(RateBean.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(TextOutputFormat.class);// 默认,可不写 FileOutputFormat.setOutputPath(job, new Path("d:/out")); job.waitForCompletion(true); } }
RateBean.java
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class RateBean implements Writable { private String uid; private int age; private String gender; private String movieId; private String movieName; private String style; private int rate; public void set(String uid, int age, String gender, String movieId, String movieName, String style, int rate) { this.uid = uid; this.age = age; this.gender = gender; this.movieId = movieId; this.movieName = movieName; this.style = style; this.rate = rate; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } public String getMovieId() { return movieId; } public void setMovieId(String movieId) { this.movieId = movieId; } public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } public String getStyle() { return style; } public void setStyle(String style) { this.style = style; } public int getRate() { return rate; } public void setRate(int rate) { this.rate = rate; } @Override public String toString() { return "RateBean{" + "uid='" + uid + '\'' + ", age=" + age + ", gender='" + gender + '\'' + ", movieId='" + movieId + '\'' + ", movieName='" + movieName + '\'' + ", style='" + style + '\'' + ", rate=" + rate + '}'; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.uid); out.writeInt(this.age); out.writeUTF(this.gender); out.writeUTF(this.movieId); out.writeUTF(this.movieName); out.writeUTF(this.style); out.writeInt(this.rate); } @Override public void readFields(DataInput in) throws IOException { this.uid = in.readUTF(); this.age = in.readInt(); this.gender = in.readUTF(); this.movieId = in.readUTF(); this.movieName = in.readUTF(); this.style = in.readUTF(); this.rate = in.readInt(); } }
写入HBase
WordCount.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class WordCount { public static class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } } public static class WcReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } Put put = new Put((key.toString().getBytes())); put.addColumn("f".getBytes(), "cnt".getBytes(), Bytes.toBytes(count)); context.write(null, put); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WcMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 使用HBase提供的工具设置输出OutputFormat为TableOutputFormat,以及指定目标表名称 TableMapReduceUtil.initTableReducerJob("wordcount", WcReducer.class, job); // 设置文件输入路径 FileInputFormat.setInputPaths(job, new Path("data/input/")); job.waitForCompletion(true); } }
读写HBase
HbaseTopn.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; import java.util.ArrayList; public class HbaseTopn { public static class TopnMapper extends TableMapper<Text, RateBean> { final byte[] F = "f1".getBytes(); Text k = new Text(); RateBean b = new RateBean(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Counter counter = context.getCounter("DitryRead", "error_line"); try { byte[] a = value.getValue(F, "a".getBytes()); byte[] g = value.getValue(F, "g".getBytes()); byte[] n = value.getValue(F, "n".getBytes()); byte[] r = value.getValue(F, "r".getBytes()); byte[] s = value.getValue(F, "s".getBytes()); byte[] rkBytes = key.get(); String rk = new String(rkBytes); String movieId = rk.substring(0, 6); String uid = rk.substring(rk.length() - 6); k.set(uid); b.set(uid, Integer.parseInt(new String(a)), new String(g), movieId, new String(n), new String(s), Integer.parseInt(new String(r))); context.write(k, b); } catch (Exception e) { counter.increment(1); } } } public static class TopnReducer extends TableReducer<Text, RateBean, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<RateBean> values, Context context) throws IOException, InterruptedException { ArrayList<RateBean> lst = new ArrayList<>(); for (RateBean b : values) { RateBean newBean = new RateBean(); newBean.set(b.getUid(), b.getAge(), b.getGender(), b.getMovieId(), b.getMovieName(),b.getStyle(),b.getRate()); lst.add(newBean); } lst.sort((o1, o2) -> o2.getRate() - o1.getRate()); int topn = 5; for (int i = 0; i < Math.min(topn, lst.size()); i++) { // 用uid+movieId作为rowkey,用bean的toString作为value RateBean rateBean = lst.get(i); String uid = rateBean.getUid(); String movieId = rateBean.getMovieId(); Put put = new Put((uid + " " + movieId).getBytes()); put.addColumn("f".getBytes(), "c".getBytes(), rateBean.toString().getBytes()); context.write(null, put); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181"); conf.set("mapreduce.map.tasks", "5"); Job job = Job.getInstance(conf); job.setJarByClass(HbaseTopn.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob("movie", scan, TopnMapper.class, Text.class, RateBean.class, job); job.setNumReduceTasks(3); TableMapReduceUtil.initTableReducerJob("topn", TopnReducer.class, job); job.waitForCompletion(true ); } }
这篇关于MapReduce整合HBase的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15PingCAP 黄东旭参与 CCF 秀湖会议,共探开源教育未来
- 2024-05-13PingCAP 戴涛:构建面向未来的金融核心系统
- 2024-05-09flutter3.x_macos桌面os实战
- 2024-05-09Rust中的并发性:Sync 和 Send Traits
- 2024-05-08使用Ollama和OpenWebUI在CPU上玩转Meta Llama3-8B
- 2024-05-08完工标准(DoD)与验收条件(AC)究竟有什么不同?
- 2024-05-084万 star 的 NocoDB 在 sealos 上一键起,轻松把数据库编程智能表格
- 2024-05-08Mac 版Stable Diffusion WebUI的安装
- 2024-05-08解锁CodeGeeX智能问答中3项独有的隐藏技能
- 2024-05-08RAG算法优化+新增代码仓库支持,CodeGeeX的@repo功能效果提升