How-To: Enable the transactional outbox pattern

Commit a single transaction across a state store and pub/sub message broker

The transactional outbox pattern is a well known design pattern for sending notifications regarding changes in an application’s state. The transactional outbox pattern uses a single transaction that spans across the database and the message broker delivering the notification.

Developers are faced with many difficult technical challenges when trying to implement this pattern on their own, which often involves writing error-prone central coordination managers that, at most, support a combination of one or two databases and message brokers.

For example, you can use the outbox pattern to:

  1. Write a new user record to an account database.
  2. Send a notification message that the account was successfully created.

With Dapr’s outbox support, you can notify subscribers when an application’s state is created or updated when calling Dapr’s transactions API.

The diagram below is an overview of how the outbox feature works:

  1. Service A saves/updates state to the state store using a transaction.
  2. A message is written to the broker under the same transaction. When the message is successfully delivered to the message broker, the transaction completes, ensuring the state and message are transacted together.
  3. The message broker delivers the message topic to any subscribers - in this case, Service B.

Diagram showing the steps of the outbox pattern

Requirements

The outbox feature can be used with using any transactional state store supported by Dapr. All pub/sub brokers are supported with the outbox feature.

Learn more about the transactional methods you can use.

Note

Message brokers that work with the competing consumer pattern (for example, Apache Kafka) are encouraged to reduce the chances of duplicate events.

Enable the outbox pattern

To enable the outbox feature, add the following required and optional fields on a state store component:

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mysql-outbox
  5. spec:
  6. type: state.mysql
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "<CONNECTION STRING>"
  11. - name: outboxPublishPubsub # Required
  12. value: "mypubsub"
  13. - name: outboxPublishTopic # Required
  14. value: "newOrder"
  15. - name: outboxPubsub # Optional
  16. value: "myOutboxPubsub"
  17. - name: outboxDiscardWhenMissingState #Optional. Defaults to false
  18. value: false

Metadata fields

NameRequiredDefault ValueDescription
outboxPublishPubsubYesN/ASets the name of the pub/sub component to deliver the notifications when publishing state changes
outboxPublishTopicYesN/ASets the topic that receives the state changes on the pub/sub configured with outboxPublishPubsub. The message body will be a state transaction item for an insert or update operation
outboxPubsubNooutboxPublishPubsubSets the pub/sub component used by Dapr to coordinate the state and pub/sub transactions. If not set, the pub/sub component configured with outboxPublishPubsub is used. This is useful if you want to separate the pub/sub component used to send the notification state changes from the one used to coordinate the transaction
outboxDiscardWhenMissingStateNofalseBy setting outboxDiscardWhenMissingState to true, Dapr discards the transaction if it cannot find the state in the database and does not retry. This setting can be useful if the state store data has been deleted for any reason before Dapr was able to deliver the message and you would like Dapr to drop the items from the pub/sub and stop retrying to fetch the state

Additional configurations

Combining outbox and non-outbox messages on the same state store

If you want to use the same state store for sending both outbox and non-outbox messages, simply define two state store components that connect to the same state store, where one has the outbox feature and the other does not.

MySQL state store without outbox

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mysql
  5. spec:
  6. type: state.mysql
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "<CONNECTION STRING>"

MySQL state store with outbox

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: mysql-outbox
  5. spec:
  6. type: state.mysql
  7. version: v1
  8. metadata:
  9. - name: connectionString
  10. value: "<CONNECTION STRING>"
  11. - name: outboxPublishPubsub # Required
  12. value: "mypubsub"
  13. - name: outboxPublishTopic # Required
  14. value: "newOrder"

Shape the outbox pattern message

You can override the outbox pattern message published to the pub/sub broker by setting another transaction that is not be saved to the database and is explicitly mentioned as a projection. This transaction is added a metadata key named outbox.projection with a value set to true. When added to the state array saved in a transaction, this payload is ignored when the state is written and the data is used as the payload sent to the upstream subscriber.

To use correctly, the key values must match between the operation on the state store and the message projection. If the keys do not match, the whole transaction fails.

If you have two or more outbox.projection enabled state items for the same key, the first one defined is used and the others are ignored.

Learn more about default and custom CloudEvent messages.

In the following Python SDK example of a state transaction, the value of "2" is saved to the database, but the value of "3" is published to the end-user topic.

  1. DAPR_STORE_NAME = "statestore"
  2. async def main():
  3. client = DaprClient()
  4. # Define the first state operation to save the value "2"
  5. op1 = StateItem(
  6. key="key1",
  7. value=b"2"
  8. )
  9. # Define the second state operation to publish the value "3" with metadata
  10. op2 = StateItem(
  11. key="key1",
  12. value=b"3",
  13. options=StateOptions(
  14. metadata={
  15. "outbox.projection": "true"
  16. }
  17. )
  18. )
  19. # Create the list of state operations
  20. ops = [op1, op2]
  21. # Execute the state transaction
  22. await client.state.transaction(DAPR_STORE_NAME, operations=ops)
  23. print("State transaction executed.")

By setting the metadata item "outbox.projection" to "true" and making sure the key values match (key1):

  • The first operation is written to the state store and no message is written to the message broker.
  • The second operation value is published to the configured pub/sub topic.

In the following JavaScript SDK example of a state transaction, the value of "2" is saved to the database, but the value of "3" is published to the end-user topic.

  1. const { DaprClient, StateOperationType } = require('@dapr/dapr');
  2. const DAPR_STORE_NAME = "statestore";
  3. async function main() {
  4. const client = new DaprClient();
  5. // Define the first state operation to save the value "2"
  6. const op1 = {
  7. operation: StateOperationType.UPSERT,
  8. request: {
  9. key: "key1",
  10. value: "2"
  11. }
  12. };
  13. // Define the second state operation to publish the value "3" with metadata
  14. const op2 = {
  15. operation: StateOperationType.UPSERT,
  16. request: {
  17. key: "key1",
  18. value: "3",
  19. metadata: {
  20. "outbox.projection": "true"
  21. }
  22. }
  23. };
  24. // Create the list of state operations
  25. const ops = [op1, op2];
  26. // Execute the state transaction
  27. await client.state.transaction(DAPR_STORE_NAME, ops);
  28. console.log("State transaction executed.");
  29. }
  30. main().catch(err => {
  31. console.error(err);
  32. });

By setting the metadata item "outbox.projection" to "true" and making sure the key values match (key1):

  • The first operation is written to the state store and no message is written to the message broker.
  • The second operation value is published to the configured pub/sub topic.

In the following .NET SDK example of a state transaction, the value of "2" is saved to the database, but the value of "3" is published to the end-user topic.

  1. public class Program
  2. {
  3. private const string DAPR_STORE_NAME = "statestore";
  4. public static async Task Main(string[] args)
  5. {
  6. var client = new DaprClientBuilder().Build();
  7. // Define the first state operation to save the value "2"
  8. var op1 = new StateTransactionRequest(
  9. key: "key1",
  10. value: Encoding.UTF8.GetBytes("2"),
  11. operationType: StateOperationType.Upsert
  12. );
  13. // Define the second state operation to publish the value "3" with metadata
  14. var metadata = new Dictionary<string, string>
  15. {
  16. { "outbox.projection", "true" }
  17. };
  18. var op2 = new StateTransactionRequest(
  19. key: "key1",
  20. value: Encoding.UTF8.GetBytes("3"),
  21. operationType: StateOperationType.Upsert,
  22. metadata: metadata
  23. );
  24. // Create the list of state operations
  25. var ops = new List<StateTransactionRequest> { op1, op2 };
  26. // Execute the state transaction
  27. await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, ops);
  28. Console.WriteLine("State transaction executed.");
  29. }
  30. }

By setting the metadata item "outbox.projection" to "true" and making sure the key values match (key1):

  • The first operation is written to the state store and no message is written to the message broker.
  • The second operation value is published to the configured pub/sub topic.

In the following Java SDK example of a state transaction, the value of "2" is saved to the database, but the value of "3" is published to the end-user topic.

  1. public class Main {
  2. private static final String DAPR_STORE_NAME = "statestore";
  3. public static void main(String[] args) {
  4. try (DaprClient client = new DaprClientBuilder().build()) {
  5. // Define the first state operation to save the value "2"
  6. StateOperation<String> op1 = new StateOperation<>(
  7. StateOperationType.UPSERT,
  8. "key1",
  9. "2"
  10. );
  11. // Define the second state operation to publish the value "3" with metadata
  12. Map<String, String> metadata = new HashMap<>();
  13. metadata.put("outbox.projection", "true");
  14. StateOperation<String> op2 = new StateOperation<>(
  15. StateOperationType.UPSERT,
  16. "key1",
  17. "3",
  18. metadata
  19. );
  20. // Create the list of state operations
  21. List<StateOperation<?>> ops = new ArrayList<>();
  22. ops.add(op1);
  23. ops.add(op2);
  24. // Execute the state transaction
  25. client.executeStateTransaction(DAPR_STORE_NAME, ops).block();
  26. System.out.println("State transaction executed.");
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }

By setting the metadata item "outbox.projection" to "true" and making sure the key values match (key1):

  • The first operation is written to the state store and no message is written to the message broker.
  • The second operation value is published to the configured pub/sub topic.

In the following Go SDK example of a state transaction, the value of "2" is saved to the database, but the value of "3" is published to the end-user topic.

  1. ops := make([]*dapr.StateOperation, 0)
  2. op1 := &dapr.StateOperation{
  3. Type: dapr.StateOperationTypeUpsert,
  4. Item: &dapr.SetStateItem{
  5. Key: "key1",
  6. Value: []byte("2"),
  7. },
  8. }
  9. op2 := &dapr.StateOperation{
  10. Type: dapr.StateOperationTypeUpsert,
  11. Item: &dapr.SetStateItem{
  12. Key: "key1",
  13. Value: []byte("3"),
  14. // Override the data payload saved to the database
  15. Metadata: map[string]string{
  16. "outbox.projection": "true",
  17. },
  18. },
  19. }
  20. ops = append(ops, op1, op2)
  21. meta := map[string]string{}
  22. err := testClient.ExecuteStateTransaction(ctx, store, meta, ops)

By setting the metadata item "outbox.projection" to "true" and making sure the key values match (key1):

  • The first operation is written to the state store and no message is written to the message broker.
  • The second operation value is published to the configured pub/sub topic.

You can pass the message override using the following HTTP request:

  1. curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "operations": [
  5. {
  6. "operation": "upsert",
  7. "request": {
  8. "key": "order1",
  9. "value": {
  10. "orderId": "7hf8374s",
  11. "type": "book",
  12. "name": "The name of the wind"
  13. }
  14. }
  15. },
  16. {
  17. "operation": "upsert",
  18. "request": {
  19. "key": "order1",
  20. "value": {
  21. "orderId": "7hf8374s"
  22. },
  23. "metadata": {
  24. "outbox.projection": "true"
  25. },
  26. "contentType": "application/json"
  27. }
  28. }
  29. ]
  30. }'

By setting the metadata item "outbox.projection" to "true" and making sure the key values match (key1):

  • The first operation is written to the state store and no message is written to the message broker.
  • The second operation value is published to the configured pub/sub topic.

Override Dapr-generated CloudEvent fields

You can override the Dapr-generated CloudEvent fields on the published outbox event with custom CloudEvent metadata.

  1. async def execute_state_transaction():
  2. async with DaprClient() as client:
  3. # Define state operations
  4. ops = []
  5. op1 = {
  6. 'operation': 'upsert',
  7. 'request': {
  8. 'key': 'key1',
  9. 'value': b'2', # Convert string to byte array
  10. 'metadata': {
  11. 'cloudevent.id': 'unique-business-process-id',
  12. 'cloudevent.source': 'CustomersApp',
  13. 'cloudevent.type': 'CustomerCreated',
  14. 'cloudevent.subject': '123',
  15. 'my-custom-ce-field': 'abc'
  16. }
  17. }
  18. }
  19. ops.append(op1)
  20. # Execute state transaction
  21. store_name = 'your-state-store-name'
  22. try:
  23. await client.execute_state_transaction(store_name, ops)
  24. print('State transaction executed.')
  25. except Exception as e:
  26. print('Error executing state transaction:', e)
  27. # Run the async function
  28. if __name__ == "__main__":
  29. asyncio.run(execute_state_transaction())
  1. const { DaprClient } = require('dapr-client');
  2. async function executeStateTransaction() {
  3. // Initialize Dapr client
  4. const daprClient = new DaprClient();
  5. // Define state operations
  6. const ops = [];
  7. const op1 = {
  8. operationType: 'upsert',
  9. request: {
  10. key: 'key1',
  11. value: Buffer.from('2'),
  12. metadata: {
  13. 'id': 'unique-business-process-id',
  14. 'source': 'CustomersApp',
  15. 'type': 'CustomerCreated',
  16. 'subject': '123',
  17. 'my-custom-ce-field': 'abc'
  18. }
  19. }
  20. };
  21. ops.push(op1);
  22. // Execute state transaction
  23. const storeName = 'your-state-store-name';
  24. const metadata = {};
  25. }
  26. executeStateTransaction();
  1. public class StateOperationExample
  2. {
  3. public async Task ExecuteStateTransactionAsync()
  4. {
  5. var daprClient = new DaprClientBuilder().Build();
  6. // Define the value "2" as a string and serialize it to a byte array
  7. var value = "2";
  8. var valueBytes = JsonSerializer.SerializeToUtf8Bytes(value);
  9. // Define the first state operation to save the value "2" with metadata
  10. // Override Cloudevent metadata
  11. var metadata = new Dictionary<string, string>
  12. {
  13. { "cloudevent.id", "unique-business-process-id" },
  14. { "cloudevent.source", "CustomersApp" },
  15. { "cloudevent.type", "CustomerCreated" },
  16. { "cloudevent.subject", "123" },
  17. { "my-custom-ce-field", "abc" }
  18. };
  19. var op1 = new StateTransactionRequest(
  20. key: "key1",
  21. value: valueBytes,
  22. operationType: StateOperationType.Upsert,
  23. metadata: metadata
  24. );
  25. // Create the list of state operations
  26. var ops = new List<StateTransactionRequest> { op1 };
  27. // Execute the state transaction
  28. var storeName = "your-state-store-name";
  29. await daprClient.ExecuteStateTransactionAsync(storeName, ops);
  30. Console.WriteLine("State transaction executed.");
  31. }
  32. public static async Task Main(string[] args)
  33. {
  34. var example = new StateOperationExample();
  35. await example.ExecuteStateTransactionAsync();
  36. }
  37. }
  1. public class StateOperationExample {
  2. public static void main(String[] args) {
  3. executeStateTransaction();
  4. }
  5. public static void executeStateTransaction() {
  6. // Build Dapr client
  7. try (DaprClient daprClient = new DaprClientBuilder().build()) {
  8. // Define the value "2"
  9. String value = "2";
  10. // Override CloudEvent metadata
  11. Map<String, String> metadata = new HashMap<>();
  12. metadata.put("cloudevent.id", "unique-business-process-id");
  13. metadata.put("cloudevent.source", "CustomersApp");
  14. metadata.put("cloudevent.type", "CustomerCreated");
  15. metadata.put("cloudevent.subject", "123");
  16. metadata.put("my-custom-ce-field", "abc");
  17. // Define state operations
  18. List<StateOperation<?>> ops = new ArrayList<>();
  19. StateOperation<String> op1 = new StateOperation<>(
  20. StateOperationType.UPSERT,
  21. "key1",
  22. value,
  23. metadata
  24. );
  25. ops.add(op1);
  26. // Execute state transaction
  27. String storeName = "your-state-store-name";
  28. daprClient.executeStateTransaction(storeName, ops).block();
  29. System.out.println("State transaction executed.");
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  1. func main() {
  2. // Create a Dapr client
  3. client, err := dapr.NewClient()
  4. if err != nil {
  5. log.Fatalf("failed to create Dapr client: %v", err)
  6. }
  7. defer client.Close()
  8. ctx := context.Background()
  9. store := "your-state-store-name"
  10. // Define state operations
  11. ops := make([]*dapr.StateOperation, 0)
  12. op1 := &dapr.StateOperation{
  13. Type: dapr.StateOperationTypeUpsert,
  14. Item: &dapr.SetStateItem{
  15. Key: "key1",
  16. Value: []byte("2"),
  17. // Override Cloudevent metadata
  18. Metadata: map[string]string{
  19. "cloudevent.id": "unique-business-process-id",
  20. "cloudevent.source": "CustomersApp",
  21. "cloudevent.type": "CustomerCreated",
  22. "cloudevent.subject": "123",
  23. "my-custom-ce-field": "abc",
  24. },
  25. },
  26. }
  27. ops = append(ops, op1)
  28. // Metadata for the transaction (if any)
  29. meta := map[string]string{}
  30. // Execute state transaction
  31. err = client.ExecuteStateTransaction(ctx, store, meta, ops)
  32. if err != nil {
  33. log.Fatalf("failed to execute state transaction: %v", err)
  34. }
  35. log.Println("State transaction executed.")
  36. }
  1. curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "operations": [
  5. {
  6. "operation": "upsert",
  7. "request": {
  8. "key": "key1",
  9. "value": "2"
  10. }
  11. },
  12. ],
  13. "metadata": {
  14. "id": "unique-business-process-id",
  15. "source": "CustomersApp",
  16. "type": "CustomerCreated",
  17. "subject": "123",
  18. "my-custom-ce-field": "abc",
  19. }
  20. }'

Note

The data CloudEvent field is reserved for Dapr’s use only, and is non-customizable.

Demo

Watch this video for an overview of the outbox pattern:

Last modified October 11, 2024: Fixed typo (#4389) (fe17926)