Twitter 连接器

Twitter Streaming API 提供了访问 Twitter 的 tweets 流的能力。 Flink Streaming 通过一个内置的 TwitterSource 类来创建到 tweets 流的连接。 使用 Twitter 连接器,需要在工程中添加下面的依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-twitter_2.11</artifactId>
  4. <version>1.12.0</version>
  5. </dependency>

注意:当前的二进制发行版还没有这些连接器。集群执行请参考这里.

认证

使用 Twitter 流,用户需要先注册自己的程序,获取认证相关的必要信息。过程如下:

获取认证信息

首先,需要一个 Twitter 账号。可以通过 twitter.com/signup 免费注册, 或者在 Twitter 的 Application Management 登录,然后点击 “Create New App” 按钮来注册应用,填写应用程序相关表格并且接受条款。选择应用程序之后,可以在 “API Keys” 标签页看到 API key 和 API secret(对应于TwitterSource中的twitter-source.consumerKeytwitter-source.consumerSecret )。 请保管好这些信息并且不要将其发布到public的仓库。

使用

和其他的连接器不同的是,TwitterSource 没有任何其他依赖。下面的示例代码就可以优雅的运行:

  1. Properties props = new Properties();
  2. props.setProperty(TwitterSource.CONSUMER_KEY, "");
  3. props.setProperty(TwitterSource.CONSUMER_SECRET, "");
  4. props.setProperty(TwitterSource.TOKEN, "");
  5. props.setProperty(TwitterSource.TOKEN_SECRET, "");
  6. DataStream<String> streamSource = env.addSource(new TwitterSource(props));
  1. val props = new Properties()
  2. props.setProperty(TwitterSource.CONSUMER_KEY, "")
  3. props.setProperty(TwitterSource.CONSUMER_SECRET, "")
  4. props.setProperty(TwitterSource.TOKEN, "")
  5. props.setProperty(TwitterSource.TOKEN_SECRET, "")
  6. val streamSource = env.addSource(new TwitterSource(props))

TwitterSource 会发出包含了JSON object的字符串,这样的字符串表示一个Tweet.

flink-examples-streaming 中的 TwitterExample 类是使用 TwitterSource 的完整示范。

TwitterSource 默认使用 StatusesSampleEndpointStatusesSampleEndpoint 会返回一个 Tweets 的随机抽样。用户可以通过实现 TwitterSource.EndpointInitializer 接口来自定义 endpoint。