VStream Skew Minimization

Aligning streams from different shards in the VStream API

VStream Skew Detection

Motivation

When the VStream API is streaming from multiple shards we have multiple sources of events: one PRIMARY or REPLICA tablet for each shard in the provided VGTID. The rate at which the events will be streamed from the underlying sources can vary depending on various factors, such as:

  • The replication lag on the source tablets (if a REPLICA tablet is selected as the source for the vstream)
  • The CPU load on the source tablet
  • Possible network partitions or network delays

This can result in the events in the vstream from some shards being well ahead of other shards. So, for example, if a row moves from the faster shard to a slower shard we might see the DELETE event in the vstream from the faster shard long before the INSERT from the second. This would result in the row going “invisible” for the duration of the skew. This can affect the user experience in applications where the vstream events are used to refresh a UI, for example.

For most applications where VStream API events feed into change data capture systems for auditing or reporting purposes these delays may be acceptable. However, for applications which are using these events for user-facing functions this can cause unexpected behavior. See https://github.com/vitessio/vitess/issues/7402 for one such case.

Goal

It is not practically possible to provide exact ordering of events across Vitess shards. The VStream API will inherently stream events from one shard independently of another. However, vstream events (VEvent) do keep track of the binlog event timestamps which we can use to loosely coordinate the streams. Since binlog timestamp granularity is only to the nearest second, and we attempt to align the streams to within a second.

Implementation

The skew minimization feature adds a MinimizeSkew flag that the client can set. This flag enables skew detection between the various streams. Once a skew is detected, events for streams that are ahead are held back until the lagging streams catch up causing the skew to reach an acceptable level.

Each vstream event (VEvent) contains two timestamps: one when the database transaction occurred, and the other the current time on the source tablet where the VEvent was created. This lets us compute how far in the past the event we just received was created. We use this to determine which shard has the most recent event and which one has the oldest. Note that for shards where there is no activity, the vstreamer sends a heartbeat event every second and the transaction time for a heartbeat is the same as the current time on the source. (These heartbeats are not forwarded to clients in the vstream since they are synthetic/internal VReplication events.)

If the difference between the fastest and slowest streams is greater than a threshold, we declare that we have detected a skew. MySQL binlogs store the transaction timestamp in seconds. Also, on the vtgate serving the vstream, we adjust this time for clock skews between the vtgate and the source tablet’s mysqld server. When the user sets the MinimizeSkew flag we want to keep the events across shards within the same second: each transaction timestamp is within 1 second of each other. To account for rounding-off of the transaction timestamp and the clock-skew we set the threshold to be 2 seconds, instead of 1 second, so that we don’t keep stalling the streams due to cumulative round-offs.

Possible Unexpected Behavior

If there are no events for a second in a shard then a heartbeat is sent. On receiving a heartbeat we reset the skew. This is necessary to avoid shards with no events starving other shards. The current logic will align streams only if they are all getting events faster than the heartbeat frequency.

This means that we cannot guarantee the skew alignment feature will work as expected in certain conditions. This could happen mainly while streaming from REPLICA tablets with high replication lag, say, due to high write QPS or a network partition.

Thus it is recommended that you stream from PRIMARY tablets when using the VStream feature. Note, however, that even PRIMARY tablets with skewed loads could potentially trigger such a situation.

API

This is how you would turn on the skew detection and alignment feature in a VStream client:

  1. import vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
  2. ...
  3. ...
  4. flags := &vtgatepb.VStreamFlags{};
  5. flags.MinimizeSkew = true;
  6. reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)