原文链接 : http://zeppelin.apache.org/docs/0.7.2/interpreter/scio.html

译文链接 : http://cwiki.apachecn.org/pages/viewpage.action?pageId=10030912

贡献者 : 片刻 ApacheCN Apache中文网

概述

Scio是Google云数据流的Scala DSL 和由SparkScalding启发的 Apache Beam。有关更多信息,请参阅当前的wikiAPI文档。

配置

名称默认值描述
zeppelin.scio.argz—runner = InProcessPipelineRunner
Scio解释者广泛的论据。

文档:https://github.com/spotify/scio/wiki#optionshttps://cloud.google.com/dataflow/pipelines/specifying-exec-params
zeppelin.scio.maxResult1000要显示的最大SC选择结果数

启用Scio解释器

在笔记本中,要启用Scio解释器,请单击Gear图标并选择beambeam.scio)。

使用Scio解释器

在段落中,用于%beam.scio选择Scio解释器。您可以使用与香草Scala REPL和Scio REPL相同的方式。状态(如变量,导入,执行等)在所有Scio段落之间共享。有一个特殊的变量argz,它包含来自Scio解释器设置的参数。最简单的方法是通过标准创建Scio上下文ContextAndArgs

  1. %beam.scio
  2. val (sc, args) = ContextAndArgs(argz)

sc以常规管道/ REPL的方式使用上下文。

示例:

  1. %beam.scio
  2. val (sc, args) = ContextAndArgs(argz)
  3. sc.parallelize(Seq("foo", "foo", "bar")).countByValue.closeAndDisplay()

如果您关闭Scio上下文,请继续创建一个新的ContextAndArgs。请参考Scio wiki更复杂的例子。您可以关闭Scio上下文与Scio REPL相同,并使用Zeppelin显示帮助程序同步关闭并显示结果 - 在下面阅读更多。

进展

一次只能运行一个段落。没有总体进展的概念,因此进度条将显示0

SCollection显示助手

Scio解释器带有显示助手,以方便与Zeppelin笔记本电脑的工作。只需使用closeAndDisplay()SCollection,关闭背景和显示结果。结果数量受限于zeppelin.scio.maxResult(默认为1000)。

支持的SCollection类型:

  • Scio键入的BigQuery
  • Scala的产品(案例 classes, tuples)
  • Google BigQuery的TableRow
  • Apache Avro
  • 所有Scala的 AnyVal
    助手方法

不同的对象有不同的帮助方法。您可以轻松地显示结果SCollectionFuture[Tap]Tap

SCollection 帮手

SCollection closeAndDisplay对于上面列出的类型,具有Zeppelin辅助方法。使用它来同步关闭Scio上下文,一旦可用的拉和显示结果。

Future[Tap] 帮手

Future[Tap] waitAndDisplay对于上面列出的类型,具有Zeppelin辅助方法。使用它来同步等待结果,一旦可用拉和显示结果。

Tap 帮手

Tap display对于上面列出的类型,具有Zeppelin辅助方法。使用它来拉和显示结果。

示例

BigQuery示例:

  1. %beam.scio
  2. @BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
  3. |FROM [bigquery-samples:airline_ontime_data.flights]
  4. |group by departure_airport
  5. |order by 2 desc
  6. |limit 10""".stripMargin) class Flights
  7.  
  8. val (sc, args) = ContextAndArgs(argz)
  9. sc.bigQuerySelect(Flights.query).closeAndDisplay(Flights.schema)

BigQuery typed 示例:

  1. %beam.scio
  2. @BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
  3. |FROM [bigquery-samples:airline_ontime_data.flights]
  4. |group by departure_airport
  5. |order by 2 desc
  6. |limit 10""".stripMargin) class Flights
  7.  
  8. val (sc, args) = ContextAndArgs(argz)
  9. sc.typedBigQuery[Flights]().flatMap(_.no_of_delays).mean.closeAndDisplay()

Avro示例:

  1. %beam.scio
  2. import com.spotify.data.ExampleAvro
  3.  
  4. val (sc, args) = ContextAndArgs(argz)
  5. sc.avroFile[ExampleAvro]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay()

具有视图模式的Avro示例:

  1. %beam.scio
  2. import com.spotify.data.ExampleAvro
  3. import org.apache.avro.Schema
  4.  
  5. val (sc, args) = ContextAndArgs(argz)
  6. val view = Schema.parse("""{"type":"record","name":"ExampleAvro","namespace":"com.spotify.data","fields":[{"name":"track","type":"string"}, {"name":"artist", "type":"string"}]}""")
  7.  
  8. sc.avroFile[EndSongCleaned]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay(view)

Google凭据

Scio Interpreter会尝试从其环境推断出您的Google Cloud凭据,它将进入帐户:

  • argz解释器设置(doc
  • 环境变量(GOOGLE_APPLICATION_CREDENTIALS
  • gcloud配置
    BigQuery宏凭证

目前用于宏扩展的BigQuery项目是使用Google Dataflow的DefaultProjectFactory().create() 推断的。