Manage transactions

Transactions - 图1tip

This page only shows some frequently used operations.

  • For the latest and complete information about Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin docs.

  • For the latest and complete information about REST API, including parameters, responses, samples, and more, see REST API doc.

  • For the latest and complete information about Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.

Transaction resources

GetSlowTransactions

In the production environment, there may be some long-lasting transactions that have never been completed. You can get these slow transactions that have survived over a certain time under a coordinator or all coordinators in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions slow-transactions -c 1 -t 1s

GET /admin/v3/transactions/slowTransactions/:timeout/getSlowTransactions

  1. admin.transactions().getSlowTransactionsByCoordinatorId(coordinatorId, timeout, timeUnit)
  2. //Or get slow transactions from all coordinators
  3. admin.transactions().getSlowTransactions(timeout, timeUnit)

The following is an example of the returned values.

  1. {
  2. "(0,3)": {
  3. "txnId": "(0,3)",
  4. "status": "OPEN",
  5. "openTimestamp": 1658120122474,
  6. "timeoutAt": 300000,
  7. "producedPartitions": {},
  8. "ackedPartitions": {}
  9. },
  10. "(0,2)": {
  11. "txnId": "(0,2)",
  12. "status": "OPEN",
  13. "openTimestamp": 1658120122471,
  14. "timeoutAt": 300000,
  15. "producedPartitions": {},
  16. "ackedPartitions": {}
  17. },
  18. "(0,5)": {
  19. "txnId": "(0,5)",
  20. "status": "OPEN",
  21. "openTimestamp": 1658120122478,
  22. "timeoutAt": 300000,
  23. "producedPartitions": {},
  24. "ackedPartitions": {}
  25. },
  26. "(0,4)": {
  27. "txnId": "(0,4)",
  28. "status": "OPEN",
  29. "openTimestamp": 1658120122476,
  30. "timeoutAt": 300000,
  31. "producedPartitions": {},
  32. "ackedPartitions": {}
  33. },
  34. "(0,7)": {
  35. "txnId": "(0,7)",
  36. "status": "OPEN",
  37. "openTimestamp": 1658120122482,
  38. "timeoutAt": 300000,
  39. "producedPartitions": {},
  40. "ackedPartitions": {}
  41. },
  42. "(0,10)": {
  43. "txnId": "(0,10)",
  44. "status": "OPEN",
  45. "openTimestamp": 1658120122488,
  46. "timeoutAt": 300000,
  47. "producedPartitions": {},
  48. "ackedPartitions": {}
  49. },
  50. "(0,6)": {
  51. "txnId": "(0,6)",
  52. "status": "OPEN",
  53. "openTimestamp": 1658120122480,
  54. "timeoutAt": 300000,
  55. "producedPartitions": {},
  56. "ackedPartitions": {}
  57. },
  58. "(0,9)": {
  59. "txnId": "(0,9)",
  60. "status": "OPEN",
  61. "openTimestamp": 1658120122486,
  62. "timeoutAt": 300000,
  63. "producedPartitions": {},
  64. "ackedPartitions": {}
  65. },
  66. "(0,8)": {
  67. "txnId": "(0,8)",
  68. "status": "OPEN",
  69. "openTimestamp": 1658120122484,
  70. "timeoutAt": 300000,
  71. "producedPartitions": {},
  72. "ackedPartitions": {}
  73. },
  74. "(0,11)": {
  75. "txnId": "(0,11)",
  76. "status": "OPEN",
  77. "openTimestamp": 1658120122490,
  78. "timeoutAt": 300000,
  79. "producedPartitions": {},
  80. "ackedPartitions": {}
  81. }
  82. }

ScaleTransactionCoordinators

When the performance of transactions reaches a bottleneck due to the insufficient number of transaction coordinators, you can scale the number of the transaction coordinators in the following ways.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions scale-transactionCoordinators -r 17

GET /admin/v3/transactions/transactionCoordinator/:replicas/scaleTransactionCoordinators

  1. admin.transactions().scaleTransactionCoordinators(replicas);

Transaction stats

Get transaction metadata

The transaction metadata that can be retrieved include:

  • txnId: The ID of this transaction.
  • status: The status of this transaction.
  • openTimestamp: The open time of this transaction.
  • timeoutAt: The timeout of this transaction.
  • producedPartitions: The partitions or topics that messages have been sent to with this transaction.
  • ackedPartitions: The partitions or topics where messages have been acknowledged with this transaction.

Use one of the following ways to get your transaction metadata.

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions transaction-metadata -m 1 -l 1

GET /admin/v3/transactions/transactionMetadata/:mostSigBits/:leastSigBits/getTransactionMetadata

  1. admin.transactions().getTransactionMetadata(txnID);

The following is an example of the returned values.

  1. {
  2. "txnId" : "(1,18)",
  3. "status" : "ABORTING",
  4. "openTimestamp" : 1656592983374,
  5. "timeoutAt" : 5000,
  6. "producedPartitions" : {
  7. "my-topic" : {
  8. "startPosition" : "127:4959",
  9. "aborted" : true
  10. }
  11. },
  12. "ackedPartitions" : {
  13. "my-topic" : {
  14. "mysubName" : {
  15. "cumulativeAckPosition" : null
  16. }
  17. }
  18. }
  19. }

Get transaction stats in transaction pending ack

The transaction stats in transaction pending ack that can be retrieved include:

  • cumulativeAckPosition: The position that this transaction cumulatively acknowledges in this subscription.

Use one of the following ways to get transaction stats in pending ack:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions transaction-in-pending-ack-stats -m 1 -l 1 -t my-topic -s mysubname

GET /admin/v3/transactions/transactionInPendingAckStats/:tenant/:namespace/:topic/:subName/:mostSigBits/:leastSigBits/getTransactionInPendingAckStats

  1. admin.transactions().getTransactionInPendingAckStats(txnID, topic, subname);

The following is an example of the returned value.

  1. {
  2. "cumulativeAckPosition" : "137:49959"
  3. }

Get transaction stats in transaction buffer

The transaction stats in the transaction buffer that can be retrieved include:

  • startPosition: The start position of this transaction in the transaction buffer.
  • aborted: The flag of whether this transaction has been aborted.

Use one of the following ways to get transaction stats in transaction buffer:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions transaction-in-buffer-stats -m 1 -l 1 -t my-topic

GET /admin/v3/transactions/transactionInBufferStats/:tenant/:namespace/:topic/:mostSigBits/:leastSigBits/getTransactionInBufferStats

  1. admin.transactions().getTransactionInBufferStatsAsync(txnID, topic);

The following is an example of the returned values.

  1. {
  2. "startPosition" : "137:49759",
  3. "aborted" : false
  4. }

Transaction coordinator stats

The transaction coordinator (TC) is a module inside a Pulsar broker. It maintains the entire life cycle of transactions and handles transaction timeout.

Get coordinator stats

The transaction coordinator stats that can be retrieved include:

  • state: The state of this transaction coordinator.
  • leastSigBit:s The sequence ID of this transaction coordinator.
  • lowWaterMark: The low watermark of this transaction coordinator.
  • ongoingTxnSize: The total number of ongoing transactions in this transaction coordinator.
  • recoverStartTime: The start timestamp of transaction coordinator recovery. 0L means no startup.
  • recoverEndTime: The end timestamp of transaction coordinator recovery. 0L means no startup.

Use one of the following ways to get transaction coordinator stats:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions coordinator-stats -c 1

GET /admin/v3/transactions/coordinatorStats/getCoordinatorStats

  1. admin.transactions().getCoordinatorStatsById(coordinatorId);
  2. //Or get all coordinator stats.
  3. admin.transactions().getCoordinatorStats();

The following is an example of the returned values.

  1. {
  2. "state" : "Ready",
  3. "leastSigBits" : 1,
  4. "lowWaterMark" : 0,
  5. "ongoingTxnSize" : 0,
  6. "recoverStartTime" : 1657021892377,
  7. "recoverEndTime" : 1657021892378
  8. }

Get coordinator internal stats

The coordinator’s internal stats that can be retrieved include:

  • transactionLogStats: The stats of the transaction coordinator log.
  • managedLedgerName: The name of the managed ledger where the transaction coordinator log is stored.
  • managedLedgerInternalStats: The internal stats of the managed ledger where the transaction coordinator log is stored. See [managedLedgerInternalStats](/docs/3.0.x/admin-api-topics#get-internal-stats) for more details.

Use one of the following ways to get coordinator’s internal stats:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions coordinator-internal-stats -c 1 -m

GET /admin/v3/transactions/coordinatorInternalStats/:coordinatorId/getCoordinatorInternalStats

  1. admin.transactions().getCoordinatorInternalStats(coordinatorId, metadata);

The following is an example of the returned values.

  1. {
  2. "transactionLogStats" : {
  3. "managedLedgerName" : "pulsar/system/persistent/__transaction_log_1",
  4. "managedLedgerInternalStats" : {
  5. "entriesAddedCounter" : 3,
  6. "numberOfEntries" : 3,
  7. "totalSize" : 63,
  8. "currentLedgerEntries" : 3,
  9. "currentLedgerSize" : 63,
  10. "lastLedgerCreatedTimestamp" : "2022-06-30T18:18:05.88+08:00",
  11. "waitingCursorsCount" : 0,
  12. "pendingAddEntriesCount" : 0,
  13. "lastConfirmedEntry" : "13:2",
  14. "state" : "LedgerOpened",
  15. "ledgers" : [ {
  16. "ledgerId" : 13,
  17. "entries" : 0,
  18. "size" : 0,
  19. "offloaded" : false,
  20. "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=63, lastEntryId=2, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVsc2FyL3N5c3RlbS9wZXJzaXN0ZW50L19fdHJhbnNhY3Rpb25fbG9nXzE=, application=base64:cHVsc2Fy}}",
  21. "underReplicated" : false
  22. } ],
  23. "cursors" : {
  24. "transaction.subscription" : {
  25. "markDeletePosition" : "13:2",
  26. "readPosition" : "13:3",
  27. "waitingReadOp" : false,
  28. "pendingReadOps" : 0,
  29. "messagesConsumedCounter" : 3,
  30. "cursorLedger" : 22,
  31. "cursorLedgerLastEntry" : 1,
  32. "individuallyDeletedMessages" : "[]",
  33. "lastLedgerSwitchTimestamp" : "2022-06-30T18:18:05.932+08:00",
  34. "state" : "Open",
  35. "numberOfEntriesSinceFirstNotAckedMessage" : 1,
  36. "totalNonContiguousDeletedMessagesRange" : 0,
  37. "subscriptionHavePendingRead" : false,
  38. "subscriptionHavePendingReplayRead" : false,
  39. "properties" : { }
  40. }
  41. }
  42. }
  43. }
  44. }

Transaction pending ack stats

Pending ack maintains message acknowledgments within a transaction before a transaction completes. If a message is in the pending acknowledge state, the message cannot be acknowledged by other transactions until the message is removed from the pending acknowledge state.

Get transaction pending ack stats

The transaction pending ack state stats that can be retrieved include:

  • state: The state of this transaction coordinator.
  • lowWaterMark: The low watermark of this transaction coordinator.
  • ongoingTxnSize: The total number of ongoing transactions in this transaction coordinator.
  • recoverStartTime: The start timestamp of transaction pendingAck recovery. 0L means no startup.
  • recoverEndTime: The end timestamp of transaction pendingAck recovery. 0L means no startup.

Use one of the following ways to get transaction pending ack stats:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin.transactions()s pending-ack-stats -t my-topic -s mysubName -l

GET /admin/v3/transactions/pendingAckStats/:tenant/:namespace:/:topic:/:subName/getPendingAckStats

  1. admin.transactions().getPendingAckStats(topic, subName, lowWaterMarks)

The following is an example of the returned values.

  1. {
  2. "state" : "Ready",
  3. "lowWaterMarks" : {
  4. "1" : 0
  5. },
  6. "ongoingTxnSize" : 1,
  7. "recoverStartTime" : 1657021899202,
  8. "recoverEndTime" : 1657021899203
  9. }

Get transaction pending ack internal stats

The transaction pending ack internal stats that can be retrieved include:

  • transactionLogStats: The stats of the transaction pending ack log.
  • managedLedgerName: The name of the managed ledger where the transaction pending ack log is stored.
  • managedLedgerInternalStats: The internal stats of the managed ledger where the transaction coordinator log is stored. See [managedLedgerInternalStats](/docs/3.0.x/admin-api-topics#get-internal-stats) for more details.

Use one of the following ways to get transaction pending ack internal stats:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions pending-ack-internal-stats -t my-topic -s mysubName -m

GET /admin/v3/transactions/pendingAckInternalStats/:tenant/:namespace:/:topic:/:subName/getPendingAckInternalStats

  1. admin.transactions().getPendingAckInternalStats(topic, subName, boolean metadata);

The following is an example of the returned values.

  1. {
  2. "pendingAckLogStats" : {
  3. "managedLedgerName" : "public/default/persistent/my-topic-mysubName__transaction_pending_ack",
  4. "managedLedgerInternalStats" : {
  5. "entriesAddedCounter" : 2247,
  6. "numberOfEntries" : 2247,
  7. "totalSize" : 37212,
  8. "currentLedgerEntries" : 104,
  9. "currentLedgerSize" : 1732,
  10. "lastLedgerCreatedTimestamp" : "2022-06-30T19:02:09.746+08:00",
  11. "waitingCursorsCount" : 0,
  12. "pendingAddEntriesCount" : 52,
  13. "lastConfirmedEntry" : "64:51",
  14. "state" : "LedgerOpened",
  15. "ledgers" : [ {
  16. "ledgerId" : 56,
  17. "entries" : 2195,
  18. "size" : 36346,
  19. "offloaded" : false,
  20. "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=36346, lastEntryId=2194, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}",
  21. "underReplicated" : false
  22. }, {
  23. "ledgerId" : 64,
  24. "entries" : 0,
  25. "size" : 0,
  26. "offloaded" : false,
  27. "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=866, lastEntryId=51, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}",
  28. "underReplicated" : false
  29. } ],
  30. "cursors" : {
  31. "__pending_ack_state" : {
  32. "markDeletePosition" : "56:-1",
  33. "readPosition" : "56:0",
  34. "waitingReadOp" : false,
  35. "pendingReadOps" : 0,
  36. "messagesConsumedCounter" : 0,
  37. "cursorLedger" : 57,
  38. "cursorLedgerLastEntry" : 0,
  39. "individuallyDeletedMessages" : "[]",
  40. "lastLedgerSwitchTimestamp" : "2022-06-30T18:55:26.842+08:00",
  41. "state" : "Open",
  42. "numberOfEntriesSinceFirstNotAckedMessage" : 1,
  43. "totalNonContiguousDeletedMessagesRange" : 0,
  44. "subscriptionHavePendingRead" : false,
  45. "subscriptionHavePendingReplayRead" : false,
  46. "properties" : { }
  47. }
  48. }
  49. }
  50. }
  51. }

Get position stats in pending ack

The position stats in pending ack include:

  • PendingAck The position is in pending ack stats.
  • MarkDelete The position is already acknowledged.
  • NotInPendingAck The position is not acknowledged within a transaction.
  • PendingAckNotReady The pending ack has not been initialized.
  • InvalidPosition The position is invalid, for example, batch index > batch size.

If you want to know whether the position has been acknowledged, you can use one of the following ways to get position stats pending ack:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions position-stats-in-pending-ack -t my-topic -s mysubName -l 15 -e 6

GET /admin/v3/transactions/pendingAckStats /:tenant/:namespace:/:topic:/:subName/:ledgerId/:entryId?batchIndex=batchIndex/getPositionStatsInPendingAck

  1. admin.transactions().getPositionStatsInPendingAckAsync(topic, subName, ledgerId, entryId, lowWaterMarks);

The following is an example of the returned values.

  1. {
  2. "State" : "MarkDelete"
  3. }

Transaction buffer stats

Transaction buffer handles messages produced to a topic partition within a transaction. The messages in the transaction buffer are not visible to consumers until the transactions are committed. The messages in the transaction buffer are discarded when the transactions are aborted.

Get transaction buffer stats

The transaction buffer stats that can be retrieved include:

  • state: The state of this transaction buffer.
  • maxReadPosition: The maximum read position of this transaction buffer.
  • lastSnapshotTimestamps: The last snapshot timestamp of this transaction buffer.
  • lowWaterMarks (Optional): The low watermark details of the transaction buffer.
  • ongoingTxnSize: The total number of ongoing transactions in this transaction buffer.
  • recoverStartTime: The start timestamp of transaction buffer recovery. 0L means no startup.
  • recoverEndTime: The end timestamp of transaction buffer recovery. 0L means no startup.

Use one of the following ways to get transaction buffer stats:

  • pulsar-admin
  • REST API
  • Java
  1. pulsar-admin transactions transaction-buffer-stats -t my-topic -l

GET /admin/v3/transactions/transactionBufferStats/:tenant/:namespace:/:topic:/:subName/getTransactionBufferStats

  1. admin.transactions().getTransactionBufferStats(topic, lowWaterMarks);

The following is an example of the returned values.

  1. {
  2. "state" : "Ready",
  3. "maxReadPosition" : "38:101",
  4. "lastSnapshotTimestamps" : 1657021903534,
  5. "lowWaterMarks" : {
  6. "1" : -1,
  7. "2" : -1
  8. },
  9. "ongoingTxnSize" : 0,
  10. "recoverStartTime" : 1657021892850,
  11. "recoverEndTime" : 1657021893372
  12. }