HBase and Spark
Apache Spark是一个用于分布式处理数据的软件框架,在许多情况下被用来替换MapReduce使用。
spark本身非本文讨论范围,请参考Spark site来获取更多关于Spark的项目。这里主要讨论Hbase和Spark4个方面的交互。
- Basic Spark
-
在Spark DAG 中任意点连接HBase的能力。
- Spark Streaming
-
在Spark streaming应用中连接HBase的能力。
- Spark Bulk Load
-
直接写入HBase HFiles的能力,从而实现批量插入HBase。
- SparkSQL/DataFrames
-
写SparkSQL的能力,即在HBase中绘制表格的能力
83. Basic Spark
本节讨论Spark、HBase最低层次上的整合。所有其他交互都是建立在这里将描述的概念之上。
HBaseContext是所有Spark和HBase整合的基础。HbaseContext获取HBase配置并将它们放入Spark executors中。这使每个Spark Executor在静态位置上都有一个HBase连接。
例如,Spark Executor可以和Region Server在同一节点上或在不同节点上,这里没有共同位置的依赖。每个Spark Excutor都是一个多线程任务的客户端应用。这使任何运行在executor上的Spark任务连接共享连接对象。
This example shows how HBaseContext can be used to do a foreachPartition
on a RDD
in Scala:
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
…
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
})
bufferedMutator.flush()
bufferedMutator.close()
})
Here is the same example implemented in Java:
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
…
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.foreachPartition(rdd,
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
public void call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2().getTable(TableName.valueOf(tableName));
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
while (t._1().hasNext()) {
byte[] b = t._1().next();
Result r = table.get(new Get(b));
if (r.getExists()) {
mutator.mutate(new Put(b));
}
}
mutator.flush();
mutator.close();
table.close();
}
});
} finally {
jsc.stop();
}
All functionality between Spark and HBase will be supported both in Scala and in Java, with the exception of SparkSQL which will support any language that is supported by Spark. For the remaining of this documentation we will focus on Scala examples for now.
The examples above illustrate how to do a foreachPartition with a connection. A number of other Spark base functions are supported out of the box:
bulkPut
-
大规模并发发送puts到HBase中
bulkDelete
-
大规模并发送deletes到HBase中
bulkGet
-
大规模并发发送gets到HBase来创建新的的RDD
mapPartition
-
执行Connection对象的Spark Map函数来允许全连接至HBase
hBaseRDD
-
简化分布式scan来创建一个RDD
84. Spark Streaming
Spark streaming是建立在Spark顶端的微型批量流处理框架。Hbase和Spark streaming可以很好的结合,Hbase可为HBase streaming提供一下帮助
- 一个用来抓取参考数据或概要数据的地方
- A place to store counts or aggregates in a way that supports Spark Streaming promise of only once processing.
HBase-Spark结合的Spark-streaming方法与普通的Spark结合方法类似,in that the following commands are possible straight off a Spark Streaming DStream.
bulkPut
-
大规模并发发送puts到HBase中
bulkDelete
-
大规模并发送deletes到HBase中
bulkGet
-
大规模并发发送gets到HBase来创建新的的RDD
mapPartition
-
执行Connection对象的Spark Map函数来允许全连接至HBase
hBaseRDD
-
简化分布式scan来创建一个RDD
bulkPut
Example with DStreamsBelow is an example of bulkPut with DStreams. It is very close in feel to the RDD bulk put.
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val rdd1 = …
val rdd2 = …
val queue = mutable.QueueRDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]
queue += rdd1
queue += rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
hbaseBulkPut
函数有三个输入。hbaseContext,持有配置信息在executor中连接到HBase connections。 输入数据的表名。一个将DStream中的记录转为HBase Put对象的函数
85. Bulk Load
有两种方法使用Spark将数据批量加载到HBase中。基础的批量加载功能适用于行具有许多列的情况以及列不固定的情况。
还有一个Spark选项来记录批量加载的。这种情况是为表格每行少于10000列的情况。第二种方法的优势是在Spark shuffle操作时更多的吞吐和更少的负载。
实现工作或多或少类似于MapReduce批量加载流程,分区工具在区域分割的基础上分割行键,并按序将行键送入reducer中。