Cluster Architecture

The Cluster architecture of ArangoDB is a CP master/master model with nosingle point of failure.

With “CP” in terms of the CAP theoremwe mean that in the presence of anetwork partition, the database prefers internal consistency overavailability. With “master/master” we mean that clients can send theirrequests to an arbitrary node, and experience the same view on thedatabase regardless. “No single point of failure” means that the clustercan continue to serve requests, even if one machine fails completely.

In this way, ArangoDB has been designed as a distributed multi-modeldatabase. This section gives a short outline on the Cluster architecture andhow the above features and capabilities are achieved.

Structure of an ArangoDB Cluster

An ArangoDB Cluster consists of a number of ArangoDB instanceswhich talk to each other over the network. They play different roles,which will be explained in detail below.

The current configurationof the Cluster is held in the Agency, which is a highly-availableresilient key/value store based on an odd number of ArangoDB instancesrunning Raft Consensus Protocol.

For the various instances in an ArangoDB Cluster there are three distinctroles:

  • Agents
  • Coordinators
  • DBServers.In the following sections we will shed light on each of them.

ArangoDB Cluster

Agents

One or multiple Agents form the Agency in an ArangoDB Cluster. TheAgency is the central place to store the configuration in a Cluster. Itperforms leader elections and provides other synchronization services forthe whole Cluster. Without the Agency none of the other components canoperate.

While generally invisible to the outside the Agency is the heart of theCluster. As such, fault tolerance is of course a must have for theAgency. To achieve that the Agents are using the Raft ConsensusAlgorithm. The algorithm formally guaranteesconflict free configuration management within the ArangoDB Cluster.

At its core the Agency manages a big configuration tree. It supportstransactional read and write operations on this tree, and other serverscan subscribe to HTTP callbacks for all changes to the tree.

Coordinators

Coordinators should be accessible from the outside. These are the onesthe clients talk to. They will coordinate cluster tasks likeexecuting queries and running Foxx services. They know where thedata is stored and will optimize where to run user supplied queries orparts thereof. Coordinators are stateless and can thus easily be shut downand restarted as needed.

DBServers

DBservers are the ones where the data is actually hosted. Theyhost shards of data and using synchronous replication a DBServer mayeither be leader or follower for a shard.

They should not be accessed from the outside but indirectly through theCoordinators. They may also execute queries in part or as a whole whenasked by a Coordinator.

See Sharding below for more information.

Many sensible configurations

This architecture is very flexible and thus allows many configurations,which are suitable for different usage scenarios:

  • The default configuration is to run exactly one Coordinator andone DBServer on each machine. This achieves the classicalmaster/master setup, since there is a perfect symmetry between thedifferent nodes, clients can equally well talk to any one of theCoordinators and all expose the same view to the data store. _Agents_can run on separate, less powerful machines.
  • One can deploy more Coordinators than DBservers. This is a sensibleapproach if one needs a lot of CPU power for the Foxx services,because they run on the Coordinators.
  • One can deploy more DBServers than Coordinators if more data capacityis needed and the query performance is the lesser bottleneck
  • One can deploy a Coordinator on each machine where an applicationserver (e.g. a node.js server) runs, and the Agents and DBServers_on a separate set of machines elsewhere. This avoids a network hopbetween the application server and the database and thus decreaseslatency. Essentially, this moves some of the database distributionlogic to the machine where the client runs.As you can see, the _Coordinator layer can be scaled and deployed independentlyfrom the DBServer layer.

It is a best practice and a recommended approach to run Agent instanceson different machines than DBServer instances.

When deploying using the tool Starterthis can be achieved by using the options —cluster.start-dbserver=false and—cluster.start-coordinator=false on the first three machines where the Starter_is started, if the desired _Agency__size is 3, or on the first 5 machinesif the desired Agency__size is 5.

The different instances that form a Cluster are supposed to be run in the sameData Center (DC), with reliable and high-speed network connection betweenall the machines participating to the Cluster.

Multi-datacenter Clusters, where the entire structure and content of a Cluster locatedin a specific DC is replicated to others Clusters located in different DCs, arepossible as well. See Datacenter to datacenter replication(DC2DC) for further details.

Cluster ID

Every non-Agency ArangoDB instance in a Cluster is assigned a uniqueID during its startup. Using its ID a node is identifiablethroughout the Cluster. All cluster operations will communicatevia this ID.

Sharding

Using the roles outlined above an ArangoDB Cluster is able to distributedata in so called shards across multiple DBServers. From the outsidethis process is fully transparent and as such we achieve the goals ofwhat other systems call “master-master replication”.

In an ArangoDB Cluster you talk to any Coordinator and whenever you read or write datait will automatically figure out where the data is stored (read) or tobe stored (write). The information about the shards is shared across theCoordinators using the Agency.

ArangoDB organizes its collection data in shards. Shardingallows to use multiple machines to run a cluster of ArangoDBinstances that together constitute a single database. This enablesyou to store much more data, since ArangoDB distributes the dataautomatically to the different servers. In many situations one canalso reap a benefit in data throughput, again because the load canbe distributed to multiple machines.

Shards are configured per collection so multiple shards of data formthe collection as a whole. To determine in which shard the data is tobe stored ArangoDB performs a hash across the values. By default thishash is being created from the document _key.

For further information, please refer to theCluster Administration section.

Synchronous replication

In an ArangoDB Cluster, the replication among the data stored by the _DBServers_is synchronous.

Synchronous replication works on a per-shard basis. Using the option replicationFactor,one configures for each collection how many copies of each shard are kept in the Cluster.

If a collection has a replication factor of 1, its data is notreplicated to other DBServers. This exposes you to a risk of data loss, ifthe machine running the DBServer with the only copy of the data fails permanently.

The replication factor has to be set to a value equals or higher than 2to achieve minimal data redundancy via the synchronous replication.

An equal-or-higher-than 2 replication factor has to be set explicitlywhen the collection is created, or can be set later at run time if you forgotto set it at creation time.

When using a Cluster, please make sure all the collections that are important(and should not be lost in any case) have a replication factor equal or higherthan 2.

At any given time, one of the copies is declared to be the leader andall other replicas are followers. Internally, write operations for this shard_are always sent to the _DBServer which happens to hold the leader copy,which in turn replicates the changes to all followers before the operationis considered to be done and reported back to the Coordinator.Internally, read operations are all served by the DBServer holding the leader copy,this allows to provide snapshot semantics for complex transactions.

Using synchronous replication alone will guarantee consistency and high availabilityat the cost of reduced performance: write requests will have a higher latency(due to every write-request having to be executed on the followers) andread requests will not scale out as only the leader is being asked.

In a Cluster, synchronous replication will be managed by the Coordinators for the client. The data will always be stored on the DBServers.

The following example will give you an idea of how synchronous operationhas been implemented in ArangoDB Cluster:

  • Connect to a Coordinator via arangosh
  • Create a collection

127.0.0.1:8530@_system> db._create(“test”, {“replicationFactor”: 2})

  • The Coordinator will figure out a leader and one follower and createone shard (as this is the default)
  • Insert data

127.0.0.1:8530@_system> db.test.insert({“foo”: “bar”})

  • The Coordinator will write the data to the leader, which in turn willreplicate it to the follower.
  • Only when both were successful the result is reported to be successful:
  1. {
  2. "_id" : "test/7987",
  3. "_key" : "7987",
  4. "_rev" : "7987"
  5. }

Obviously, synchronous replication comes at the cost of an increased latency forwrite operations, simply because there is one more network hop within theCluster for every request. Therefore the user can set the _replicationFactor_to 1, which means that only one copy of each shard is kept, therebyswitching off synchronous replication. This is a suitable setting forless important or easily recoverable data for which low latency writeoperations matter.

Automatic failover

Failure of a follower

If a DBServer that holds a follower copy of a shard fails, then the leader_can no longer synchronize its changes to that _follower. After a short timeout(3 seconds), the leader gives up on the follower and declares it to beout of sync.

One of the following two cases can happen:

a) If another DBServer (that does not hold a replica for this shard already) is available in the Cluster, a new follower will automatically be created on this other DBServer (so the replication factor constraint is satisfied again).

b) If no other DBServer (that does not hold a replica for this shard already) is available, the service continues with one follower less than the number prescribed by the replication factor.

If the old DBServer with the follower copy comes back, one of the followingtwo cases can happen:

a) If previously we were in case a), the DBServer recognizes that there is a new follower that was elected in the meantime, so it will no longer be a follower for that shard.

b) If previously we were in case b), the DBServer automatically resynchronizes its data with the leader. The replication factor constraint is now satisfied again and order is restored.

Failure of a leader

If a DBServer that holds a leader copy of a shard fails, then the leader_can no longer serve any requests. It will no longer send a heartbeat tothe _Agency. Therefore, a supervision process running in the Raft__leader_of the Agency, can take the necessary action (after 15 seconds of missingheartbeats), namely to promote one of the _DBServers that hold in-syncreplicas of the shard to leader for that shard. This involves areconfiguration in the Agency and leads to the fact that Coordinators_now contact a different _DBServer for requests to this shard. Serviceresumes. The other surviving replicas automatically resynchronize theirdata with the new leader.

In addition to the above, one of the following two cases cases can happen:

a) If another DBServer (that does not hold a replica for this shard already) is available in the Cluster, a new follower will automatically be created on this other DBServer (so the replication factor constraint is satisfied again).b) If no other DBServer (that does not hold a replica for this shard already) is available the service continues with one follower less than the number prescribed by the replication factor.

When the DBServer with the original leader copy comes back, it recognizesthat a new leader was elected in the meantime, and one of the followingtwo cases can happen:

a) If previously we were in case a), since also a new follower was created and the replication factor constraint is satisfied, the DBServer will no longer be a follower for that shard.b) If previously we were in case b), the DBServer notices that it now holds a follower__replica of that shard and it resynchronizes its data with the new leader. The replication factor constraint is now satisfied again, and order is restored.

The following example will give you an idea of how _failover_has been implemented in ArangoDB Cluster:

  • The leader of a shard (let’s name it DBServer001) is going down.
  • A Coordinator is asked to return a document:

127.0.0.1:8530@_system> db.test.document(“100069”)

  • The Coordinator determines which server is responsible for this documentand finds DBServer001
  • The Coordinator tries to contact DBServer001 and timeouts because it isnot reachable.
  • After a short while the supervision (running in parallel on the Agency)will see that heartbeats from DBServer001 are not coming in
  • The supervision promotes one of the followers (say DBServer002), thatis in sync, to be leader and makes DBServer001 a follower.
  • As the Coordinator continues trying to fetch the document it will see thatthe leader changed to DBServer002
  • The Coordinator tries to contact the new leader (DBServer002) and returnsthe result:
  1. {
  2. "_key" : "100069",
  3. "_id" : "test/100069",
  4. "_rev" : "513",
  5. "foo" : "bar"
  6. }
  • After a while the supervision declares DBServer001 to be completely dead.
  • A new follower is determined from the pool of DBservers.
  • The new follower syncs its data from the leader and order is restored.Please note that there may still be timeouts. Depending on when exactlythe request has been done (in regard to the supervision) and dependingon the time needed to reconfigure the Cluster the Coordinator might failwith a timeout error.

Shard movement and resynchronization

All shard data synchronizations are done in an incremental way, such thatresynchronizations are quick. This technology allows to move shards(follower and leader ones) between DBServers without service interruptions.Therefore, an ArangoDB Cluster can move all the data on a specific DBServer_to other _DBServers and then shut down that server in a controlled way.This allows to scale down an ArangoDB Cluster without service interruption,loss of fault tolerance or data loss. Furthermore, one can re-balance thedistribution of the shards, either manually or automatically.

All these operations can be triggered via a REST/JSON API or via thegraphical web UI. All fail-over operations are completely handled withinthe ArangoDB Cluster.

Microservices and zero administation

The design and capabilities of ArangoDB are geared towards usage inmodern microservice architectures of applications. With theFoxx services it is very easy to deploy a datacentric microservice within an ArangoDB Cluster.

In addition, one can deploy multiple instances of ArangoDB within thesame project. One part of the project might need a scalable documentstore, another might need a graph database, and yet another might needthe full power of a multi-model database actually mixing the variousdata models. There are enormous efficiency benefits to be reaped bybeing able to use a single technology for various roles in a project.

To simplify life of the devops in such a scenario we try as much aspossible to use a zero administration approach for ArangoDB. A runningArangoDB Cluster is resilient against failures and essentially repairsitself in case of temporary failures.

Deployment

An ArangoDB Cluster can be deployed in several ways, e.g. by manuallystarting all the needed instances, by using the tool Starter, inDocker, in Mesos or DC/OS, and in Kubernetes.

See the Cluster Deploymentchapter for instructions.