Pulsar Python client library is a wrapper over the existing C++ client library and exposes all of the same features. 你可以在 C++ 客户端源码的 python
子目录中找到 Pulsar Python 客户端的相关源码 。
All the methods in producer, consumer, and reader of a Python client are thread-safe.
pdoc-generated API docs for the Python client are available here.
安装
You can install the pulsar-client
library either via PyPi, using pip, or by building the library from source.
Install using pip
To install the pulsar-client
library as a pre-built package using the pip package manager:
$ pip install pulsar-client==2.6.1
Installation via PyPi is available for the following Python versions:
平台 | 支持的 Python 版本 |
---|---|
MacOS 10.13 (High Sierra), 10.14 (Mojave) | 2.7, 3.7 |
Linux | 2.7, 3.4, 3.5, 3.6, 3.7 |
Install from source
To install the pulsar-client
library by building from source, follow instructions and compile the Pulsar C++ client library. That builds the Python binding for the library.
To install the built Python bindings:
$ git clone https://github.com/apache/pulsar
$ cd pulsar/pulsar-client-cpp/python
$ sudo python setup.py install
API 手册:
The complete Python API reference is available at api/python.
示例
You can find a variety of Python code examples for the pulsar-client
library.
生产者示例
The following example creates a Python producer for the my-topic
topic and sends 10 messages on that topic:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
消费者示例
The following example creates a consumer with the my-subscription
subscription name on the my-topic
topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# 确认已经成功处理消息
consumer.acknowledge(msg)
except:
# 消息处理失败
consumer.negative_acknowledge(msg)
client.close()
This example shows how to configure negative acknowledgement.
from pulsar import Client, schema
client = Client('pulsar://localhost:6650')
consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
producer = client.create_producer('negative_acks',schema=schema.StringSchema())
for i in range(10):
print('send msg "hello-%d"' % i)
producer.send_async('hello-%d' % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print('receive and nack msg "%s"' % msg.data())
for i in range(10):
msg = consumer.receive()
consumer.acknowledge(msg)
print('receive and ack msg "%s"' % msg.data())
try:
# No more messages expected
msg = consumer.receive(100)
except:
print("no more msg")
pass
读者接口示例
You can use the Pulsar Python API to use the Pulsar reader interface. Here’s an example:
# MessageId 取自先前获取的消息
msg_id = msg.message_id()
reader = client.create_reader('my-topic', msg_id)
while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# 无确认操作
多主题订阅
In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously. To use multi-topic subscriptions, you can supply a regular expression (regex) or a List
of topics. 如果通过 regex 选择主题, 则所有主题都必须位于同一Pulsar命名空间中。
The following is an example.
import re
consumer = client.subscribe(re.compile('persistent://public/default/topic-*'), 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
Schema
Declare and validate schema
You can declare a schema by passing a class that inherits from pulsar.schema.Record
and defines the fields as class variables. 例如:
from pulsar.schema import *
class Example(Record):
a = String()
b = Integer()
c = Boolean()
With this simple schema definition, you can create producers, consumers and readers instances that refer to that.
producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )
producer.send(Example(a='Hello', b=1))
After creating the producer, the Pulsar broker validates that the existing topic schema is indeed of “Avro” type and that the format is compatible with the schema definition of the Example
class.
If there is a mismatch, an exception occurs in the producer creation.
Once a producer is created with a certain schema definition, it will only accept objects that are instances of the declared schema class.
Similarly, for a consumer/reader, the consumer will return an object, instance of the schema record class, rather than the raw bytes:
consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )
while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
Supported schema types
You can use different builtin schema types in Pulsar. All the definitions are in the pulsar.schema
package.
Schema | 备注 |
---|---|
BytesSchema | Get the raw payload as a bytes object. No serialization/deserialization are performed. This is the default schema mode |
StringSchema | Encode/decode payload as a UTF-8 string. Uses str objects |
JsonSchema | Require record definition. Serializes the record into standard JSON payload |
AvroSchema | Require record definition. Serializes in AVRO format |
Schema definition reference
The schema definition is done through a class that inherits from pulsar.schema.Record
.
This class has a number of fields which can be of either pulsar.schema.Field
type or another nested Record
. All the fields are specified in the pulsar.schema
package. The fields are matching the AVRO fields types.
字段类型 | Python 类型 | 备注 |
---|---|---|
Boolean | bool | |
Integer | int | |
Long | int | |
Float | float | |
Double | float | |
Bytes | bytes | |
String | str | |
Array | list | Need to specify record type for items. |
Map | dict | Key is always String . Need to specify value type. |
Additionally, any Python Enum
type can be used as a valid field type.
字段参数
When adding a field, you can use these parameters in the constructor.
参数 | 默认值 | 备注 |
---|---|---|
default | 无 | Set a default value for the field. Eg: a = Integer(default=5) |
required | False | Mark the field as “required”. It is set in the schema accordingly. |
Schema 定义示例
简单定义
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
使用枚举
from enum import Enum
class Color(Enum):
red = 1
green = 2
blue = 3
class Example(Record):
name = String()
color = Color
复杂类型
class MySubRecord(Record):
x = Integer()
y = Long()
z = String()
class Example(Record):
a = String()
sub = MySubRecord()