Manage transactions
tip
This page only shows some frequently used operations.
For the latest and complete information about
Pulsar admin
, including commands, flags, descriptions, and more, see Pulsar admin docs.For the latest and complete information about
REST API
, including parameters, responses, samples, and more, see REST API doc.For the latest and complete information about
Java admin API
, including classes, methods, descriptions, and more, see Java admin API doc.
Transaction resources
GetSlowTransactions
In the production environment, there may be some long-lasting transactions that have never been completed. You can get these slow transactions that have survived over a certain time under a coordinator or all coordinators in the following ways.
- pulsar-admin
- REST API
- Java
pulsar-admin transactions slow-transactions -c 1 -t 1s
GET /admin/v3/transactions/slowTransactions/:timeout/getSlowTransactions
admin.transactions().getSlowTransactionsByCoordinatorId(coordinatorId, timeout, timeUnit)
//Or get slow transactions from all coordinators
admin.transactions().getSlowTransactions(timeout, timeUnit)
The following is an example of the returned values.
{
"(0,3)": {
"txnId": "(0,3)",
"status": "OPEN",
"openTimestamp": 1658120122474,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,2)": {
"txnId": "(0,2)",
"status": "OPEN",
"openTimestamp": 1658120122471,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,5)": {
"txnId": "(0,5)",
"status": "OPEN",
"openTimestamp": 1658120122478,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,4)": {
"txnId": "(0,4)",
"status": "OPEN",
"openTimestamp": 1658120122476,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,7)": {
"txnId": "(0,7)",
"status": "OPEN",
"openTimestamp": 1658120122482,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,10)": {
"txnId": "(0,10)",
"status": "OPEN",
"openTimestamp": 1658120122488,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,6)": {
"txnId": "(0,6)",
"status": "OPEN",
"openTimestamp": 1658120122480,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,9)": {
"txnId": "(0,9)",
"status": "OPEN",
"openTimestamp": 1658120122486,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,8)": {
"txnId": "(0,8)",
"status": "OPEN",
"openTimestamp": 1658120122484,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
},
"(0,11)": {
"txnId": "(0,11)",
"status": "OPEN",
"openTimestamp": 1658120122490,
"timeoutAt": 300000,
"producedPartitions": {},
"ackedPartitions": {}
}
}
ScaleTransactionCoordinators
When the performance of transactions reaches a bottleneck due to the insufficient number of transaction coordinators, you can scale the number of the transaction coordinators in the following ways.
- pulsar-admin
- REST API
- Java
pulsar-admin transactions scale-transactionCoordinators -r 17
GET /admin/v3/transactions/transactionCoordinator/:replicas/scaleTransactionCoordinators
admin.transactions().scaleTransactionCoordinators(replicas);
Transaction stats
Get transaction metadata
The transaction metadata that can be retrieved include:
- txnId: The ID of this transaction.
- status: The status of this transaction.
- openTimestamp: The open time of this transaction.
- timeoutAt: The timeout of this transaction.
- producedPartitions: The partitions or topics that messages have been sent to with this transaction.
- ackedPartitions: The partitions or topics where messages have been acknowledged with this transaction.
Use one of the following ways to get your transaction metadata.
- pulsar-admin
- REST API
- Java
pulsar-admin transactions transaction-metadata -m 1 -l 1
GET /admin/v3/transactions/transactionMetadata/:mostSigBits/:leastSigBits/getTransactionMetadata
admin.transactions().getTransactionMetadata(txnID);
The following is an example of the returned values.
{
"txnId" : "(1,18)",
"status" : "ABORTING",
"openTimestamp" : 1656592983374,
"timeoutAt" : 5000,
"producedPartitions" : {
"my-topic" : {
"startPosition" : "127:4959",
"aborted" : true
}
},
"ackedPartitions" : {
"my-topic" : {
"mysubName" : {
"cumulativeAckPosition" : null
}
}
}
}
Get transaction stats in transaction pending ack
The transaction stats in transaction pending ack that can be retrieved include:
- cumulativeAckPosition: The position that this transaction cumulatively acknowledges in this subscription.
Use one of the following ways to get transaction stats in pending ack:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions transaction-in-pending-ack-stats -m 1 -l 1 -t my-topic -s mysubname
admin.transactions().getTransactionInPendingAckStats(txnID, topic, subname);
The following is an example of the returned value.
{
"cumulativeAckPosition" : "137:49959"
}
Get transaction stats in transaction buffer
The transaction stats in the transaction buffer that can be retrieved include:
- startPosition: The start position of this transaction in the transaction buffer.
- aborted: The flag of whether this transaction has been aborted.
Use one of the following ways to get transaction stats in transaction buffer:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions transaction-in-buffer-stats -m 1 -l 1 -t my-topic
admin.transactions().getTransactionInBufferStatsAsync(txnID, topic);
The following is an example of the returned values.
{
"startPosition" : "137:49759",
"aborted" : false
}
Transaction coordinator stats
The transaction coordinator (TC) is a module inside a Pulsar broker. It maintains the entire life cycle of transactions and handles transaction timeout.
Get coordinator stats
The transaction coordinator stats that can be retrieved include:
- state: The state of this transaction coordinator.
- leastSigBit:s The sequence ID of this transaction coordinator.
- lowWaterMark: The low watermark of this transaction coordinator.
- ongoingTxnSize: The total number of ongoing transactions in this transaction coordinator.
- recoverStartTime: The start timestamp of transaction coordinator recovery.
0L
means no startup. - recoverEndTime: The end timestamp of transaction coordinator recovery.
0L
means no startup.
Use one of the following ways to get transaction coordinator stats:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions coordinator-stats -c 1
GET /admin/v3/transactions/coordinatorStats/getCoordinatorStats
admin.transactions().getCoordinatorStatsById(coordinatorId);
//Or get all coordinator stats.
admin.transactions().getCoordinatorStats();
The following is an example of the returned values.
{
"state" : "Ready",
"leastSigBits" : 1,
"lowWaterMark" : 0,
"ongoingTxnSize" : 0,
"recoverStartTime" : 1657021892377,
"recoverEndTime" : 1657021892378
}
Get coordinator internal stats
The coordinator’s internal stats that can be retrieved include:
- transactionLogStats: The stats of the transaction coordinator log.
- managedLedgerName: The name of the managed ledger where the transaction coordinator log is stored.
- managedLedgerInternalStats: The internal stats of the managed ledger where the transaction coordinator log is stored. See
[managedLedgerInternalStats](admin-api-topics.md#get-internal-stats)
for more details.
Use one of the following ways to get coordinator’s internal stats:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions coordinator-internal-stats -c 1 -m
GET /admin/v3/transactions/coordinatorInternalStats/:coordinatorId/getCoordinatorInternalStats
admin.transactions().getCoordinatorInternalStats(coordinatorId, metadata);
The following is an example of the returned values.
{
"transactionLogStats" : {
"managedLedgerName" : "pulsar/system/persistent/__transaction_log_1",
"managedLedgerInternalStats" : {
"entriesAddedCounter" : 3,
"numberOfEntries" : 3,
"totalSize" : 63,
"currentLedgerEntries" : 3,
"currentLedgerSize" : 63,
"lastLedgerCreatedTimestamp" : "2022-06-30T18:18:05.88+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "13:2",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 13,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=63, lastEntryId=2, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVsc2FyL3N5c3RlbS9wZXJzaXN0ZW50L19fdHJhbnNhY3Rpb25fbG9nXzE=, application=base64:cHVsc2Fy}}",
"underReplicated" : false
} ],
"cursors" : {
"transaction.subscription" : {
"markDeletePosition" : "13:2",
"readPosition" : "13:3",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 3,
"cursorLedger" : 22,
"cursorLedgerLastEntry" : 1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-06-30T18:18:05.932+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
}
}
}
}
Transaction pending ack stats
Pending ack maintains message acknowledgments within a transaction before a transaction completes. If a message is in the pending acknowledge state, the message cannot be acknowledged by other transactions until the message is removed from the pending acknowledge state.
Get transaction pending ack stats
The transaction pending ack state stats that can be retrieved include:
- state: The state of this transaction coordinator.
- lowWaterMark: The low watermark of this transaction coordinator.
- ongoingTxnSize: The total number of ongoing transactions in this transaction coordinator.
- recoverStartTime: The start timestamp of transaction pendingAck recovery.
0L
means no startup. - recoverEndTime: The end timestamp of transaction pendingAck recovery.
0L
means no startup.
Use one of the following ways to get transaction pending ack stats:
- pulsar-admin
- REST API
- Java
pulsar-admin.transactions()s pending-ack-stats -t my-topic -s mysubName -l
GET /admin/v3/transactions/pendingAckStats/:tenant/:namespace:/:topic:/:subName/getPendingAckStats
admin.transactions().getPendingAckStats(topic, subName, lowWaterMarks)
The following is an example of the returned values.
{
"state" : "Ready",
"lowWaterMarks" : {
"1" : 0
},
"ongoingTxnSize" : 1,
"recoverStartTime" : 1657021899202,
"recoverEndTime" : 1657021899203
}
Get transaction pending ack internal stats
The transaction pending ack internal stats that can be retrieved include:
- transactionLogStats: The stats of the transaction pending ack log.
- managedLedgerName: The name of the managed ledger where the transaction pending ack log is stored.
- managedLedgerInternalStats: The internal stats of the managed ledger where the transaction coordinator log is stored. See
[managedLedgerInternalStats](admin-api-topics.md#get-internal-stats)
for more details.
Use one of the following ways to get transaction pending ack internal stats:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions pending-ack-internal-stats -t my-topic -s mysubName -m
admin.transactions().getPendingAckInternalStats(topic, subName, boolean metadata);
The following is an example of the returned values.
{
"pendingAckLogStats" : {
"managedLedgerName" : "public/default/persistent/my-topic-mysubName__transaction_pending_ack",
"managedLedgerInternalStats" : {
"entriesAddedCounter" : 2247,
"numberOfEntries" : 2247,
"totalSize" : 37212,
"currentLedgerEntries" : 104,
"currentLedgerSize" : 1732,
"lastLedgerCreatedTimestamp" : "2022-06-30T19:02:09.746+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 52,
"lastConfirmedEntry" : "64:51",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 56,
"entries" : 2195,
"size" : 36346,
"offloaded" : false,
"metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=36346, lastEntryId=2194, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}",
"underReplicated" : false
}, {
"ledgerId" : 64,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=CLOSED, length=866, lastEntryId=51, digestType=CRC32C, password=OMITTED, ensembles={0=[10.20.240.119:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC9teS10b3BpYy1teXN1Yk5hbWVfX3RyYW5zYWN0aW9uX3BlbmRpbmdfYWNr, application=base64:cHVsc2Fy}}",
"underReplicated" : false
} ],
"cursors" : {
"__pending_ack_state" : {
"markDeletePosition" : "56:-1",
"readPosition" : "56:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 57,
"cursorLedgerLastEntry" : 0,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-06-30T18:55:26.842+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
}
}
}
}
Get position stats in pending ack
The position stats in pending ack include:
- PendingAck The position is in pending ack stats.
- MarkDelete The position is already acknowledged.
- NotInPendingAck The position is not acknowledged within a transaction.
- PendingAckNotReady The pending ack has not been initialized.
- InvalidPosition The position is invalid, for example, batch index > batch size.
If you want to know whether the position has been acknowledged, you can use one of the following ways to get position stats pending ack:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions position-stats-in-pending-ack -t my-topic -s mysubName -l 15 -e 6
admin.transactions().getPositionStatsInPendingAckAsync(topic, subName, ledgerId, entryId, lowWaterMarks);
The following is an example of the returned values.
{
"State" : "MarkDelete"
}
Transaction buffer stats
Transaction buffer handles messages produced to a topic partition within a transaction. The messages in the transaction buffer are not visible to consumers until the transactions are committed. The messages in the transaction buffer are discarded when the transactions are aborted.
Get transaction buffer stats
The transaction buffer stats that can be retrieved include:
- state: The state of this transaction buffer.
- maxReadPosition: The maximum read position of this transaction buffer.
- lastSnapshotTimestamps: The last snapshot timestamp of this transaction buffer.
- lowWaterMarks (Optional): The low watermark details of the transaction buffer.
- ongoingTxnSize: The total number of ongoing transactions in this transaction buffer.
- recoverStartTime: The start timestamp of transaction buffer recovery.
0L
means no startup. - recoverEndTime: The end timestamp of transaction buffer recovery.
0L
means no startup.
Use one of the following ways to get transaction buffer stats:
- pulsar-admin
- REST API
- Java
pulsar-admin transactions transaction-buffer-stats -t my-topic -l
admin.transactions().getTransactionBufferStats(topic, lowWaterMarks);
The following is an example of the returned values.
{
"state" : "Ready",
"maxReadPosition" : "38:101",
"lastSnapshotTimestamps" : 1657021903534,
"lowWaterMarks" : {
"1" : -1,
"2" : -1
},
"ongoingTxnSize" : 0,
"recoverStartTime" : 1657021892850,
"recoverEndTime" : 1657021893372
}