,所以我们将这几个函数进行打包,定义成为getFactor函数,里面进行了各个因子的原始数据的获取,然后进行组合为dataframe的格式,最后进行返回,代表实时更新的因子数据。
因子原始数据获取完成以后,接下来我们进行因子的计算,在计算之前,我们进行缺失值的处理,我们在个别时候并不能获取所有品种的完整天数的数据,所以我们需要处理缺失值,使用前向填充的方法填充缺失值;下面进行基本面因子的计算,包括基差动量和年度仓单变化率。最后我们来处理因变量收益率,将收益率进行前移。这样我们的数据集处理完毕,可以理解为前面的因子数据都是自变量,最后的收益率是因变量。我们每日使用历史的自变量和因变量进行模型的拟合,可以认为是训练集;而最新的当日数据,因变量为空,是我们要测试的样本,可以认为是测试集,这样我们的初始因子计算就完成了。
接着我们来进行因子的处理,这里我们进行的因子处理包括标准化,使用norm_factor函数,参数为具体的因子factor,正交化orthogonalize_factors函数,参数是因子的dataframe,正交化这里选择的方法是信息损失最小的对称正交法,最后对所有的因子变量进行处理,使用proFactor函数。
因子处理完毕,下面一步进行因子的合成。这里我们选择因子合成的方法是历史收益率因子合成法,具体的参数包括因子dataframe,移动窗口周期,步长周期,和经过因子有效性检验的因子列表,在这个函数里我们进行合成因子的计算。定义好因子合成方法函数以后,编写getComposite函数,用来获取合成的因子,最后进行返回。这样我们的因子合成的函数就定义完成。
接下来是交易部分的函数了。具体的包括移仓,我们在前面的课程中有讲到过,当主力合约更新的时候,我们需要进行移仓换月。下面是多空组判断的函数,这里我们在前面课程的基础上,加了一个约束条件。原始的函数是根据合成的因子大小进行排序,取前25%进行开多,后25%进行平空。但是由于特殊的情况,合成的因子可能都为正值或者负值,所以可能只需要一个方向的操作,不需要另一个方向的操作。这里在判断头部组head_group获取合约名称的时候,返回合成因子分数大于0的合约名称;尾部组tail_group也一样,返回合成因子分数小于0的合约名称。
最后我们根据合成因子判断获取的做多或者做空合约的列表,进行多空组买卖了。这里在前面讲过的函数基础上,我们进行了一些细微部分的修改。我们先前的平仓逻辑是,如果某品种不在实时更新的做多或者做空列表的时候,我们直接进行平仓,可是这种交易的逻辑会造成频繁的开平仓。所以我们改变一下平仓的条件,首先获取持多仓和持空仓的列表,然后判断当一个方向的持仓合约,在另一个方向判断需要进行相反仓位的操作的时候,我们再进行平仓,这样就可以减少频繁开平仓的情况。当然这种情况下也会出现弊端,可能会持有一个亏损的仓位很长的时间。所以我们需要进行止损的操作,这里增加一个列表,判断实时出现亏损的仓位,我们直接进行平仓,避免判断失误的合约损失扩大。以上就是多因子模型中需要使用的各种需要的方法函数,可以发现都是我们前些课程讲过的各种方法函数,只是这里我们定义为模版类库的函数格式,这样在具体的实盘中我们就可以直接拿过来使用,减少代码的冗余,更加集中于实盘策略的实现。
import pandas as pd
import re
import datetime
import akshare as ak
import json
import time
import numpy as np
import warnings
import math
import statsmodels.api as sm
warnings.filterwarnings("ignore")
# 构建目标合约列表
commodity_contracts = {
# "hc": ['01', '05', '10'], # 热轧卷板
"rb": ['01', '05', '10'], # 螺纹钢
"sp": ['01', '05', '09'], # 纸浆
# "pg": ['01', '05', '09'], # LPG
"FG": ['01', '05', '09'], # 玻璃
"MA": ['01', '05', '09'], # 甲醇
"SA": ['01', '05', '09'], # 纯碱
# "pg": ['01', '05', '09'], # LPG
"TA": ['01', '05', '09'], # PTA
# "UR": ['01', '05', '09'], # 尿素
"eg": ['01', '05', '09'], # 乙二醇
# "eb": ['01', '05', '09'], # 苯乙烯
# "fu": ['01', '05', '09'], # 燃油
# "bu": ['01', '05', '09'], # 沥青
# "l": ['01', '05', '09'], # 聚乙烯
# "SF": ['01', '05', '09'], # 硅铁
# "p": ['01', '05', '09'], # 棕榈油
"pp": ['01', '05', '09'], # 聚丙烯
# "v": ['01', '05', '09'], # 聚氯乙烯
# "i": ['01', '05', '09'] # 铁矿石
}
# 获取主力/次主力合约代码
def getTarList():
mainList = [] # 主力合约列表
nextList = [] # 次主力合约列表
for commodity, monthList in commodity_contracts.items():
curTime = _D()
curYear = curTime[2:4]
curMonth = curTime[5:7]
if int(curMonth) == 12:
if commodity.isupper():
mainID = commodity + str(int(curYear[1])+1) + str(monthList[1])
nextID = commodity + str(int(curYear[1])+1) + str(monthList[2])
else:
mainID = commodity + str(int(curYear)+1) + str(monthList[1])
nextID = commodity + str(int(curYear)+1) + str(monthList[2])
elif int(curMonth) >= int(monthList[0]) and int(curMonth) < int(monthList[1]) - 1:
if commodity.isupper():
mainID = commodity + str(curYear[1]) + str(monthList[1])
nextID = commodity + str(curYear[1]) + str(monthList[2])
else:
mainID = commodity + str(curYear) + str(monthList[1])
nextID = commodity + str(curYear) + str(monthList[2])
elif int(curMonth) >= int(monthList[1]) - 1 and int(curMonth) < int(monthList[2]) - 1:
if commodity.isupper():
mainID = commodity + str(curYear[1]) + str(monthList[2])
nextID = commodity + str(int(curYear[1])+1) + str(monthList[0])
else:
mainID = commodity + str(curYear) + str(monthList[2])
nextID = commodity + str(int(curYear)+1) + str(monthList[0])
elif int(curMonth) < 12:
if commodity.isupper():
mainID = commodity + str(int(curYear[1])+1) + str(monthList[0])
nextID = commodity + str(int(curYear[1])+1) + str(monthList[1])
else:
mainID = commodity + str(int(curYear)+1) + str(monthList[0])
nextID = commodity + str(int(curYear)+1) + str(monthList[1])
mainList.append(mainID)
nextList.append(nextID)
return [mainList, nextList]
# 获取主力日级别k线,方便后续因子计算
def getMainData(symbol):
info = _C(exchange.SetContractType, symbol)
codeID = re.search(r'([A-Za-z]+)\d+', symbol).group(1)
records = exchange.GetRecords()
records.pop() # 删除最新未完成的k线
records_list = [] # Use a list to store individual records
for r in records:
curR = {
"Instrument": codeID,
"InstrumentId": symbol,
"Time": r['Time'],
"Open": r['Open'],
"High": r['High'],
"Low": r['Low'],
"Close": r['Close'],
"Volume": r['Volume']
}
records_list.append(curR)
# Convert the list of dictionaries to a DataFrame
recordsDf = pd.DataFrame(records_list)
return recordsDf
# 计算高阶矩阵因子
def calCMF(symbol, backPeriod):
info = exchange.SetContractType(symbol)
codeID = re.search(r'([A-Za-z]+)\d+', symbol).group(1)
records = exchange.GetRecords(PERIOD_M5)[-backPeriod - 2 : -1]
recent_returns = [records[i + 1].Close / records[i].Close - 1 for i in range(len(records) - 1)]
std_dev = pd.Series(recent_returns).std()
skewness = pd.Series(recent_returns).skew()
kurtosis = pd.Series(recent_returns).kurt()
return [std_dev, skewness, kurtosis]
# 获取动量因子
def momFactor(factorDf, factor, momPeriod):
factorDf['movingValue'] = factorDf[factor].rolling(window=momPeriod, min_periods = 1).mean()
factorDf['momValue'] = (factorDf[factor] - factorDf['movingValue']) / factorDf['movingValue']
return factorDf.iloc[-1, factorDf.columns.get_loc('momValue')]
# 计算展期收益率
def calRollover(mainContract, nextContract):
_C(exchange.SetContractType, mainContract)
mainRecords = exchange.GetRecords()[-1]
_C(exchange.SetContractType, nextContract)
nextRecords = exchange.GetRecords()[-1]
mainDate = re.findall(r"\d+\d*", mainContract)
nextDate = re.findall(r"\d+\d*", nextContract)
if int(nextDate[0][-2:]) > int(mainDate[0][-2:]):
diffMonth = int(nextDate[0][-2:]) - int(mainDate[0][-2:])
else:
diffMonth = int(nextDate[0][-2:]) + 12 - int(mainDate[0][-2:])
rollOver = math.log(mainRecords['Close'] / nextRecords['Close']) / diffMonth * 12
return rollOver
def getBasis(date, symbol):
max_retries = 10
for i in range(max_retries):
try:
basisDf = ak.futures_spot_price_daily(start_day=date, end_day=date, vars_list=[symbol])
if basisDf is None or basisDf.empty:
raise ValueError("No data available")
return [basisDf.spot_price[0], basisDf.dominant_contract_price[0]]
except (ValueError) as e:
if i < max_retries - 1: # 不在最后一次循环中抛出异常
Log(f"Failed to get data, retrying ({i+1}/{max_retries}). Error: {e}")
else:
Log(f"Failed to get data after {max_retries} attempts. Error: {e}")
return [None, None] # 如果没有成功获取数据,返回两个空值的列表
def getReceipt(date, symbol):
max_retries = 10
for i in range(max_retries):
try:
receiptDf = ak.get_receipt(start_day = date, end_day = date, vars_list=[symbol])
if receiptDf is None or receiptDf.empty:
raise ValueError("No data available")
return [receiptDf.receipt[0], receiptDf.receipt_chg[0]]
except (ValueError) as e:
if i < max_retries - 1: # 不在最后一次循环中抛出异常
Log(f"Failed to get data, retrying ({i+1}/{max_retries}). Error: {e}")
else:
Log(f"Failed to get data after {max_retries} attempts. Error: {e}")
return [None, None] # 如果没有成功获取数据,返回两个空值的列表
# 因子获取
def getFactor(symbol, nextsymbol, cmfBackPeriod, pricePeriod, volumePeriod):
# 获取k线数据
rData = getMainData(symbol)
# 获取展期收益率
rollover = calRollover(symbol, nextsymbol)
# 获取高阶矩类因子
cmfFactor = calCMF(symbol, cmfBackPeriod)
# 获取动量因子
priceMom = momFactor(rData, 'Close', pricePeriod)
volumeMom = momFactor(rData, 'Volume', volumePeriod)
# 获取基本面时间
timePre = _D(int(rData.Time.tolist()[-1])/1000)[0:10]
formatted_date = str(timePre.replace("-", ""))
# 获取基本面数据
upperSymbol = ''.join(filter(str.isalpha, symbol)).upper()
basisData = getBasis(formatted_date, upperSymbol)
receiptData = getReceipt(formatted_date, upperSymbol)
# 获取当前时间
timeNow = _D()[0:10]
dateNow = str(timeNow.replace("-", ""))
# 数据保存
curFactor = {
"Instrument": symbol,
"InstrumentId": upperSymbol,
"Time": dateNow,
"Close": rData.iloc[-1,rData.columns.get_loc('Close')],
"RollOver": rollover,
"Std": cmfFactor[0],
"Skew": cmfFactor[1],
"Kurt": cmfFactor[2],
"PriceMom": priceMom,
"VolumeMom": volumeMom,
"SpotPrice": basisData[0],
"ContractPrice": basisData[1],
"Receipt": receiptData[0],
"ReceiptCng": receiptData[1]
}
# 将字典转换为DataFrame,并添加到主DataFrame中
new_factor = pd.DataFrame([curFactor])
return new_factor
# 因子计算
def calFactor(factorData):
# 处理缺失值
dateList = factorData.Time.unique()
varList = factorData.InstrumentId.unique()
# 创建包含所有可能组合的 DataFrame
all_combinations = pd.MultiIndex.from_product([varList, dateList], names=['InstrumentId', 'Time'])
all_combinations = pd.DataFrame(index=all_combinations).reset_index()
merged_df = pd.merge(all_combinations, factorData, how='left', left_on=['InstrumentId', 'Time'], right_on=['InstrumentId', 'Time'])
merged_df = merged_df.sort_values(by=['InstrumentId','Time']) # 分组排序
# 填补缺失值
fillna_df = merged_df.fillna(method='ffill')
# 计算基差基本面因子
fillna_df['basis'] = fillna_df.SpotPrice - fillna_df.ContractPrice
fillna_df['basisrate'] = (fillna_df.SpotPrice - fillna_df.ContractPrice) / fillna_df.SpotPrice
# 设置不同周期K
periodList = [5, 22, 63, 126,243]
# 计算基差动量
for basisT in periodList:
rolling_window = fillna_df.groupby('InstrumentId')['basis'].rolling(window=basisT, min_periods=basisT).mean().reset_index(level=0, drop=True)
fillna_df['br_' + str(basisT)] = (fillna_df.basis - rolling_window) / rolling_window
# 仓单数据处理
fillna_df['wr_day'] = np.where((fillna_df['Receipt'] == fillna_df['ReceiptCng']) & (fillna_df['Receipt'] != 0), 1,
np.where((fillna_df['Receipt'] == fillna_df['ReceiptCng']) & (fillna_df['Receipt'] == 0), 0,
fillna_df['ReceiptCng'] / (fillna_df['Receipt'] - fillna_df['ReceiptCng'])))
receipt_rolling_averages = []
# 遍历每个 'var' 分组
for var_group, var_group_df in fillna_df.groupby('InstrumentId'):
# 遍历每一天
for dateI in range(len(var_group_df)):
# 计算当前日期前 -306 到 -180 范围内的滑动均值
if dateI >= 306:
rolling_average = var_group_df['Receipt'].iloc[dateI - 306: dateI - 180].mean()
else:
rolling_average = None # 对于前306天内的数据,无法计算范围内的滑动均值
receipt_rolling_averages.append(rolling_average)
fillna_df = fillna_df.sort_values(by=['InstrumentId','Time'])
fillna_df['receipt_rolling_averages'] = receipt_rolling_averages
# 计算年度仓单变化率
for receiptT in periodList:
receipt_rolling_window = fillna_df.groupby('InstrumentId')['Receipt'].rolling(window=receiptT, min_periods=receiptT).mean().reset_index(level=0, drop=True)
fillna_df['wr_' + str(receiptT)] = receipt_rolling_window / fillna_df['receipt_rolling_averages'] - 1
finalDf = fillna_df.dropna()
## 计算收益率
finalDf['Returns'] = finalDf.groupby('InstrumentId')['Close'].pct_change()
finalDf = finalDf.sort_values(by=['Time','InstrumentId']) # 分组排序
# 收益率前移
finalDf['Returns'] = finalDf['Returns'].shift(-8)
return finalDf
# 因子处理
def norm_factor(factor):
median_val = factor.median()
mad_val = (factor - factor.mean()).abs().mean()
factor_clip = np.clip(factor, median_val - 3 * mad_val, median_val + 3 * mad_val)
factor_norm = (factor_clip - factor_clip.mean()) / factor_clip.std()
return factor_norm
def orthogonalize_factors(factors_standardize):
# 计算协方差矩阵
M = (factors_standardize.shape[0] - 1) * np.cov(factors_standardize.T.astype(float))
# 特征值分解
D, U = np.linalg.eig(M)
# 转换为矩阵
U = np.mat(U)
# 对特征值取倒数并开方
d = np.mat(np.diag(D**(-0.5)))
# 获取过度矩阵S
S = U * d * U.T
# 获取对称正交矩阵
factors_orthogonal_mat = np.mat(factors_standardize) * S
# 转为DataFrame
factors_orthogonal = pd.DataFrame(factors_orthogonal_mat, columns=factors_standardize.columns, index=factors_standardize.index)
return factors_orthogonal
oriFeatures = ['RollOver','Std','Skew',
'Kurt', 'basisrate', 'br_5', 'br_22', 'br_63', 'br_126', 'br_243',
'wr_day', 'wr_5', 'wr_22', 'wr_63', 'wr_126', 'wr_243', 'PriceMom',
'VolumeMom']
allCol = ['RollOver','Std','Skew',
'Kurt', 'basisrate', 'br_5', 'br_22', 'br_63', 'br_126', 'br_243',
'wr_day', 'wr_5', 'wr_22', 'wr_63', 'wr_126', 'wr_243', 'PriceMom',
'VolumeMom','Returns']
# 因子处理
def proFactor(df):
df[oriFeatures] = orthogonalize_factors(df[oriFeatures].apply(norm_factor))
return df
selFeatures = ['RollOver','Std','Skew',
'Kurt', 'basisrate', 'br_5', 'br_22', 'br_63', 'br_126', 'br_243',
'wr_day', 'wr_5', 'wr_22', 'wr_63', 'wr_126', 'wr_243', 'PriceMom',
'VolumeMom']
# 合成因子
def history_linear_composite(df, window_size, step_size, selFeatures):
composite_factors = []
cumulative_coefficients = np.zeros(len(selFeatures)) # 因子收益率累加列表
cumulative_count = 0 # 次数统计
for i in range(window_size, len(df), step_size):
window_data = df.iloc[(i - window_size):i, :]
X = window_data[selFeatures]
y = window_data['Returns']
X = sm.add_constant(X)
model = sm.OLS(y, X).fit()
coefficients = model.params[1:]
p_values = model.pvalues[1:]
coefficients = np.where(p_values < 0.1, coefficients, 0)
cumulative_coefficients += coefficients
cumulative_count += 1
coefficientsMean = cumulative_coefficients / cumulative_count # 计算均值
for k in range(i, i + step_size):
composite_factor_value = np.dot(df.loc[k, selFeatures], coefficientsMean)
composite_factors.append(composite_factor_value)
return composite_factors
def getComposite(df, selFea):
df = df.sort_values(by=['Time','InstrumentId']) # 分组排序
df = df.reset_index(drop = True)
window_size = 63 * len(df['InstrumentId'].unique())
step_size = 1 * len(df['InstrumentId'].unique())
composite_factors = history_linear_composite(df, window_size, step_size, selFea)
df['HeCompositeFactor'] = [np.nan] * window_size + composite_factors
return df
# 移仓
def posTrans():
mainList = getTarList()[0]
codeList = [''.join(filter(str.isalpha, item)) for item in mainList]
prePos = exchange.GetPosition()
if len(prePos) != 0:
for i in range(len(prePos)):
if prePos[i].ContractType not in mainList:
mainCode = re.search(r'([A-Za-z]+)\d+', prePos[i].ContractType).group(1)
index = codeList.index(mainCode)
mainID = mainList[index]
Log('旧合约', prePos[i].ContractType, '需要被更换为', mainID)
if prePos[i].Type == PD_LONG or prePos[i].Type == PD_LONG_YD:
# 平掉旧合约
exchange.SetContractType(prePos[i].ContractType)
curPrice = exchange.GetRecords()[-1].Close - 4
exchange.SetDirection('closebuy')
exchange.Sell(curPrice, 1)
# 开仓新合约
exchange.SetContractType(mainID)
curPrice = exchange.GetRecords()[-1].Close + 4
exchange.SetDirection('buy')
exchange.Buy(curPrice, 1)
else:
exchange.SetContractType(prePos[i].ContractType)
curPrice = exchange.GetRecords()[-1].Close + 4
exchange.SetDirection('closesell')
exchange.Buy(curPrice, 1)
exchange.SetContractType(mainID)
curPrice = exchange.GetRecords()[-1].Close - 4
exchange.SetDirection('sell')
exchange.Sell(curPrice, 1)
# 移仓完成后再次判断
afterPos = exchange.GetPosition()
if len(prePos) != 0:
all_in_main_list = all(afterPos[i].ContractType in mainList for i in range(len(afterPos)))
if all_in_main_list:
Log('所有合约都是主力合约', "#00FF00")
# 多空组判断
def groupFactor(factorDf, factorName):
sortDf = factorDf.sort_values(by=factorName, ascending=False)
# 取前25%和最后25%的数据
total_count = len(sortDf)
group_count = int(total_count * 0.25)
head_group = sortDf.head(group_count) # 第一组取前25%
tail_group = sortDf.tail(group_count) # 第二组取最后25%
head_codes = head_group[head_group.HeCompositeFactor > 0]['Instrument'].tolist() # 第一组codes
tail_codes = tail_group[tail_group.HeCompositeFactor < 0]['Instrument'].tolist() # 第二组codes
return [head_codes, tail_codes]
# 多空组买卖
def trade(pos_group, neg_group):
posInfo = exchange.GetPosition()
if len(posInfo) == 0:
for i in range(len(pos_group)):
exchange.SetContractType(pos_group[i])
curPrice = exchange.GetRecords()[-1].Close + 4
exchange.SetDirection('buy')
exchange.Buy(curPrice, 1)
for i in range(len(neg_group)):
exchange.SetContractType(neg_group[i])
curPrice = exchange.GetRecords()[-1].Close - 4
exchange.SetDirection('sell')
exchange.Sell(curPrice, 1)
Log('多因子策略开始,建仓完成', "#FF0000")
else:
longList = [pos.ContractType for pos in posInfo if pos.Type == PD_LONG or pos.Type == PD_LONG_YD] # 已经开多仓
shortList = [pos.ContractType for pos in posInfo if pos.Type == PD_SHORT or pos.Type == PD_SHORT_YD] # 已经开空仓
lossList = [pos.ContractType for pos in posInfo if pos.Profit <= 0] # 亏损仓位
Log(lossList,'亏损仓位')
# 检查已经开的多仓,如果在需要开多仓的列表中,则保留,否则平仓
for pos_contract in longList:
if pos_contract in neg_group or pos_contract in lossList:
#if pos_contract not in pos_group:
# 进行平仓操作
Log(pos_contract, '多余多仓进行平仓')
exchange.SetContractType(pos_contract)
curPrice = exchange.GetRecords()[-1].Close - 4
exchange.SetDirection('closebuy')
exchange.Sell(curPrice, 1)
# 检查已经开的空仓,如果在需要开空仓的列表中,则保留,否则平仓
for neg_contract in shortList:
if neg_contract in pos_group or neg_contract in lossList:
#if neg_contract not in neg_group:
# 进行平仓操作
Log(neg_contract, '多余空仓进行平仓')
exchange.SetContractType(neg_contract)
curPrice = exchange.GetRecords()[-1].Close + 4
exchange.SetDirection('closesell')
exchange.Buy(curPrice, 1)
# 开需要开的多仓
for pos_contract in pos_group:
if pos_contract not in longList:
# 进行开仓操作
Log(pos_contract, '需要开多的进行开多仓')
exchange.SetContractType(pos_contract)
curPrice = exchange.GetRecords()[-1].Close + 4
exchange.SetDirection('buy')
exchange.Buy(curPrice, 1)
# 开需要开的空仓
for neg_contract in neg_group:
if neg_contract not in shortList:
# 进行开仓操作
Log(neg_contract, '需要开空的进行开空仓')
exchange.SetContractType(neg_contract)
curPrice = exchange.GetRecords()[-1].Close - 4
exchange.SetDirection('sell')
exchange.Sell(curPrice, 1)
ext.getTarList = getTarList
ext.posTrans = posTrans
ext.groupFactor = groupFactor
ext.trade = trade
ext.getFactor = getFactor
ext.calFactor = calFactor
ext.proFactor = proFactor
ext.getComposite = getComposite
其实综合来看,学过数据分析的朋友,是不是对上述的数据处理逻辑很熟悉,包括数据的获取,特征计算和处理,数据的清洗,还有模型的预测,以及最后信号的判断。只不过这里的数据是具体合约的时间流的数据,相信各位小伙伴也有数据分析的大拿,可以在此基础上进行更多数据探索和模型分析的工作,欢迎大家留言,一起展开探讨。下节课我们将讲解具体实盘部分的设置,我们下节课再见。
上节课我们讲述了多因子模版类库中的各个模块函数,本节课我们就要结合实盘,运用各个模版类库中的函数进行一个实盘级多因子模型的搭建。多因子模型是需要一定的数据量才能开展模型训练的,要我们开启实盘等待收集足够的数据肯定是不合适的,所以我们可以利用回测系统进行因子收据的收集。然后在实盘中建立数据库导入我们收集完成的数据,然后伴随每日的更新,不断地将数据保存到数据库当中。这样呢,我们就可以实时的利用数据库收集的数据,进行一些因子有效性和模型检验的工作,不断优化我们的多因子模型。
这里首先为大家讲解一下怎样利用回测系统收集实盘的数据,也是为了验证我们模版类库中的因子数据的模版函数。这个代码编写并不复杂,首先我们要确定的是回测的时间,这里我们定义收集数据的时间是最近的三年,从2021年年初至今三年的时间,在这三年中,我们收集每一个交易日的因子数据。接着来定义策略的参数,这里我们分为两组,因子获取参数,里面包括高阶因子类,动量和数据获取的周期;因子组合参数组包括历史收益率窗口周期。
'''backtest
start: 2021-01-01 09:00:00
end: 2023-12-07 15:00:44
period: 1d
basePeriod: 1h
exchanges: [{"eid":"Futures_CTP","currency":"FUTURES","depthDeep":20}]
'''
import pandas as pd
import pymysql
def main():
# 创建 MySQL 连接对象
cnx = pymysql.connect(
host='IP', # 数据库主机地址
user='账户名', # 数据库用户名
password='密码'
)
# 创建 MySQL 游标对象
cursor = cnx.cursor()
# 创建数据库
cursor.execute("CREATE DATABASE IF NOT EXISTS tt_database")
cursor.execute("USE tt_database")
# 创建数据表
create_table_query = """
CREATE TABLE IF NOT EXISTS tt_table (
Instrument VARCHAR(255),
InstrumentId VARCHAR(255),
Time BIGINT,
Close FLOAT,
RollOver FLOAT,
Std FLOAT,
Skew FLOAT,
Kurt FLOAT,
PriceMom FLOAT,
VolumeMom FLOAT,
SpotPrice FLOAT,
ContractPrice FLOAT,
Receipt FLOAT,
ReceiptCng FLOAT
);
"""
cursor.execute(create_table_query)
# 记录上次执行的时间
last_operation_timeBig = 0
isFirst = True
while True:
current_timeBig = int(time.time())
# 获取时间间隔(秒)
if isFirst:
time_intervalBig = 24 * 60 * 60 * dataPeriod
isFirst = False
else:
time_intervalBig = current_timeBig - last_operation_timeBig
if time_intervalBig >= 24 * 60 * 60 * dataPeriod:
# 更新因子数据
symbolList = ext.getTarList()
contractLen = len(symbolList[0])
for k in range(contractLen):
if not ext.IsTrading(symbolList[0][k]):
continue
new_factor = ext.getFactor(symbolList[0][k], symbolList[1][k], cmfBackPeriod, pricePeriod, volumePeriod)
## 数据导入
insert_data_query = """
INSERT INTO tt_table
(Instrument, InstrumentId, Time, Close, RollOver, Std, Skew, Kurt, PriceMom, VolumeMom, SpotPrice, ContractPrice, Receipt, ReceiptCng)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_data_query, (
new_factor['Instrument'][0], new_factor['InstrumentId'][0], new_factor['Time'][0], new_factor['Close'][0],
new_factor['RollOver'][0], new_factor['Std'][0], new_factor['Skew'][0], new_factor['Kurt'][0], new_factor['PriceMom'][0],
new_factor['VolumeMom'][0], new_factor['SpotPrice'][0], new_factor['ContractPrice'][0], new_factor['Receipt'][0],
new_factor['ReceiptCng'][0]
))
cnx.commit()
last_operation_timeBig = current_timeBig
Sleep(60*60*10)
然后在主函数里,定义每隔一个数据获取周期,开启一次数据的收集。通过获取合约列表的长度,然后使用模版类库的getFactor函数,依次获取每个订阅合约的因子数据。这里增加了一个条件,如果该合约不在交易时间,直接跳过。这样就可以获取到原始的因子数据。
这些原始的数据当然需要进行保存,所以使用我们以前讲过的mysql数据保存方法。在策略开始打开mysql,创建数据库和数据表。然后在while循环里,每当新的一条数据更新,我们导入到数据库里面,不要忘了使用commit确定导入的操作。
这样我们就可以保存三年的原始数据到数据库里面,不仅可以在实盘里直接可以使用,也可以在实盘之外,利用我们前面课程讲过的因子有效性的验证方法,确定各因子的有效性然后导入模型,实现动态的优化模型。
下面我们来看实盘部分的设置。首先我们来看这里的参数设置,这里需要增加一组参数,代表持仓的周期holdPeriod。我们可以每日获取因子数据,但是调仓的时间可以自己确定。
import pandas as pd
import pymysql
import json
import time
import warnings
warnings.filterwarnings("ignore")
def main():
# 记录上次执行的时间
last_operation_time = 0
last_trade_time = 0
isFirst = True
if exchange.IO("status"):
Log('已连接CTP')
else:
Sleep(1000)
# 获取初始账户信息
initAccount = exchange.GetAccount()
while True:
if exchange.IO("status"):
current_sectime = int(time.time()) # 获取当前秒级别时间
cur_time = int(_D()[0:10].replace("-", ""))# 获取当前日级别时间
# 获取时间间隔(秒)
if isFirst:
time_intervalOpe = 24 * 60 * 60 * dataPeriod
time_intervalTra = 24 * 60 * 60 * holdPeriod
isFirst = False
else:
time_intervalOpe = current_sectime - last_operation_time
time_intervalTra = current_sectime - last_trade_time
# 数据库收集数据
if time_intervalOpe >= 24 * 60 * 60 * dataPeriod:
# 移仓
ext.posTrans()
# 创建 MySQL 连接对象
cnx = pymysql.connect(
host='???', # 数据库主机地址
user='???', # 数据库用户名
password='???'
)
# 创建 MySQL 游标对象
cursor = cnx.cursor()
cursor.execute("USE tt_database")
# 读取因子数据
select_data_query = "SELECT * FROM tt_table"
cursor.execute(select_data_query)
rows = cursor.fetchall()
column_names = [desc[0] for desc in cursor.description]
factordf = pd.DataFrame(rows, columns=column_names) # 获取数据库df
# 判断是否需要更新
if cur_time not in factordf.Time.tolist():
Log('每日更新数据#00ff00')
symbolList = ext.getTarList()
contractLen = len(symbolList[0])
for k in range(contractLen):
if not ext.IsTrading(symbolList[0][k]):
Log('非交易时间#00ff00')
continue
new_factor = ext.getFactor(symbolList[0][k], symbolList[1][k], cmfBackPeriod, pricePeriod, volumePeriod)
## 数据导入
insert_data_query = """
INSERT INTO tt_table
(Instrument, InstrumentId, Time, Close, RollOver, Std, Skew, Kurt, PriceMom, VolumeMom, SpotPrice, ContractPrice, Receipt, ReceiptCng)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_data_query, (
new_factor['Instrument'][0], new_factor['InstrumentId'][0], new_factor['Time'][0], new_factor['Close'][0],
new_factor['RollOver'][0], new_factor['Std'][0], new_factor['Skew'][0], new_factor['Kurt'][0], new_factor['PriceMom'][0],
new_factor['VolumeMom'][0], new_factor['SpotPrice'][0], new_factor['ContractPrice'][0], new_factor['Receipt'][0],
new_factor['ReceiptCng'][0]
))
cnx.commit()
cursor.close()
cnx.close()
last_operation_time = current_sectime ## 更新数据库保存时间
# 因子计算进行交易
if time_intervalTra >= 24 * 60 * 60 * holdPeriod:
# 创建 MySQL 连接对象
cnx = pymysql.connect(
host='???', # 数据库主机地址
user='???', # 数据库用户名
password='???'
)
# 创建 MySQL 游标对象
cursor = cnx.cursor()
cursor.execute("USE tt_database")
Log('调仓时间#ff0000')
## 因子计算
caldf = ext.calFactor(factordf)
Log('因子计算完成#ff0000')
## 因子处理
prodf = ext.proFactor(caldf)
Log('因子处理完成#ff0000')
## 因子合成
selFea = ['RollOver', 'Skew', 'br_63', 'wr_day', 'VolumeMom', 'PriceMom']
comdf = ext.getComposite(prodf, selFea)
finaldf = comdf[comdf.Time == cur_time]
Log('因子合成完成#ff0000')
## 多空组判断
positive_codes = ext.groupFactor(finaldf, 'HeCompositeFactor')[0]
negative_codes = ext.groupFactor(finaldf, 'HeCompositeFactor')[1]
Log('多空组判断完成#ff0000')
Log('做多组:', positive_codes)
Log('做空组:', negative_codes)
## 交易操作
ext.trade(positive_codes, negative_codes)
cursor.close()
cnx.close()
last_trade_time = current_sectime ## 更新交易操作时间
# 持仓状态展示
positions = _C(exchange.GetPosition)
if len(positions) != 0:
longContract = []
longPrice = []
longProfit = []
shortContract = []
shortPrice = []
shortProfit = []
for i in range(len(positions)):
if (positions[i]['Type'] == PD_LONG) or (positions[i]['Type'] == PD_LONG_YD):
longContract.append(positions[i]['ContractType'])
longPrice.append(positions[i]['Price'])
longProfit.append(positions[i]['Profit'])
else:
shortContract.append(positions[i]['ContractType'])
shortPrice.append(positions[i]['Price'])
shortProfit.append(positions[i]['Profit'])
tblAStatus = {
"type" : "table",
"title" : "持多组",
"cols" : ["合约名称", "持仓均价", "持仓盈亏"],
"rows" : []
}
tblBStatus = {
"type" : "table",
"title" : "持空组",
"cols" : ["合约名称", "持仓均价", "持仓盈亏"],
"rows" : []
}
for i in range(len(longContract)):
tblAStatus["rows"].append([longContract[i], longPrice[i], longProfit[i]])
for i in range(len(shortContract)):
tblBStatus["rows"].append([shortContract[i], shortPrice[i], shortProfit[i]])
lastStatus = f"`{json.dumps(tblAStatus)}`\n`{json.dumps(tblBStatus)}`\n当前时间: {cur_time}"
curAccount = _C(exchange.GetAccount)
totalProfit = sum(position['Profit'] for position in positions)
totalMargin = sum(position['Margin'] for position in positions)
curProfit = curAccount.Balance - initAccount.Balance + totalProfit + totalMargin
LogProfit(curProfit, "&")
LogStatus(lastStatus)
Sleep(1000 * 60 * 60)
回到代码部分,在策略开始导入策略运行需要的库,然后连接数据库;因为数据记录和交易的操作时间周期是不一致的,这里我们分别记录上次操作进行的时间,包括数据导入操作,last_operation_time,和last_trade_time交易的操作。然后定义isFirst代表是否第一次运行策略。这里还需要获取账户信息,方便后续的收益计算。
接着进入while循环,在判断连接CTP条件下,执行我们的多因子策略。这里我们获取两个时间戳,current_sectime当前秒级别时间,cur_time当前日级别时间。然后获取时间间隔,判断应该进行数据导入还是交易的操作。在策略开始的时候,我们是想直接运行的,所以初始的时间间隔定义为需要的时间间隔,然后伴随策略运行,两个时间间隔定义为当前时间减去上一次各自操作进行的时间。这样我们就可以根据两个时间循环进行相应的数据导入和交易的操作。
下面我们来定义具体的操作。第一个我们来进行数据更新的操作,和课程前面讲到的一致。这里定义如果数据导入的时间间隔,大于设置的数据收集的参数间隔,我们首先进行移仓的操作,然后读取数据库的数据,转换为dataframe,接着判断当前日级别的时间戳在数据库中是否已经存在,如果不存在进行更新导入。最后更新数据库保存时间。
第二个我们来进行因子计算的交易。如果时间间隔大于调仓的间隔,首先我们获取更新完成的数据,然后我们使用更新完成的数据,依次使用模版类库的函数进行因子计算,因子处理,因子合成,因子合成这里的特征,我们可以根据因子有效性的验证结果进行选择,然后我们可以获取到当日的合成因子数据。我们就是要利用这个合成因子使用模版类库的groupFactor函数,进行多空组的判断,在这个模型中,合成因子和收益率是正向的关系,所以设置第一组为做多组,第二组为做空组,如果计算出来的合成因子和收益率是负向的关系,这里做多组和做空组的索引是不一致的。最后根据两个合约列表进行相应的交易操作。交易操作的时间不要忘了更新。
当然,作为一个实盘,持仓状态的显示和收益的显示也是必须的,这些在前面的课程中我们都讲到过,这里不加以赘述了。
最后的验证时间来到了,我们建立一个实盘。这里选择编写好的多因子策略,选择托管者和仿真交易所。我们点击实盘运行。我们看下实盘的运行,可以看到实时的收益变化,以及具体做多组和做空组持仓均价和持仓的盈亏。
在日志信息了可以看到具体的策略操作。我们看到,策略首先判断持仓的合约是否都是主力合约,如果不是主力合约需要进行移仓换月的操作,接着策略收集今日更新的数据,导入到数据库里面,这样数据更新的操作就完成;然后利用更新完成的数据,依次进行因子计算,因子处理,因子合成,多空组判断的操作,接着利用多空组判断的合约列表,进行亏损仓位和多余仓位的平仓,和需求仓位的开仓操作。这样我们的多因子策略就可以实时运行起来。
以上呢,就是在优宽平台实现多因子策略的具体操作。这一系列的课程持续了一个月的时间,在这期间也遇到了很多的困难和挑战,非常感谢大家耐心的等待和评论区的鼓励。当然,这个模型并不是十分完善的,可以优化的细节还有很多,比如因子处理计算的各种策略参数和计算方法,大家可以根据自己的所学进行更加细化的研究。如果大家有问题,可以留言评论区,我们将会热心解答,大家一起加油,一起进步!
实盘公开地址:
最近有小伙伴在评论区留言,说希望讲解一下“多周期共振系统”。我们的回答当然是立即安排。首先给不了解这个策略的小伙伴们介绍一下。
商品期货的多周期均线共振系统策略是一种综合性的交易策略,它基于不同周期的均线共振现象来制定交易计划。这种策略的核心思想是利用不同周期的均线在同一时间点上形成的共振状态,来判断市场走势的强势或弱势,并以此为依据进行交易。
期货交易界老话。大级别上涨,小级别不配合,涨不动;小级别上涨,大级别不配合,涨不多的规律,只有多个周期的均线系统形成统一方向的趋势共振时,才可以形成持久的趋势行情,这也是多周期共振的理论认知。我们可以从图像的角度了解多周期共振,具体包括均线的多头排列和空头排列。
这里使用到了不同级别的k线,包括大周期,中周期,小周期,和不同滑动窗口的均线,包括快线,中线,和慢线。当不同级别的不同滑动周期的均线,均呈现金叉的信号向上扩散,则形成均线多头排列,均线多头排列代表上涨行情,可以视为多头开仓的信号;
与之相反的,当不同级别的不同滑动周期的均线,均呈现死叉的信号,往下扩散,则形成均线空头排列,均线空头排列代表下跌行情,可以视为空头开仓信号。
当多个周期均线都形成多头排列或者空头排列时,就形成了均线多周期共振。均线多周期共振表示当前多个周期的均线走势是相同的。均线在某种意义上代表着资金的走向,同时均线对价格的支撑或阻力作用也较为明显。由此呢,我们可以制定多周期均值共振的交易策略。
具体来说,该策略的应用包括以下步骤:
选择合适的周期参数:首先,投资者需要明确自己的交易习惯,选择适合自己的周期参数。长线交易可以选择月线、周线和日线三个周期共振;中线交易可以选择周线、日线和60分钟三个周期共振;短线交易可以选择日线、60分钟和15分钟三个周期共振。
判断共振状态:当不同周期的均线在同一时间点附近出现交叉或聚集时,就形成了均线共振。例如,月线、周线和日线的均线方向一致向上,表明市场处于多头趋势。此时,投资者可以制定相应的多头策略。
制定交易策略:根据共振状态和市场走势,制定相应的交易策略。在多头趋势中,投资者可以选择买入或持有期货合约;在空头趋势中,投资者可以选择卖出或观望期货合约。同时,投资者还需要注意不同周期的信号冲突和共振状态的变化,及时调整自己的交易策略。
观察进场信号:在确定了交易策略后,投资者需要密切关注市场走势,等待合适的进场信号。例如,在多头趋势中,投资者可以等待价格回调到重要的支撑位附近时,发出买入信号时再进场。
设定止损止盈:进场后,投资者需要设定合理的止损止盈位。止损位可以设置在重要的支撑或压力位附近,止盈位则可以参考目标盈利位或移动平均线等指标来确定。
跟踪持仓:进场后,投资者需要密切关注市场走势和持仓情况。如果市场走势与预期不符,投资者需要及时调整自己的交易策略或平仓离场。
其实有没有觉得很熟悉,这种策略很类似于我们前面讲过的剥头皮策略。那个策略当中,我们使用大小两个周期的均线共振判断趋势,然后根据小周期的回踩确定入场的信号。但是多周期均线共振使用两条以上的均线,更多的均线周期代表更多的趋势信号,我们就可以利用这些信号进行更多的操作。
本节课呢,我们就来编写一个,更加灵活的多周期均线共振策略。这个策略不仅可以适用于多种的k线周期;另外,我们还可以使用量化的方式定义多头排列和空头排列的程度。下面我们来具体讲解一下。
”`JavaScript function main() { var trendBigSignal = 0 //大周期趋势信号 var trendMidSignal = 0 //中周期趋势信号 var trendSmallSignal = 0 //小周期趋势信号 var prefinalSignal = 0 //先前趋势信号 var finalSignal = 0 //实时趋势信号 var p = $.NewPositionManager() //交易类库函数
while(true){
if(exchange.IO('status')){
// 订阅合约
var info = exchange.SetContractType(symbol)
// 大周期趋势判断
var bigR = exchange.GetRecords(1 * 60 * bigPeriod)
var popbigR = JSON.parse(JSON.stringify(bigR))
popbigR.pop()
if(bigR.length < bigSlowMA){
continue
}
var fastMABig = TA.MA(popbigR, bigFastMA)
var midMABig = TA.MA(popbigR, bigMidMA)
var slowMABig = TA.MA(popbigR, bigSlowMA)
if(fastMABig[fastMABig.length - 1] - midMABig[midMABig.length - 1] > diffBig && midMABig[midMABig.length - 1] - slowMABig[slowMABig.length - 1] > diffBig){
trendBigSignal = 1
}else if(fastMABig[fastMABig.length - 1] - midMABig[midMABig.length - 1] < -diffBig && midMABig[midMABig.length - 1] - slowMABig[slowMABig.length - 1] < -diffBig){
trendBigSignal = -1
}else{
trendBigSignal = 0
}
// 中周期趋势判断
var midR = exchange.GetRecords(1 * 60 * midPeriod)
var popmidR = JSON.parse(JSON.stringify(midR))
popmidR.pop()
var fastMAMid = TA.MA(popmidR, midFastMA)
var midMAMid = TA.MA(popmidR, midMidMA)
var slowMAMid = TA.MA(popmidR, midSlowMA)
if(fastMAMid[fastMAMid.length - 1] - midMAMid[midMAMid.length - 1] > diffMid && midMAMid[midMAMid.length - 1] - slowMAMid[slowMAMid.length - 1] > diffMid){
trendMidSignal = 1
}else if(fastMAMid[fastMAMid.length - 1] - midMAMid[midMAMid.length - 1] < -diffMid && midMAMid[midMAMid.length - 1] - slowMAMid[slowMAMid.length - 1] < -diffMid){
trendMidSignal = -1
}else{
trendMidSignal = 0
}
// 小周期趋势判断
var smallR = exchange.GetRecords(1 * 60 * smallPeriod)
var popsmallR = JSON.parse(JSON.stringify(smallR))
popsmallR.pop()
var fastMASmall = TA.MA(popsmallR, smallFastMA)
var midMASmall = TA.MA(popsmallR, smallMidMA)
var slowMASmall = TA.MA(popsmallR, smallSlowMA)
if(fastMASmall[fastMASmall.length - 1] - midMASmall[midMASmall.length - 1] > diffSmall && midMASmall[midMASmall.length - 1] - slowMASmall[slowMASmall.length - 1] > diffSmall){
trendSmallSignal = 1
}else if(fastMASmall[fastMASmall.length - 1] - midMASmall[midMASmall.length - 1] < -diffSmall && midMASmall[midMASmall.length - 1] - slowMASmall[slowMASmall.length - 1] < -diffSmall){
trendSmallSignal = -1
}else{
trendSmallSignal = 0
}
var finalSignal = trendBigSignal == 1 && trendMidSignal== 1 && trendSmallSignal== 1 ? 1 : trendBigSignal == -1 && trendMidSignal== -1 && trendSmallSignal== -1 ? -1 :0
var posInfo = exchange.GetPosition()
// 下单
if(posInfo.length == 0){
if(finalSignal == 1){
p.OpenLong(symbol, 1)
Log('开多仓')
}
if(finalSignal == -1){
p.OpenShort(symbol, 1)
Log('开空仓')
}
}
//止盈止损平仓
if(posInfo.length != 0){
profitLevel = info.VolumeMultiple * info.PriceTick * stopProfit //盈利线
lossLevel = -info.VolumeMultiple * info.PriceTick * stopLoss //亏损线
if(posInfo[0].Type % 2 == 0 && posInfo[0].Profit > profitLevel){
Log(posInfo[0].Profit)
p.Cover(symbol)
Log('平盈利多仓')
}
if(posInfo[0].Type % 2 == 0 && posInfo[0].Profit < lossLevel ){
Log(posInfo[0].Profit)
p.Cover(symbol)
Log('平亏损多仓')
}
if(posInfo[0].Type % 2 == 1 && posInfo[0].Profit > profitLevel){
Log(posInfo[0].Profit)
p.Cover(symbol)