Design Doc: Asynchronous SGD
Motivation
Parameter server (PS) based distributed training uses data parallelism to speed up training.
We have implemented synchronous SGD in ElasticDL. When PS accumulates grads_to_wait
gradients from workers, PS averages these gradients and updates the model with the averaged gradients. PS also maintains model_version
which equals to the number of model updates. Each worker has a local copy of the model. Before a minibatch training step starts, if there is a new model_version
on PS, the worker will get the new model from PS to replace the local model. After computing the gradients with a minibatch, the worker reports the gradients to PS together with the local model version. When PS receives gradients from a worker, it only accepts the gradients with a model version same as the current PS model_version
.
This Synchronous SGD ensures model consistency in the price of wasted and blocked computation.
- Wasted computation: when a worker reports gradients with an outdated model version to PS, PS will reject these gradients. The worker have to get the current model from PS, reuse the minibatch data to train the model again.
- Blocked computation: PS has to use a lock for model update with gradients and model read by workers to ensure model consistency.
Asynchronous SGD can avoid the wasted and blocked computation mentioned above with a relaxed model consistency.
- PS will accept all gradients from workers.
- PS does not use locks and supports concurrent model reads and updates.
Asynchronous SGD
Let us recall how workers train the model in synchronous SGD. Below is the pseudocode:
for minibatch in training_data:
accepted = False
while not accepted:
local_model,model_version = get_model_from_ps()
gradients = compute_gradient(local_model, minibatch)
accepted = report_gradient_to_ps(gradients, model_version)
In asynchronous SGD, each worker is training the model in nearly the same way as synchronous SGD. The only difference is that the worker does not need to retrain any minibatch data as PS accepts all gradients.
for minibatch in training_data:
local_model, model_version = get_model_from_ps()
gradients = compute_gradient(local_model, minibatch)
report_gradient_to_ps(gradients, model_version)
PS does not need locks in GetModel
and ReportGradient
GRPC services for asynchronous SGD.
def GetModel():
pb_model = Model()
for variable in pb_model:
assign_value(variable, current_variable_value_in_PS)
return pb_model, PS_model_version
def ReportGradient(gradients, version):
grad_var = zip(gradients, model_variables)
optimizer.apply_gradient(grad_var)
PS_model_version.atomic_add(1)
Relaxed Model Consistency
PS can processes multiple GRPC calls GetModel
and ReportGradients
concurrently. Thus, there are two kinds of relaxed model consistency.
- In
GetModel
, during the variable assign loop, there may beReportGradient
GRPC service running and updating the variables. Thus, variables inlocal_model
in workers may contain values from different model versions.model_version
fromget_model_from_ps
is just a proximate model version. - There may be multiple
ReportGradient
running concurrently. Different model variables may apply these gradients in different orders.
Also, the concurrent updates to variables in ReportGradient
may cause some gradients are not applied, as the updates can be overwritten by other concurrent running updates. TensorFlow optimizers have an argument use_locking.
If use_locking is True
, TensorFlow will use a lock to prevent concurrent updates to variables.
Staleness in Asynchronous SGD
In ReportGradient
, the argument version
may be smaller than PS_model_version
. Staleness value is the difference between PS_model_version
and version
:
staleness = PS_model_version - version
According to some researches, this staleness affects the training convergence, and large staleness may result in poor training accuracy. The deeper the model, the more impact of the staleness. Some optimizers such as SGD and Adagrad are more robust to staleness, some optimizers such as other with momentum are very bad with staleness.
Staleness-aware asychronous SGD proposes a method to modulate learning rate by the staleness. If the staleness is not 0, this method modulates the learning rate used in the optimizer as:
if staleness > 0:
learning_rate_used = learning_rate / staleness
else:
learning_rate_used = learning_rate
Stale Synchronous Parallel (SSP)
In the pseudocode for the asynchronous SGD worker, the worker pulls model from PS in every minibatch step. Stale synchronous parallel (SSP) method uses the strategy that the fastest worker can exceed the slowest one within a predefined staleness threshold. SSP can reduce the number of get_model_from_ps
calls. The worker training process is:
get_model_frequency = predefined_staleness_threshold
local_model, model_version = get_model_from_ps()
local_update_count = 0
for minibatch in training_data:
gradients = compute_gradient(local_model, minibatch)
report_gradient_to_ps(gradients, model_version)
local_update_count += 1
if local_update_count >= get_model_frequency:
local_model, model_version = get_model_from_ps()
local_update_count = 0
else:
apply_gradient(local_model, gradients)
Although the original SSP method uses this strategy in synchronized SGD, we can also adopt SSP strategy in asynchronized SGD to reduce get_model_from_ps
calls. Note that in ElasticDL, local models only have non-embedding variables. So in apply_gradient(local_model, gradients)
, ElasticDL workers only update non-embedding variables. Also, worker can run report_gradient_to_ps
concurrently with apply_gradient(local_model, gradients)
when it does not need to get_model_from_ps
.
Support Asynchronous SGD in ElasticDL
Change in PS
- No need to use locks in
GetModel
and_update_model
in server.py. - No need to accumulate gradients in
ReportGradient
in server.py.ReportGradient
calls_update_model
directly. - Users decide if disabling concurrent variable update by set
use_locking
argument in the optimizer. - To support Staleness-aware asychronous SGD, PS need to modulate the learning rate in the optimizer with the staleness value. PS may have multiple threads running concurrently for model updates with a same optimizer instance. Thus, we cannot modify the learning rate in the optimizer instance. We may modify the learning rate as a callable method, and use a thread local storage
threading.local()
to store the staleness. The callable method uses the staleness value to modulate the learning rate. The optimizer will call this callable method when it reads the learning rate hyperparameter.
Change in Worker
- No need to retrain with the minibatch data.
- To support SSP strategy, the worker pulls the model from PS in every
get_model_frequency
minibatch step. Also, the worker needs to update the local model with the computed gradients. model pull/updates do not include embedding variables, as we directly access the embedding vectors in the embedding service.
Add Arguments for elasticdl.train
--use_async, default=False, help="True for asynchronous SGD, False for synchronous SGD"
--lr_staleness_modulation, default=False, help="If True, master will modulate learning rate with staleness in asynchronous SGD"
--get_model_frequency, default=1, help="worker will get_model from PS every these steps."