Scala REPL

Flink附带了一个集成的交互式Scala Shell。它可以在本地设置和群集设置中使用。

要将shell与集成的Flink集群一起使用,只需执行:

  1. bin/start-scala-shell.sh local

在二进制Flink目录的根目录中。要在群集上运行Shell,请参阅下面的“设置”部分。

用法

shell支持Batch和Streaming。启动后会自动预先绑定两个不同的ExecutionEnvironments。使用“benv”和“senv”分别访问Batch和Streaming环境。

DataSet API

以下示例将在Scala shell中执行wordcount程序:

  1. Scala-Flink> val text = benv.fromElements(
  2. "To be, or not to be,--that is the question:--",
  3. "Whether 'tis nobler in the mind to suffer",
  4. "The slings and arrows of outrageous fortune",
  5. "Or to take arms against a sea of troubles,")
  6. Scala-Flink> val counts = text
  7. .flatMap { _.toLowerCase.split("\\W+") }
  8. .map { (_, 1) }.groupBy(0).sum(1)
  9. Scala-Flink> counts.print()

print()命令将自动将指定的任务发送到JobManager执行,并在终端中显示计算结果。

可以将结果写入文件。但是,在这种情况下,您需要调用execute,以运行您的程序:

  1. Scala-Flink> benv.execute("MyProgram")

DataStream API

与上面的批处理程序类似,我们可以通过DataStream API执行流程序:

  1. Scala-Flink> val textStreaming = senv.fromElements(
  2. "To be, or not to be,--that is the question:--",
  3. "Whether 'tis nobler in the mind to suffer",
  4. "The slings and arrows of outrageous fortune",
  5. "Or to take arms against a sea of troubles,")
  6. Scala-Flink> val countsStreaming = textStreaming
  7. .flatMap { _.toLowerCase.split("\\W+") }
  8. .map { (_, 1) }.keyBy(0).sum(1)
  9. Scala-Flink> countsStreaming.print()
  10. Scala-Flink> senv.execute("Streaming Wordcount")

请注意,在Streaming情况下,打印 算子操作不会直接触发执行。

Flink Shell附带命令历史记录和自动完成函数。

添加外部依赖项

可以将外部类路径添加到Scala-shell。当调用execute时,它们将与shell程序一起自动发送到JobManager。

使用参数-a <path/to/jar.jar>—addclasspath <path/to/jar.jar>加载其他类。

  1. bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>

建立

要了解Scala Shell提供的选项,请使用

  1. bin/start-scala-shell.sh --help

本地

要将shell与集成的Flink集群一起使用,只需执行:

  1. bin/start-scala-shell.sh local

远程

要将其与正在运行的集群一起使用,请使用关键字启动scala shell,remote并为JobManager提供以下主机和端口:

  1. bin/start-scala-shell.sh remote <hostname> <portnumber>

YARNScala Shell集群

shell可以将Flink集群部署到YARN,YARN专门由shell使用。YARN容器的数量可以通过参数控制-n <arg>shell在YARN上部署新的Flink集群并连接集群。您还可以为YARN群集指定选项,例如JobManager的内存,YARN应用程序的名称等。

例如,要使用两个TaskManagers为Scala Shell启动Yarn集群,请使用以下命令:

  1. bin/start-scala-shell.sh yarn -n 2

对于所有其他选项,请参阅底部的完整参考。

YARN Session

如果您之前使用Flink Yarn会话部署了Flink集群,则Scala shell可以使用以下命令与其连接:

  1. bin/start-scala-shell.sh yarn

完整参考

  1. Flink Scala Shell
  2. Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
  3. Command: local [options]
  4. Starts Flink scala shell with a local Flink cluster
  5. -a <path/to/jar> | --addclasspath <path/to/jar>
  6. Specifies additional jars to be used in Flink
  7. Command: remote [options] <host> <port>
  8. Starts Flink scala shell connecting to a remote cluster
  9. <host>
  10. Remote host name as string
  11. <port>
  12. Remote port as integer
  13. -a <path/to/jar> | --addclasspath <path/to/jar>
  14. Specifies additional jars to be used in Flink
  15. Command: yarn [options]
  16. Starts Flink scala shell connecting to a yarn cluster
  17. -n arg | --container arg
  18. Number of YARN container to allocate (= Number of TaskManagers)
  19. -jm arg | --jobManagerMemory arg
  20. Memory for JobManager container with optional unit (default: MB)
  21. -nm <value> | --name <value>
  22. Set a custom name for the application on YARN
  23. -qu <arg> | --queue <arg>
  24. Specifies YARN queue
  25. -s <arg> | --slots <arg>
  26. Number of slots per TaskManager
  27. -tm <arg> | --taskManagerMemory <arg>
  28. Memory per TaskManager container with optional unit (default: MB)
  29. -a <path/to/jar> | --addclasspath <path/to/jar>
  30. Specifies additional jars to be used in Flink
  31. --configDir <value>
  32. The configuration directory.
  33. -h | --help
  34. Prints this usage text