PSModel
PSModel is Angel’s core abstract class. It encapsulates details of context and client of remote parameter server (PS) and provides frequently-use interfaces for accessing and updating remote matrices and vectors, allows algorithm engineers to operate on distributed matrices and vectors on the PS as if operating on local objects. PSModel is a mutable model object that can be updated iteratively in essence.
Functionality
PSModel has three core classes: MatrixContext,MatrixClient,TaskContext, with which you can make any operation on remote PS possible.
For developing machine-learning algorithms on Angel, we recommend creating PSModel inside MLModel and working on top of it. PSModel has the following five major interfaces:
-
- getRow
- getRows
- get(func: GetFunc)
-
- increment
- update(func: UpdateFunc)
-
- syncClock
- clock
- flush
-
- setLoadPath
- setNeedSave
- setSavePath
-
- setAverage
- setHogwild
- setOplogType
- setRowType
With decent ways of initializing PSModel, setting PSModel’s behavioral properties and calling PSModel methods, algorithm engineers can operate on remote distributed model (martrix or vector). As the core abstract class, PSModel allows one to program machine-learning algorithms in distributed fashion without caring about low-level details.
Core Interfaces
0. constructor
constructor
- Definition:
def apply[K <: TVector](modelName: String, row: Int, col: Int, blockRow: Int = -1, blockCol: Int = -1)(implicit ctx:TaskContext)
- Parameters:
- modelName: String, model name
- row: Int, number of rows of matrix
- col: Int, number of columns of matrix
- blockRow: Int, number of rows of one block (matrix slice)
- blockCol: Int, number of columns of one block
- ctx: TaskContext, context of the PSModel Task
- a PSModel object needs to be bound to a task since PSModel runs on the worker; also to support the BSP and SSP protocols
- implicit conversion is used: as long as a ctx object exists in the container of PSModel, the ctx object will be automatically injected into the PSModel without explicit calls
- Definition:
1. pull-type
getRow
- Definition:
def getRow(rowId: Int): K
- Functionality: retrieve a specified row from the matrix. Under different sync protocols, this method has different procedures. Angel supports three sync protocols, namely, BSP,SSP and ASP
- under BSP and SSP, this method first checks whether the specified row is in local cache and whether its clock is current under the sync protocol in use, if false, the method requests the specified row from the PS; on the PS side, if the row’s clock is not current under the sync protocol (with its specific staleness criterion), the method waits until it becomes current
- under ASP, the method directly requests the specified row from the PS without checking clock
- Parameters:
- rowId: Int
- Return value: the specified row (vector)
- Definition:
getRows
- Definition:
def getRows(rowIndexes:Array[Int]): List[K]
- Functionality: retrieve specified rows from the matrix. This method works in similar fashion under different sync protocols as the
getRow
method - Parameters:
- rowIndexes: Array[Int]
- Return value: list of the specified row vectors; the list is ordered, consistent with the rowIndexes array
- Definition:
getRowsFlow
- Definition:
def getRowsFlow(rowIndex: RowIndex, batchNum: Int): GetRowsResult
- Functionality: retrieve specified rows from the matrix in a flow; the method returns immediately, allowing simultaneous computing and row retrieving; the method has similar procedures as in
getRow
under the BSP/SSP/ASP protocols - Parameters:
- rowIndex: RowIndex, set of the row indices
- batchNum: Int, number of rows that RPC requests every time, defining the granularity of the flow; if set to -1, system will decide the value
- Return value:一a row vector queue; top-layer applications can get the row vectors that are already retrieved from the queue
- Definition:
get
- Definition:
def get(func: GetFunc): GetResult
- Functionality: use
psf get
function to get matrix elements or their statistics. Different from getRow/getRows/getRowsFlow, this method only supports ASP - Parameters:
- func: GetFunc, get-type psf, where psf is an extension interface for Angel PS
- Return value: GetResult returned by
psf get
- Definition:
2. push-type
increment
- Definition:
def increment(delta: TVector)
- Functionality: incrementally update a row of the matrix. This method works under ASP; it caches the delta vector to local, and only directly pushes to the PS when executing
flush
orclock
- Parameters: delta: TVector, delta (update) vector, which has the same size as a row vector
- Return value: none
- Definition:
increment
- Definition:
def increment(deltas: List[TVector])
- Functionality: incrementally update some rows of the matrix. This method works under ASP; it caches the delta vectors to local, and only directly pushes to the PS when executing
flush
orclock
- Parameters: deltas: List[TVector], list of delta vectors, where each delta vector has the same size as a row vector
- Return value: none
- Definition:
update
- Definition:
def update(func: UpdaterFunc): Future[VoidResult]
- Functionality: use
psf update
function to update the parameter matrix. Unlike theincrement
method, this method directly pushes delta to PS - Parameters: func: GetFunc
psf update
function. For a detailed introduction to psf function, please refer to psFunc Developing Guide. Users can customize the psf update function. Angel also provides an update lib of commonly-used functions. Unlike theincrement
methods, this method immediately pushes the delta to the PS - Return value: Future[VoidResult], return value of the psf update function; the application can decide whether to wait for update
- Definition:
3. sync-type
syncClock
- Definition:
def syncClock():
- Functionality: simplified version of clock; encapsulates clock().get(). We recommend calling this method unless there is a specific requrement for waiting
- Parameters: none
- Definition:
clock
- Definition:
def clock(): Future[VoidResult]
- Functionality: merge all delta vectors in local cache (created by
increment
) and send to the PS, then update the matrix clock - Parameters: none
- Return value: Future[VoidResult],
clock
operation result; the application can choose whether to wait forclock
operation to complete
- Definition:
flush
- Definition:
def flush(): Future[VoidResult]
- Functionality: merge all delta vectors in local cache (created by
increment
) and send to the PS - Parameters: none
- Return value: Future[VoidResult],
flush
operation result; the application can choose whether to wait forflush
to complete
- Definition:
4. io-type
setLoadPath
- Definition:
def setLoadPath(path: String)
- Functionality: set path for loading matrix; this method is used for incremental learning or under the
predict
mode —- PS loads the matrix of parameters from file for initialization - Parameters: path: String, existing save path of the matrix parameters
- Return value: none
- Definition:
setSavePath
- Definition:
def setSavePath(path: String)
- Functionality: set path for saving matrix; under the
train
mode, when training is done, the matrix on PS needs to be saved in the file system - Parameters: path: String, existing save path
- Return value: none
- Definition:
5. behavior-type
setAverage
- Definition:
def setAverage(aver: Boolean)
- Functionality: set whether to divide the update parameters by the total number of tasks; this method is used in the
increment
method, but not affecting the update functions - Parameters: aver: Boolean, if set to true, divide the parameters by the total number of tasks
- Return value: none
- Definition:
setHogwild
- Definition:
def setHogwild(hogwild: Boolean)
- Functionality: set whether to use the hogwild mechanism to store and update the local matrix; when there are more than 1 worker tasks, hogwild can save memory; default is true
- Parameters: aver: Boolean, if set to true, use the hogwild mechanism
- Return value: none
- Definition:
setRowType
- Definition:
def setRowType(rowType: MLProtos.RowType)
- Functionality: set type and storage method for matrix’s row vector, can be based on the model’s characteristics such as its sparsity. Currently, Angel supports int, float and double row element, and sparse and dense models
Parameters:
rowType: MLProtos.RowType, currently support
- T_DOUBLE_SPARSE: sparse, double type
- T_DOUBLE_DENSE: dense, double type
- T_INT_SPARSE: sparse, int type
- T_INT_DENSE: dense, int type
- T_FLOAT_SPARSE: sparse, float type
- T_FLOAT_DENSE: dense, float type
T_INT_ARBITRARY: int type
Users can choose the most memory-efficient setting based on their own use cases.
- Return value: none
- Definition:
setOplogType
- Definition:
def setOplogType(oplogType: String)
- Functionality: set storage method for model delta; when using
increment
, Angel caches delta in local by creating a local matrix of the same size as the matrix to be updated - Parameters:
- oplogType: String, currently support
- DENSE_DOUBE: use a dense, double-type matrix for storing the model delta, used when the model matrix to be updated is double-type
- DENSE_INT: use a dense, int-type matrix for storing the model delta, used when the model matrix is int-type
- LIL_INT: use a sparse, int-type matrix for storing the model delta, used when the model matrix is int-type, and delta is sparse
- DENSE_FLOAT: use a dense, float-type matrix for storing the model delta, used when the model matrix is float-type
- oplogType: String, currently support
- Return value: none
- Definition:
- setAttribute
- Definition:
def setAttribute(key: String, value: String)
- Functionality: Angel’s self-defined extension for matrix parameters
- Parameters:
- key: String, parameter name
- value: String, parameter value
- Return value: none
- Definition: