PXF HDFS连接器支持SequenceFile格式的二进制数据。本节描述如何使用PXF读写HDFS SequenceFile数据,包括如何创建引用HDFS文件的外部表,查询和向外部表写入数据。
先决条件
在尝试从HDFS读取或向HDFS写入数据之前,请确保已满足PXF Hadoop先决条件。
创建外部表
PXF HDFS连接器 hdfs:SequenceFile
配置文件支持读写HDFS中SequenceFile格式的二进制数据。 当您将记录插入可写外部表中时,您插入的数据块将写入指定目录中的一个或多个文件。
注意: 使用可写配置创建的外部表仅INSERT操作。 如果要查询插入的数据,则必须单独创建一个引用相关HDFS目录的可读外部表。
使用以下语法创建一个引用HDFS目录的Greenplum数据库外部表:
CREATE [WRITABLE] EXTERNAL TABLE <table_name>
( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION ('pxf://<path-to-hdfs-dir>
?PROFILE=hdfs:SequenceFile[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT 'CUSTOM' (<formatting-properties>)
[DISTRIBUTED BY (<column_name> [, ... ] ) | DISTRIBUTED RANDOMLY];
CREATE EXTERNAL TABLE命令中使用的特定关键字和值见下表中描述。
关键字 | 值 |
---|---|
<path‑to‑hdfs‑dir> | HDFS数据存储中目录或文件的绝对路径 |
PROFILE | PROFILE 关键字必须指定为 hdfs:SequenceFile . |
SERVER=<server_name> | PXF用于访问数据的命名服务器配置。可选的; 如果未指定,PXF将使用default 服务器。 |
<custom‑option> | <custom-option> 描述见下方. |
FORMAT | 使用 FORMAT ’CUSTOM ’ 时指定 (FORMATTER=’pxfwritable_export’) (写入) 或 (FORMATTER=’pxfwritable_import’) (读取). |
DISTRIBUTED BY | 如果您计划将现有Greenplum数据库表中的数据加载到可写外部表,考虑在可写外部表上使用与Greenplum表相同的分布策略或<字段名>。 这样做可以避免数据加载操作中segment节点间额外的数据移动。 |
SequenceFile 格式数据可以选择采用record或block压缩。 PXF hdfs:SequenceFile
配置支持以下压缩编解码器:
- org.apache.hadoop.io.compress.DefaultCodec
- org.apache.hadoop.io.compress.BZip2Codec
使用 hdfs:SequenceFile
配置文件写入SequenceFile格式数据时,则必须提供Java类的名称,以用于序列化/反序列化二进制数据。 这个类必须为数据模式中引用的每种数据类型提供读写方法。
您可以通过 CREATE EXTERNAL TABLE
LOCATION
子句的自定义选项指定压缩编解码器和Java 序列化类。 hdfs:SequenceFile
配置文件支持以下自定义选项:
选项 | 值描述 |
---|---|
COMPRESSION_CODEC | 压缩编码器类名称。 如果此选项未提供,Greenplum 数据库不执行数据压缩。 支持的压缩编解码器包括:org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.BZip2Codec org.apache.hadoop.io.compress.GzipCodec |
COMPRESSION_TYPE | 采用的压缩类型; 支持的值有 RECORD (默认) 或 BLOCK . |
DATA-SCHEMA | 编写器序列化/反序列化类的名称。 此类所在的jar文件必须位于PXF类路径中。 hdfs:SequenceFile 配置文件需要此选项,并且没有默认值。 |
THREAD-SAFE | 确定表查询是否可以在多线程模式下运行的布尔值。 默认为 TRUE 。 将此选项设置为 FALSE 以处理单个线程中所有非线程安全操作 (例如,压缩)的请求。 |
读写二进制数据
当您要读写HDFS中 SequenceFile 格式的数据,请使用HDFS 连接器 hdfs:SequenceFile
配置文件。 这种类型的文件由二进制键/值对组成。 SequenceFile 格式是MapReduce作业之间常见的数据传输格式。
示例: 将二进制数据写入HDFS
在此示例中,您将创建一个名为 PxfExample_CustomWritable
的Java类,该类将对先前示例中使用的示例模式中的字段进行序列化/反序列化。然后,您将使用此类访问通过hdfs:SequenceFile
配置文件创建的可写外部表,该表使用默认的PXF服务器。
执行以下步骤来创建Java类和可写外部表。
准备创建示例Java类:
$ mkdir -p pxfex/com/example/pxf/hdfs/writable/dataschema
$ cd pxfex/com/example/pxf/hdfs/writable/dataschema
$ vi PxfExample_CustomWritable.java
将以下文本复制并黏贴到
PxfExample_CustomWritable.java
文件中:package com.example.pxf.hdfs.writable.dataschema;
import org.apache.hadoop.io.*;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Field;
/**
* PxfExample_CustomWritable class - used to serialize and deserialize data with
* text, int, and float data types
*/
public class PxfExample_CustomWritable implements Writable {
public String st1, st2;
public int int1;
public float ft;
public PxfExample_CustomWritable() {
st1 = new String("");
st2 = new String("");
int1 = 0;
ft = 0.f;
}
public PxfExample_CustomWritable(int i1, int i2, int i3) {
st1 = new String("short_string___" + i1);
st2 = new String("short_string___" + i1);
int1 = i2;
ft = i1 * 10.f * 2.3f;
}
String GetSt1() {
return st1;
}
String GetSt2() {
return st2;
}
int GetInt1() {
return int1;
}
float GetFt() {
return ft;
}
@Override
public void write(DataOutput out) throws IOException {
Text txt = new Text();
txt.set(st1);
txt.write(out);
txt.set(st2);
txt.write(out);
IntWritable intw = new IntWritable();
intw.set(int1);
intw.write(out);
FloatWritable fw = new FloatWritable();
fw.set(ft);
fw.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
Text txt = new Text();
txt.readFields(in);
st1 = txt.toString();
txt.readFields(in);
st2 = txt.toString();
IntWritable intw = new IntWritable();
intw.readFields(in);
int1 = intw.get();
FloatWritable fw = new FloatWritable();
fw.readFields(in);
ft = fw.get();
}
public void printFieldTypes() {
Class myClass = this.getClass();
Field[] fields = myClass.getDeclaredFields();
for (int i = 0; i < fields.length; i++) {
System.out.println(fields[i].getType().getName());
}
}
}
编译并为
PxfExample_CustomWritable
创建Java类JAR文件。 为您的Hadoop发行版提供一个包含hadoop-common.jar
文件的类路径。 例如, 如果您安装了Hortonworks Data Platform Hadoop客户端:$ javac -classpath /usr/hdp/current/hadoop-client/hadoop-common.jar PxfExample_CustomWritable.java
$ cd ../../../../../../
$ jar cf pxfex-customwritable.jar com
$ cp pxfex-customwritable.jar /tmp/
(您的Hadoop库类路径可能不同)
将
pxfex-customwritable.jar
文件复制到Greenplum 数据库master节点。 例如:$ scp pxfex-customwritable.jar gpadmin@gpmaster:/home/gpadmin
登录到您的Greenplum数据库master节点:
$ ssh gpadmin@<gpmaster>
将
pxfex-customwritable.jar
JAR文件复制到用户运行时类库目录中,并记下位置。 例如, 如果PXF_CONF=/usr/local/greenplum-pxf
:gpadmin@gpmaster$ cp /home/gpadmin/pxfex-customwritable.jar /usr/local/greenplum-pxf/lib/pxfex-customwritable.jar
将PXF配置同步到Greenplum数据库集群。 例如:
gpadmin@gpmaster$ $GPHOME/pxf/bin/pxf cluster sync
如重启PXF中所述,在每个Greenplum数据库segment主机上重启PXF。
使用PXF
hdfs:SequenceFile
配置文件创建Greenplum数据库可写外部表。 在DATA-SCHEMA
自定义选项中标识您在上面创建的序列化/反序列化Java类。 创建可写外部表时,使用 BZip2
压缩并指定BLOCK
压缩模式。postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable&COMPRESSION_TYPE=BLOCK&COMPRESSION_CODEC=org.apache.hadoop.io.compress.BZip2Codec')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
注意
'CUSTOM'
FORMAT
格式化属性指定为内置的 pxfwritable_export
格式化程序。通过直接插入
pxf_tbl_seqfile
表将一些数据写入pxf_seqfile
HDFS目录中。 例如:postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
回想一下,Greenplum 数据库不支持直接查询一个可写外部表。要读取
pxf_seqfile
中的数据,请创建一个引用此HDFS目录的可读外部表:postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
当您通过
hdfs:SequenceFile
配置文件读取HDFS数据是必须指定DATA-SCHEMA
自定义选项。 您无需提供压缩相关的选项。 查询可读外部表
read_pxf_tbl_seqfile
:gpadmin=# SELECT * FROM read_pxf_tbl_seqfile ORDER BY total_sales;
location | month | number_of_orders | total_sales
-----------+-------+------------------+-------------
Frankfurt | Mar | 777 | 3956.98
Cleveland | Oct | 3812 | 96645.4
(2 rows)
读取记录键
当Greenplum数据库外部表引用SequenceFile或其他以键-值对存储行的数据格式时, 您可以通过使用 recordkey
关键字作为字段名称,在Greenplum查询中访问记录键的值。
recordkey
字段类型必须与记录键类型相对应,就像其他字段必须与HDFS数据匹配一样。.
您可以将 recordkey
定义为以下任何Hadoop类型:
- BooleanWritable
- ByteWritable
- DoubleWritable
- FloatWritable
- IntWritable
- LongWritable
- Text
如果没有为行定义记录键, Greenplum数据库将返回处理该行的segment的id。
示例: 使用记录键
创建一个可读外部表,以访问您在 示例: 将二进制数据写入HDFS 创建的可写外部表 pxf_tbl_seqfile
的记录键。 本例中将 recordkey
定义为 int8
类型。
postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile_recordkey(recordkey int8, location text, month text, number_of_orders integer, total_sales real)
LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
gpadmin=# SELECT * FROM read_pxf_tbl_seqfile_recordkey;
recordkey | location | month | number_of_orders | total_sales
-----------+-------------+-------+------------------+-------------
2 | Frankfurt | Mar | 777 | 3956.98
1 | Cleveland | Oct | 3812 | 96645.4
(2 rows)
当您将行插入到可写外部表时,您没有定义记录键, 因此 recordkey
标识为处理该行的segment。