Input
定义所有的数据源(Source),用于Pipeline.read()方法
实现一个Source需要实现四个接口:
- 有一个input_format属性,是一个flume::Loader
- 有一个objector属性,是一个Objector
- 有一个uris属性,返回一个uri列表
- 有一个transform_from_node方法,把一个Node变换成一个PType
- 有一个get_size方法,计算本文件读出数据量有多少。可以返回-1(?)表示未知大小。
class bigflow.input.FileBase
(path, options*)
基类:object
用于Pipeline.read()方法读取文件的基类
参数: | *path — 读取文件的path,必须均为str或unicode类型 |
---|
-
获得所有读取文件在文件系统中的大小
返回: 文件大小,以字节为单位 返回类型: int
class bigflow.input.SchemaTextFile
(path, options*)
读取文本文件生成支持字段操作的SchemaPCollection
参数: |
|
---|
Example
>>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n")
>>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = ['name', 'age']))
>>> persons.get()
[{'age': '20', 'name': 'XiaoA'}, {'age': '21', 'name': 'XiaoB'}]
>>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n")
>>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = [('name', str), ('age', int)]))
>>> persons.get()
[{'age': 20, 'name': 'XiaoA'}, {'age': 21, 'name': 'XiaoB'}]
>>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf")
>>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=3))
>>> data.get()
[('1', '2.0', 'biflow'), ('10', '20.1', 'inf')]
>>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf")
>>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=[int, float, str]))
>>> data.get()
[(1, 2.0, 'biflow'), (10, 20.1, 'inf')]
class bigflow.input.SequenceFile
(path, options*)
表示读取SequenceFile的数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析
参数: |
|
---|
Example
>>> from bigflow import serde
>>> StrSerde = serde.StrSerde
>>> lines = _pipeline.read(
input.SequenceFile('path', key_serde=StrSerde(), value_serde=StrSerde()))
>>> lines.get()
[("key1", "value1"), ("key2", "value2")]
>>> import mytest_proto_pb2
>>> msg_type = mytest_proto_pb2.MyTestPbType
>>> _pipeline.add_file("mytest_proto_pb2.py", "mytest_proto_pb2.py")
>>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(msg_type)))
>>> pbs.get() # 如果未设置key_serde/value_serde,则key会被丢弃。
>>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>,
<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]
有时,Pb包在本地没有,例如,py文件在hdfs,则可以使用下边的方法:
>>> _pipeline.add_archive("hdfs:///proto.tar.gz", "proto") #add_archive暂时不支持本地模式
>>> def get_pb_msg_creator(module_name, class_name):
... import importlib
... return lambda: importlib.import_module(module_name).__dict__[class_name]()
>>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(get_pb_msg_creator("proto.mytest_proto_pb2", "MyTestPbType"))))
>>> pbs.get()
>>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>,
<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]
如果需要自定义Serde,参见:bigflow.serde.Serde
。
-
通过kv_deserializer反序列化读取的(Key, Value)
kv_deserializer的期望签名为:
kv_deserializer(key: str, value: str) => object
transform_from_node
(load_node, pipeline)内部接口
class bigflow.input.SequenceFileStream
(path, options*)
表示读取SequenceFile的无穷数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析
参数: |
|
---|
注解
- 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
- 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
- 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
-
通过kv_deserializer反序列化读取的(Key, Value)
kv_deserializer的期望签名为:
kv_deserializer(key: str, value: str) => object
transform_from_node
(load_node, pipeline)内部方法
class bigflow.input.TextFile
(path, options*)
表示读取的文本文件的数据源
参数: | *path — 读取文件的path,必须均为str类型 |
---|
读取文件数据示例::
>>> lines1 = _pipeline.read(input.TextFile('hdfs:///my_hdfs_dir/'))
>>> lines2 = _pipeline.read(input.TextFile('hdfs://host:port/my_hdfs_file'))
>>> lines3 = _pipeline.read(input.TextFile('hdfs:///multi_path1', 'hdfs:///multi_path2'))
>>> lines4 = _pipeline.read(input.TextFile('./local_file_by_rel_path/'))
>>> lines5 = _pipeline.read(input.TextFile('/home/work/local_file_by_abs_path/'))
>>> lines6 = _pipeline.read(input.TextFile(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))
**options: 其中关键参数:
combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。
partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,
ptable的key是split_info,value这个split上的全部数据所组成的pcollection。:
>>> f1 = open('data1.txt', 'w')
>>> f1.write('1 2 1')
>>> f1.close()
>>> f2 = open('data2.txt', 'w')
>>> f2.write('1 2 2')
>>> f2.close()
>>> table = _pipeline.read(input.TextFile('./data1.txt', './data2.txt', partitioned = True))
>>> def wordcount(p):
return p.flat_map(lambda line: line.split()) \
.group_by(lambda word: word) \
.apply_values(transforms.count)
>>> table.apply_values(wordcount).get()
{'/home/data1.txt': {'1': 2, '2': 1}, '/home/data2.txt': {'1': 1, '2', 2}}
# 需要注意,MR模式下,key可能不是这个格式的,切分方法也不一定是按照文件来的。
transform_from_node
(load_node, pipeline)内部接口
class bigflow.input.TextFileStream
(path, options*)
表示读取的文本文件的无穷数据源。
参数: | *path — 读取文件目录的path,必须均为str类型 |
---|
读取文件数据示例::
>>> lines1 = _pipeline.read(input.TextFileStream('hdfs:///my_hdfs_dir/'))
>>> lines2 = _pipeline.read(input.TextFileStream('hdfs://host:port/my_hdfs_dir/'))
>>> lines3 = _pipeline.read(input.TextFileStream('hdfs:///multi_path1', 'hdfs:///multi_path2'))
>>> lines4 = _pipeline.read(input.TextFileStream('./local_file_by_rel_path/'))
>>> lines5 = _pipeline.read(input.TextFileStream('/home/work/local_file_by_abs_path/'))
>>> lines6 = _pipeline.read(input.TextFileStream(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))
**options: 可选的参数。
[Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000
[Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s
注解
- 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
- 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
- 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
class bigflow.input.UserInputBase
基类:object
用户输入抽象基类
用户需要按以下方法重写split/load函数
Eg.
class LocalFileInput(UserInputBase):
def __init__(self, dir):
self._dir = dir
def split(self):
return [os.path.join(self._dir, filename) for filename in os.listdir(self._dir)]
def load(split):
with open(split) as f:
for line in f.readline():
yield line.strip()
用户可以重写post_process以实现一些后处理, post_process有一个传入参数,是一个PTable,这个PTable的key是split string, value是这个split上的数据。 默认post_process方法是`bigflow.transforms.flatten_values`.
-
User can override this method to set the serde
-
user can override this method to calculate the size of the input data
-
Load data from a split. The return value will be flattened into a PCollection.
-
User can override post_process method to do some post_process.
-
splits urls as some splits. User should override this method.
bigflow.input.user_define_format
(user_input_base)
return a FileBase object from a UserInputBase