Mirroring
Agent mirrors can be used interchangeably when processing a search query. Manticore instance (can be multiple) hosting the distributed table where the mirrored agents are defined keeps track of mirror status (alive or dead) and response times, and does automatic failover and load balancing based on that.
Agent mirrors
agent = node1|node2|node3:9312:shard2
The above example declares that ‘node1:9312’, ‘node2:9312’, and ‘node3:9312’ all have a table called shard2, and can be used as interchangeable mirrors. If any single of those servers go down, the queries will be distributed between the other two. When it gets back up, master will detect that and begin routing queries to all three nodes again.
Mirror may also include individual table list, as:
agent = node1:9312:node1shard2|node2:9312:node2shard2
This works essentially the same as the previous example, but different table names will be used when querying different severs: node1shard2 when querying node1:9312, and node2shard when querying node2:9312.
By default, all queries are routed to the best of the mirrors. The best one is picked based on the recent statistics, as controlled by the ha_period_karma config directive. Master stores a number of metrics (total query count, error count, response time, etc) recently observed for every agent. It groups those by time spans, and karma is that time span length. The best agent mirror is then determined dynamically based on the last 2 such time spans. Specific algorithm that will be used to pick a mirror can be configured ha_strategy directive.
The karma period is in seconds and defaults to 60 seconds. Master stores up to 15 karma spans with per-agent statistics for instrumentation purposes (see SHOW AGENT STATUS
statement). However, only the last 2 spans out of those are ever used for HA/LB logic.
When there are no queries, master sends a regular ping command every ha_ping_interval milliseconds in order to have some statistics and at least check, whether the remote host is still alive. ha_ping_interval defaults to 1000 msec. Setting it to 0 disables pings and statistics will only be accumulated based on actual queries.
Example:
# sharding table over 4 servers total
# in just 2 shards but with 2 failover mirrors for each shard
# node1, node2 carry shard1 as local
# node3, node4 carry shard2 as local
# config on node1, node2
agent = node3:9312|node4:9312:shard2
# config on node3, node4
agent = node1:9312|node2:9312:shard1
Load balancing
Load balancing is turned on by default for any distributed table using mirroring. By default queries are distributed randomly among the mirrors. To change this behaviour you can use ha_strategy.
ha_strategy
ha_strategy = {random|nodeads|noerrors|roundrobin}
Agent mirror selection strategy for load balancing. Optional, default is random
.
The strategy used for mirror selection, or in other words, choosing a specific agent mirror in a distributed table. Essentially, this directive controls how exactly master does the load balancing between the configured mirror agent nodes. The following strategies are implemented:
Simple random balancing
The default balancing mode. Simple linear random distribution among the mirrors. That is, equal selection probability are assigned to every mirror. Kind of similar to round-robin (RR), but unlike RR, does not impose a strict selection order.
- Example
Example
ha_strategy = random
Adaptive randomized balancing
The default simple random strategy does not take mirror status, error rate and, most importantly, actual response latencies into account. So to accommodate for heterogeneous clusters and/or temporary spikes in agent node load, we have a group of balancing strategies that dynamically adjusts the probabilities based on the actual query latencies observed by the master.
The adaptive strategies based on latency-weighted probabilities basically work as follows:
- latency stats are accumulated in blocks of ha_period_karma seconds;
- once per karma period latency-weighted probabilities get recomputed;
- once per request (including ping requests) “dead or alive” flag is adjusted.
Currently, we begin with equal probabilities (or percentages, for brevity), and on every step, scale them by the inverse of the latencies observed during the last “karma” period, and then renormalize them. For example, if during the first 60 seconds after the master startup 4 mirrors had latencies of 10, 5, 30, and 3 msec/query respectively, the first adjustment step would go as follow:
- initial percentages: 0.25, 0.25, 0.25, 0.25;
- observed latencies: 10 ms, 5 ms, 30 ms, 3 ms;
- inverse latencies: 0.1, 0.2, 0.0333, 0.333;
- scaled percentages: 0.025, 0.05, 0.008333, 0.0833;
- renormalized percentages: 0.15, 0.30, 0.05, 0.50.
Meaning that the 1st mirror would have a 15% chance of being chosen during the next karma period, the 2nd one a 30% chance, the 3rd one (slowest at 30 ms) only a 5% chance, and the 4th and the fastest one (at 3 ms) a 50% chance. Then, after that period, the second adjustment step would update those chances again, and so on.
The rationale here is, once the observed latencies stabilize, the latency weighted probabilities stabilize as well. So all these adjustment iterations are supposed to converge at a point where the average latencies are (roughly) equal over all mirrors.
nodeads
Latency-weighted probabilities, but dead mirrors are excluded from the selection. “Dead” mirror is defined as a mirror that resulted in multiple hard errors (eg. network failure, or no answer, etc) in a row.
- Example
Example
ha_strategy = nodeads
noerrors
Latency-weighted probabilities, but mirrors with worse errors/success ratio are excluded from the selection.
- Example
Example
ha_strategy = noerrors
Round-robin balancing
Simple round-robin selection, that is, selecting the 1st mirror in the list, then the 2nd one, then the 3rd one, etc, and then repeating the process once the last mirror in the list is reached. Unlike with the randomized strategies, RR imposes a strict querying order (1, 2, 3, …, N-1, N, 1, 2, 3, … and so on) and guarantees that no two subsequent queries will be sent to the same mirror.
- Example
Example
ha_strategy = roundrobin
Instance-wide options
ha_period_karma
ha_period_karma = 2m
ha_period_karma
defines agent mirror statistics window size, in seconds (or time suffixed). Optional, default is 60.
For a distributed table with agent mirrors in it, server tracks several different per-mirror counters. These counters are then used for failover and balancing. (Server picks the best mirror to use based on the counters.) Counters are accumulated in blocks of ha_period_karma
seconds.
After beginning a new block, master may still use the accumulated values from the previous one, until the new one is half full. Thus, any previous history stops affecting the mirror choice after 1.5 times ha_period_karma seconds at most.
Despite that at most 2 blocks are used for mirror selection, up to 15 last blocks are actually stored, for instrumentation purposes. They can be inspected using SHOW AGENT STATUS
statement.
ha_ping_interval
ha_ping_interval = 3s
ha_ping_interval
defines interval between agent mirror pings, in milliseconds (or time suffixed). Optional, default is 1000.
For a distributed table with agent mirrors in it, server sends all mirrors a ping command during the idle periods. This is to track the current agent status (alive or dead, network roundtrip, etc). The interval between such pings is defined by this directive.
To disable pings, set ha_ping_interval to 0.
Setting up replication
Write transaction (any result of INSERT
, REPLACE
, DELETE
, TRUNCATE
, UPDATE
, COMMIT
) can be replicated to other cluster nodes before the transaction is fully applied on the current node. Currently replication is supported for percolate
and rt
tables in Linux an MacOS. Manticore Search packages for Windows do not provide replication support.
Manticore’s replication is based on Galera library and features the following:
- true multi-master - read and write to any node at any time
- virtually synchronous replication - no slave lag, no data is lost after a node crash
- hot standby - no downtime during failover (since there is no failover)
- tightly coupled - all the nodes hold the same state. No diverged data between nodes allowed
- automatic node provisioning - no need to manually back up the database and restore it on a new node
- easy to use and deploy
- detection and automatic eviction of unreliable nodes
- certification based replication
To use replication in Manticore Search:
- data_dir option should be set in section “searchd” of the configuration file. Replication is not supported in the plain mode
- there should be either:
- a listen directive specified (without specifying a protocol) containing an IP address accessible by other nodes
- or node_address with an accessible IP address
- optionally you can set unique values for server_id on each cluster node. If no value is set, the node will try to use the MAC address (or a random number if that fails) to generate the
server_id
.
If there is no replication
listen directive set Manticore will use the first two free ports in the range of 200 ports after the default protocol listening port for each created cluster. To set replication ports manually the listen directive (of replication
type) port range should be defined and the address/port range pairs should not intersect between different nodes on the same server. As a rule of thumb, the port range should specify no less than two ports per cluster.
Replication cluster
Replication cluster is a set of nodes among which a write transaction gets replicated. Replication is configured on per-table basis, meaning that one table can be assigned to only one cluster. There is no restriction on how many tables a cluster can have. All transactions such as INSERT
, REPLACE
, DELETE
, TRUNCATE
in any percolate or real-time table belonging to a cluster are replicated to all the other nodes in the cluster. Replication is multi-master, so writes to any particular node or to multiple nodes simultaneously work equally well.
In most cases you create cluster with CREATE CLUSTER <cluster name>
and join cluster with JOIN CLUSTER <cluster name> at 'host:port'
, but in rare cases you may want to fine-tune the behaviour of CREATE/JOIN CLUSTER
. The options are:
name
Specifies cluster name. Should be unique.
path
Data directory for a write-set cache replication and incoming tables from other nodes. Should be unique among the other clusters in the node. Default is data_dir. Should be specified in the form of a path to an existing directory relative to the data_dir.
nodes
List of address:port pairs for all the nodes in the cluster (comma separated). Node’s API interface should be used for this option. It can contain the current node’s address too. This list is used to join a node to the cluster and rejoin it after restart.
options
Other options that are passed over directly to Galera replication plugin as described here Galera Documentation Parameters
Write statements
For SQL interface all write statements such as INSERT
, REPLACE
, DELETE
, TRUNCATE
, UPDATE
that change the content of a cluster’s table should use cluster_name:index_name
expression in place of a table name to make sure the change is propagated to all replicas in the cluster. An error will be triggered otherwise.
All write statements for HTTP interface to a cluster’s table should set cluster
property along with table
name. An error will be triggered otherwise.
Auto ID generated for a table in a cluster should be valid as soon as server_id is not misconfigured.
- SQL
- JSON
- PHP
- Python
- Javascript
- Java
SQL JSON PHP Python Javascript Java
INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
TRUNCATE RTINDEX click_query:weekly_index
UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206
POST /insert -d '
{
"cluster":"posts",
"index":"weekly_index",
"doc":
{
"title" : "iphone case",
"price" : 19.85
}
}'
POST /delete -d '
{
"cluster":"posts",
"index": "weekly_index",
"id":1
}'
$index->addDocuments([
1, ['title' => 'iphone case', 'price' => 19.85]
]);
$index->deleteDocument(1);
indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}})
indexApi.delete({"cluster":"posts","index":"weekly_index","id":1})
res = await indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}});
res = await indexApi.delete({"cluster":"posts","index":"weekly_index","id":1});
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","Crossbody Bag with Tassel");
put("price",19.85);
}};
newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
deleteRequest.index("weekly_index").cluster("posts").setId(1L);
indexApi.delete(deleteRequest);
Read statements
Read statements such as SELECT
, CALL PQ
, DESCRIBE
can use either regular table names not prepended with a cluster name or cluster_name:index_name
. In this case cluster_name
is just ignored.
In HTTP endpoint json/search
you can specify cluster
property if you like, but can also omit it.
- SQL
- JSON
SQL JSON
SELECT * FROM weekly_index
CALL PQ('posts:weekly_index', 'document is here')
POST /search -d '
{
"cluster":"posts",
"index":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
POST /search -d '
{
"index":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
Cluster parameters
Replication plugin options can be changed using SET
statement (see the example).
See Galera Documentation Parameters for a list of available options.
- SQL
- JSON
SQL JSON
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
POST /cli -d "
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
"
Cluster with diverged nodes
Sometimes replicated nodes can diverge from each other. The state of all the nodes might turn into non-primary
due to a network split between nodes, a cluster crash, or if the replication plugin hits an exception when determining the primary component
. Then it’s necessary to select a node and promote it to the primary component
.
To determine which node needs to be a reference, compare the last_committed
cluster status variable value on all nodes. If all the servers are already running there’s no need to start the cluster again. You just need to promote the most advanced node to the primary component
with SET
statement (see the example).
All other nodes will reconnect to the node and resync their data based on this node.
- SQL
- JSON
SQL JSON
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"
Replication and cluster
To use replication define one listen port for SphinxAPI protocol and one listen for replication address and port range in the config. Define data_dir folder for incoming tables.
- ini
ini
searchd {
listen = 9312
listen = 192.168.1.101:9360-9370:replication
data_dir = /var/lib/manticore/
...
}
Create a cluster at the server that has local tables that need to be replicated
- SQL
- JSON
- PHP
- Python
- Javascript
- Java
SQL JSON PHP Python Javascript Java
CREATE CLUSTER posts
POST /cli -d "
CREATE CLUSTER posts
"
$params = [
'cluster' => 'posts'
]
];
$response = $client->cluster()->create($params);
utilsApi.sql('CREATE CLUSTER posts')
res = await utilsApi.sql('CREATE CLUSTER posts');
utilsApi.sql("CREATE CLUSTER posts");
Add these local tables to cluster
- SQL
- JSON
- PHP
- Python
- Javascript
- Java
SQL JSON PHP Python Javascript Java
ALTER CLUSTER posts ADD pq_title
ALTER CLUSTER posts ADD pq_clicks
POST /cli -d "
ALTER CLUSTER posts ADD pq_title
"
POST /cli -d "
ALTER CLUSTER posts ADD pq_clicks
"
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'index' => 'pq_title'
]
];
$response = $client->cluster()->alter($params);
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'index' => 'pq_clicks'
]
];
$response = $client->cluster()->alter($params);
utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_title');
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks');
utilsApi.sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.sql("ALTER CLUSTER posts ADD pq_clicks");
All other nodes that want replica of cluster’s tables should join cluster as
- SQL
- JSON
- PHP
- Python
- Javascript
- Java
SQL JSON PHP Python Javascript Java
JOIN CLUSTER posts AT '192.168.1.101:9312'
POST /cli -d "
JOIN CLUSTER posts AT '192.168.1.101:9312'
"
$params = [
'cluster' => 'posts',
'body' => [
'192.168.1.101:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')
res = await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'');
utilsApi.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");
When running queries for SQL prepend the table name with the cluster name posts:
or use cluster
property for HTTP request object.
- SQL
- JSON
- PHP
- Python
- Javascript
- Java
SQL JSON PHP Python Javascript Java
INSERT INTO posts:pq_title VALUES ( 3, 'test me' )
POST /insert -d '
{
"cluster":"posts",
"index":"pq_title",
"id": 3
"doc":
{
"title" : "test me"
}
}'
$index->addDocuments([
3, ['title' => 'test me']
]);
indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}})
res = await indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}});
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","test me");
}};
newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
Now all such queries that modify tables in the cluster are replicated to all nodes in the cluster.