BasePipeline

Pipeline基类定义

class bigflow.pipeline.pipeline_base.PipelineBase(**options)

基类:object

Pipeline基类

  • add_archive(file_path, resource_path)

    向Pipeline添加一个压缩文件,使得该文件在运行期自动被解包

    参数:
    • file_path (str) — 文件路径,目前仅支持HDFS
    • resource_path (str) — 运行期访问该文件解压后的路径
  • add_cache_id(cache_id)

    save the ptype cache node id for use

  • add_directory(dir_path, resource_path=’’, is_only_python_files_accepted=False)

    向Pipeline添加一个目录,使得该目录下的文件/子目录能够在运行期被访问

    参数:
    • dir_path (str) — 需要添加的本地目录路径
    • resource_path (str) — 计算引擎运行时访问该目录的路径, 应是相对路径. 如未提供, 则 dir_path 下的所有 文件和子目录将会在远端的当前目录.
    • is_only_python_files_accepted (bool) — 是否仅添加目录下的.py/.pyc/.egg文件,默认为False

    注解

    如 resource_path 未提供, 且调用了多次 add_directory, 各个目录下的文件或者子目录如果存在重 名, 则为未定义行为. 有可能在添加时就出错, 也有可能在远端启动出错. 为避免该情况, 可以为每个要添 加的 dir_path 设置惟一的 resource_path.

  • add_egg_file(file_path)

    向Pipeline添加一个egg文件,使得该文件会在运行期自动被添加到PYTHONPATH中

    参数:file_path (str) — egg文件路径
  • add_file(source, resource_path=None, executable=False, from_bytes=False)

    向Pipeline添加单个文件,使得该文件能够在运行期被访问

    参数:
    • source (str) — 需要添加的文件路径或者比特数据流
    • resource_path (str) — 计算引擎运行时访问该文件的本地路径, 应是相对路径. 也即在远端, source 将会被映射 成该 resource_path 路径, 用户程序可以直接用该路径访问.
    • executable (bool) — 若为True,则该文件在运行期会被添加可执行属性
    • from_bytes (bool) — 若为True, 则表示远端文件的内容即为 source 的比特数据流, 默认为 False, source 应为一个文件路径
    1. >> pipeline.add_file('/path/to/local/or/hdfs/file', './target_path_a')
    2. >> def remote_filter(self):
    3. .. # This is a dumb example, don't do this in production,
    4. .. import os
    5. .. return os.path.isfile('./target_path_a')
    6. >> pc = pipeline.parallelize([1,2,3])
    7. >> filtered_pc = pc.filter(remote_filter)
    8. >> print pc.diff(filtered_pc).count().get()
    9. .. 0
  • async_run()

    立刻运行Pipeline并等待作业提交完成

    Raises:BigflowRuntimeException — 若运行期出错抛出此异常
  • config()

    获得job config

    返回:用户不应当修改此Pipeline job config
    返回类型:JobConfig
  • default_objector()

    返回该Pipeline的默认序列化/反序列化器

    返回:序列化/反序列化器,用户不应当修改此objector
    返回类型:Objector
  • get(pvalue)

    将一个P类型表示的数据汇聚为内存变量,相当于调用pvalue.get()。改方法隐式调用pvalue.cache() 并立即触发Pipeline.run()

    参数:pvalue (PType) — P类型实例
    返回:内存变量
    返回类型:object
  • id()

    得到表示该Pipeline的唯一ID

    返回:Pipeline ID
    返回类型:int
  • parallelize(dataset, **options)

    将一段内存变量映射为一个P类型实例

    参数:
    • dataset (object) — 任意类型的内存变量
    • options — serde: 设置dataset的serde对象
    返回:

    表示该内存变量的P类型

    返回类型:

    PType

  • plan(cache_id)

  • read(source)

    将外部存储的数据映射为一个PCollection,并在运行时读取数据

    参数:source (Source) — 表示外部存储的Source实例
    返回:读取结果
    返回类型:PCollection
  • reset_all_counters()

    将所有counter清零

    Raises:error.BigflowRuntimeException — 此方法不允许在 Bigflow变换 的用户自定义方法(UDF)中调用,否则抛出此异常
  • reset_counter(name)

    将一个counter清零, 若 name 中不包含 group 部分, 则默认将 Flume group 下面对应的 counter 清零

    参数:name (str) — counter名称,其说明请参考 counter模块
    Raises:error.BigflowRuntimeException — 此方法不允许在 Bigflow变换 的用户自定义方法(UDF)中调用,否则抛出此异常
  • run()

    立刻运行Pipeline并等待结束

    Raises:BigflowRuntimeException — 若运行期出错抛出此异常
  • set_fini_hook(name, fn)

    向Pipeline设置一个结束钩子,使得Pipeline能够在进程结束前依次执行 :param name: 钩子名称 :type name: str :param fn: 方法名称 :type fn: callable

  • set_init_hook(name, fn)

    向Pipeline设置一个初始化钩子,使得Pipeline能够在进程启动时依次执行(按钩子名字排序) :param name: 钩子名称, 用户应使用 str 作为钩子名字, 同时钩子应使用 ascii 码中字符. :type name: str :param fn: 方法名称 :type fn: callable

  • write(pcollection, target)

    将一个PCollection映射为外部存储数据,并在运行期写到该外部存储

    参数:
    • pcollection (PCollection) — 要写出的PCollection
    • target (Target) — 表示外部存储的Target实例