替换已有数据源

基础数据

通过 $ rqalpha update-bundle 下载的数据有如下文件:

  1. $ cd ~/.rqalpha/bundle & tree -A -d -L 1
  2.  
  3. .
  4. ├── adjusted_dividends.bcolz
  5. ├── funds.bcolz
  6. ├── futures.bcolz
  7. ├── indexes.bcolz
  8. ├── original_dividends.bcolz
  9. ├── st_stock_days.bcolz
  10. ├── stocks.bcolz
  11. ├── suspended_days.bcolz
  12. ├── trading_dates.bcolz
  13. └── yield_curve.bcolz

目前基础数据,比如 Instruments, st_stocks, suspended_days, trading_dates 都是全量数据,并且可以通过 $ rqalpha update-bundle 每天更新,因此没有相应的显式接口可以对其进行替换。

您如果想要替换,可以使用如下两种方式:

  • 写脚本将自有数据源按照相同的格式生成对应的文件,并进行文件替换。
  • 实现 AbstractDataSource 对应的接口,您可以继承 BaseDataSource 并 override 对应的接口即可完成替换。

行情数据 - 五十行代码接入 tushare 行情数据

RQAlpha 支持自定义扩展数据源。得益于 RQAlpha 的 mod 机制,我们可以很方便的替换或者扩展默认的数据接口。

RQAlpha 将提供给用户的数据 API 和回测所需的基础数据抽象成了若干个函数,这些函数被封于 DataSource 类中,并将在需要的时候被调用。简单的说,我们只需要在自己定义的 mod 中扩展或重写默认的 DataSource 类,就可以替换掉默认的数据源,接入自有数据。

DataSource 类的完整文档,请参阅 基本概念。下面将用一个简单的例子,为大家介绍如何用五十行左右的代码将默认的行情数据替换为 TuShare 的行情数据。

TushareKDataMod 的作用是使用 tushare 提供的k线数据替换 data_bundle 中的行情数据,由于目前 tushare 仅仅开放了日线、周线和月线的历史数据,所以该 mod 仍然只能提供日回测的功能,若未来 tushare 开放了60分钟或5分钟线的历史数据,只需进行简单修改,便可通过该 mod 使 RQAlpha 实现5分钟回测。

开工前,首先熟悉一下用到的 tushare 的k线接口,接口如下:

  1. get_k_data(code, ktype='D', autype='qfq', index=False, start=None, end=None)

如上文所说,我们要做的主要就是扩展或重写默认的 DataSource 类。在此处,我们选择建立一个新的 DataSource 类,该类继承于默认的 BaseDataSource 类。

这样做的好处在于我们不必重写 DataSource 需要实现的所有函数,而可以只实现与我们想替换的数据源相关的函数,其他数据的获取直接甩锅给父类 BaseDataSource

与行情数据密切相关的主要有以下三个函数:

  • current_snapshot(instrument, frequency, dt)
  • get_bar(instrument, dt, frequency)
  • history_bars(instrument, bar_count, frequency, fields, dt, skip_suspended=True)
  • available_data_range(frequency)经过查看 DataProxy 类的源代码,可以发现,提供日级别数据的 DataSource 类不需要实现 current_snapshot 函数,所以我们只关注后三个函数的实现。

get_barhistory_bars 函数实现的主要功能都是传入 instrument 对象,从 tushare 获取指定时间或时间段的 bar 数据。我们把这一过程抽象为一个函数:

  1. class TushareKDataSource(BaseDataSource):
  2.  
  3. ...
  4.  
  5. @staticmethod
  6. def get_tushare_k_data(instrument, start_dt, end_dt):
  7.  
  8. # 首先获取 order_book_id 并将其转换为 tushare 所能识别的 code
  9. order_book_id = instrument.order_book_id
  10. code = order_book_id.split(".")[0]
  11.  
  12. # tushare 行情数据目前仅支持股票和指数,并通过 index 参数进行区分
  13. if instrument.type == 'CS':
  14. index = False
  15. elif instrument.type == 'INDX':
  16. index = True
  17. else:
  18. return None
  19.  
  20. # 调用 tushare 函数,注意 datetime 需要转为指定格式的 str
  21. return ts.get_k_data(code, index=index, start=start_dt.strftime('%Y-%m-%d'), end=end_dt.strftime('%Y-%m-%d'))

现在实现 get_bar 函数:

  1. class TushareKDataSource(BaseDataSource):
  2.  
  3. ...
  4.  
  5. def get_bar(self, instrument, dt, frequency):
  6.  
  7. # tushare k线数据暂时只能支持日级别的回测,其他情况甩锅给默认数据源
  8. if frequency != '1d':
  9. return super(TushareKDataSource, self).get_bar(instrument, dt, frequency)
  10.  
  11. # 调用上边写好的函数获取k线数据
  12. bar_data = self.get_tushare_k_data(instrument, dt, dt)
  13.  
  14. # 遇到获取不到数据的情况,同样甩锅;若有返回值,注意转换格式。
  15. if bar_data is None or bar_data.empty:
  16. return super(TushareKDataSource, self).get_bar(instrument, dt, frequency)
  17. else:
  18. return bar_data.iloc[0].to_dict()

然后是硬骨头 history_bars 函数:

  1. class TushareKDataSource(BaseDataSource):
  2.  
  3. ...
  4.  
  5. def history_bars(self, instrument, bar_count, frequency, fields, dt, skip_suspended=True):
  6. # tushare 的k线数据未对停牌日期做补齐,所以遇到不跳过停牌日期的情况我们先甩锅。有兴趣的开发者欢迎提交代码补齐停牌日数据。
  7. if frequency != '1d' or not skip_suspended:
  8. return super(TushareKDataSource, self).history_bars(instrument, bar_count, frequency, fields, dt, skip_suspended)
  9.  
  10. # 参数只提供了截止日期和天数,我们需要自己找到开始日期
  11. # 获取交易日列表,并拿到截止日期在列表中的索引,之后再算出开始日期的索引
  12. start_dt_loc = self.get_trading_calendar().get_loc(dt.replace(hour=0, minute=0, second=0, microsecond=0)) - bar_count + 1
  13. # 根据索引拿到开始日期
  14. start_dt = self.get_trading_calendar()[start_dt_loc]
  15.  
  16. # 调用上边写好的函数获取k线数据
  17. bar_data = self.get_tushare_k_data(instrument, start_dt, dt)
  18.  
  19. if bar_data is None or bar_data.empty:
  20. return super(TushareKDataSource, self).get_bar(instrument, dt, frequency)
  21. else:
  22. # 注意传入的 fields 参数可能会有不同的数据类型
  23. if isinstance(fields, six.string_types):
  24. fields = [fields]
  25. fields = [field for field in fields if field in bar_data.columns]
  26.  
  27. # 这样转换格式会导致返回值的格式与默认 DataSource 中该方法的返回值格式略有不同。欢迎有兴趣的开发者提交代码进行修改。
  28. return bar_data[fields].as_matrix()

最后是 available_data_range 函数

  1. class TushareKDataSource(BaseDataSource):
  2.  
  3. ...
  4.  
  5. def available_data_range(self, frequency):
  6. return date(2005, 1, 1), date.today() - relativedelta(days=1)

把以上几个函数组合起来,并加入构造函数,就完成了我们重写的 DataSource 类。完整代码如下:

  1. import six
  2. import tushare as ts
  3. from datetime import date
  4. from dateutil.relativedelta import relativedelta
  5. from rqalpha.data.base_data_source import BaseDataSource
  6.  
  7.  
  8. class TushareKDataSource(BaseDataSource):
  9. def __init__(self, path):
  10. super(TushareKDataSource, self).__init__(path)
  11.  
  12. @staticmethod
  13. def get_tushare_k_data(instrument, start_dt, end_dt):
  14. order_book_id = instrument.order_book_id
  15. code = order_book_id.split(".")[0]
  16.  
  17. if instrument.type == 'CS':
  18. index = False
  19. elif instrument.type == 'INDX':
  20. index = True
  21. else:
  22. return None
  23.  
  24. return ts.get_k_data(code, index=index, start=start_dt.strftime('%Y-%m-%d'), end=end_dt.strftime('%Y-%m-%d'))
  25.  
  26. def get_bar(self, instrument, dt, frequency):
  27. if frequency != '1d':
  28. return super(TushareKDataSource, self).get_bar(instrument, dt, frequency)
  29.  
  30. bar_data = self.get_tushare_k_data(instrument, dt, dt)
  31.  
  32. if bar_data is None or bar_data.empty:
  33. return super(TushareKDataSource, self).get_bar(instrument, dt, frequency)
  34. else:
  35. return bar_data.iloc[0].to_dict()
  36.  
  37. def history_bars(self, instrument, bar_count, frequency, fields, dt, skip_suspended=True):
  38. if frequency != '1d' or not skip_suspended:
  39. return super(TushareKDataSource, self).history_bars(instrument, bar_count, frequency, fields, dt, skip_suspended)
  40.  
  41. start_dt_loc = self.get_trading_calendar().get_loc(dt.replace(hour=0, minute=0, second=0, microsecond=0)) - bar_count + 1
  42. start_dt = self.get_trading_calendar()[start_dt_loc]
  43.  
  44. bar_data = self.get_tushare_k_data(instrument, start_dt, dt)
  45.  
  46. if bar_data is None or bar_data.empty:
  47. return super(TushareKDataSource, self).get_bar(instrument, dt, frequency)
  48. else:
  49. if isinstance(fields, six.string_types):
  50. fields = [fields]
  51. fields = [field for field in fields if field in bar_data.columns]
  52.  
  53. return bar_data[fields].as_matrix()
  54.  
  55. def available_data_range(self, frequency):
  56. return date(2005, 1, 1), date.today() - relativedelta(days=1)

到目前为止,我们的主要工作已经完成了。想要将我们刚刚写好的 DataSource 类投入使用,还需要将其放入一个 mod 来被 RQAlpha 加载。

mod 的实现如下:

  1. from rqalpha.interface import AbstractMod
  2.  
  3. from .data_source import TushareKDataSource
  4.  
  5.  
  6. class TushareKDataMode(AbstractMod):
  7. def __init__(self):
  8. pass
  9.  
  10. def start_up(self, env, mod_config):
  11. # 设置 data_source 为 TushareKDataSource 类的对象
  12. env.set_data_source(TushareKDataSource(env.config.base.data_bundle_path))
  13.  
  14. def tear_down(self, code, exception=None):
  15. pass

最后的最后,添加 load_mod 函数,该函数将被 RQAlpha 调用以加载我们刚刚写好的 mod 。

  1. from .mod import TushareKDataMode
  2.  
  3.  
  4. def load_mod():
  5. return TushareKDataMode()

至此,我们已经完成了外部行情数据的接入,剩下要做的就是在 RQAlpha 启动时传入的配置信息中开启以上 mod。

该 mod 只是一个简单的 demo,仍存在一些问题,例如调用 tushare 接口速度较慢,频繁调用会消耗大量时间。如能将多次调用合并,或是将接口的调用改为异步,相信能够大幅提升回测速度。