Design Overview for ElasticDL on SQLFlow
Overview
This is a design doc on integration with SQLFlow.
User Interface
Training Job Submission
SELECT
c1, c2, c3, c4, c5 as class
FROM training_data
TRAIN ElasticDLKerasClassifier
WITH
model.optimizer = "optimizer",
model.loss = "loss",
model.eval_metrics_fn = "eval_metrics_fn",
model.num_classes = 3,
model.dataset_fn = "dataset_fn",
train.shuffle = 120,
train.epoch = 2,
train.grads_to_wait = 2,
train.tensorboard_log_dir = "",
train.checkpoint_steps = 0,
train.checkpoint_dir = "",
train.keep_checkpoint_max = 0,
eval.steps = 0,
eval.start_delay_secs = 100,
eval.throttle_secs = 0,
eval.checkpoint_dir_for_init = "",
engine.master_resource_request = "cpu=400m,memory=1024Mi",
engine.master_resource_limit = "cpu=1,memory=2048Mi",
engine.worker_resource_request = "cpu=400m,memory=2048Mi",
engine.worker_resource_limit = "cpu=1,memory=3072Mi",
engine.num_workers = 2,
engine.volume = "",
engine.image_pull_policy = "Never",
engine.restart_policy = "Never",
engine.extra_pypi_index = "",
engine.namespace = "default",
engine.minibatch_size = 64,
engine.master_pod_priority = "",
engine.cluster_spec = "",
engine.num_minibatches_per_task = 2,
engine.docker_image_repository = "",
engine.envs = ""
COLUMN
c1,
c2,
c3,
c4
LABEL class
INTO trained_elasticdl_keras_classifier;
Prediction Job Submission
SELECT
c1, c2, c3, c4
FROM prediction_data
PREDICT prediction_results_table
WITH
model.num_classes = 10,
model.dataset_fn = "dataset_fn",
predict.checkpoint_dir_for_init = "v1/",
engine.master_resource_request = "cpu=400m,memory=1024Mi",
engine.master_resource_limit = "cpu=1,memory=2048Mi",
engine.worker_resource_request = "cpu=400m,memory=2048Mi",
engine.worker_resource_limit = "cpu=1,memory=3072Mi",
engine.num_workers = 2,
engine.volume = "",
engine.image_pull_policy = "Never",
engine.restart_policy = "Never",
engine.extra_pypi_index = "",
engine.namespace = "default",
engine.minibatch_size = 64,
engine.master_pod_priority = "",
engine.cluster_spec = "",
engine.num_minibatches_per_task = 2,
engine.docker_image_repository = "",
engine.envs = ""
USING trained_elasticdl_keras_classifier;
Implementation
Mapping Extended SQL
The components of the extended SQL defined by SQLFlow are mapped to a elasticDLFiller
struct that looks like the following:
type elasticDLFiller struct {
IsTraining bool
TrainInputTable string
EvalInputTable string
PredictInputTable string
PredictOutputTable string
PredictInputModel string
OutputShape int
InputShape int
ModelDir string
LabelColName string
FeaturesList string
TrainClause *resolvedTrainClause
PredictClause *resolvedPredictClause
}
This elasticDLFiller
struct will be used to fill a template pre-defined to generate the model definition components required for ElasticDL, such as the model definition using tf.keras
API, loss, optimizer, dataset_fn
, etc.
For example, the dataset_fn
is generated using the FeaturesList
, LabelColName
, InputShape
, IsTraining
, and TrainClause
in the elasticDLFiller
struct:
def dataset_fn(dataset, mode, metadata):
def _parse_data(record):
def _get_features_without_labels(
record, label_col_ind, features_shape
):
features = [
record[:label_col_ind],
record[label_col_ind + 1 :],
]
features = tf.concat(features, -1)
return tf.reshape(features, features_shape)
record = tf.strings.to_number(record, tf.float32)
features_shape = (, 1)
labels_shape = (1,)
label_col_name = ""
if mode != Mode.PREDICTION:
if label_col_name not in metadata.column_names:
raise ValueError(
"Missing the label column '%s' in the retrieved "
"table." % label_col_name
)
label_col_ind = metadata.column_names.index(label_col_name)
labels = tf.reshape(record[label_col_ind], labels_shape)
return (
_get_features_without_labels(
record, label_col_ind, features_shape
),
labels,
)
return tf.reshape(record, features_shape)
dataset = dataset.map(_parse_data)
if mode != Mode.PREDICTION and "" == "true":
dataset = dataset.shuffle(buffer_size=)
return dataset
Some fields used to generate the above dataset_fn
are obtained directly from the extended SQL statement. For example, FeaturesList
is obtained from SELECT FROM
clause. LabelColName
is obtained from LABEL
clause. TrainClause.ShuffleBufferSize
is obtained from train.shuffle
in the WITH
clause. There are also fields that are obtained indirectly. For example, InputShape
is inferred from FeaturesList
.
Note that in the template we currently we hard-coded the types for each column to be tf.float32
in the generated dataset_fn
. We should infer this information from the database instead. We also hard-coded other components in the model definition such as loss
and optimizer
, these components should be derived from the model zoo instead.
Generate ElasticDL Command
Once we generated the components for the model definition, we can then generate the ElasticDL command to submit the job. Below is an example:
elasticdl train \
--image_base=elasticdl:ci \
--model_zoo=<model-zoo> \
--model_def=<path-to-generated-model-def> \
--loss=<loss-function-name> \
--eval_metrics_fn=<eval-metrics-function-name> \
--training_data=<training-input-table> \
--validation_data=<validation-input-table> \
--num_epochs=2 \
--master_resource_request="cpu=400m,memory=1024Mi" \
--master_resource_limit="cpu=1,memory=2048Mi" \
--worker_resource_request="cpu=400m,memory=2048Mi" \
--worker_resource_limit="cpu=1,memory=3072Mi" \
--minibatch_size=64 \
--num_minibatches_per_task=10 \
--num_workers=2 \
--checkpoint_steps=10 \
--evaluation_steps=15 \
--grads_to_wait=2 \
--job_name=test-iris \
--log_level=INFO \
--image_pull_policy=Never \
--output=<model-output> \
--envs=<env-vars> \
--data_reader_params=<data-reader-params>
In the command, --model_def
is the path to the model definition file we generated earlier. Additional arguments related to model definition such as --loss
and --eval_metrics_fn
are obtained from parameters with name starting with model.
.
The rest of the arguments are derived from the extended SQL, for example:
--model_zoo
is obtained fromTRAIN
clause.--training_data
is obtained fromFROM
clause.--num_epochs
is obtained fromtrain.shuffle
inWITH
clause.
ElasticDL engine specific arguments such as --grads_to_wait
and --num_workers
are obtained from parameters with name starting with engine.
.
In order to integrate with different databases we support, we pass additional information to the ElasticDL command.
For example, we pass necessary environment variables such as access ID and key for ODPS account to --envs
. In addition, we pass the list of column names that we want to read from ODPS via --data_reader_params
.
Future Work
- Support
tf.feature_columns
API viaCOLUMN
clause. - Support evaluation job. Evaluation on separate evaluation table is not supported yet in SQLFlow. Please check out #675 and #675 for details.
- Switch to use intermediate representation for ElasticDL codegen. For details, please see #1075.
- Support on synchronous call on high level API. For details, please see #1285.
- Unify model zoos between SQLFlow and ElasticDL and support submitting an ElasticDL job for a model defined in model zoo. Please see #22 and #1063 for details.
- Currently the only database ElasticDL supports is ODPS. However, we should expose necessary abstractions so ElasticDL can fully leverage SQLFlow’s functionality to read/write from different SQL databases.
- Support prediction job and add integration tests on Travis CI.
- Currently we have hard-coded the types for each column to be
tf.float32
in the generateddataset_fn
. We should infer this information from the database instead.