Pulsar adaptor for Apache Kafka
Pulsar为使用Apache Kafka Java客户端API编写的应用程序提供了一个简单的选项。
使用 Pulsar Kafka 兼容包装器
In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
引入 Pulsar Kafka 包装器
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka</artifactId>
<version>2.7.0</version>
</dependency>
现存的代码无需任何更改即可使用新的依赖。 需要修改的是配置信息,应确保将生产者和消费者使用 Pulsar 服务,而不是Kafka,并使用特定的 Pulsar 主题。
使用 Pulsar Kafka 兼容包装器和现有 kafka 客户端
从 Kafka 迁移到 Pulsar 时,应用程序在迁移过程中可能不得不同时使用原始 kafka 客户端和 Pulsar Kafka 包装器。 这时,应该考虑使用透明的 Pulsar Kafka 包装器。
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka-original</artifactId>
<version>2.7.0</version>
</dependency>
当使用这个依赖时,应使用 org.apache.kafka.clients.producer.PulsarKafkaProducer
而不是 org.apache.kafka.clients.producer.KafkaProducer
构造生产者,应使用 org.apache.kafka.clients.producer.PulsarKafkaConsumer
构造消费者。
生产者示例
// 主题应为常规 Pulsar 主题
String topic = "persistent://public/default/my-topic";
Properties props = new Properties();
// 指向一个 Pulsar 服务
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
log.info("Message {} sent successfully", i);
}
producer.close();
消费者示例
String topic = "persistent://public/default/my-topic";
Properties props = new Properties();
// 指向一个Pulsar服务
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
records.forEach(record -> {
log.info("Received record: {}", record);
});
// 提交最近的 offset
consumer.commitSync();
}
完整例子
点击这里查看完整的生产者和消费者示例
兼容性列表
目前,Pulsar Kafka 包装器支持 Kafka API 提供的大多数操作。
生产者(Producer)
APIs:
生产者方法 | 支持 | 备注 |
---|---|---|
Future<RecordMetadata> send(ProducerRecord<K, V> record) | Yes | |
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) | Yes | |
void flush() | Yes | |
List<PartitionInfo> partitionsFor(String topic) | No | |
Map<MetricName, ? extends Metric> metrics() | No | |
void close() | Yes | |
void close(long timeout, TimeUnit unit) | Yes |
属性:
配置属性 | 支持 | 备注 |
---|---|---|
acks | 已忽略 | 持久性和法定写作已在命名空间级别上配置 |
auto.offset.reset | Yes | 若用户为提供特定设置,则默认值为 latest 。 |
batch.size | 已忽略 | |
bootstrap.servers | Yes | |
buffer.memory | 已忽略 | |
client.id | 已忽略 | |
compression.type | Yes | Allows gzip and lz4 . No snappy . |
connections.max.idle.ms | Yes | 仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间 |
interceptor.classes | Yes | |
key.serializer | Yes | |
linger.ms | Yes | 批处理时控制提交时间 |
max.block.ms | 已忽略 | |
max.in.flight.requests.per.connection | 已忽略 | Pulsar中即使有多个请求也会保证顺序 |
max.request.size | 已忽略 | |
metric.reporters | 已忽略 | |
metrics.num.samples | 已忽略 | |
metrics.sample.window.ms | 已忽略 | |
partitioner.class | Yes | |
receive.buffer.bytes | 已忽略 | |
reconnect.backoff.ms | 已忽略 | |
request.timeout.ms | 已忽略 | |
retries | 已忽略 | Pulsar client retries with exponential backoff until the send timeout expires. |
send.buffer.bytes | 已忽略 | |
timeout.ms | Yes | |
value.serializer | Yes |
消费者(Consumer)
下述表格列出了consumer 接口
使用者方法 | 支持 | 备注 |
---|---|---|
Set<TopicPartition> assignment() | No | |
Set<String> subscription() | Yes | |
void subscribe(Collection<String> topics) | Yes | |
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) | No | |
void assign(Collection<TopicPartition> partitions) | No | |
void subscribe(Pattern pattern, ConsumerRebalanceListener callback) | No | |
void unsubscribe() | Yes | |
ConsumerRecords<K, V> poll(long timeoutMillis) | Yes | |
void commitSync() | Yes | |
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) | Yes | |
void commitAsync() | Yes | |
void commitAsync(OffsetCommitCallback callback) | Yes | |
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) | Yes | |
void seek(TopicPartition partition, long offset) | Yes | |
void seekToBeginning(Collection<TopicPartition> partitions) | Yes | |
void seekToEnd(Collection<TopicPartition> partitions) | Yes | |
long position(TopicPartition partition) | Yes | |
OffsetAndMetadata committed(TopicPartition partition) | Yes | |
Map<MetricName, ? extends Metric> metrics() | No | |
List<PartitionInfo> partitionsFor(String topic) | No | |
Map<String, List<PartitionInfo>> listTopics() | No | |
Set<TopicPartition> paused() | No | |
void pause(Collection<TopicPartition> partitions) | No | |
void resume(Collection<TopicPartition> partitions) | No | |
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) | No | |
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) | No | |
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) | No | |
void close() | Yes | |
void close(long timeout, TimeUnit unit) | Yes | |
void wakeup() | No |
属性:
配置属性 | 支持 | 备注 |
---|---|---|
group.id | Yes | 到一个Pulsar订阅名称的映射 |
max.poll.records | Yes | |
max.poll.interval.ms | 已忽略 | 来自消息服务器“推送”的消息 |
session.timeout.ms | 已忽略 | |
heartbeat.interval.ms | 已忽略 | |
bootstrap.servers | Yes | 指定一个pulsar服务地址 |
enable.auto.commit | Yes | |
auto.commit.interval.ms | 已忽略 | 自动提交后, acks 立即发送给 broker. |
partition.assignment.strategy | 已忽略 | |
auto.offset.reset | Yes | 仅支持最早和最新的。 |
fetch.min.bytes | 已忽略 | |
fetch.max.bytes | 已忽略 | |
fetch.max.wait.ms | 已忽略 | |
interceptor.classes | Yes | |
metadata.max.age.ms | 已忽略 | |
max.partition.fetch.bytes | 已忽略 | |
send.buffer.bytes | 已忽略 | |
receive.buffer.bytes | 已忽略 | |
client.id | 已忽略 |
Customize Pulsar configurations
你可以直接通过Kafka的属性去配置Pulsar的身份认证.
Pulsar client properties
配置属性 | 默认值 | 备注 |
---|---|---|
pulsar.authentication.class | Configure to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls . | |
pulsar.authentication.params.map | Map which represents parameters for the Authentication-Plugin. | |
pulsar.authentication.params.string | String which represents parameters for the Authentication-Plugin, for example, key1:val1,key2:val2 . | |
pulsar.use.tls | false | Enable TLS transport encryption. |
pulsar.tls.trust.certs.file.path | Path for the TLS trust certificate store. | |
pulsar.tls.allow.insecure.connection | false | Accept self-signed certificates from brokers. |
pulsar.operation.timeout.ms | 30000 | General operations timeout. |
pulsar.stats.interval.seconds | 60 | Pulsar client lib stats printing interval. |
pulsar.num.io.threads | 1 | The number of Netty IO threads to use. |
pulsar.connections.per.broker | 1 | The maximum number of connection to each broker. |
pulsar.use.tcp.nodelay | true | TCP no-delay. |
pulsar.concurrent.lookup.requests | 50000 | The maximum number of concurrent topic lookups. |
pulsar.max.number.rejected.request.per.connection | 50 | The threshold of errors to forcefully close a connection. |
pulsar.keepalive.interval.ms | 30000 | Keep alive interval for each client-broker-connection. |
Pulsar producer properties
配置属性 | 默认值 | 备注 |
---|---|---|
pulsar.producer.name | 指定 producer 名称。 | |
pulsar.producer.initial.sequence.id | 为producer指定序列id的基线。 | |
pulsar.producer.max.pending.messages | 1000 | 设置等待接收broker确认的消息队列最大值。 |
pulsar.producer.max.pending.messages.across.partitions | 50000 | 设置所有分区挂起消息的最大值。 |
pulsar.producer.batching.enabled | true | 是否允许自动批量接收消息。 |
pulsar.producer.batching.max.messages | 1000 | 批量中的消息的最大数量。 |
pulsar.block.if.producer.queue.full | Specify the block producer if queue is full. |
Pulsar consumer Properties
配置属性 | 默认值 | 备注 |
---|---|---|
pulsar.consumer.name | 指定 consumer 名称. | |
pulsar.consumer.receiver.queue.size | 1000 | Set the size of the consumer receiver queue. |
pulsar.consumer.acknowledgments.group.time.millis | 100 | Set the maximum amount of group time for consumers to send the acknowledgments to the broker. |
pulsar.consumer.total.receiver.queue.size.across.partitions | 50000 | Set the maximum size of the total receiver queue across partitions. |
pulsar.consumer.subscription.topics.mode | PersistentOnly | Set the subscription topic mode for consumers. |