VReplication Operator’s Guide

VReplication is a core component of vitess that can be used to compose many features. It can be used for the following use cases:

  • Resharding: Legacy workflows of vertical and horizontal resharding. New workflows of resharding from an unsharded to a sharded keyspace and vice-versa. Resharding from an unsharded to an unsharded keyspace using a different vindex than the source keyspace.
  • Materialized Views: You can specify a materialization rule that creates a view of the source table into a target keyspace. This materialization can use a different primary vindex than the source. It can also materialize a subset of the source columns, or add new expressions from the source. This view will be kept up-to-date in real time. One can also materialize reference tables onto all shards and have vitess perform efficient local joins with those materialized tables.
  • Realtime rollups; The materialization expression can include aggregation expressions in which case, vitess will create a rolled up version of the source table which can be used for realtime analytics.
  • Backfilling lookup vindexes: VReplication can be used to backfill a newly created lookup vindex. Workflows can be built to manage the switching from a backfill mode to the vindex itself keeping it up-to-date.
  • Schema deployment: We can use VReplication to recreate the workflow performed by gh-ost and thereby support zero-downtime schema deployments in vitess natively.
  • Data migration: VReplication can be setup to migrate data from an existing system into vitess. The replication could also be reversed after a cutover giving you the option to rollback a migration if something went wrong.
  • Change notification: The streamer component of VReplication can be used for the application or a systems operator to subscribe to change notification and use it to keep downstream systems up-to-date with the source.

The VReplication feature itself is a fairly low level one that is expected to be used as a building block for the above use cases. However, it’s still possible to directly issue commands to do some of the activities.

Feature description

VReplication works as a stream or combination of streams. Each stream establishes a replication from a source keyspace/shard into a target keyspace/shard.

A given stream can replicate multiple tables. For each table, you can specify a select statement that represents both the transformation rule and the filtering rule. The select expressions specify the transformation, and the where clause specifies the filtering.

The select expressions can be any non-aggregate mysql expression, or they can also be count or sum as aggregate expressions. Aggregate expressions combined with the corresponding group by clauses will allow you to materialize real-time rollups of the source table, which can be used for analytics. The target table can have a different name from the source.

For a sharded system like vitess, multiple VReplication streams may be needed to achieve the necessary goals. This is because there will be multiple source shards as well as destination shards, and the relationship between them may not be one to one.

VReplication performs the following essential functions:

  • Copy data from the source to the destination table in a consistent fashion. For large data, this copy can be long-running. It can be interrupted and resumed. If interrupted, VReplication can keep the copied portion up-to-date with respect to the source, and it can resume the copy process at a point that’s consistent with the current replication position.
  • After copying is finished, it can continuously replicate the data from the source to destination.
  • The copying rule can be expressed as a select statement. The statement should be simple enough that the materialized table can be kept up-to-date from the data coming from the binlog. For example, joins are not supported.
  • Correctness verification (to be implemented): VReplication can verify that the target table is an exact representation of the select statement from the source by capturing consistent snapshots of the source and target and comparing them against each other. This step can be done without the need to create special snapshot replicas.
  • Journaling (to be implemented): If there is any kind of traffic cut-over where we start writing to a different table than we used to before, VReplication will save the current binlog positions into a journal table. This can be used by other streams to resume replication from the new source.
  • Routing rules: Although this feature is itself not a direct functionality of VReplication, it works hand in hand with it. It allows you to specify sophisticated rules about where to route queries depending on the type of workflow being performed. For example, it can be used to control the cut-over during resharding. In the case of materialized views, it can be used to establish equivalence of tables, which will allow VTGate to compute the most optimal plans given the available options.

VReplicationExec

The VReplicationExec command is used to manage vreplication streams. The commands are issued as SQL statements. For example, a select can be used to see the current list of streams. An insert can be used to create one, etc. By design, the metadata for vreplication streams are stored in a vreplication table in the vt database. VReplication uses the ‘pull’ model. This means that a stream is created on the target side, and the target pulls the data by finding the appropriate source.

The table schema is as follows:

  1. CREATE TABLE _vt.vreplication (
  2. id INT AUTO_INCREMENT,
  3. workflow VARBINARY(1000),
  4. source VARBINARY(10000) NOT NULL,
  5. pos VARBINARY(10000) NOT NULL,
  6. stop_pos VARBINARY(10000) DEFAULT NULL,
  7. max_tps BIGINT(20) NOT NULL,
  8. max_replication_lag BIGINT(20) NOT NULL,
  9. cell VARBINARY(1000) DEFAULT NULL,
  10. tablet_types VARBINARY(100) DEFAULT NULL,
  11. time_updated BIGINT(20) NOT NULL,
  12. transaction_timestamp BIGINT(20) NOT NULL,
  13. state VARBINARY(100) NOT NULL,
  14. message VARBINARY(1000) DEFAULT NULL,
  15. db_name VARBINARY(255) NOT NULL,
  16. PRIMARY KEY (id)
  17. )

The fields are explained in the following section.

This is the syntax of the command:

  1. VReplicationExec [-json] <tablet alias> <sql command>

Here’s an example of the command to list all existing streams for a given tablet.

  1. lvtctl.sh VReplicationExec 'tablet-100' 'select * from _vt.vreplication'

Creating a stream

It’s generally easier to send the VReplication command programmatically instead of a bash script. This is because of the number of nested encodings involved:

  • One of the arguments is an SQL statement, which can contain quoted strings as values.
  • One of the strings in the SQL statement is a string encoded protobuf, which can contain quotes.
  • One of the parameters within the protobuf is an SQL select expression for the materialized view.

However, you can use vreplgen.go to generate a fully escaped bash command.

Alternately, you can use a python program. Here’s an example:

  1. cmd = [
  2. './lvtctl.sh',
  3. 'VReplicationExec',
  4. 'test-200',
  5. """insert into _vt.vreplication
  6. (db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values
  7. ('vt_keyspace', 'keyspace:"lookup" shard:"0" filter:<rules:<match:"uproduct" filter:"select * from product" > >', '', 99999, 99999, 'master', 0, 0, 'Running')""",
  8. ]

The first argument to the command is the master tablet id of the target keyspace/shard.

The second argument is the SQL command. To start a new stream, you need an insert statement. The parameters are as follows:

  • db_name: This name must match the name of the mysql database. In the future, this will not be required, and will be automatically filled in by the vttablet.
  • source: The protobuf representation of the stream source, explained below.
  • pos: For a brand new stream, this should be empty. To start from a specific position, a flavor-encoded position must be specified. A typical position would look like this MySQL56/ac6c45eb-71c2-11e9-92ea-0a580a1c1026:1-1296.
  • max_tps: 99999, reserved.
  • max_replication_lag: 99999, reserved.
  • tablet_types: specifies a comma separated list of tablet types to replicate from. If empty, the default tablet type specified by the -vreplication_tablet_type command line flag is used.
  • time_updated: 0, reserved.
  • transaction_timestamp: 0, reserved.
  • state: ‘Running’ or ‘Stopped’.
  • cell: is an optional parameter that specifies the cell from which the stream can be sourced.

The source field

The source field is a proto-encoding of the following structure:

  1. message BinlogSource {
  2. // the source keyspace
  3. string keyspace = 1;
  4. // the source shard
  5. string shard = 2;
  6. // list of filtering rules
  7. Filter filter = 6;
  8. // what to do if a DDL is encountered
  9. OnDDLAction on_ddl = 7;
  10. }
  11. message Filter {
  12. repeated Rule rules = 1;
  13. }
  14. message Rule {
  15. // match can be a table name or a regular expression
  16. // delineated by '/' and '/'.
  17. string match = 1;
  18. // filter can be an empty string or keyrange if the match
  19. // is a regular expression. Otherwise, it must be a select
  20. // query.
  21. string filter = 2;
  22. }
  23. enum OnDDLAction {
  24. IGNORE = 0;
  25. STOP = 1;
  26. EXEC = 2;
  27. EXEC_IGNORE = 3;
  28. }

Here are some examples of proto encodings:

  1. keyspace:"lookup" shard:"0" filter:<rules:<match:"uproduct" filter:"select * from product" > >

Meaning: replicate all columns and rows of product from lookup/0.product into the uproduct table in target keyspace.

  1. keyspace:"user" shard:"-80" filter:<rules:<match:"morder" filter:"select * from uorder where in_keyrange(mname, \\'unicode_loose_md5\\', \\'-80\\')" > >

The double-backslash for the strings inside the select will first be escaped by the python script, which will cause the expression to internally be \'unicode_loose_md5\'. Since the entire source is surrounded by single quotes when being sent as a value inside the outer insert statement, the single \ will escape the single quotes that follow. The final value in the source will therefore be:

  1. keyspace:"user" shard:"-80" filter:<rules:<match:"morder" filter:"select * from uorder where in_keyrange(mname, 'unicode_loose_md5', '-80')" > >

Meaning: replicate all columns of user/-80.uorder where unicode_loose_md5(mname) is within -80 keyrange, into morder.

This particular stream generally wouldn’t make sense in isolation. This would typically be one of four streams that combine together to create a materialized view of uorder from the user keyspace into the target (merchant) keyspace, but sharded by using mname as the primary vindex. The vindex used would be unicode_loose_md5 which should also match the primary vindex of other tables in the target keyspace.

  1. keyspace:"user" shard:"-80" filter:<rules:<match:"sales" filter:"select pid, count(*) as kount, sum(price) as amount from uorder group by pid" > >

Meaning: create a materialized view of user/-80.uorder into sales of the target keyspace using the expression: select pid, count(*) as kount, sum(price) as amount from uorder group by pid.

This represents only one stream from source shard -80. Presumably, there will be one more for the other -80 shard.

The ‘select’ features

The select statement has the following features (and restrictions):

  • The Select expressions can be any deterministic mysql expression. Subqueries are not supported. Among aggregate expressions, only count(*) and sum(col) are supported.
  • The where clause can only contain the in_keyrange construct. It has two forms:
    • in_keyrange('-80'): The row’s source keyrange matched against -80.
    • in_keyrange(col, 'hash', '-80'): The keyrange is computed using hash(col) and matched against -80.
  • group by: can be specified if using aggregations. The group by expressions are expected to cover the non-aggregated columns just like regular SQL requires.
  • No other constructs like order by, limit, joins, etc. are allowed.

The pos field

For starting a brand new vreplication stream, the pos field must be empty. The empty string signifies that there’s no starting point for the vreplication. This causes VReplication to copy the contents of the source table first, and then start the replication.

For large tables, this is done in chunks. After each chunk is copied, replication is resumed until it’s caught up. VReplication ensures that only changes that affect existing rows are applied. Following this another chunk is copied, and so on, until all tables are completed. After that, replication runs indefinitely.

It’s a shared row

The vreplication row is shared between the operator and Vreplication itself. Once the row is created, the VReplication stream updates various fields of the row to save and report on its own status. For example, the pos field is continuously updated as it makes forward progress.

While copying, the state field is updated as Init or Copying.

Updating a stream

You can change any field of the stream by issuing a VReplicationExec with an update statement. You are required to specify the id of the row you intend to update. You can only update one row at a time.

Typically, you can update the row and change the state to Stopping to stop a stream, or to Running to restart a stopped stream.

You can also update the row to set a stop_pos, which will make the replication stop once it reaches the specified position.

Deleting a stream

You can delete a stream by issuing a delete statement. This will stop the replication and delete the row. This statement is destructive. All data about the replication state will be permanently deleted.

Other properties of VReplication

Fast replay

VReplication has the capability to batch transactions if the send rate of the source exceeds the replay rate of the destination. This allows it to catch up very quickly when there is a backlog. Load tests have shown a 3-20X improvement over traditional MySQL replication depending on the workload.

Accurate lag tracking

The source vttablet sends its current time along with every event. This allows the target to correct for clock skew while estimating replication lag. Additionally, the source starts sending heartbeats if there is nothing to send. If the target receives no events from the source at all, it knows that it’s definitely lagged and starts reporting itself accordingly.

Self-replication

VReplication allows you to set the source keyspace/shard to be the same as the target. This is especially useful for performing schema rollouts: you can create the target table with the intended schema and vreplicate from the source table to the new target. Once caught up, you can cutover to write to the target table. In this situation, an apply on the target generates a binlog event that will be picked up by the source and sent to the target. Typically, it will be an empty transaction. In such cases, the target does not generally apply these transactions, because such an application will generate yet another event. However, there are situations where one needs to apply empty transactions, especially if it’s a required stopping point. VReplication can differentiate between these situations and apply events only as needed.

Deadlocks and lock wait timeouts

It is possible that multiple streams can conflict with each other and cause deadlocks or lock waits. When such things happen, VReplication silently retries such transactions without reporting an error. It does increment a counter so that the frequency of such occurrences can be tracked.

Automatic retries

If any other error is encountered, the replication is retried after a short wait. Each time, the stream searches from the full list of available sources and picks one at random.

on_ddl

The source specification allows you to specify a value for on_ddl. The values can be as follows:

  • IGNORE: Ignore all DDLs.
  • STOP: Stop when DDL is encountered. This allows you to make any necessary changes to the target. Once changes are made, updating the state to Running will cause VReplication to continue from just after the point where it encountered the DDL.
  • EXEC: Apply the DDL, but stop if an error is encountered while applying it.
  • EXEC_IGNORE: Apply the DDL, but ignore any errors and continue replicating.

Failover continuation

If a failover is performed on the target keyspace/shard, the new master will automatically resume VReplication from where the previous master left off.

Monitoring and troubleshooting

VTTablet /debug/status

The first place to look at is the /debug/status page of the target master vttablet. The bottom of the page shows the status of all the VReplication streams.

Typically, if there is a problem, the Last Message column will display the error. Sometimes, it’s possible that the stream cannot find a source. If so, the Source Tablet would be empty.

VTTablet logfile

If the errors are not clear or if they keep disappearing, the VTTablet logfile will contain information about what it’s been doing with each stream.

VReplicationExec select

The current status of the streams can also be fetched by issuing a VReplicationExec command with select * from _vt.vreplication.

Monitoring variables

VReplication also reports the following variables that can be scraped by monitoring tools like prometheus:

  • VReplicationStreamCount: Number of vreplication streams.
  • VReplicationSecondsBehindMasterMax: Max vreplication seconds behind master.
  • VReplicationSecondsBehindMaster: VReplication seconds behind master per stream.
  • VReplicationSource: The source for each VReplication stream.
  • VReplicationSourceTablet: The source tablet for each VReplication stream.

Thresholds and alerts can be set to draw attention to potential problems.