ElasticDL on SQLFlow
Overview
This is a design doc on integration with ElasticDL.
User Interface
Training Job Submission
SELECT
c1, c2, c3, c4, c5 as class
FROM training_data
TO TRAIN ElasticDLKerasClassifier
WITH
optimizer = "optimizer",
loss = "loss",
eval_metrics = "eval_metrics_fn",
num_classes = 10
COLUMN
c1,
DENSE(c2, 10),
BUCKET(c3, [0, 10, 100]),
c4
LABEL class
INTO trained_elasticdl_keras_classifier;
Prediction Job Submission
SELECT
c1, c2, c3, c4
FROM prediction_data
TO PREDICT prediction_results_table
WITH
num_classes = 10
USING trained_elasticdl_keras_classifier;
Run-time Configurations
Users can provide run-time configurations to ElasticDL job via additional parameters with prefix “runtime” within WITH
clause, for example:
SELECT
c1, c2, c3, c4, c5 as class
FROM training_data
TO TRAIN ElasticDLKerasClassifier
WITH
optimizer = "optimizer",
loss = "loss",
eval_metrics = "eval_metrics_fn",
num_classes = 10,
runtime.num_epochs = 2,
runtime.master_resource_request = "cpu=400m,memory=1024Mi",
runtime.master_resource_limit = "cpu=400m,memory=1024Mi",
runtime.worker_resource_request = "cpu=400m,memory=2048Mi",
runtime.worker_resource_limit = "cpu=1,memory=3072Mi",
runtime.num_minibatches_per_task = 10,
runtime.num_workers = 2
COLUMN
c1, c2, c3, c4
LABEL class
INTO trained_elasticdl_keras_classifier;
Implementation Details
Training Job
Steps:
- Based on
SELECT ... FROM ...
, read ODPS table and write it to RecordIO files, including both features and labels. These files will be stored in Kubernetes Persistent Volumes. In the future, we will support reading ODPS table directly without having to convert it to RecordIO files. Generate model definition file (e.g. cifar10_functional_api.py) that will be used in
TO TRAIN
clause, which includes:- In model definition function e.g.
custom_model()
, we need to configure model input and output shapes correctly ininputs = tf.keras.layers.Input(shape=<input_shape>)
(only when the model is defined usingtf.keras
functional APIs) andoutputs = tf.keras.layers.Dense(<num_classes>)
(based onCOLUMN ... LABEL ...
). For this MVP, users can provide<input_shape>
and<num_classes>
usingWITH
clause which will then get passed to the model constructorcustom_model(input_shape, num_classes)
via--params
argument in ElasticDL high-level API. In the future, this will be inferred from the ODPS table. - Pass additional parameters from
WITH
clause tocustom_model()
’s instantiation, such asoptimizer
andloss
. - Skip support for feature transformation functions such as
DENSE
orBUCKET
inCOLUMN
clause for now as this requires additional design details and discussions on the use of feature column APIs. - Pass column names, shapes, and types for features and labels to
dataset_fn
’s feature description that will be used intf.io.parse_single_example()
. For this MVP, column names can be obtained fromSELECT ... LABEL ...
. Each feature columns will be of shape[1]
and of typetf.float32
while label column is of shape[1]
and of typetf.int64
for classification problems andtf.float32
for regression problems. In the future, this will be inferred from the ODPS table. An exampledataset_fn()
looks like the following:
def dataset_fn(dataset, mode):
def _parse_data(record):
if mode == Mode.PREDICTION:
feature_description = {
"f1": tf.io.FixedLenFeature([1], tf.float32),
"f2": tf.io.FixedLenFeature([1], tf.float32),
}
else:
feature_description = {
"f1": tf.io.FixedLenFeature([1], tf.float32),
"f2": tf.io.FixedLenFeature([1], tf.float32),
"label": tf.io.FixedLenFeature([1], tf.int64),
}
r = tf.io.parse_single_example(record, feature_description)
features = {
"f1": tf.math.divide(tf.cast(r["f1"], tf.float32), 255.0),
"f2": tf.math.divide(tf.cast(r["f2"], tf.float32), 255.0)
}
if mode == Mode.PREDICTION:
return features
else:
return features, tf.cast(r["label"], tf.int32)
dataset = dataset.map(_parse_data)
if mode != Mode.PREDICTION:
dataset = dataset.shuffle(buffer_size=1024)
return dataset
- Pass
INTO
clause to--outputs
argument in ElasticDL high-level API.
- In model definition function e.g.
- Submit ElasticDL training job via a generated ElasticDL high-level API or CLI. Below is an example:
elasticdl train \
--image_base=elasticdl:ci \
--model_zoo=model_zoo \
--model_def=ElasticDLKerasClassifier \
--training_data=training_table_name \
--evaluation_data=evaluation_table_name \
--num_epochs=2 \
--master_resource_request="cpu=400m,memory=1024Mi" \
--master_resource_limit="cpu=1,memory=2048Mi" \
--worker_resource_request="cpu=400m,memory=2048Mi" \
--worker_resource_limit="cpu=1,memory=3072Mi" \
--minibatch_size=64 \
--num_minibatches_per_task=10 \
--num_workers=2 \
--checkpoint_steps=10 \
--evaluation_steps=15 \
--grads_to_wait=2 \
--job_name=test-mnist \
--log_level=INFO \
--image_pull_policy=Never \
--output=model_output
Prediction Job
This is similar to training except that prediction results will be written back to an ODPS table through PREDICT
clause. An additional PredictionOutputsProcessor
class will be generated in the model definition file for writing the prediction results to ODPS:
class PredictionOutputsProcessor(BasePredictionOutputsProcessor):
def __init__(self):
self.odps_writer = ODPSWriter(
os.environ[ODPSConfig.PROJECT_NAME],
os.environ[ODPSConfig.ACCESS_ID],
os.environ[ODPSConfig.ACCESS_KEY],
os.environ[ODPSConfig.ENDPOINT],
<prediction_results_table>,
columns=["f" + str(i) for i in range(<num_classes>)],
column_types=["double" for _ in range(<num_classes>)],
)
def process(self, predictions, worker_id):
self.odps_writer.from_iterator(...)
where an ODPSWriter
will be instantiated with necessary information on ODPS access and prediction output columns. <prediction_results_table>
above is inferred from PREDICT
clause and <num_classes>
is provided from WITH
clause.
USING
clause contains the name to the trained model to be used to make predictions.
Differentiate Run-time Configurations
We need to differentiate between the run-time configuration parameters (e.g. num_workers
, num_epochs
, etc.) and the model construction parameters (e.g. optimizer
, loss
, num_classes
, etc.). In this MVP, we can add different prefixes to different types of parameters, such as adding “runtime.” to run-time configuration parameters.