Python SDK for Portable Plugin
By using Python SDK for portable plugins, user can develop portable plugins with python language. The Python SDK provides APIs for the source, sink and function interfaces. Additionally, it provides a plugin start function as the execution entry point to define the plugin and its symbols.
To run python plugin, there are two prerequisites in the runtime environment:
- Install Python 3.x environment.
- Install nng and ekuiper package by
pip install nng ekuiper
.
By default, the eKuiper portable plugin runtime will run python script with python userscript.py
. If users have multiple python instance or an alternative python executable command, they can specify the python command in the configuration file.
Development
The process is the same: develop the symbols and then develop the main program. Python SDK provides the similar source, sink and function interfaces in python language.
Source interface:
class Source(object):
"""abstract class for eKuiper source plugin"""
@abstractmethod
def configure(self, datasource: str, conf: dict):
"""configure with the string datasource and conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""run continuously and send out the data or error with ctx"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Sink interface:
class Sink(object):
"""abstract class for eKuiper sink plugin"""
@abstractmethod
def configure(self, conf: dict):
"""configure with conf map and raise error if any"""
pass
@abstractmethod
def open(self, ctx: Context):
"""open connection and wait to receive data"""
pass
@abstractmethod
def collect(self, ctx: Context, data: Any):
"""callback to deal with received data"""
pass
@abstractmethod
def close(self, ctx: Context):
"""stop running and clean up"""
pass
Function interface:
class Function(object):
"""abstract class for eKuiper function plugin"""
@abstractmethod
def validate(self, args: List[Any]):
"""callback to validate against ast args, return a string error or empty string"""
pass
@abstractmethod
def exec(self, args: List[Any], ctx: Context) -> Any:
"""callback to do execution, return result"""
pass
@abstractmethod
def is_aggregate(self):
"""callback to check if function is for aggregation, return bool"""
pass
Users need to create their own source, sink and function by implement these abstract classes. Then create the main program and declare the instantiation functions for these extensions like below:
if __name__ == '__main__':
c = PluginConfig("pysam", {"pyjson": lambda: PyJson()}, {"print": lambda: PrintSink()},
{"revert": lambda: revertIns})
plugin.start(c)
For the full example, please check the python sdk example.
Package
As python is an interpretive language, we don’t need to build an executable for it. Just specify the main program python file in the plugin json file is ok. For detail, please check packaing.
Deployment requirements
Running python script requires the python environment. Make sure python 3.x are installed in the target environment. If using docker image, we recommend to use tags like lfedge/ekuiper:<tag>-slim-python
which have both eKuiper and python environment.