Pulsar adaptor for Apache Kafka
Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
使用 Pulsar Kafka 兼容封装器
In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
引入 Pulsar Kafka 包装器
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka</artifactId>
<version>2.7.2</version>
</dependency>
无需更改代码即可使用新的依赖。 需要修改配置信息,并确保将 producer 和 consumer 接入 Pulsar 并使用特定的 Pulsar topic,而不是接入 Kafka。
使用 Pulsar Kafka 兼容封装器和 Kafka 客户端
从 Kafka 迁移到 Pulsar 的过程中,应用程序可能需要同时使用原始 Kafka 客户端和 Pulsar Kafka 封装器。 这种情况下,可以考虑使用透明的 Pulsar Kafka 封装器。
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka-original</artifactId>
<version>2.7.2</version>
</dependency>
当使用这个依赖时,应使用 org.apache.kafka.clients.producer.PulsarKafkaProducer
而不是 org.apache.kafka.clients.producer.KafkaProducer
创建 producer,使用 org.apache.kafka.clients.producer.PulsarKafkaConsumer
创建 consumer。
生产者示例
// 主题应为常规 Pulsar topic
String topic = "persistent://public/default/my-topic";
Properties props = new Properties();
// 指向一个 Pulsar 服务
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
log.info("Message {} sent successfully", i);
}
producer.close();
消费者示例
String topic = "persistent://public/default/my-topic";
Properties props = new Properties();
// 指向一个 Pulsar 服务
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {
log.info("Received record: {}", record);
});
// 提交最近的 offset
consumer.commitSync();
}
完整示例
点击这里查看完整的生产者和消费者示例
兼容性列表
目前,Pulsar Kafka 封装器支持 Kafka API 提供的多数操作。
生产者(Producer)
API:
Producer 方法 | 支持 | 备注 |
---|---|---|
Future<RecordMetadata> send(ProducerRecord<K, V> record) | 是 | |
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) | 是 | |
void flush() | 是 | |
List<PartitionInfo> partitionsFor(String topic) | 否 | |
Map<MetricName, ? extends Metric> metrics() | 否 | |
void close() | 是 | |
void close(long timeout, TimeUnit unit) | 是 |
属性:
配置属性 | 支持 | 备注 |
---|---|---|
acks | 忽略 | 已在命名空间级别配置持久性和 quorum write |
auto.offset.reset | 是 | 若用户为提供特定设置,则默认值为 latest 。 |
batch.size | 忽略 | |
bootstrap.servers | 是 | |
buffer.memory | 忽略 | |
client.id | 忽略 | |
compression.type | 是 | Allows gzip and lz4 . No snappy . |
connections.max.idle.ms | 是 | 仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间 |
interceptor.classes | 是 | |
key.serializer | 是 | |
linger.ms | 是 | 批处理时控制提交时间 |
max.block.ms | 忽略 | |
max.in.flight.requests.per.connection | 忽略 | Pulsar中即使有多个请求也会保证顺序 |
max.request.size | 忽略 | |
metric.reporters | 忽略 | |
metrics.num.samples | 忽略 | |
metrics.sample.window.ms | 忽略 | |
partitioner.class | 是 | |
receive.buffer.bytes | 忽略 | |
reconnect.backoff.ms | 忽略 | |
request.timeout.ms | 忽略 | |
retries | 忽略 | Pulsar client retries with exponential backoff until the send timeout expires. |
send.buffer.bytes | 忽略 | |
timeout.ms | 是 | |
value.serializer | 是 |
消费者(Consumer)
下表为 consumer API 列表。
Consumer 方法 | 支持 | 备注 |
---|---|---|
Set<TopicPartition> assignment() | 否 | |
Set<String> subscription() | 是 | |
void subscribe(Collection<String> topics) | 是 | |
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) | 否 | |
void assign(Collection<TopicPartition> partitions) | 否 | |
void subscribe(Pattern pattern, ConsumerRebalanceListener callback) | 否 | |
void unsubscribe() | 是 | |
ConsumerRecords<K, V> poll(long timeoutMillis) | 是 | |
void commitSync() | 是 | |
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) | 是 | |
void commitAsync() | 是 | |
void commitAsync(OffsetCommitCallback callback) | 是 | |
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) | 是 | |
void seek(TopicPartition partition, long offset) | 是 | |
void seekToBeginning(Collection<TopicPartition> partitions) | 是 | |
void seekToEnd(Collection<TopicPartition> partitions) | 是 | |
long position(TopicPartition partition) | 是 | |
OffsetAndMetadata committed(TopicPartition partition) | 是 | |
Map<MetricName, ? extends Metric> metrics() | 否 | |
List<PartitionInfo> partitionsFor(String topic) | 否 | |
Map<String, List<PartitionInfo>> listTopics() | 否 | |
Set<TopicPartition> paused() | 否 | |
void pause(Collection<TopicPartition> partitions) | 否 | |
void resume(Collection<TopicPartition> partitions) | 否 | |
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) | 否 | |
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) | 否 | |
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) | 否 | |
void close() | 是 | |
void close(long timeout, TimeUnit unit) | 是 | |
void wakeup() | 否 |
属性:
配置属性 | 支持 | 备注 |
---|---|---|
group.id | 是 | 到 Pulsar 订阅名称的映射 |
max.poll.records | 是 | |
max.poll.interval.ms | 忽略 | broker “推送”的消息 |
session.timeout.ms | 忽略 | |
heartbeat.interval.ms | 忽略 | |
bootstrap.servers | 是 | 指定 Pulsar 服务地址 |
enable.auto.commit | 是 | |
auto.commit.interval.ms | 忽略 | 自动提交后,立即向 broker 发送 ack。 |
partition.assignment.strategy | 忽略 | |
auto.offset.reset | 是 | 仅支持最早和最新消息。 |
fetch.min.bytes | 忽略 | |
fetch.max.bytes | 忽略 | |
fetch.max.wait.ms | 忽略 | |
interceptor.classes | 是 | |
metadata.max.age.ms | 忽略 | |
max.partition.fetch.bytes | 忽略 | |
send.buffer.bytes | 忽略 | |
receive.buffer.bytes | 忽略 | |
client.id | 忽略 |
Customize Pulsar configurations
可以通过 Kafka 的属性直接配置 Pulsar 的身份认证。
Pulsar client properties
配置属性 | 默认值 | 备注 |
---|---|---|
pulsar.authentication.class | Configure to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls . | |
pulsar.authentication.params.map | Map which represents parameters for the Authentication-Plugin. | |
pulsar.authentication.params.string | String which represents parameters for the Authentication-Plugin, for example, key1:val1,key2:val2 . | |
pulsar.use.tls | false | Enable TLS transport encryption. |
pulsar.tls.trust.certs.file.path | Path for the TLS trust certificate store. | |
pulsar.tls.allow.insecure.connection | false | Accept self-signed certificates from brokers. |
pulsar.operation.timeout.ms | 30000 | General operations timeout. |
pulsar.stats.interval.seconds | 60 | Pulsar client lib stats printing interval. |
pulsar.num.io.threads | 1 | The number of Netty IO threads to use. |
pulsar.connections.per.broker | 1 | The maximum number of connection to each broker. |
pulsar.use.tcp.nodelay | true | TCP no-delay. |
pulsar.concurrent.lookup.requests | 50000 | The maximum number of concurrent topic lookups. |
pulsar.max.number.rejected.request.per.connection | 50 | The threshold of errors to forcefully close a connection. |
pulsar.keepalive.interval.ms | 30000 | Keep alive interval for each client-broker-connection. |
Pulsar producer properties
配置属性 | 默认值 | 备注 |
---|---|---|
pulsar.producer.name | 指定 producer 名称。 | |
pulsar.producer.initial.sequence.id | 为 producer 指定序列 id 的基线。 | |
pulsar.producer.max.pending.messages | 1000 | 设置等待接收 broker 确认的消息队列的最大值。 |
pulsar.producer.max.pending.messages.across.partitions | 50000 | 设置所有分区等待确认消息的最大值。 |
pulsar.producer.batching.enabled | true | 是否允许自动批量接收消息。 |
pulsar.producer.batching.max.messages | 1000 | 批消息中消息数量的最大值。 |
pulsar.block.if.producer.queue.full | Specify the block producer if queue is full. |
Pulsar consumer Properties
配置属性 | 默认值 | 备注 |
---|---|---|
pulsar.consumer.name | 指定 consumer 名称。 | |
pulsar.consumer.receiver.queue.size | 1000 | Set the size of the consumer receiver queue. |
pulsar.consumer.acknowledgments.group.time.millis | 100 | Set the maximum amount of group time for consumers to send the acknowledgments to the broker. |
pulsar.consumer.total.receiver.queue.size.across.partitions | 50000 | Set the maximum size of the total receiver queue across partitions. |
pulsar.consumer.subscription.topics.mode | PersistentOnly | Set the subscription topic mode for consumers. |