回测研究
backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以及回测引擎应用示例:
加载策略
把CTA策略逻辑,对应合约品种,以及参数设置(可在策略文件外修改)载入到回测引擎中。
- def add_strategy(self, strategy_class: type, setting: dict):
- """"""
- self.strategy_class = strategy_class
- self.strategy = strategy_class(
- self, strategy_class.__name__, self.vt_symbol, setting
- )
载入历史数据
负责载入对应品种的历史数据,大概有4个步骤:
- 根据数据类型不同,分成K线模式和Tick模式;
- 通过select().where()方法,有条件地从数据库中选取数据,其筛选标准包括:vt_symbol、 回测开始日期、回测结束日期、K线周期(K线模式下);
- order_by(DbBarData.datetime)表示需要按照时间顺序载入数据;
- 载入数据是以迭代方式进行的,数据最终存入self.history_data。
- def load_data(self):
- """"""
- self.output("开始加载历史数据")
- if self.mode == BacktestingMode.BAR:
- s = (
- DbBarData.select()
- .where(
- (DbBarData.vt_symbol == self.vt_symbol)
- & (DbBarData.interval == self.interval)
- & (DbBarData.datetime >= self.start)
- & (DbBarData.datetime <= self.end)
- )
- .order_by(DbBarData.datetime)
- )
- self.history_data = [db_bar.to_bar() for db_bar in s]
- else:
- s = (
- DbTickData.select()
- .where(
- (DbTickData.vt_symbol == self.vt_symbol)
- & (DbTickData.datetime >= self.start)
- & (DbTickData.datetime <= self.end)
- )
- .order_by(DbTickData.datetime)
- )
- self.history_data = [db_tick.to_tick() for db_tick in s]
- self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")
撮合成交
载入CTA策略以及相关历史数据后,策略会根据最新的数据来计算相关指标。若符合条件会生成交易信号,发出具体委托(buy/sell/short/cover),并且在下一根K线成交。
根据委托类型的不同,回测引擎提供2种撮合成交机制来尽量模仿真实交易环节:
- 限价单撮合成交:(以买入方向为例)先确定是否发生成交,成交标准为委托价>= 下一根K线的最低价;然后确定成交价格,成交价格为委托价与下一根K线开盘价的最小值。
- 停止单撮合成交:(以买入方向为例)先确定是否发生成交,成交标准为委托价<= 下一根K线的最高价;然后确定成交价格,成交价格为委托价与下一根K线开盘价的最大值。
下面展示在引擎中限价单撮合成交的流程:
- 确定会撮合成交的价格;
- 遍历限价单字典中的所有限价单,推送委托进入未成交队列的更新状态;
- 判断成交状态,若出现成交,推送成交数据和委托数据;
- 从字典中删除已成交的限价单。
- def cross_limit_order(self):
- """
- Cross limit order with last bar/tick data.
- """
- if self.mode == BacktestingMode.BAR:
- long_cross_price = self.bar.low_price
- short_cross_price = self.bar.high_price
- long_best_price = self.bar.open_price
- short_best_price = self.bar.open_price
- else:
- long_cross_price = self.tick.ask_price_1
- short_cross_price = self.tick.bid_price_1
- long_best_price = long_cross_price
- short_best_price = short_cross_price
- for order in list(self.active_limit_orders.values()):
- # Push order update with status "not traded" (pending)
- if order.status == Status.SUBMITTING:
- order.status = Status.NOTTRADED
- self.strategy.on_order(order)
- # Check whether limit orders can be filled.
- long_cross = (
- order.direction == Direction.LONG
- and order.price >= long_cross_price
- and long_cross_price > 0
- )
- short_cross = (
- order.direction == Direction.SHORT
- and order.price <= short_cross_price
- and short_cross_price > 0
- )
- if not long_cross and not short_cross:
- continue
- # Push order udpate with status "all traded" (filled).
- order.traded = order.volume
- order.status = Status.ALLTRADED
- self.strategy.on_order(order)
- self.active_limit_orders.pop(order.vt_orderid)
- # Push trade update
- self.trade_count += 1
- if long_cross:
- trade_price = min(order.price, long_best_price)
- pos_change = order.volume
- else:
- trade_price = max(order.price, short_best_price)
- pos_change = -order.volume
- trade = TradeData(
- symbol=order.symbol,
- exchange=order.exchange,
- orderid=order.orderid,
- tradeid=str(self.trade_count),
- direction=order.direction,
- offset=order.offset,
- price=trade_price,
- volume=order.volume,
- time=self.datetime.strftime("%H:%M:%S"),
- gateway_name=self.gateway_name,
- )
- trade.datetime = self.datetime
- self.strategy.pos += pos_change
- self.strategy.on_trade(trade)
- self.trades[trade.vt_tradeid] = trade
计算策略盈亏情况
基于收盘价、当日持仓量、合约规模、滑点、手续费率等计算总盈亏与净盈亏,并且其计算结果以DataFrame格式输出,完成基于逐日盯市盈亏统计。
下面展示盈亏情况的计算过程
- 浮动盈亏 = 持仓量 (当日收盘价 - 昨日收盘价) 合约规模
- 实际盈亏 = 持仓变化量 (当时收盘价 - 开仓成交价) 合约规模
- 总盈亏 = 浮动盈亏 + 实际盈亏
- 净盈亏 = 总盈亏 - 总手续费 - 总滑点
- def calculate_pnl(
- self,
- pre_close: float,
- start_pos: float,
- size: int,
- rate: float,
- slippage: float,
- ):
- """"""
- self.pre_close = pre_close
- # Holding pnl is the pnl from holding position at day start
- self.start_pos = start_pos
- self.end_pos = start_pos
- self.holding_pnl = self.start_pos * \
- (self.close_price - self.pre_close) * size
- # Trading pnl is the pnl from new trade during the day
- self.trade_count = len(self.trades)
- for trade in self.trades:
- if trade.direction == Direction.LONG:
- pos_change = trade.volume
- else:
- pos_change = -trade.volume
- turnover = trade.price * trade.volume * size
- self.trading_pnl += pos_change * \
- (self.close_price - trade.price) * size
- self.end_pos += pos_change
- self.turnover += turnover
- self.commission += turnover * rate
- self.slippage += trade.volume * size * slippage
- # Net pnl takes account of commission and slippage cost
- self.total_pnl = self.trading_pnl + self.holding_pnl
- self.net_pnl = self.total_pnl - self.commission - self.slippage
计算策略统计指标
calculate_statistics函数是基于逐日盯市盈亏情况(DateFrame格式)来计算衍生指标,如最大回撤、年化收益、盈亏比、夏普比率等。
- df["balance"] = df["net_pnl"].cumsum() + self.capital
- df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0)
- df["highlevel"] = (
- df["balance"].rolling(
- min_periods=1, window=len(df), center=False).max()
- )
- df["drawdown"] = df["balance"] - df["highlevel"]
- df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100
- # Calculate statistics value
- start_date = df.index[0]
- end_date = df.index[-1]
- total_days = len(df)
- profit_days = len(df[df["net_pnl"] > 0])
- loss_days = len(df[df["net_pnl"] < 0])
- end_balance = df["balance"].iloc[-1]
- max_drawdown = df["drawdown"].min()
- max_ddpercent = df["ddpercent"].min()
- total_net_pnl = df["net_pnl"].sum()
- daily_net_pnl = total_net_pnl / total_days
- total_commission = df["commission"].sum()
- daily_commission = total_commission / total_days
- total_slippage = df["slippage"].sum()
- daily_slippage = total_slippage / total_days
- total_turnover = df["turnover"].sum()
- daily_turnover = total_turnover / total_days
- total_trade_count = df["trade_count"].sum()
- daily_trade_count = total_trade_count / total_days
- total_return = (end_balance / self.capital - 1) * 100
- annual_return = total_return / total_days * 240
- daily_return = df["return"].mean() * 100
- return_std = df["return"].std() * 100
- if return_std:
- sharpe_ratio = daily_return / return_std * np.sqrt(240)
- else:
- sharpe_ratio = 0
统计指标绘图
通过matplotlib绘制4幅图:
- 资金曲线图
- 资金回撤图
- 每日盈亏图
- 每日盈亏分布图
- def show_chart(self, df: DataFrame = None):
- """"""
- if not df:
- df = self.daily_df
- if df is None:
- return
- plt.figure(figsize=(10, 16))
- balance_plot = plt.subplot(4, 1, 1)
- balance_plot.set_title("Balance")
- df["balance"].plot(legend=True)
- drawdown_plot = plt.subplot(4, 1, 2)
- drawdown_plot.set_title("Drawdown")
- drawdown_plot.fill_between(range(len(df)), df["drawdown"].values)
- pnl_plot = plt.subplot(4, 1, 3)
- pnl_plot.set_title("Daily Pnl")
- df["net_pnl"].plot(kind="bar", legend=False, grid=False, xticks=[])
- distribution_plot = plt.subplot(4, 1, 4)
- distribution_plot.set_title("Daily Pnl Distribution")
- df["net_pnl"].hist(bins=50)
- plt.show()
回测引擎使用示例
- 导入回测引擎和CTA策略
- 设置回测相关参数,如:品种、K线周期、回测开始和结束日期、手续费、滑点、合约规模、起始资金
- 载入策略和数据到引擎中,运行回测。
- 计算基于逐日统计盈利情况,计算统计指标,统计指标绘图。
- from vnpy.app.cta_strategy.backtesting import BacktestingEngine
- from vnpy.app.cta_strategy.strategies.boll_channel_strategy import (
- BollChannelStrategy,
- )
- from datetime import datetime
- engine = BacktestingEngine()
- engine.set_parameters(
- vt_symbol="IF88.CFFEX",
- interval="1m",
- start=datetime(2018, 1, 1),
- end=datetime(2019, 1, 1),
- rate=3.0/10000,
- slippage=0.2,
- size=300,
- pricetick=0.2,
- capital=1_000_000,
- )
- engine.add_strategy(AtrRsiStrategy, {})
- engine.load_data()
- engine.run_backtesting()
- df = engine.calculate_result()
- engine.calculate_statistics()
- engine.show_chart()