BasePipeline
Pipeline基类定义
class bigflow.pipeline.pipeline_base.PipelineBase
(**options)
基类:object
Pipeline基类
add_archive
(file_path, resource_path)向Pipeline添加一个压缩文件,使得该文件在运行期自动被解包
参数: -
save the ptype cache node id for use
add_directory
(dir_path, resource_path=’’, is_only_python_files_accepted=False)向Pipeline添加一个目录,使得该目录下的文件/子目录能够在运行期被访问
参数: 注解
如 resource_path 未提供, 且调用了多次 add_directory, 各个目录下的文件或者子目录如果存在重 名, 则为未定义行为. 有可能在添加时就出错, 也有可能在远端启动出错. 为避免该情况, 可以为每个要添 加的 dir_path 设置惟一的 resource_path.
-
向Pipeline添加一个egg文件,使得该文件会在运行期自动被添加到PYTHONPATH中
参数: file_path (str) — egg文件路径 add_file
(source, resource_path=None, executable=False, from_bytes=False)向Pipeline添加单个文件,使得该文件能够在运行期被访问
参数: >> pipeline.add_file('/path/to/local/or/hdfs/file', './target_path_a')
>> def remote_filter(self):
.. # This is a dumb example, don't do this in production,
.. import os
.. return os.path.isfile('./target_path_a')
>> pc = pipeline.parallelize([1,2,3])
>> filtered_pc = pc.filter(remote_filter)
>> print pc.diff(filtered_pc).count().get()
.. 0
-
立刻运行Pipeline并等待作业提交完成
Raises: BigflowRuntimeException
— 若运行期出错抛出此异常 -
获得job config
返回: 用户不应当修改此Pipeline job config 返回类型: JobConfig -
返回该Pipeline的默认序列化/反序列化器
返回: 序列化/反序列化器,用户不应当修改此objector 返回类型: Objector -
将一个P类型表示的数据汇聚为内存变量,相当于调用pvalue.get()。改方法隐式调用pvalue.cache() 并立即触发Pipeline.run()
参数: pvalue (PType) — P类型实例 返回: 内存变量 返回类型: object -
得到表示该Pipeline的唯一ID
返回: Pipeline ID 返回类型: int parallelize
(dataset, **options)将一段内存变量映射为一个P类型实例
参数: - dataset (object) — 任意类型的内存变量
- options — serde: 设置dataset的serde对象
返回: 表示该内存变量的P类型
返回类型: -
将外部存储的数据映射为一个PCollection,并在运行时读取数据
参数: source (Source) — 表示外部存储的Source实例 返回: 读取结果 返回类型: PCollection -
将所有counter清零
Raises: error.BigflowRuntimeException
— 此方法不允许在Bigflow变换
的用户自定义方法(UDF)中调用,否则抛出此异常 -
将一个counter清零, 若 name 中不包含 group 部分, 则默认将 Flume group 下面对应的 counter 清零
参数: name (str) — counter名称,其说明请参考 counter模块
Raises: error.BigflowRuntimeException
— 此方法不允许在Bigflow变换
的用户自定义方法(UDF)中调用,否则抛出此异常 -
立刻运行Pipeline并等待结束
Raises: BigflowRuntimeException
— 若运行期出错抛出此异常 -
向Pipeline设置一个结束钩子,使得Pipeline能够在进程结束前依次执行 :param name: 钩子名称 :type name: str :param fn: 方法名称 :type fn: callable
-
向Pipeline设置一个初始化钩子,使得Pipeline能够在进程启动时依次执行(按钩子名字排序) :param name: 钩子名称, 用户应使用 str 作为钩子名字, 同时钩子应使用 ascii 码中字符. :type name: str :param fn: 方法名称 :type fn: callable
-
将一个PCollection映射为外部存储数据,并在运行期写到该外部存储
参数: - pcollection (PCollection) — 要写出的PCollection
- target (Target) — 表示外部存储的Target实例