Life of a Stream
How VReplication replicates data
Introduction
When a VReplication workflow runs, data is copied from source to target shards. Each target PRIMARY
tablet runs one vreplication stream (vstream
) for each source shard that the target’s keyrange overlaps with.
The diagram below outlines how one such stream operates. VReplication can be asked to start from a specific GTID or from the start. When starting from a GTID the replication mode is used where it streams events from the binlog.
Full Table Copy
If the entire table data is requested then the simple streaming done by the replication mode can create an avalanche of events (think 100s of millions of rows). Moreover, and more importantly, it is highly likely that necesasry older binlogs are no longer available.
So a copy/catchup mode is initiated first: data in the tables are copied over in a consistent manner using bulk inserts. Once we have copied enough data so that we are close enough to the current position (when replication lag is low) it switches over to, and forever stays in, the replication mode. All future replication is done only by streaming binlog events.
While we may have multiple database sources in a workflow each vstream
has just one source and one target. The source is always a vttablet
(and hence one mysqld
instance). The target could be another vttablet
(when resharding) or a streaming gRPC response (for vtgate vstream API clients).
Transformation and Filtering
Note that for all steps the data selected from the source will only be from the tables specified in the Match field of the Rule specification of the VReplication workflow. Furthermore, if a Filter is specified for a table it will be applied before being sent to the target. Columns may also be transformed based on the Filter’s SELECT
clause.
Source and Sink
Each stream has two actors: the target initiates streaming by making gRPC calls to the source tablet and the source tablet sources the data by connecting to its underlying mysqld
server as a replica (while replicating) or using SQL queries (in the copy phase) and streams it to the target. The target takes appropriate action: in case of resharding it will convert the events into CRUD SQL statements and apply them to the target database. In case of vtgate vstream API clients the events are forwarded by vtgate
to the client.
Note that the target always pulls data. If the source pushes data, there are chances of buffer overruns if the target is not able to process them in time. For example, in resharding workflows we need to convert the events to SQL INSERT
statements and execute them on the target’s mysqld instance, which are usually much slower than just selecting data on the source.
Modes, in Detail
Replicate
This is the easiest to understand. The source stream just acts like a MySQL replica and processes events as they are received. Events, after any necessary filtering and transformation, are sent to the target. Replication runs continuously with short sleeps when there are no more events to source. Periodic heartbeats are sent to the target to signal liveness. You will see this reflected with the Running
state for the workflow.
Initialize
Initialize is called at the start of the copy phase. For each table to be copied an entry is created in the internal _vt.copy_state
table with a null primary key (PK). As each table copy is completed the related entries are deleted and when there are no more entries for this workflow the copy phase is considered complete and the workflow moves into the replication mode which you will see reflected with the Running
state for the workflow.
Copy
Copy works on one table at a time. The source selects a set of rows from the table, for primary keys greater than the ones copied so far, using a consistent snapshot. This results in a stream of rows to be sent to the target which generates a bulk INSERT
for these rows. You will see this reflected with the Copying
state for the workflow.
However, there are a couple of factors which complicate our story:
- Each copy selects all rows until the current position of the binlog, but,
- Since transactions continue to be applied (presuming the database is online) the GTID position is continuously moving forward
Consider this example:
We have two tables X
and Y
. Each table has 20 rows and we copy 10 rows at a time. (The queries below simplified for readability).
The queries for the copy phase of X
will be:
T1: select * from X where pk > 0 limit 10; GTID: 100, Last PK 10
send rows to target
T2: select * from X where pk > 10 limit 10; GTID: 110, Last PK 20
send rows to target
There is a gotcha here: onsider that there are 10 new transactions or GTIDs between times T1 and T2. Some of these can potentially modify the rows returned from the query at T1. Hence if we just return the rows from T2 (which have only rows from PK 11 to 20) then we will have an inconsistent state on the target: the updates to rows with PK between 1 and 10 will not be present.
This means that we need to first stream the events between GTIDs 100 and 110 for primary keys between 1 and 10 first and then do the second select:
T1: select * from X where pk > 0 limit 10; GTID: 100, Last PK 10
send rows to target
T2: replicate from 100 to current position (110 from previous example),
only pass events for pks 1 to 10 of X
T3: select * from X where pk > 10 limit 10; GTID: 112, Last PK 20
send rows to target
Another gotcha: note that at time T3 when we selected the PKs from 11 to 20 the GTID position could have moved further! This could be due to transactions that were applied between T2 and T3. So if we just applied the rows from T3 we would still have an inconsistent state, if transactions 111 and 112 affected the rows from pks 1 to 10.
This leads us to the following flow:
T1: select * from X where pk > 0 limit 10; GTID: 100, Last PK 10
send rows to target
T2: replicate from 100 to current position (110 from previous example),
only pass events for pks 1 to 10
T3: select * from X where pk > 10 limit 10; GTID: 112, Last PK 20
T4: replicate from 111 to 112
only pass events for pks 1 to 10
T5: Send rows for pks 11 to 20 to target
This flow actually works and is the one used in Vitess VReplication!
The transactions to be applied at T1 can take a long time (due to the bulk inserts). T3 (which is just a snapshot) is quick. So the position can diverge much more at T2 than at T4. Hence, we call step T2 “Catchup” and step T4 “Fast Forward”.
Catchup
As detailed above the catchup phase runs between copy phase cycles (time limited by the vreplication_copy_phase_max_duration flag). During the copy phase the GTID position can move significantly ahead. So we run a catchup and fast-forward phase until we come close to the current position — i.e. the replication lag is small. At that point we execute another Copy cycle.
Fast Forward
During the copy phase we first take a snapshot. Then we fast-forward: we replicate from the gtid position where we stopped the Catchup to the position of the new snapshot.
Finally once we have finished copying all the tables we proceed to the replicate or Running
phase until our job is done: for example if we have resharded and switched over the reads and writes to the new shards or when the vstream API client closes its connection.