策略开发
多合约组合策略模板提供完整的信号生成和委托管理功能,用户可以基于该模板自行开发策略。新策略可以放在用户运行的文件内(推荐),如在c:\users\administrator.vntrader目录下创建strategies文件夹;可以放在根目录下vnpy\app\portfolio_strategy\strategies文件夹内。 注意:策略文件命名是以下划线模式,如pair_trading_strategy.py;而策略类命名采用的是驼峰式,如PairTradingStrategy。
下面通过PairTradingStrategy策略示例,来展示策略开发的具体步骤:
参数设置
定义策略参数并且初始化策略变量。策略参数为策略类的公有属性,用户可以通过创建新的实例来调用或者改变策略参数。
如针对rb和hc品种,用户可以创建基于PairTradingStrategy的策略示例,如rb_hc_PairTradingStrategy,boll_window可以由20改成30。
创建策略实例的方法有效地实现了一个策略跑多个品种,并且其策略参数可以通过品种的特征进行调整。
price_add = 5
boll_window = 20
boll_dev = 2
fixed_size = 1
leg1_ratio = 1
leg2_ratio = 1
leg1_symbol = ""
leg2_symbol = ""
current_spread = 0.0
boll_mid = 0.0
boll_down = 0.0
boll_up = 0.0
类的初始化
初始化:
通过super( )的方法继承StrategyTemplate,在__init__( )函数传入Strategy引擎、策略名称、vt_symbols、参数设置。
调用K线生成模块:通过时间切片来把Tick数据合成1分钟K线数据。
如果需要,可以调用K线时间序列管理模块:基于K线数据,来生成相应的技术指标。
def __init__(
self,
strategy_engine: StrategyEngine,
strategy_name: str,
vt_symbols: List[str],
setting: dict
):
""""""
super().__init__(strategy_engine, strategy_name, vt_symbols, setting)
self.bgs: Dict[str, BarGenerator] = {}
self.targets: Dict[str, int] = {}
self.last_tick_time: datetime = None
self.spread_count: int = 0
self.spread_data: np.array = np.zeros(100)
# Obtain contract info
self.leg1_symbol, self.leg2_symbol = vt_symbols
def on_bar(bar: BarData):
""""""
pass
for vt_symbol in self.vt_symbols:
self.targets[vt_symbol] = 0
self.bgs[vt_symbol] = BarGenerator(on_bar)
策略的初始化、启动、停止
通过“多合约组合策略”组件的相关功能按钮实现。
注意:函数load_bar(1),代表策略初始化需要载入1个交易日的历史数据。该历史数据只能是K线数据(on_tick只有实盘中会调用,回测不支持)。在策略初始化时候,会调用K线时间序列管理器计算并缓存相关的计算指标,但是并不触发交易。
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bars(1)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
Tick数据回报
策略订阅某品种合约行情,交易所会推送Tick数据到该策略上。
由于是基于1分钟K线来生成交易信号的,故收到Tick数据后,需要用到K线生成模块里面的update_tick函数,通过时间切片的方法,聚合成1分钟K线数据,并且推送到on_bars函数。
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
if (
self.last_tick_time
and self.last_tick_time.minute != tick.datetime.minute
):
bars = {}
for vt_symbol, bg in self.bgs.items():
bars[vt_symbol] = bg.generate()
self.on_bars(bars)
bg: BarGenerator = self.bgs[tick.vt_symbol]
bg.update_tick(tick)
self.last_tick_time = tick.datetime
K线数据回报
在回调函数on_bars中,同时收到所有合约的K线行情推送,并实现核心交易逻辑。调用buy/sell/short/cover/cancel_order等函数来发送交易请求。
信号的生成,由3部分组成:
清空未成交委托:为了防止之前下的单子在上一个5分钟没有成交,但是下一个5分钟可能已经调整了价格,就用cancel_all()方法立刻撤销之前未成交的所有委托,保证策略在当前这5分钟开始时的整个状态是清晰和唯一的。
时间过滤:这里有用(bar.datetime.minute+1) % 5的逻辑来达到每5分钟推送一次的逻辑。
指标计算:基于最新的5分钟K线数据来计算相应计算指标,如价差的布林带通道上下轨。
信号计算:通过持仓的判断以及结合布林带通道上下轨,在通道突破点以及离场点设置目标仓位。
仓位管理:基于目标仓位和现在持仓的差别来计算本次下单的数量并挂出委托(buy/sell),同时设置离场点(short/cover)。
def on_bars(self, bars: Dict[str, BarData]):
""""""
self.cancel_all()
# Return if one leg data is missing
if self.leg1_symbol not in bars or self.leg2_symbol not in bars:
return
# Calculate current spread
leg1_bar = bars[self.leg1_symbol]
leg2_bar = bars[self.leg2_symbol]
# Filter time only run every 5 minutes
if (leg1_bar.datetime.minute + 1) % 5:
return
self.current_spread = (
leg1_bar.close_price * self.leg1_ratio - leg2_bar.close_price * self.leg2_ratio
)
# Update to spread array
self.spread_data[:-1] = self.spread_data[1:]
self.spread_data[-1] = self.current_spread
self.spread_count += 1
if self.spread_count <= self.boll_window:
return
# Calculate boll value
buf: np.array = self.spread_data[-self.boll_window:]
std = buf.std()
self.boll_mid = buf.mean()
self.boll_up = self.boll_mid + self.boll_dev * std
self.boll_down = self.boll_mid - self.boll_dev * std
# Calculate new target position
leg1_pos = self.get_pos(self.leg1_symbol)
if not leg1_pos:
if self.current_spread >= self.boll_up:
self.targets[self.leg1_symbol] = -1
self.targets[self.leg2_symbol] = 1
elif self.current_spread <= self.boll_down:
self.targets[self.leg1_symbol] = 1
self.targets[self.leg2_symbol] = -1
elif leg1_pos > 0:
if self.current_spread >= self.boll_mid:
self.targets[self.leg1_symbol] = 0
self.targets[self.leg2_symbol] = 0
else:
if self.current_spread <= self.boll_mid:
self.targets[self.leg1_symbol] = 0
self.targets[self.leg2_symbol] = 0
# Execute orders
for vt_symbol in self.vt_symbols:
target_pos = self.targets[vt_symbol]
current_pos = self.get_pos(vt_symbol)
pos_diff = target_pos - current_pos
volume = abs(pos_diff)
bar = bars[vt_symbol]
if pos_diff > 0:
price = bar.close_price + self.price_add
if current_pos < 0:
self.cover(vt_symbol, price, volume)
else:
self.buy(vt_symbol, price, volume)
elif pos_diff < 0:
price = bar.close_price - self.price_add
if current_pos > 0:
self.sell(vt_symbol, price, volume)
else:
self.short(vt_symbol, price, volume)
self.put_event()
委托回报、成交回报、停止单回报
在于组合策略中需要对多合约同时下单交易,在回测时无法判断某一段K线内部每个合约委托成交的先后时间顺序,因此无法提供on_order和on_trade获取委托成交推送,而只能在每次on_bars回调时通过get_pos和get_order来进行相关的状态查询。
同时组合策略模块只支持限价单交易,不提供停止单功能(StopOrder)。
def send_order(
self,
vt_symbol: str,
direction: Direction,
offset: Offset,
price: float,
volume: float,
lock: bool = False
) -> List[str]:
"""
Send a new order.
"""
if self.trading:
vt_orderids = self.strategy_engine.send_order(
self, vt_symbol, direction, offset, price, volume, lock
)
return vt_orderids
else:
return []