9.HBase and MapReduce

Apache MapReduce是一种用来分析大量数据的软件框架,是Hadoop最常使用的框架。MapReduce本身超出本文范围。 A good place to get started with MapReduce is http://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html。 MapReduce version 2 (MR2)is now part of YARN.

本章讨论下要在HBase中使用MapReduce需要指定的配置步骤。另外,讨论下其他HBase和MapReduce工作之间的相互作用和问题。 Finally, it discusses Cascading, an alternative API for MapReduce。

46.HBase, Mapreduce, and the CLASSPATH

默认情况下,MapReduce job部署在MR集群上,不会与$HBASE_CONF_DIR下的HBase配置或HBase类产生关联。

要使MapReduce工作获得它们需要的,你要将hbase-site.xml放入$HADOOP_HOME/conf目录,并将HBase jar放入$HADOOP_HOME/lib目录,每个集群都要如此配置。或者你可以编辑$HADOOP_HOME/conf/hadoop-env.sh,并把它加入到HADOOP_CLASSPATH 变量中,但并不推荐你这种方法,因为这会影响你根据Hbase referenc的Hadoop安装。这也需要你重启hadoop集群以使Hadoop能够使用Hbase data。

建议的方法是使HBase自己增加他的依赖jars并使用HADOOP_CLASSPATH or -libjars

从0.90.x起,HBase可自动将依赖jars加入到job配置中,只需保证相关jars存在于本地的CLASSPATH。下面的事例运行了绑定的HBase RowCounter MR工作,其中表名叫usertable。如果你没有设置命令中(该命令以$为前缀,并被花括号包围)预期的环境变量,可以使用真实系统路径替代。确保为你的系统使用正确的HBase jar。单引号会使shell执行子命令setting the output of hbase classpath (the command to dump HBase CLASSPATH) to HADOOP_CLASSPATH.This example assumes you use a BASH-compatible shell.

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-VERSION.jar rowcounter usertable

当这个命令运行时,内部地, HBase jar会去找它需要的依赖并将它们加入到MR工作配置中。See the source at TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) for how this is done.

命令 hbase mapredcp 能帮你转储MR需要的CLASSPATH条目,同样jars TableMapReduceUtil#addDependencyJars会加入。你可以将它们一起放入HBase配置目录 HADOOP_CLASSPATH 中。那些不打包依赖或调用TableMapReduceUtil#addDependencyJars的工作,下面的命令结构式必须的:

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...

这个案例可能会出错如果你在HBase的build 目录而不是安装目录运行它。可能会有如下错误: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper 如果这个发生了,试着将命令改成如下,使它可以使用build环境中target/directory下的HBase jars $ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar:${HBASE_BUILD_HOME}/bin/hbase classpath${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar rowcounter usertable

0.96.1至0.98.4的Hbase MapReduce使用者请注意 某些使用Hbase的MR工作可能无法启动。The symptom is an exception similar to the following:

Exception in thread “main” java.lang.IllegalAccessError: class com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass com.google.protobuf.LiteralByteString at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100) …


This affects both jobs using the -libjars option and “fat jar,” those which package their runtime dependencies in a nested lib folder.

要满足新类加载器的要求,hadoop的类路径必须包含hbase-protocol.jar。See 46.HBase, MapReduce, and the CLASSPATH for current recommendations for resolving classpath errors.



  1. $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
  2. $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
  3. $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

For jars that do not package their dependencies, the following command structure is necessary:

  1. $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...

47.MapReduce Scan Caching MR扫描缓存


  1. 扫描对象的缓存设置。
  2. 配置选项 hbase.client.scanner.caching中指定的缓存设置,可以在hbase-site.xml中手动设置也可以通过辅助方法TableMapReduceUtil.setScannerCaching()设置。



48. Bundled HBase MapReduce Jobs

The HBase JAR also serves as a Driver for some bundled MapReduce jobs. To learn about the bundled MapReduce jobs, run the following command.

  1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar
  2. An example program must be given as the first argument.
  3. Valid program names are:
  4. copytable: Export a table from local cluster to peer cluster
  5. completebulkload: Complete a bulk data load.
  6. export: Write table data to HDFS.
  7. import: Import data written by Export.
  8. importtsv: Import data in TSV format.
  9. rowcounter: Count rows in HBase table

Each of the valid program names are bundled MapReduce jobs. To run one of the jobs, model your command after the following example.

  1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar rowcounter myTable


HBase可以被用作mapreduce的数据源,TableInputFormat 和数据接收器,TableOutputFormat或MultiTableOutputFormat。写Mr作业,读或写HBase,最好是子类TableMapper and/or tableReducer。See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, see RowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

如果运行MR job时使用HBase作为数据源或数据接收器,需要在配置中指定源和结果表及列名。

当你从HBase中读时, TableInputFormat从HBase中请求regions表并制成map,或者是map-per-region或是mapreduce.job.maps map,无论哪个是更小的。如果job只有两个map,提高mapreduce.job.maps使其大于regions数量。如果你在每个regionServer节点上都运行了TaskTracker/NodeManager,Maps将运行在相邻的TaskTracker/NodeManager上。当向HBase写入时,可以避免Reduce步骤并从map中写回HBase。当任务不需要分类和整理,这种map发出数据的方法会管用。插入时, Hbase ‘sorts’中没有double-sorting(以及对你的MR集群shuffling data),除非你需要这个。如果不需要Reduce,map可能会发出处理的记录数来报告任务结束,或将Reduce数设为0,并使用TableOutputFormat。如果在你的案例中,有必要启动Reduce,通常你应该使用多个reducers,这样负载可以分布到整个HBase集群中。

HRegionPartitioner,HBase新的分割,可启动与regions同样数量的reducers.如果你的表格很大而上传不会大大改变现有region的数量,HRegionPartitioner很合适。 否则使用默认的分割方法。

50. 批量导入时直接写入HFiles

如果你在导入一张新的表格,可以忽略HBase API直接将内容写入文件系统,格式化成HBase数据文件(HFiles.)导入将运行得更快,也许是一个数量级的更快。

51. RowCounter Example

包括RowCounter的MR job使用TableInputFormat并记录指定表中行的数量。 To run it, use the following command:

  1. $ ./bin/hadoop jar hbase-X.X.X.jar

这将引用HBase MapReduce Driver类。在提供的选择中选择rowcounter。会将rowcounter使用建议打印到标准输出中。指定表名、计数的列以及输出目录。


52.1. 默认的HBase MapReduce分离器

在MapReduce job中使用TableInputFormat从HBase表中请求数据时,分离器将为表的每个region生成一个map任务。因此, 如果表中有100个regions这有100个map任务,不管扫描操作中选择了多少列族。

52.2. 自定义分离器

For those interested in implementing custom splitters, see the method getSplits in TableInputFormatBase. That is where the logic for map-task assignment resides.

53. HBase MapReduce Examples

53.1. HBase MapReduce Read Example


  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleRead");
  3. job.setJarByClass(MyReadJob.class); // class that contains mapper
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. ...
  9. TableMapReduceUtil.initTableMapperJob(
  10. tableName, // input HBase table name
  11. scan, // Scan instance to control CF and attribute selection
  12. MyMapper.class, // mapper
  13. null, // mapper output key
  14. null, // mapper output value
  15. job);
  16. job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
  17. boolean b = job.waitForCompletion(true);
  18. if (!b) {
  19. throw new IOException("error with job!");
  20. }

…mapper实例将extend TableMapper…

  1. public static class MyMapper extends TableMapper<Text, Text> {
  2. public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
  3. //process data for the row from the Result instance.
  4. }
  5. }

53.2. HBase MapReduce Read/Write Example


  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleReadWrite");
  3. job.setJarByClass(MyReadWriteJob.class); //包含 mapper的类
  4. Scan scan = new Scan();
  5. scan.setCashing(500); //Scan中默认为1,对MR job来说不太好
  6. scan.setCasheBlocks(false); //对MR JOB不要设置为true
  7. //设置其他scan属性
  8. TableMapReduceUtil.intTableMapperJob(
  9. sourceTable, //input table
  10. scan, //Scan instance to control CF and attribute selection
  11. MyMapper.class, //mapper class
  12. null, //mapper output key
  13. null, //mapper output value
  14. job);
  15. TableMapReduceUtil.initTableReducerJob(
  16. targetTable, //output table
  17. null, //reducer class
  18. job);
  19. job.setNumReduceTasks(0);
  20. boolean b = job.waitForCompletion(true);
  21. if(!b){
  22. throw new IOException("error with job!");
  23. }

这里要解释下TableMapReduceUtil的工作内容,特别是对于reducer.TableOutputFormat,被用作outputFormat类,许多参数被在config上设置(如,TableOutputFormat.OUTPUT_TABLE ),同样设置reducer输出值为ImmutableBytesWritable ,reducer值为Writable.这些可以被programmer设置在job和conf上,但是TableMapReduceUtil试着使事情简单。

下面的mapper例子,将创建一个put操作,并匹配输入结果并发送它。注意这是CopyTable utility所完成的。

  1. public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
  2. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  3. // this example is just copying the data from the source table...
  4. context.write(row, resultToPut(row,value));
  5. }
  6. private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
  7. Put put = new Put(key.get());
  8. for (KeyValue kv : result.raw()) {
  9. put.add(kv);
  10. }
  11. return put;
  12. }
  13. }



53.3. HBase MapReduce Read/Write Example With Multi-Table Output

TODO: example for MultiTableOutputFormat.

53.4. HBase MapReduce Summary to HBase Example


  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleReadWrite");
  3. job.setJarByClass(MyReadWriteJob.class); //包含 mapper的类
  4. Scan scan = new Scan();
  5. scan.setCashing(500); //Scan中默认为1,对MR job来说不太好
  6. scan.setCasheBlocks(false); //对MR JOB不要设置为true
  7. //设置其他scan属性
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. Text.class, // mapper output key
  13. IntWritable.class, // mapper output value
  14. job);
  15. TableMapReduceUtil.initTableReducerJob(
  16. targetTable, // output table
  17. MyTableReducer.class, // reducer class
  18. job);
  19. job.setNumReduceTasks(1); // at least one, adjust as required
  20. boolean b = job.waitForCompletion(true);
  21. if(!b){
  22. throw new IOException("error with job");
  23. }

在这个例子中将字符串值映射为一列被选为总结值。This value is used as the key to emit from the mapper,and an IntWritable represents an instance counter.

  1. public static class MyMapper extends TableMapper<Text, IntWritable> {
  2. public static final byte[] CF = "cf".getBytes();
  3. public static final byte[] ATTR1 = "attr1".getBytes();
  4. private final IntWritable ONE = new IntWritable(1);
  5. private Text text = new Text();
  6. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  7. String val = new String(value.getValue(CF, ATTR1));
  8. text.set(val); // we can only emit Writables...
  9. context.write(text, ONE);
  10. }
  11. }

在Reducer程序中, “ones”被累计(就像其他MR例子中那样),然后发出一个Put。

  1. public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  2. public static final byte[] CF = "cf".getBytes();
  3. public static final byte[] COUNT = "count".getBytes();
  4. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  5. int i = 0;
  6. for (IntWritable val : values) {
  7. i += val.get();
  8. }
  9. Put put = new Put(Bytes.toBytes(key.toString()));
  10. put.add(CF, COUNT, Bytes.toBytes(i));
  11. context.write(null, put);
  12. }
  13. }

53.5 HBase MapReduce Summary to File Example


  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config,"ExampleSummaryToFile");
  3. job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. Text.class, // mapper output key
  13. IntWritable.class, // mapper output value
  14. job);
  15. job.setReducerClass(MyReducer.class); // reducer class
  16. job.setNumReduceTasks(1); // at least one, adjust as required
  17. FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
  18. boolean b = job.waitForCompletion(true);
  19. if (!b) {
  20. throw new IOException("error with job!");
  21. }

如上所述, 这个例子的Mapper和之前的一样。对于Reducer,使用”generic“代替继承TableMapper并发出puts

  1. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. public void reduce(Text key, Iteralbe<IntWritable> values, Context context) throw IOException, InterruptedException{
  3. int i = 0;
  4. for (IntWritable val: values){
  5. i += val.get();
  6. }
  7. context.write(key, new IntWritable(i));
  8. }
  9. }

53.6 HBase MapReduce Summary to HBase Without Reducer


一个HBase目标表有必要为job总结而存在。Table方法incrementColumnValue将使用自增值。从性能角度,每个map任务保持值的Map与它的值一起增加很有意义,当mapper的cleanup方法时,每个key更新一次。然而, 你的mileage可能会有所不同,这取决于要处理的行数和唯一键。


53.7 HBase MapReduce Summary to RDBMS


理解job的reducer数量将影响总结的实现,你必须将这个设计到你的reducer中。特别地,不管是以singleton来运行还是multiple来运行reducer。正确或错误,根据用例。越多的reducer被布置到job中,越多的RDBMS同步连接会被建立,this will scale, but only to a point.(?)

  1. public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. private Connection c = null;
  3. public void setup(Context context) {
  4. //创建 DB连接。。。
  5. }
  6. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  7. //do summarisation
  8. // 本例中Text是键, 但这只是个例子
  9. }
  10. public void cleanup(Context context){
  11. //关闭db连接
  12. }
  13. }

In the end, the summary results are written to your RDBMS table/s.

54. 在一个MR 任务中连接其他 Hbase 表

尽管当前框架内允许一个HBase表作为MR任务的输入,其他HBase表可作为查找表访问,在一个MR job中在Mapper的setup方法中创建一个表的实例。

  1. public class MyMapper extends TableMapper<Text,LongWritable> {
  2. public Talbe myOtherTable;
  3. public void setup(Context context) {
  4. //这里创建连接到集群上并保存下来或使用已存在表的连接
  5. myOtherTable = connection.getTable("myOtherTable");
  6. }
  7. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException,InterruptedException {
  8. //处理结果
  9. //使用 ‘myOtherTable’查询
  10. }

55. 预测执行

通常建议使用HBase作为数据源的MR jobs关闭预测执行。这个可以通过配置在每个job基础上实现, 或者集群上实现。特别是对那些长时间运行的工作,预测执行将创建重复的map任务,这将你的数据写两遍到HBase中,这可能不是你想要的。

See spec.ex for more information.

56.Cascading 级联


下面是Cascading Flow的例子,将数据”汇入“Hbase集群。同样的hBaseTap API也可以被用来”源“数据。

  1. //从默认文件系统中读取数据
  2. //发出两个字段,:“offset偏移”和“line行”
  3. Tap source = new Hfs(new TextLine(), inputFileLhs);
  4. //在一个HBase集群中存储数据
  5. //接收字段:"num","lower"和"upper"
  6. //will automatically scope incoming fields to their proper familyname, "left" or "right"
  7. Fields keyFields = new Fields("num");
  8. String[] familyNames = {"left", "right"};
  9. Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" )};
  10. Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
  11. //分配一个简单的管道将输入解析成字段
  12. //现实应用可能会链接多个管道一起进行更复杂的处理
  13. Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "lower","upper"), " "));
  14. //"plan"一个集群执行流
  15. //这将the source Tap and hBaseTap和parsePipe连接起来
  16. Flow parseFlow = new FlowConnector(properties).connect(source, hBaseTap, parsePipe);
  17. //start the flow, and block until complete
  18. parseFlow.complete();
  19. //打开Hbase表的迭代器来填充数据
  20. TupleEntryIterator iterator = parseFlow.openSink();
  21. while(iterator.hasNext())
  22. {
  23. //将每个tuple从HBase中打印出来
  24. System.out.println("iterator.next()="+ iterator.next());
  25. }
  26. iterator.close();