VStream
Change event streams
Vitess Gateways (vtgate
) provide a VStream service that allows clients to subscribe to a change event stream for a set of tables.
Use Cases
- Change Data Capture (CDC):
VStream
can be used to capture changes to a table and send them to a downstream system. This is useful for building real-time data pipelines.
Overview
VStream
supports copying the current contents of a table — as you will often not have the binary logs going back to the creation of the table — and then begin streaming new changes to the table from that point on. It also supports resuming this initial copy phase if it’s interrupted for any reason.
Events in the stream are MySQL row based binary log events — with extended metadata — and can be processed by event bridges which support Vitess such as Debezium. Other products such as AirByte can also be used with custom Vitess connectors.
We recommend Debezium as it has native Vitess support and has been used in production environments by many Vitess users.
API Details
VStream is a gRPC that is part of the vtgate service and is accessible via a vtgate process’s --grpc_port
.
RPC Parameters
Context
Type Context
Required
Default none
In addition to the typical Context
usage, it can contain a custom key-value pair where the key is 1
and the value is a CallerID. This value is then passed along to tablets to identify the originating client for the request. It is not meant to be secure, but primarily informational. The client can provide whatever info they want in the CallerID fields and they will be trusted by the servers as this information is primarily used to aid in monitoring and debugging. The vtgate propagates the value to the source vttablet processes and the tablets may use this information for various monitoring, metrics, and logging purposes. It can, however, also be used for other purposes such as denying the client access to tables during a migration (MoveTables or Reshard).
TabletType
Type TabletType
Required
Default UNKNOWN (you must specify a valid type)
The tablet type to use when selecting stream source tablets.
VGtid
Type VGtid
Required
The keyspace, shard, and GTID position list to start streaming from. If no ShardGtid.Gtid
value is provided then a table copy phase will be initiated for the tables matched by the provided filter on the given shard.
If the ShardGtid.Shard
value is omitted, this means that all shards in the keyspace specified in the ShardGtid.Keyspace
value are included. Additionally, if the ShardGtid.Keyspace
value has a /
prefix, you can use regular expressions such as /.*
to include all keyspaces.
Filter
Type Filter
Required
The tables which you want to subscribe to change events from — in the given keyspace(s) and shard(s) contained in the provided VGtid — and any query predicates to use when filtering the rows for which change events will be generated.
VStreamFlags
MinimizeSkew
Type bool
Default false
When enabled the vtgate
will keep the events in the stream roughly time aligned — it is aggregating streams coming from each of the shards involved — using the event timestamps to ensure the maximum time skew between the source tablet shard streams is kept under 10 minutes. When it detects skew between the source streams it will pause sending the client more events and allow the lagging shard(s) to catch up.
There is no strict ordering of events across shards and the client will need to examine the event timestamps.
HeartbeatInterval
Type unsigned integer
Default 0 (none)
How frequently, in seconds, to send heartbeat events to the client when there are no other events in the stream to send.
StopOnReshard
Type bool
Default false
When enabled the vtgate
will send a reshard event to the client along with an EOF
error
in the VStreamReader.Recv response and stop sending any further events.
Cells
Type string
Default “”
If specified, these cells (comma-separated list) are used when selecting stream source tablets. When no value is specified the vtgate
will default to looking for source tablets within its own local cell.
CellPreference
Type string
Default “”
If specified, this determines which cells to give preference to during tablet selection. By default, preferlocalwithalias
is used in order to give preference to the caller’s local cell and then any alias its cell belongs to. If onlyspecified
is given, then only tablets within the specified Cells
field value will be considered.
TabletOrder
Type string
Default “”
This replaces the in_order
hint (e.g. "in_order:REPLICA,PRIMARY"
) previously used to specify tablet type order during source tablet selection.
RPC Response
The VStream gRPC returns a VStreamReader and a non-nil error
if the stream could not be initialized. You would call the Recv
method on that VStreamReader in a for loop and responses will be sent when available. Each response consisting of the following two parameters:
- An array of VEvent objects — the new messages to process in the stream
- An
error
— an error that, if non-nil, indicates the stream has been closed (EOF
) or an error occurred
API Types
Example Usage
You can find a full example go client here.
Below is a snippet showing how to use the VStream
API in go:
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
if err != nil {
t.Fatal(err)
}
defer gconn.Close()
// lastPK is id1=4
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}
var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: "MySQL56/89f66ef2-863a-11ed-9bdf-3d270fd3f552:1-30219"
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: "MySQL56/2174b383-5441-11e8-b90a-c80aa9429562:1-29516,24da167-0c0c-11e8-8442-00059a3c7b00:1-19"
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
...
Copy All Tables From All Shards in the ks
Keyspace
Below is a snippet in Go that demonstrates how to copy from all shards by omitting ShardGtid.Shard
:
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "ks",
Gtid: "",
}},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
Copy All Tables From All Shards in All Keyspaces
Below is a snippet in Go that demonstrates how to copy from all keyspaces by specifying /.*
as the value for ShardGtid.Keyspace
:
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*",
Gtid: "",
}},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
Copying from all keyspaces can generate a significant amount of load and potentially impact production traffic. Therefore, please exercise caution when using regular expressions in production.
Debugging
There is also an SQL interface that can be used for testing and debugging from a vtgate
. Here’s an example:
$ mysql --quick <vtgate params>
mysql> SET WORKLOAD=OLAP;
mysql> VSTREAM * FROM commerce.corder\G
*************** 1. row ***************
op: +
order_id: 1
customer_id: 1
sku: NULL
price: 10
************** 2. row ***************
op: *
order_id: 1
customer_id: 1
sku: NULL
price: 7
************** 3. row ***************
op: -
order_id: 1
customer_id: 1
sku: NULL
price: 7
…
Monitoring
VTGates publish vstream metrics listed here.
More Reading
- VStream Copy
- VStream API and Resharding
- VStream Skew Minimization
- Debezium Connector for Vitess
- Blog posts