Pulsar adaptor for Apache Kafka

Pulsar为使用Apache Kafka Java客户端API编写的应用程序提供了一个简单的选项。

使用 Pulsar Kafka 兼容包装器

In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.2.1</version>
  5. </dependency>

引入 Pulsar Kafka 包装器

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka</artifactId>
  4. <version>2.7.0</version>
  5. </dependency>

现存的代码无需任何更改即可使用新的依赖。 需要修改的是配置信息,应确保将生产者和消费者使用 Pulsar 服务,而不是Kafka,并使用特定的 Pulsar 主题。

使用 Pulsar Kafka 兼容包装器和现有 kafka 客户端

从 Kafka 迁移到 Pulsar 时,应用程序在迁移过程中可能不得不同时使用原始 kafka 客户端和 Pulsar Kafka 包装器。 这时,应该考虑使用透明的 Pulsar Kafka 包装器。

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka-original</artifactId>
  4. <version>2.7.0</version>
  5. </dependency>

当使用这个依赖时,应使用 org.apache.kafka.clients.producer.PulsarKafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer 构造生产者,应使用 org.apache.kafka.clients.producer.PulsarKafkaConsumer 构造消费者。

生产者示例

  1. // 主题应为常规 Pulsar 主题
  2. String topic = "persistent://public/default/my-topic";
  3. Properties props = new Properties();
  4. // 指向一个 Pulsar 服务
  5. props.put("bootstrap.servers", "pulsar://localhost:6650");
  6. props.put("key.serializer", IntegerSerializer.class.getName());
  7. props.put("value.serializer", StringSerializer.class.getName());
  8. Producer<Integer, String> producer = new KafkaProducer<>(props);
  9. for (int i = 0; i < 10; i++) {
  10. producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
  11. log.info("Message {} sent successfully", i);
  12. }
  13. producer.close();

消费者示例

  1. String topic = "persistent://public/default/my-topic";
  2. Properties props = new Properties();
  3. // 指向一个Pulsar服务
  4. props.put("bootstrap.servers", "pulsar://localhost:6650");
  5. props.put("group.id", "my-subscription-name");
  6. props.put("enable.auto.commit", "false");
  7. props.put("key.deserializer", IntegerDeserializer.class.getName());
  8. props.put("value.deserializer", StringDeserializer.class.getName());
  9. Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
  10. consumer.subscribe(Arrays.asList(topic));
  11. while (true) {
  12. ConsumerRecords<Integer, String> records = consumer.poll(100);
  13. records.forEach(record -> {
  14. log.info("Received record: {}", record);
  15. });
  16. // 提交最近的 offset
  17. consumer.commitSync();
  18. }

完整例子

点击这里查看完整的生产者和消费者示例

兼容性列表

目前,Pulsar Kafka 包装器支持 Kafka API 提供的大多数操作。

生产者(Producer)

APIs:

生产者方法支持备注
Future<RecordMetadata> send(ProducerRecord<K, V> record)Yes
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)Yes
void flush()Yes
List<PartitionInfo> partitionsFor(String topic)No
Map<MetricName, ? extends Metric> metrics()No
void close()Yes
void close(long timeout, TimeUnit unit)Yes

属性:

配置属性支持备注
acks已忽略持久性和法定写作已在命名空间级别上配置
auto.offset.resetYes若用户为提供特定设置,则默认值为 latest
batch.size已忽略
bootstrap.serversYes
buffer.memory已忽略
client.id已忽略
compression.typeYesAllows gzip and lz4. No snappy.
connections.max.idle.msYes仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间
interceptor.classesYes
key.serializerYes
linger.msYes批处理时控制提交时间
max.block.ms已忽略
max.in.flight.requests.per.connection已忽略Pulsar中即使有多个请求也会保证顺序
max.request.size已忽略
metric.reporters已忽略
metrics.num.samples已忽略
metrics.sample.window.ms已忽略
partitioner.classYes
receive.buffer.bytes已忽略
reconnect.backoff.ms已忽略
request.timeout.ms已忽略
retries已忽略Pulsar client retries with exponential backoff until the send timeout expires.
send.buffer.bytes已忽略
timeout.msYes
value.serializerYes

消费者(Consumer)

下述表格列出了consumer 接口

使用者方法支持备注
Set<TopicPartition> assignment()No
Set<String> subscription()Yes
void subscribe(Collection<String> topics)Yes
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)No
void assign(Collection<TopicPartition> partitions)No
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)No
void unsubscribe()Yes
ConsumerRecords<K, V> poll(long timeoutMillis)Yes
void commitSync()Yes
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)Yes
void commitAsync()Yes
void commitAsync(OffsetCommitCallback callback)Yes
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)Yes
void seek(TopicPartition partition, long offset)Yes
void seekToBeginning(Collection<TopicPartition> partitions)Yes
void seekToEnd(Collection<TopicPartition> partitions)Yes
long position(TopicPartition partition)Yes
OffsetAndMetadata committed(TopicPartition partition)Yes
Map<MetricName, ? extends Metric> metrics()No
List<PartitionInfo> partitionsFor(String topic)No
Map<String, List<PartitionInfo>> listTopics()No
Set<TopicPartition> paused()No
void pause(Collection<TopicPartition> partitions)No
void resume(Collection<TopicPartition> partitions)No
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)No
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)No
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)No
void close()Yes
void close(long timeout, TimeUnit unit)Yes
void wakeup()No

属性:

配置属性支持备注
group.idYes到一个Pulsar订阅名称的映射
max.poll.recordsYes
max.poll.interval.ms已忽略来自消息服务器“推送”的消息
session.timeout.ms已忽略
heartbeat.interval.ms已忽略
bootstrap.serversYes指定一个pulsar服务地址
enable.auto.commitYes
auto.commit.interval.ms已忽略自动提交后, acks 立即发送给 broker.
partition.assignment.strategy已忽略
auto.offset.resetYes仅支持最早和最新的。
fetch.min.bytes已忽略
fetch.max.bytes已忽略
fetch.max.wait.ms已忽略
interceptor.classesYes
metadata.max.age.ms已忽略
max.partition.fetch.bytes已忽略
send.buffer.bytes已忽略
receive.buffer.bytes已忽略
client.id已忽略

Customize Pulsar configurations

你可以直接通过Kafka的属性去配置Pulsar的身份认证.

Pulsar client properties

配置属性默认值备注
pulsar.authentication.classConfigure to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls.
pulsar.authentication.params.mapMap which represents parameters for the Authentication-Plugin.
pulsar.authentication.params.stringString which represents parameters for the Authentication-Plugin, for example, key1:val1,key2:val2.
pulsar.use.tlsfalseEnable TLS transport encryption.
pulsar.tls.trust.certs.file.pathPath for the TLS trust certificate store.
pulsar.tls.allow.insecure.connectionfalseAccept self-signed certificates from brokers.
pulsar.operation.timeout.ms30000General operations timeout.
pulsar.stats.interval.seconds60Pulsar client lib stats printing interval.
pulsar.num.io.threads1The number of Netty IO threads to use.
pulsar.connections.per.broker1The maximum number of connection to each broker.
pulsar.use.tcp.nodelaytrueTCP no-delay.
pulsar.concurrent.lookup.requests50000The maximum number of concurrent topic lookups.
pulsar.max.number.rejected.request.per.connection50The threshold of errors to forcefully close a connection.
pulsar.keepalive.interval.ms30000Keep alive interval for each client-broker-connection.

Pulsar producer properties

配置属性默认值备注
pulsar.producer.name指定 producer 名称。
pulsar.producer.initial.sequence.id为producer指定序列id的基线。
pulsar.producer.max.pending.messages1000设置等待接收broker确认的消息队列最大值。
pulsar.producer.max.pending.messages.across.partitions50000设置所有分区挂起消息的最大值。
pulsar.producer.batching.enabledtrue是否允许自动批量接收消息。
pulsar.producer.batching.max.messages1000批量中的消息的最大数量。
pulsar.block.if.producer.queue.fullSpecify the block producer if queue is full.

Pulsar consumer Properties

配置属性默认值备注
pulsar.consumer.name指定 consumer 名称.
pulsar.consumer.receiver.queue.size1000Set the size of the consumer receiver queue.
pulsar.consumer.acknowledgments.group.time.millis100Set the maximum amount of group time for consumers to send the acknowledgments to the broker.
pulsar.consumer.total.receiver.queue.size.across.partitions50000Set the maximum size of the total receiver queue across partitions.
pulsar.consumer.subscription.topics.modePersistentOnlySet the subscription topic mode for consumers.