How to debug Pulsar connectors

本指南解释如何在本地运行或集群模式中调试连接器,并提供调试检查列表。 为了更好地演示如何调试 Pulsar 连接器,在这里以Mongo sink 连接器作为一个例子。

部署一个 Mongo sink 环境

  1. 启动一个 Mongo 服务。

    1. docker pull mongo:4
    2. docker run -d -p 27017:27017 --name pulsar-mongo -v $PWD/data:/data/db mongo:4
  2. 创建数据库和集合(Collection)。

    1. docker exec -it pulsar-mongo /bin/bash
    2. mongo
    3. > use pulsar
    4. > db.createCollection('messages')
    5. > exit
  3. 启动 Pulsar 单机模式。

    1. docker pull apachepulsar/pulsar:2.4.0
    2. docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --link pulsar-mongo --name pulsar-mongo-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
  4. 使用 mongo-sink-config.yaml 文件配置 Mongo sink。

    1. configs:
    2. mongoUri: "mongodb://pulsar-mongo:27017"
    3. database: "pulsar"
    4. collection: "messages"
    5. batchSize: 2
    6. batchTimeMs: 500
    1. docker cp mongo-sink-config.yaml pulsar-mongo-standalone:/pulsar/
  5. 下载 Mongo sink nar 包。

    1. docker exec -it pulsar-mongo-standalone /bin/bash
    2. curl -O http://apache.01link.hk/pulsar/pulsar-2.4.0/connectors/pulsar-io-mongo-2.4.0.nar

在本地运行模式下调试

使用 localrun 命令以本地模式启动 Mongo sink。

Tip

关于 localrun 命令的更多信息,请参阅 localrun

  1. ./bin/pulsar-admin sinks localrun \
  2. --archive pulsar-io-mongo-2.4.0.nar \
  3. --tenant public --namespace default \
  4. --inputs test-mongo \
  5. --name pulsar-mongo-sink \
  6. --sink-config-file mongo-sink-config.yaml \
  7. --parallelism 1

使用连接器日志

Use one of the following methods to get a connector log in localrun mode:

  • After executing the localrun command, the log is automatically printed on the console.

  • The log is located at:

    1. logs/functions/tenant/namespace/function-name/function-name-instance-id.log

    示例

    The path of the Mongo sink connector is:

    1. logs/functions/public/default/pulsar-mongo-sink/pulsar-mongo-sink-0.log

为了清楚地解释日志信息,此处将大块信息分解成小块,并为每个块添加描述。

  • 此日志信息显示解压后 nar 包的存储路径。

    1. 08:21:54.132 [main] INFO org.apache.pulsar.common.nar.NarClassLoader - Created class loader with paths: [file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/, file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/META-INF/bundled-dependencies/,

    Tip

    If class cannot be found exception is thrown, check whether the nar file is decompressed in the folder file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/META-INF/bundled-dependencies/ or not.

  • This piece of log information illustrates the basic information about the Mongo sink connector, such as tenant, namespace, name, parallelism, resources, and so on, which can be used to check whether the Mongo sink connector is configured correctly or not.

    1. 08:21:55.390 [main] INFO org.apache.pulsar.functions.runtime.ThreadRuntime - ThreadContainer starting function with instance config InstanceConfig(instanceId=0, functionId=853d60a1-0c48-44d5-9a5c-6917386476b2, functionVersion=c2ce1458-b69e-4175-88c0-a0a856a2be8c, functionDetails=tenant: "public"
    2. namespace: "default"
    3. name: "pulsar-mongo-sink"
    4. className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
    5. autoAck: true
    6. parallelism: 1
    7. source {
    8. typeClassName: "[B"
    9. inputSpecs {
    10. key: "test-mongo"
    11. value {
    12. }
    13. }
    14. cleanupSubscription: true
    15. }
    16. sink {
    17. className: "org.apache.pulsar.io.mongodb.MongoSink"
    18. configs: "{\"mongoUri\":\"mongodb://pulsar-mongo:27017\",\"database\":\"pulsar\",\"collection\":\"messages\",\"batchSize\":2,\"batchTimeMs\":500}"
    19. typeClassName: "[B"
    20. }
    21. resources {
    22. cpu: 1.0
    23. ram: 1073741824
    24. disk: 10737418240
    25. }
    26. componentType: SINK
    27. , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=38459, clusterName=local)
  • 此日志信息显示与 Mongo 连接和配置信息的状态。

    1. 08:21:56.231 [cluster-ClusterId{value='5d6396a3c9e77c0569ff00eb', description='null'}-pulsar-mongo:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:1, serverValue:8}] to pulsar-mongo:27017
    2. 08:21:56.326 [cluster-ClusterId{value='5d6396a3c9e77c0569ff00eb', description='null'}-pulsar-mongo:27017] INFO org.mongodb.driver.cluster - Monitor thread successfully connected to server with description ServerDescription{address=pulsar-mongo:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 0]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=89058800}
  • 该日志信息说明了消费者和客户端配置,包括主题名称、订阅名称、订阅类型等等。

    1. 08:21:56.719 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
    2. "topicNames" : [ "test-mongo" ],
    3. "topicsPattern" : null,
    4. "subscriptionName" : "public/default/pulsar-mongo-sink",
    5. "subscriptionType" : "Shared",
    6. "receiverQueueSize" : 1000,
    7. "acknowledgementsGroupTimeMicros" : 100000,
    8. "negativeAckRedeliveryDelayMicros" : 60000000,
    9. "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
    10. "consumerName" : null,
    11. "ackTimeoutMillis" : 0,
    12. "tickDurationMillis" : 1000,
    13. "priorityLevel" : 0,
    14. "cryptoFailureAction" : "CONSUME",
    15. "properties" : {
    16. "application" : "pulsar-sink",
    17. "id" : "public/default/pulsar-mongo-sink",
    18. "instance_id" : "0"
    19. },
    20. "readCompacted" : false,
    21. "subscriptionInitialPosition" : "Latest",
    22. "patternAutoDiscoveryPeriod" : 1,
    23. "regexSubscriptionMode" : "PersistentOnly",
    24. "deadLetterPolicy" : null,
    25. "autoUpdatePartitions" : true,
    26. "replicateSubscriptionState" : false,
    27. "resetIncludeHead" : false
    28. }
    29. 08:21:56.726 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
    30. "serviceUrl" : "pulsar://localhost:6650",
    31. "authPluginClassName" : null,
    32. "authParams" : null,
    33. "operationTimeoutMs" : 30000,
    34. "statsIntervalSeconds" : 60,
    35. "numIoThreads" : 1,
    36. "numListenerThreads" : 1,
    37. "connectionsPerBroker" : 1,
    38. "useTcpNoDelay" : true,
    39. "useTls" : false,
    40. "tlsTrustCertsFilePath" : null,
    41. "tlsAllowInsecureConnection" : false,
    42. "tlsHostnameVerificationEnable" : false,
    43. "concurrentLookupRequest" : 5000,
    44. "maxLookupRequest" : 50000,
    45. "maxNumberOfRejectedRequestPerConnection" : 50,
    46. "keepAliveIntervalSeconds" : 30,
    47. "connectionTimeoutMs" : 10000,
    48. "requestTimeoutMs" : 60000,
    49. "defaultBackoffIntervalNanos" : 100000000,
    50. "maxBackoffIntervalNanos" : 30000000000
    51. }

在集群模式中调试

You can use the following methods to debug a connector in cluster mode:

使用连接器日志

在集群模式下,多个连接器可以运行在一个 worker 上。 要找到指定连接器的日志路径,请使用 workerId 来定位连接器日志。

使用管理命令行工具(admin CLI)

Pulsar admin CLI helps you debug Pulsar connectors with the following subcommands:

创建 Mongo sink

  1. ./bin/pulsar-admin sinks create \
  2. --archive pulsar-io-mongo-2.4.0.nar \
  3. --tenant public \
  4. --namespace default \
  5. --inputs test-mongo \
  6. --name pulsar-mongo-sink \
  7. --sink-config-file mongo-sink-config.yaml \
  8. --parallelism 1

get

使用 get 命令,获取 Mongo sink 连接器的基本信息,例如租户、命名空间、名称、并行度等等。

  1. ./bin/pulsar-admin sinks get --tenant public --namespace default --name pulsar-mongo-sink
  2. {
  3. "tenant": "public",
  4. "namespace": "default",
  5. "name": "pulsar-mongo-sink",
  6. "className": "org.apache.pulsar.io.mongodb.MongoSink",
  7. "inputSpecs": {
  8. "test-mongo": {
  9. "isRegexPattern": false
  10. }
  11. },
  12. "configs": {
  13. "mongoUri": "mongodb://pulsar-mongo:27017",
  14. "database": "pulsar",
  15. "collection": "messages",
  16. "batchSize": 2.0,
  17. "batchTimeMs": 500.0
  18. },
  19. "parallelism": 1,
  20. "processingGuarantees": "ATLEAST_ONCE",
  21. "retainOrdering": false,
  22. "autoAck": true
  23. }

Tip

更多 get 命令信息,请参阅 get 章节。

status

使用 status 命令获取 Mongo sink 连接器的当前状态,例如实例数量、正在运行实例的数量、instanceId、workerId 等。

  1. ./bin/pulsar-admin sinks status
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-mongo-sink
  5. {
  6. "numInstances" : 1,
  7. "numRunning" : 1,
  8. "instances" : [ {
  9. "instanceId" : 0,
  10. "status" : {
  11. "running" : true,
  12. "error" : "",
  13. "numRestarts" : 0,
  14. "numReadFromPulsar" : 0,
  15. "numSystemExceptions" : 0,
  16. "latestSystemExceptions" : [ ],
  17. "numSinkExceptions" : 0,
  18. "latestSinkExceptions" : [ ],
  19. "numWrittenToSink" : 0,
  20. "lastReceivedTime" : 0,
  21. "workerId" : "c-standalone-fw-5d202832fd18-8080"
  22. }
  23. } ]
  24. }

Tip

关于 status 命令的更多信息,可参阅 status

如果一个 worker 上运行着多个连接器,workerId可以找到指定的连接器所运行的 worker。

topics stats

使用 topics stats 命令获取主题及其关联的生产者、消费者的统计信息,如主题是否已收到消息,或者是否有消息积压,或可用权限以及其它关键信息。 All rates are computed over a 1-minute window and are relative to the last completed 1-minute period.

  1. ./bin/pulsar-admin topics stats test-mongo
  2. {
  3. "msgRateIn" : 0.0,
  4. "msgThroughputIn" : 0.0,
  5. "msgRateOut" : 0.0,
  6. "msgThroughputOut" : 0.0,
  7. "averageMsgSize" : 0.0,
  8. "storageSize" : 1,
  9. "publishers" : [ ],
  10. "subscriptions" : {
  11. "public/default/pulsar-mongo-sink" : {
  12. "msgRateOut" : 0.0,
  13. "msgThroughputOut" : 0.0,
  14. "msgRateRedeliver" : 0.0,
  15. "msgBacklog" : 0,
  16. "blockedSubscriptionOnUnackedMsgs" : false,
  17. "msgDelayed" : 0,
  18. "unackedMessages" : 0,
  19. "type" : "Shared",
  20. "msgRateExpired" : 0.0,
  21. "consumers" : [ {
  22. "msgRateOut" : 0.0,
  23. "msgThroughputOut" : 0.0,
  24. "msgRateRedeliver" : 0.0,
  25. "consumerName" : "dffdd",
  26. "availablePermits" : 999,
  27. "unackedMessages" : 0,
  28. "blockedConsumerOnUnackedMsgs" : false,
  29. "metadata" : {
  30. "instance_id" : "0",
  31. "application" : "pulsar-sink",
  32. "id" : "public/default/pulsar-mongo-sink"
  33. },
  34. "connectedSince" : "2019-08-26T08:48:07.582Z",
  35. "clientVersion" : "2.4.0",
  36. "address" : "/172.17.0.3:57790"
  37. } ],
  38. "isReplicated" : false
  39. }
  40. },
  41. "replication" : { },
  42. "deduplicationStatus" : "Disabled"
  43. }

Tip

关于 topic stats 命令的更多信息,请参阅 topic stats

检查清单

此清单列出了调试连接器时要检查的主要事项。 该清单提醒我们应该注意什么,以确保彻底检查,并作为评估工具获取连接器状态。

  • Pulsar 是否成功启动?

  • 外部服务运行正常吗?

  • nar 包是否完整?

  • 连接器配置文件正确吗?

  • 在本地运行模式下,运行连接器并检查控制台输出的信息(连接器日志)。

  • 在集群模式中:

    • 使用 get命令获取基本信息。

    • 使用 status 命令获取当前状态。

    • 使用 topics stats 命令获取特定主题及其关联的生产者和消费者的统计信息。

    • 检查连接器日志。

  • 进入外部系统并验证结果。