Twitter 连接器
Twitter Streaming API 提供了访问 Twitter 的 tweets 流的能力。 Flink Streaming 通过一个内置的 TwitterSource
类来创建到 tweets 流的连接。 使用 Twitter 连接器,需要在工程中添加下面的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.11</artifactId>
<version>1.12.0</version>
</dependency>
注意:当前的二进制发行版还没有这些连接器。集群执行请参考这里.
认证
使用 Twitter 流,用户需要先注册自己的程序,获取认证相关的必要信息。过程如下:
获取认证信息
首先,需要一个 Twitter 账号。可以通过 twitter.com/signup 免费注册, 或者在 Twitter 的 Application Management 登录,然后点击 “Create New App” 按钮来注册应用,填写应用程序相关表格并且接受条款。选择应用程序之后,可以在 “API Keys” 标签页看到 API key 和 API secret(对应于TwitterSource
中的twitter-source.consumerKey
和 twitter-source.consumerSecret
)。 请保管好这些信息并且不要将其发布到public的仓库。
使用
和其他的连接器不同的是,TwitterSource
没有任何其他依赖。下面的示例代码就可以优雅的运行:
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
val props = new Properties()
props.setProperty(TwitterSource.CONSUMER_KEY, "")
props.setProperty(TwitterSource.CONSUMER_SECRET, "")
props.setProperty(TwitterSource.TOKEN, "")
props.setProperty(TwitterSource.TOKEN_SECRET, "")
val streamSource = env.addSource(new TwitterSource(props))
TwitterSource
会发出包含了JSON object的字符串,这样的字符串表示一个Tweet.
flink-examples-streaming
中的 TwitterExample
类是使用 TwitterSource
的完整示范。
TwitterSource
默认使用 StatusesSampleEndpoint
。StatusesSampleEndpoint
会返回一个 Tweets 的随机抽样。用户可以通过实现 TwitterSource.EndpointInitializer
接口来自定义 endpoint。