示例
完整的示例请参考sample。以下是简单的示例:
#!/usr/bin/env python
# coding:utf-8
from pypegasus.pgclient import Pegasus
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
@inlineCallbacks
def basic_test():
# init
c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602'], 'temp')
suc = yield c.init()
if not suc:
reactor.stop()
print('ERROR: connect pegasus server failed')
return
# set
try:
ret = yield c.set('hkey1', 'skey1', 'value', 0, 500)
print('set ret: ', ret)
except Exception as e:
print(e)
# get
ret = yield c.get('hkey1', 'skey1')
print('get ret: ', ret)
reactor.stop()
if __name__ == "__main__":
reactor.callWhenRunning(basic_test)
reactor.run()