Pulsar adaptor for Apache Kafka

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。

使用 Pulsar Kafka 兼容封装器

In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.2.1</version>
  5. </dependency>

然后在 Pulsar Kafka 封装器中加入下面依赖:

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka</artifactId>
  4. <version>2.9.2</version>
  5. </dependency>

借助新的依赖,现有代码无需任何改动就能使用。 需要修改配置信息,并确保将 producer 和 consumer 接入 Pulsar 并使用特定的 Pulsar topic,而不是接入 Kafka。

同时使用 Pulsar Kafka 兼容封装器和已有 Kafka 客户端

从 Kafka 迁移到 Pulsar 的过程中,应用程序可能需要同时使用原始 Kafka 客户端和 Pulsar Kafka 封装器。 这种情况下,可以考虑使用透明的 Pulsar Kafka 封装器。

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka-original</artifactId>
  4. <version>2.9.2</version>
  5. </dependency>

当使用这个依赖时,应使用 org.apache.kafka.clients.producer.PulsarKafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer 创建 producer,使用 org.apache.kafka.clients.producer.PulsarKafkaConsumer 创建 consumer。

生产者示例

  1. // 主题应为常规 Pulsar topic
  2. String topic = "persistent://public/default/my-topic";
  3. Properties props = new Properties();
  4. // 指向一个 Pulsar 服务
  5. props.put("bootstrap.servers", "pulsar://localhost:6650");
  6. props.put("key.serializer", IntegerSerializer.class.getName());
  7. props.put("value.serializer", StringSerializer.class.getName());
  8. Producer<Integer, String> producer = new KafkaProducer<>(props);
  9. for (int i = 0; i < 10; i++) {
  10. producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
  11. log.info("Message {} sent successfully", i);
  12. }
  13. producer.close();

消费者示例

  1. String topic = "persistent://public/default/my-topic";
  2. Properties props = new Properties();
  3. // 指向一个 Pulsar 服务
  4. props.put("bootstrap.servers", "pulsar://localhost:6650");
  5. props.put("group.id", "my-subscription-name");
  6. props.put("enable.auto.commit", "false");
  7. props.put("key.deserializer", IntegerDeserializer.class.getName());
  8. props.put("value.deserializer", StringDeserializer.class.getName());
  9. Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
  10. consumer.subscribe(Arrays.asList(topic));
  11. while (true) {
  12. ConsumerRecords<Integer, String> records = consumer.poll(100);
  13. records.forEach(record -> {
  14. log.info("Received record: {}", record);
  15. });
  16. // 提交最近的 offset
  17. consumer.commitSync();
  18. }

完整示例

这里你可以找到完整的生产者和消费者示例。

兼容性列表

目前,Pulsar Kafka 封装器支持 Kafka API 提供的多数操作。

生产者(Producer)

API:

Producer 方法支持备注
Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
void flush()
List<PartitionInfo> partitionsFor(String topic)
Map<MetricName, ? extends Metric> metrics()
void close()
void close(long timeout, TimeUnit unit)

属性:

配置属性支持备注
acks忽略在命名空间级别配置的持久性和节点写入
auto.offset.reset如果你没有给出一个特定的设置,它将使用一个默认值earliest
batch.size忽略
bootstrap.servers
buffer.memory忽略
client.id忽略
compression.typeAllows gzip and lz4. No snappy.
connections.max.idle.ms仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间
interceptor.classes
key.serializer
linger.ms批处理时控制提交时间
max.block.ms忽略
max.in.flight.requests.per.connection忽略Pulsar中即使有多个请求也会保证顺序
max.request.size忽略
metric.reporters忽略
metrics.num.samples忽略
metrics.sample.window.ms忽略
partitioner.class
receive.buffer.bytes忽略
reconnect.backoff.ms忽略
request.timeout.ms忽略
retries忽略Pulsar client retries with exponential backoff until the send timeout expires.
send.buffer.bytes忽略
timeout.ms
value.serializer

消费者(Consumer)

下表为消费者 API 列表。

Consumer 方法支持备注
Set<TopicPartition> assignment()
Set<String> subscription()
void subscribe(Collection<String> topics)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
void assign(Collection<TopicPartition> partitions)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
void unsubscribe()
ConsumerRecords<K, V> poll(long timeoutMillis)
void commitSync()
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
void commitAsync()
void commitAsync(OffsetCommitCallback callback)
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
void seek(TopicPartition partition, long offset)
void seekToBeginning(Collection<TopicPartition> partitions)
void seekToEnd(Collection<TopicPartition> partitions)
long position(TopicPartition partition)
OffsetAndMetadata committed(TopicPartition partition)
Map<MetricName, ? extends Metric> metrics()
List<PartitionInfo> partitionsFor(String topic)
Map<String, List<PartitionInfo>> listTopics()
Set<TopicPartition> paused()
void pause(Collection<TopicPartition> partitions)
void resume(Collection<TopicPartition> partitions)
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
void close()
void close(long timeout, TimeUnit unit)
void wakeup()

属性:

配置属性支持备注
group.id到 Pulsar 订阅名称的映射
max.poll.records
max.poll.interval.ms忽略broker “推送”的消息
session.timeout.ms忽略
heartbeat.interval.ms忽略
bootstrap.servers需要指向单个的 Pulsar 服务 URL
enable.auto.commit
auto.commit.interval.ms忽略自动提交后,立即向 broker 发送 ack。
partition.assignment.strategy忽略
auto.offset.reset仅支持最早和最新消息。
fetch.min.bytes忽略
fetch.max.bytes忽略
fetch.max.wait.ms忽略
interceptor.classes
metadata.max.age.ms忽略
max.partition.fetch.bytes忽略
send.buffer.bytes忽略
receive.buffer.bytes忽略
client.id忽略

自定义 Pulsar 配置

可以通过 Kafka 的属性直接配置 Pulsar 的身份认证。

Pulsar client properties

配置属性默认值备注
pulsar.authentication.classConfigure to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls.
pulsar.authentication.params.map表示认证插件(Authentication-Plugin)的参数的映射。
pulsar.authentication.params.string表示 Authentication-Plugin 参数的字符串,例如 key1:val1,key2:val2
pulsar.use.tlsfalse启用 TLS 传输加密。
pulsar.tls.trust.certs.file.pathTLS 证书文件的路径。
pulsar.tls.allow.insecure.connectionfalse接受 broker 的自签名证书。
pulsar.operation.timeout.ms30000常规操作超时时间。
pulsar.stats.interval.seconds60Pulsar 客户端数据统计信息打印间隔时间。
pulsar.num.io.threads1要使用的 Netty IO 线程数量。
pulsar.connections.per.broker1每个 broker 的最大连接数。
pulsar.use.tcp.nodelaytrueTCP无延时。
pulsar.concurrent.lookup.requests50000并发查询 topic 的最大连接数。
pulsar.max.number.rejected.request.per.connection50强制关闭连接的错误阈值。
pulsar.keepalive.interval.ms30000每个 client-broker 连接的 keep-alive 时间间隔。

Pulsar producer properties

配置属性默认值备注
pulsar.producer.name指定 producer 名称。
pulsar.producer.initial.sequence.id为 producer 指定序列 id 的基线。
pulsar.producer.max.pending.messages1000设置等待接收 broker 确认的消息队列的最大值。
pulsar.producer.max.pending.messages.across.partitions50000设置所有分区挂起消息的最大值。
pulsar.producer.batching.enabledtrue控制是否启用 producer 的自动批量消息功能。
pulsar.producer.batching.max.messages1000批消息中消息数量的最大值。
pulsar.block.if.producer.queue.full如果队列已满,则指定区块生产者。

Pulsar consumer Properties

配置属性默认值备注
pulsar.consumer.name指定 consumer 名称。
pulsar.consumer.receiver.queue.size1000设置消费者接收队列的大小。
pulsar.consumer.acknowledgments.group.time.millis100设置消费者向 broker 发送确认的最大分组时间。
pulsar.consumer.total.receiver.queue.size.across.partitions50000设置跨分区总接收队列的最大大小。
pulsar.consumer.subscription.topics.modePersistentOnly设置消费者订阅主题的模式。