依旧叠甲:本文基于前面三篇文章高频因子的获取和预测算法的基础上,这次我们要谈的是高频交易实践策略,对于策略表现大家请不要太过于在意,因为今天我们的重点是如何搭建一个基于盘口高频因子的交易策略。一起来看看我们是如何构建这个高频策略框架的吧!
引言
随着金融市场的不断发展和技术的日益成熟,高频交易作为一种快速反应市场波动并在短时间内获取利润的交易策略,受到了越来越多投资者的关注。本文将介绍一种基于行情数据的高频交易算法实现,通过盘口数据数据,特征挖掘,方向判断和具体操作的执行,演示了一个简单但具有代表性的实时交易策略,展示了其逻辑和实现细节。
策略逻辑解析
本文所介绍的高频交易策略基于行情数据,通过对交易所获取的实时数据进行分析和处理,以期望在极短的时间内获取利润。该策略主要包含以下几个核心部分:
数据处理:首先,从交易所获取实时的行情数据,包括买卖价格、成交量、持仓量等信息。然后,对这些数据进行处理,提取相关特征,如买卖价差、订单不平衡、交易压力指标等。
模型计算:利用预先设定好的模型参数(根据线性回归模拟计算出来的权重),对处理后的特征数据进行计算,得出一个数值作为交易信号。这个数值反映了当前市场情况下的预期盈利概率。
交易执行:根据计算出的交易信号,执行相应的交易操作。如果信号为正数,则执行买入操作;如果信号为负数,则执行卖出操作。
盈利统计:对每次交易的盈亏情况进行记录和统计。统计结果可以反映策略的盈利能力和稳定性,为进一步优化和调整提供参考。
策略实现
上述逻辑的实现主要通过与交易所的接口进行数据交互,实时获取行情数据,并根据预先设定的策略逻辑进行交易操作。整个过程实现了对市场变化的快速响应和交易决策的自动化执行。
前面的信号计算我们已经在前面三篇文章中有了详细的介绍。这里重点我们来了解一下基于事件的返回机制。
exchange.IO("wait", Timeout)
,当前交易所有任何品种更新行情信息或订单成交时才返回,可带第二个参数(毫秒数)指定超时,超时返回空值,正常返回EventTick/OrderEvent
结构,结合exchange.IO("mode", 0)
函数使用,这样配合使用就可以使程序在有最新行情时进行响应,执行程序逻辑(使用exchange.IO("mode", 0
)并不影响exchange.IO("wait")
,目的是为了在程序中使用exchange.GetTicker()
等函数调用时不阻塞)。如果Timeout
参数设置-1该函数设置为立即返回,在没有新事件时返回空值,Timeout参数设置0为阻塞等待最新事件,如同不设置Timeout
参数。需要注意的是在使用exchange.IO("wait")
时,必须至少已经订阅了一个当前处于交易状态的合约(已经交割的过期合约,不会再有行情数据),否则会阻塞在该函数(由于没有任何行情、订单更新)。只支持商品期货实盘。
EventTick:{Event:"tick", Index:交易所索引, Nano:事件纳秒级时间, Symbol:合约名称, Ticker:行情数据}。
OrderTick:{Event:"order", Index:交易所索引, Nano:事件纳秒级时间, Order:订单信息}。
在这个策略当中,我们使用这种基于事件的返回机制进行我们高频策略结构的搭建。根据正常返回Event
的事件,如果返回数据事件,我们进行tick数据获取,信号计算和开仓挂单操作,
e = exchange.IO("wait", -1)
if e:
if e.Event == "tick":
## tick数据获取
## 信号计算
## 开仓挂单操作
## 开仓撤单操作
## 平仓挂单操作
## 平仓撤单操作
if e.Event == "order":
## 判断挂单是否成功,定义交易状态
好的,让我们结合代码逐步详细解释挂单开仓、撤单和平仓操作的实现细节:
挂单开仓的细节解释:
挂单条件判断(tick事件): 在代码中,挂单开仓的条件是在tradeLock == 0
(交易锁释放)的情况下,并且满足特定的交易信号条件。交易信号是通过process_preTicker()
函数计算得出,存储在calSignal
变量中。如果calSignal
大于2,则表示买入信号;如果小于-2,则表示卖出信号。
执行挂单操作(tick事件): 当满足挂单条件时,在代码中将tradeLock
设置为1进行加锁,并调用交易所的相关函数执行挂单操作。买入操作使用exchange.Buy()
函数,卖出操作使用exchange.Sell()
函数。这时候为提高成交成功率,使用上浮(开多)或者下降(开空)限价单的处理,并记录当前的挂单ID(makeID)和时间maker_time
。
等待开仓挂单成交(order事件): 挂单提交后,系统返回order
事件,如果order.Status
为0,代表挂单还未成交,定义ret_stats = 'fail_make'
;如果order.Status
为1,代表挂单已经成交,定义ret_stats = 'succ_make'
,这个时候可以获取持仓信息,得到持仓的价位。
开仓挂单失败处理(tick事件):
在ret_stats = 'fail_make'
的情况下,记录当前时间,如果时间间隔大于10秒钟,使用exchange.CancelOrder(makeID)
进行撤单处理,并重新还原tradeLock
变量。
接下来我们解释一下开仓成功以后的平仓操作。
平仓的细节解释:
在判断开仓成功以后(ret_stats == 'succ_make'
和tradeLock
等于1或者-1),接下来根据预先设定的止盈止损条件(盈利大于4点或者亏损大于8点),进行相应的平仓处理。这时候为了提供交易成功率,也进行滑点的处理,并记录下平仓挂单的ID(coverID)和时间cover_time
,和修改交易锁tradeLock
的值为±2(平仓挂单锁)。
根据e.Order
返回的事件,在abs(tradeLock) == 2
已开仓情况下,判断平仓挂单是否失败order.Status == 0
或者成功order.Status == 1
,然后定义ret_stats
为相应的状态。在判断挂单平仓成功以后,重新还原ret_stats
和tradeLock
变量,并记录相应的止盈和止损次数,以及具体的盈利曲线。
如果平仓挂单失败ret_stats == 'fail_cover'
,并且等待时间也超过10秒,进行撤单处理,并将ret_stats
定义为succ_make
,重新进行平仓挂单的处理,直到平仓完成。
通过以上代码和解释,我们可以详细了解平仓操作的实现细节及其在实时交易策略中的应用。这些操作的准确执行对于实现有效的交易策略至关重要。
结论
本文介绍了一种基于行情数据的高频交易策略实现,通过数据处理、模型计算、交易执行和盈利统计等步骤,展示了一个简单但完整的交易系统。该系统通过对市场情况的实时监测和分析,以及快速的交易决策和执行,实现了对市场波动的快速响应和利润的获取。然而,需要注意的是,该模型需要优化的地方有很多,比如信号的准确性,平仓的方法等等,希望大家可以在此基础上进行更多的探索和优化;在同时,高频交易策略存在着较高的风险和复杂性,投资者在使用时应谨慎评估自身的风险承受能力和交易经验,以确保交易安全和稳健性。
import pandas as pd
import numpy as np
import time
import json
import warnings
warnings.filterwarnings("ignore")
import pandas_market_calendars as mcal
from datetime import datetime
import pytz
# 定义全局变量
coefficients = [-0.014983989606652334, 4.064105076454074, 0.00985323811339531, -0.3147353115951898, -1.2419290384499666,
0.30308389898177635, -1.2773231034219934, -0.26860187772172717, -0.5629756545156142, 0.0009211082557665554,
0.08155321455902173]
# 处理行情数据的函数
def process_preTicker(preTicker):
# 创建一个空的 DataFrame 列表
dfs = []
# 遍历每条行情数据
for item in preTicker:
r = item
# 提取行情数据中的相关信息
data = {
'ask_price1': r['Info'].get('AskPrice1', np.nan),
'ask_volume1': r['Info'].get('AskVolume1', np.nan),
'ask_price2': r['Info'].get('AskPrice2', np.nan),
'ask_volume2': r['Info'].get('AskVolume2', np.nan),
'ask_price3': r['Info'].get('AskPrice3', np.nan),
'ask_volume3': r['Info'].get('AskVolume3', np.nan),
'ask_price4': r['Info'].get('AskPrice4', np.nan),
'ask_volume4': r['Info'].get('AskVolume4', np.nan),
'ask_price5': r['Info'].get('AskPrice5', np.nan),
'ask_volume5': r['Info'].get('AskVolume5', np.nan),
'bid_price1': r['Info'].get('BidPrice1', np.nan),
'bid_volume1': r['Info'].get('BidVolume1', np.nan),
'bid_price2': r['Info'].get('BidPrice2', np.nan),
'bid_volume2': r['Info'].get('BidVolume2', np.nan),
'bid_price3': r['Info'].get('BidPrice3', np.nan),
'bid_volume3': r['Info'].get('BidVolume3', np.nan),
'bid_price4': r['Info'].get('BidPrice4', np.nan),
'bid_volume4': r['Info'].get('BidVolume4', np.nan),
'bid_price5': r['Info'].get('BidPrice5', np.nan),
'bid_volume5': r['Info'].get('BidVolume5', np.nan),
'open_price': r.get('Open', np.nan),
'high_price': r.get('High', np.nan),
'low_price': r.get('Low', np.nan),
'last_price': r.get('Last', np.nan),
'volume': r.get('Volume', np.nan),
'open_interest': r.get('OpenInterest', np.nan),
'sell_price': r.get('Sell', np.nan),
'buy_price': r.get('Buy', np.nan),
'timestamp': r.get('Time', np.nan)
}
# 将提取的信息构建成 DataFrame 并加入列表
df = pd.DataFrame(data, index=[0])
dfs.append(df)
# 将列表中的 DataFrame 合并为一个 DataFrame
final_df = pd.concat(dfs, ignore_index=True)
# 添加买卖价差特征
final_df['spread'] = final_df['bid_price1'] - final_df['ask_price1']
# 添加深度不平衡特征
for i in range(1, 6):
bid_col = f'bid_volume{i}'
ask_col = f'ask_volume{i}'
qr_col = f'QR{i}'
final_df[qr_col] = (final_df[bid_col] - final_df[ask_col]) / (final_df[bid_col] + final_df[ask_col])
final_df[['QR1', 'QR2', 'QR3', 'QR4', 'QR5']] = final_df[['QR1', 'QR2', 'QR3', 'QR4', 'QR5']].fillna(0)
# 添加订单不平衡特征
bid_volumes = final_df[[f'bid_volume{i}' for i in range(1, 6)]]
ask_volumes = final_df[[f'ask_volume{i}' for i in range(1, 6)]]
final_df['bid_sizes'] = bid_volumes.sum(axis=1)
final_df['ask_sizes'] = ask_volumes.sum(axis=1)
final_df['order_imbalance'] = (final_df['bid_sizes'] - final_df['ask_sizes']) / (final_df['bid_sizes'] + final_df['ask_sizes'])
# 添加交易量订单流不平衡特征
final_df['deltaVask'] = np.where(final_df['ask_price1'] > final_df['ask_price1'].shift(1), 0,
np.where(final_df['ask_price1'] == final_df['ask_price1'].shift(1),
final_df['ask_volume1'] - final_df['ask_volume1'].shift(1),
final_df['ask_volume1']))
final_df['deltaVbid'] = np.where(final_df['bid_price1'] < final_df['bid_price1'].shift(1), 0,
np.where(final_df['bid_price1'] == final_df['bid_price1'].shift(1),
final_df['bid_volume1'] - final_df['bid_volume1'].shift(1),
final_df['bid_volume1']))
final_df['VOI'] = final_df['deltaVbid'] - final_df['deltaVask']
# 添加买卖压力指标特征
n1, n2 = 1, 5
_ = np.arange(n1, n2 + 1)
bench_prices = (final_df['ask_price1'] + final_df['bid_price1']) / 2
def unit_calc(bench_price):
bid_d = [bench_price / (bench_price - final_df["bid_price%s" % s]) for s in _]
bid_denominator = sum(bid_d)
bid_weights = [d / bid_denominator for d in bid_d]
press_buy = sum([final_df["bid_volume%s" % s] * w for s, w in zip(_, bid_weights)])
ask_d = [bench_price / (final_df['ask_price%s' % s] - bench_price) for s in _]
ask_denominator = sum(ask_d)
ask_weights = [d / ask_denominator for d in ask_d]
press_sell = sum([final_df['ask_volume%s' % s] * w for s, w in zip(_, ask_weights)])
return (np.log(press_buy) - np.log(press_sell)).replace([-np.inf, np.inf], np.nan)
result = unit_calc(bench_prices)
final_df['price_weighted_pressure'] = result
# 添加成交量增加值特征
final_df['volume_add'] = final_df.volume.diff()
# 添加持仓量增加值特征
final_df['interest_add'] = final_df.open_interest.diff()
# 提取用于计算结果的特征列
X = final_df[['spread', 'order_imbalance', 'VOI', 'QR1', 'QR2', 'QR3', 'QR4', 'QR5',
'price_weighted_pressure', 'volume_add', 'interest_add']]
# 提取特征列的值
row_values = X.iloc[1,:].values
# 计算结果
result = np.dot(coefficients, row_values)
return result
def is_trading_time():
# 获取上海期货交易所(SHF)的市场日历
shf = mcal.get_calendar('XSHG')
# 获取当前时间,加上东八区时区信息
current_time = datetime.now(pytz.timezone('Asia/Shanghai'))
# 获取今天的交易时间
schedule = shf.schedule(start_date=current_time.date(), end_date=current_time.date())
# 判断当前是否是交易日
if not schedule.empty:
trading_day = True
else:
trading_day = False
# 判断当前时间是否在交易时间范围内
trading_time = (
(9 <= current_time.hour < 11) or
(current_time.hour == 11 and current_time.minute <= 30) or
(current_time.hour == 13 and current_time.minute >= 30) or
(current_time.hour == 14) or
(21 <= current_time.hour < 23)
)
return trading_day and trading_time
# 主函数
def main():
while not exchange.IO("status"):
Sleep(1000)
# 获取初始账户信息
initAccount = _C(exchange.GetAccount)
preProfit = 0
winCount = 0
lossCount = 0
ret_stats = 'start'
calSignal = 0
_C(exchange.SetContractType, "sp2405")
preTicker = []
tradeLock = 0
while True:
if exchange.IO("status") and is_trading_time():
e = exchange.IO("wait", -1)
if e:
if e.Event == "tick":
curPrice = e['Ticker'].Last
# 重置preTicker
date_object = datetime.strptime(_D(e['Ticker'].Time / 1000), "%Y-%m-%d %H:%M:%S")
# 获取小时和分钟
hour = date_object.hour
minute = date_object.minute
second = date_object.second
# 每日开盘重置preTicker
if hour == 21 and minute == 0 and second == 0:
preTicker = []
# 计算信号
if not preTicker:
preTicker.append(e['Ticker'])
else:
preTicker.append(e['Ticker'])
if tradeLock == 0:
calSignal = process_preTicker(preTicker)
preTicker.pop(0)
## 开仓处理
if tradeLock == 0 and calSignal > 2:
Log(tradeLock, calSignal, ret_stats, "开始挂多单", '#FF0000')
tradeLock = 1
exchange.SetDirection('buy')
long_maker_price = e['Ticker'].Last + 2
makeID = exchange.Buy(long_maker_price, 1)
maker_time = time.time()
elif tradeLock == 0 and calSignal < -2:
Log(tradeLock, calSignal, ret_stats, "开始挂空单", '#00FF00')
tradeLock = -1
exchange.SetDirection('sell')
short_maker_price = e['Ticker'].Last - 2
makeID = exchange.Sell(short_maker_price, 1)
maker_time = time.time()
## 开仓失败撤单处理
if ret_stats == 'fail_make':
wait_time = time.time()
if wait_time - maker_time > 20:
Log("取消挂单订单", '#0000FF')
exchange.CancelOrder(makeID)
tradeLock = 0
## 开仓后平仓挂单处理
if ret_stats == 'succ_make':
if (tradeLock == 1 and (curPrice - posPrice >= 4 or curPrice - posPrice <= -8)) or \
(tradeLock == -1 and (curPrice - posPrice <= -4 or curPrice - posPrice >= 8)):
Log("平仓", '#FF0000')
close_order_type = 'closebuy_today' if tradeLock == 1 else 'closesell_today'
exchange.SetDirection(close_order_type)
coverID = exchange.Sell(curPrice - 2, 1) if tradeLock == 1 else exchange.Buy(curPrice + 2, 1)
cover_time = time.time()
tradeLock = 2 if tradeLock == 1 else -2
## 平仓失败撤单处理
if ret_stats == 'fail_cover':
wait_cover_time = time.time()
if wait_cover_time - cover_time > 20:
Log("取消平仓挂单", '#FF0000')
exchange.CancelOrder(coverID)
ret_stats = 'succ_make'
if e.Event == "order":
order = e.Order
Log('order事件', tradeLock, order.Status)
# 开仓挂单状态判断
if abs(tradeLock) == 1:
if order.Status == 0:
Log('挂单失败', "#FF0000")
ret_stats = 'fail_make'
if order.Status == 1:
Log('挂单成功', "#00FF00")
ret_stats = 'succ_make'
posInfo = _C(exchange.GetPosition)
posPrice = posInfo[0].Price
Log('持仓价格:', posPrice)
# 平仓挂单状态判断
elif abs(tradeLock) == 2:
if order.Status == 0:
Log('平仓失败', "#00FF00")
ret_stats = 'fail_cover'
if order.Status == 1:
Log('平仓成功', "#FF0000")
ret_stats = 'start'
tradeLock = 0
curAccount = _C(exchange.GetAccount)
## 盈利可视化展示
curProfit = curAccount.Info.Balance - initAccount.Info.Balance
LogProfit(curProfit, "&")
if curProfit > preProfit:
winCount += 1
else:
lossCount += 1
tblAStatus = {
"type" : "table",
"title" : "盈亏统计",
"cols" : ["止盈次数", "止损次数", "总体盈利"],
"rows" : []
}
tblAStatus["rows"].append([winCount, lossCount, curProfit])
lastStatus = f"`{json.dumps(tblAStatus)}`"
preProfit = curProfit
LogStatus(lastStatus)
Sleep(10)