Pulsar

概述

Apache Pulsar是一个分布式、开源的 pub-sub 消息传递和流平台,用于实时工作负载,每天管理数千亿个事件。

版本

抽取节点版本
PulsarPulsar:> = 2.8.x

依赖项

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-pulsar</artifactId>
  4. <version>2.1.0-SNAPSHOT</version>
  5. </dependency>

用法

SQL API 用法

Step.1 准备好 sql 客户端

SQL Client用于编写用于在 Pulsar 中操作数据的 SQL 查询,您可以使用添加-addclasspath包的选项sort-connector-pulsar-{{INLONG_VERSION}}.jar

例子

  1. ./bin/sql-client.sh embedded --jar sort-connector-pulsar_{{INLONG_VERSION}}.jar

注意如果你把我们连接器的JAR包放在下面$FLINK_HOME/lib,不用--jar再用指定连接器的包了。

Step.2 从Pulsar读取数据

  1. CREATE TABLE pulsar (
  2. `physical_1` STRING,
  3. `physical_2` INT,
  4. `eventTime` TIMESTAMP(3) METADATA,
  5. `properties` MAP<STRING, STRING> METADATA ,
  6. `topic` STRING METADATA VIRTUAL,
  7. `sequenceId` BIGINT METADATA VIRTUAL,
  8. `key` STRING ,
  9. `physical_3` BOOLEAN
  10. ) WITH (
  11. 'connector' = 'pulsar-inlong',
  12. 'topic' = 'persistent://public/default/topic82547611',
  13. 'key.format' = 'raw',
  14. 'key.fields' = 'key',
  15. 'value.format' = 'avro',
  16. 'service-url' = 'pulsar://localhost:6650',
  17. 'admin-url' = 'http://localhost:8080',
  18. 'scan.startup.mode' = 'earliest'
  19. )
  20. INSERT INTO `sink_table`
  21. SELECT
  22. `physical_1` AS `physical_1`,
  23. `physical_2` AS `physical_2`
  24. FROM `pulsar`
  25. INSERT INTO pulsar
  26. VALUES
  27. ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
  28. ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
  29. ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE)

Inlong Dashboard 用法

TODO

InLong Manager Client 方式

TODO

Pulsar Extract 节点参数

ParameterRequiredDefault valueTypeDescription
connector必需(none)String设置连接器类型。可用的选项是pulsar-inlong
topic可选(none)String设置输入或输出主题,多个和连接主题使用半逗号。选择一个主题模式。Set the input or output topic, use half comma for multiple and concatenate topics. Choose one with the topic-pattern.
topic-pattern可选(none)String使用正则获取匹配的主题。
service-url必需(none)String设置 Pulsar 代理服务地址。
admin-url可选(none)String设置 Pulsar 管理服务地址。不传入该参数时,启动模式只支持earliestlatest,并且无法更新 Pulsar Topic 的 offset。
scan.startup.mode可选latestString配置 Source 的启动模式。可用选项为earliestlatestexternal-subscriptionspecific-offsets
scan.startup.specific-offsets可选(none)String指定参数时需要该specific-offsets参数。
scan.startup.sub-name可选(none)String指定参数时需要该external-subscription参数。
discovery topic interval可选(none)Long设置分区发现的时间间隔,单位为毫秒。
sink.message-router可选key-hashString设置将消息写入 Pulsar 分区的路由方式。可用选项为key-hashround-robincustom MessageRouter
sink.semantic可选at-least-onceStringSink 写入消息的保证级别。可用选项为at-least-onceexactly-oncenone
properties可选emptyMap设置 Pulsar 的可选配置,格式为properties.key=’value’. 有关详细信息,请参阅配置参数
key.format可选(none)String为 Pulsar 消息设置基于键的序列化格式。可用选项有No formatoptional rawAvroJSON等。
key.fields可选(none)String序列化Key时要使用的SQL定义字段,多个半逗号,连接。
key.fields-prefix可选(none)String为 key 格式的所有字段定义自定义前缀,以避免与 value 格式的字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,key.fields则使用表架构和。
format or value.format必需(none)String使用前缀设置名称。当以键格式构造数据类型时,前缀被移除,并且在键格式中使用非前缀名称。Pulsar 消息值序列化格式,支持 JSON、Avro 等。更多信息请参见 Flink 格式。
value.fields-include可选ALLEnumPulsar 消息值包含字段策略、可选的 ALL 和 EXCEPT_KEY。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}

可用元数据

METADATA 标志用于读取和写入 Pulsar 消息中的元数据。支持列表如下。

注意 R/W 列定义元数据字段是否可读 (R) 和/或可写 (W)。只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作期间排除它们。

关键字数据类型描述读/写
topicSTRING NOT NULLPulsar 消息的主题名称R
messageIdBYTES NOT NULLPulsar 消息的消息 IDR
sequenceIdBIGINT NOT NULLPulsar 消息的序列 IDR
publishTimeTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLPulsar 消息的发布时间R
eventTimeTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLPulsar 消息的生成时间R/W
propertiesMAP<STRING, STRING> NOT NULLPulsar 消息的扩展信息R/W

数据类型映射

Pulsar 将消息键和值存储为字节,因此 Pulsar 没有 schema 或数据类型。Pulsar 消息按格式进行反序列化和序列化,例如 csv、json、avro。因此,数据类型映射由特定格式确定。有关格式详细信息,请参阅格式页面。