扩展事件源
了解事件,首先要从 RQAlpha 的事件驱动说起。
RQAlpha 大部分的组件是以 add_listener
的方式进行事件的注册。举例来说:
- 当Bar数据生成,则会触发
EVENT.BAR
事件,那么用户的handle_bar
相关的代码注册了该事件则会立即执行。 - 当订单成交,则会触发
EVENT.TRADE
事件,那么系统的账户模块因为注册了该事件,就可以立即计算成交以后的收益和资金变化。 - 当订单下单,则会触发
EVENT.ORDER_PENDING_NEW
事件,前端风控模块注册了该事件,则可以立即对该订单进行审核,如果不满足风控要求,则直接指定执行order._cancel(some_reason)
来保证有问题的订单不会进入实际下单环节。程序化交易中很多需求,都可以通过注册事件的方式无缝插入到 RQAlpha 中进行扩展。
事件源分类
- SystemEvent: 系统事件源
- POST_SYSTEM_INIT: 系统初始化后触发
- POST_USER_INIT: 策略的
init
函数执行后触发 - POST_SYSTEM_RESTORED: 在实盘时,你可能需要在此事件后根据其他信息源对系统状态进行调整
- MarketEvent: 市场及数据事件源
- POST_UNIVERSE_CHANGED: 策略证券池发生变化后触发
- PRE_BEFORE_TRADING: 执行
before_trading
函数前触发 - BEFORE_TRADING: 该事件会触发策略的
before_trading
函数 - POST_BEFORE_TRADING: 执行
before_trading
函数后触发 - PRE_BAR: 执行
handle_bar
函数前触发 - BAR: 该事件会触发策略的
handle_bar
函数 - POST_BAR: 执行
handle_bar
函数后触发 - PRE_TICK: 执行
handle_tick
前触发 - TICK: 该事件会触发策略的
handle_tick
函数 - POST_TICK: 执行
handle_tick
后触发 - PRE_SCHEDULED: 在
scheduler
执行前触发 - POST_SCHEDULED: 在
scheduler
执行后触发 - PRE_AFTER_TRADING: 执行
after_trading
函数前触发 - AFTER_TRADING: 该事件会触发策略的
after_trading
函数 - POST_AFTER_TRADING: 执行
after_trading
函数后触发 - PRE_SETTLEMENT: 结算前触发该事件
- SETTLEMENT: 触发结算事件
- POST_SETTLEMENT: 结算后触发该事件
- OrderEvent: 交易事件源
- ORDER_PENDING_NEW: 创建订单
- ORDER_CREATION_PASS: 创建订单成功
- ORDER_CREATION_REJECT: 创建订单失败
- ORDER_PENDING_CANCEL: 创建撤单
- ORDER_CANCELLATION_PASS: 撤销订单成功
- ORDER_CANCELLATION_REJECT: 撤销订单失败
- ORDER_UNSOLICITED_UPDATE: 订单状态更新
- TRADE: 成交
事件源的订阅及使用
我们可以订阅需要的事件源,从而在该事件发生时实现指定需求。
下面以最简单的 Mod - ProgressMod 为例,介绍事件源的订阅和使用。
ProgressMod 需要实现的需求非常的简单:在命令行输出目前回测的进度条。首先定义一个 ProgressMod 类,继承与接口类 AbstractMod
- from rqalpha.interface import AbstractMod
- class ProgressMod(AbstractMod):
- def __init__(self):
- pass
- def start_up(self, env, mod_config):
- """
- RQAlpha 在系统启动时会调用此接口;在此接口中,可以通过调用 ``env`` 的相应方法来覆盖系统默认组件。
- :param env: 系统环境
- :type env: :class:`~Environment`
- :param mod_config: 模块配置参数
- """
- pass
- def tear_down(self, success, exception=None):
- """
- RQAlpha 在系统退出前会调用此接口。
- :param code: 退出代码
- :type code: rqalpha.const.EXIT_CODE
- :param exception: 如果在策略执行过程中出现错误,此对象为相应的异常对象
- """
- pass
我们将需求进行分拆:
- 在回测开始时初始化进度条
- 在回测每日交易结束后更新进度条
在回测结束后,终止进度条为了实现以上需求,我们需要注册两个事件:
EVENT.POST_SYSTEM_INIT
系统初始化后EVENT.POST_AFTER_TRADING
交易结束后进度条相关 我们使用click
库来实现,具体 API 这里不详细展开。
接下来,我们在 start_up
函数中进行事件注册,并定义 _init
和 _tick
函数来响应事件。
- from rqalpha.interface import AbstractMod
- class ProgressMod(AbstractMod):
- def __init__(self):
- self._env = None
- def start_up(self, env, mod_config):
- self._env = env
- env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
- env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)
- def tear_down(self, success, exception=None):
- pass
- def _init(self, event):
- pass
- def _tick(self, event):
- pass
在 _init
函数中,初始化 progressBar
,进度条的长度为回测的总时长
- def _init(self):
- trading_length = len(self._env.config.base.trading_calendar)
- self.progress_bar = click.progressbar(length=trading_length, show_eta=False)
在 _tick
函数中,更新进度条
- def _tick(self, event):
- self.progress_bar.update(1)
在 tear_down
函数中,终止进度条
- def tear_down(self, success, exception=None):
- self.progress_bar.render_finish()
至此,我们就完成了整个 ProgressMod 的编写
- import click
- from rqalpha.interface import AbstractMod
- from rqalpha.events import EVENT
- class ProgressMod(AbstractMod):
- def __init__(self):
- self._env = None
- self.progress_bar = None
- def start_up(self, env, mod_config):
- self._env = env
- env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
- env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)
- def _init(self, event):
- trading_length = len(self._env.config.base.trading_calendar)
- self.progress_bar = click.progressbar(length=trading_length, show_eta=False)
- def _tick(self, event):
- self.progress_bar.update(1)
- def tear_down(self, success, exception=None):
- self.progress_bar.render_finish()
最后,我们添加默认的载入函数 load_mod
,一个完整的进度条的Mod就完成了
- import click
- from rqalpha.interface import AbstractMod
- from rqalpha.events import EVENT
- class ProgressMod(AbstractMod):
- def __init__(self):
- self._env = None
- self.progress_bar = None
- def start_up(self, env, mod_config):
- self._env = env
- env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
- env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)
- def _init(self, event):
- trading_length = len(self._env.config.base.trading_calendar)
- self.progress_bar = click.progressbar(length=trading_length, show_eta=False)
- def _tick(self, event):
- self.progress_bar.update(1)
- def tear_down(self, success, exception=None):
- self.progress_bar.render_finish()
- def load_mod():
- return ProgressMod()
事件源的扩展
上一节讲的是如何订阅事件源,那么如何发布事件呢?其实也很简单,只需要通过 publish_event
就可以进行事件的发布。
RQAlpha 整个回测模块是通过 rqalpha_mod_sys_simulation
实现的,其中定义了基于Bar回测的 event_source
和 simulation_broker
, 其中包含了 MarketEvent 和 OrderEvent 大部分事件源的定义和发布。
我们简单来分析一下日线回测 simulation_event_source
中 MaketEvent 相关事件的触发流程。
- class SimulationEventSource(AbstractEventSource):
- ...
- def events(self, start_date, end_date, frequency):
- # 根据起始日期和结束日期,获取所有的交易日,然后再循环获取每一个交易日
- for day in self._env.data_proxy.get_trading_dates(start_date, end_date):
- date = day.to_pydatetime()
- dt_before_trading = date.replace(hour=0, minute=0)
- dt_bar = date.replace(hour=15, minute=0)
- dt_after_trading = date.replace(hour=15, minute=30)
- dt_settlement = date.replace(hour=17, minute=0)
- yield Event(EVENT.BEFORE_TRADING, calendar_dt=dt_before_trading, trading_dt=dt_before_trading)
- yield Event(EVENT.BAR, calendar_dt=dt_bar, trading_dt=dt_bar)
- yield Event(EVENT.AFTER_TRADING, calendar_dt=dt_after_trading, trading_dt=dt_after_trading)
- yield Event(EVENT.SETTLEMENT, calendar_dt=dt_settlement, trading_dt=dt_settlement)
event
函数是一个generator, 在 rqalphamod_sys_simulation 中主要返回 BEFORE_TRADING
, BAR
, AFTER_TRADING
和 SETTLEMENT
事件。RQAlpha 在接受到对应的事件后,会自动的进行相应的 _publish_event 操作,并且会自动 publish 相关的 PRE_ 和 _POST 事件。
而在 simulationbroker
中可以看到,当被调用 _cancel_order 时,会模拟撤单的执行流程,分别触发 ORDER_PENDING_CANCEL
&& ORDER_CANCELLATION_PASS
事件,并将 account
和 order
传递给回调函数,使其可以获取其可能需要到的数据。
- class SimulationBroker(AbstractBroker, Persistable):
- def cancel_order(self, order):
- account = self._get_account_for(order.order_book_id)
- self._env.event_bus.publish_event(Event(EVENT.ORDER_PENDING_CANCEL, account=account, order=order))
- order._mark_cancelled(_("{order_id} order has been cancelled by user.").format(order_id=order.order_id))
- self._env.event_bus.publish_event(Event(EVENT.ORDER_CANCELLATION_PASS, account=account, order=order))
- # account.on_order_cancellation_pass(order)
- try:
- self._open_orders.remove((account, order))
- except ValueError:
- try:
- self._delayed_orders.remove((account, order))
- except ValueError:
- pass
如果想查看详细的事件源相关的内容,建议直接阅读 rqalpha_mod_sys_simulation 源码,您会发现,扩展事件源比想象中要简单。
您也可以基于 rqalpha_mod_sys_simulation 扩展一个自定义的回测引擎,实现您特定的回测需求。