资源加载中... loading...

商品期货量化交易实践系列课程

Author: 雨幕(youquant), Created: 2023-09-12 09:54:22, Updated: 2023-12-18 16:43:42

,作为参数放入这个函数当中,然后使用轮询的方式,获取倒数第二条记录,是完成的k线,添加进入DataFrame当中,进行返回就可以获取到主力和次主力的k线数据。

# 获取主力日级别k线,方便后续因子计算

def getMainData(PERIOD, contractList):

    df = pd.DataFrame(columns=["Instrument", "InstrumentId", "Time", "Open", "High", "Low", "Close", "Volume"])
    
    for codeID in contractList:

        info = exchange.SetContractType(codeID)

        code = re.search(r'([A-Za-z]+)\d+', codeID).group(1)
        records = exchange.GetRecords(PERIOD)
        latest_record = records[-2]  # 获取倒数第二条记录

        curR = {
            "Instrument": code,
            "InstrumentId": codeID,
            "Time": latest_record['Time'],
            "Open": latest_record['Open'],
            "High": latest_record['High'],
            "Low": latest_record['Low'],
            "Close": latest_record['Close'],
            "Volume": latest_record['Volume']
        }

        # 将字典转换为DataFrame,并添加到主DataFrame中
        new_row = pd.DataFrame([curR])
        df = df.append(new_row, ignore_index=True)

    return df

在获取完成主力和次主力k线数据以后,第三个板块就是要计算展期收益率了。展期收益率的公式是这样的,所以我们也使用轮询的方式对上两个板块中获取的数据,针对于不同的合约进行依次的计算。这里需要注意的是,针对于间隔月份的计算,因为同处于一年的主力合约,可以直接使用两个月份之差进行计算,如果跨年,那么次主力的月份就要加上12。

# 计算展期收益率

import math

def calRollover():
    
    mainData = getMainData(PERIOD_D1, getTarList()[0])
    nextData = getMainData(PERIOD_D1, getTarList()[1])

    rolloverList = []  # 用于存储展期收益率的列表

    for i in range(len(mainData)):
        mainLine = mainData.iloc[i]
        nextLine = nextData.iloc[i]

        mainFuture = mainLine['InstrumentId']
        nextFuture = nextLine['InstrumentId']
        
        mainDate = re.findall(r"\d+\d*", mainFuture)
        nextDate = re.findall(r"\d+\d*", nextFuture)

        if int(nextDate[0][-2:]) > int(mainDate[0][-2:]):
            diffMonth = int(nextDate[0][-2:]) - int(mainDate[0][-2:])
        else:
            diffMonth = int(nextDate[0][-2:]) + 12 - int(mainDate[0][-2:])

        rollOver = math.log(mainLine['Close'] / nextLine['Close']) / diffMonth * 12

        rolloverList.append(rollOver)

    rolloverDf = pd.DataFrame({
        "InstrumentId": mainData['InstrumentId'],
        "RollOver": rolloverList
    })

    return rolloverDf

这里因为只有一个因子,所以因子处理和因子合成的板块这里暂缺。

多空组判断groupFactor,这里定义两个参数,factorDf因子的dataframe,factor代表具体使用的因子。这里只有展期收益的因子,根据正负性分为做多组和做空组。

def groupFactor(factorDf, factorName):

    positive_df = factorDf[factorDf[factorName] > 0].sort_values(by=factorName)
    negative_df = factorDf[factorDf[factorName] < 0].sort_values(by=factorName)

    positive_codes = positive_df['InstrumentId'].tolist() # 做多组
    negative_codes = negative_df['InstrumentId'].tolist() # 做空组

    return[positive_codes, negative_codes]

对不同的品种进行分类完毕以后,我们就要进行交易的操作了。这里不仅仅是根据多空组的判断进行做多还有做空的操作。因为不同品种可能在在这两组之间进行切换,所以当品种切换组别的时候,我们进行平仓,在另外方向开仓的处理。

在获取仓位信息以后,在策略起始阶段,判断仓位列表长度为0,直接根据多空组的列表进行相应的操作。

后续判断持仓长度不为0,获取各个品种的持仓type,是多头还是空头,然后判断如果该品种不再做多组的时候,打印信息该品种需要被移动到做空组,首先进行多头的平仓,然后进行空头的开仓。这里为了交易的进行,都是设置了一个比较容易成交的价格。对于空头组的处理也是一致的,如果某品种不再属于空头组,进行空头平仓,然后就像多头开仓。

# 多空组买卖
def trade(pos_group, neg_group):
    posInfo = exchange.GetPosition()

    if len(posInfo) == 0:
        for i in range(len(pos_group)):
            exchange.SetContractType(pos_group[i])
            curPrice = exchange.GetRecords()[-1].Close + 5
            exchange.SetDirection('buy')
            exchange.Buy(curPrice, 1)
        
        for i in range(len(neg_group)):
            exchange.SetContractType(neg_group[i])
            curPrice = exchange.GetRecords()[-1].Close - 5
            exchange.SetDirection('sell')
            exchange.Sell(curPrice, 1)
        Log('多因子策略开始,建仓完成', "#FF0000")
    else:
        
        for i in range(len(posInfo)):
            if posInfo[i].Type == PD_LONG or posInfo[i].Type == PD_LONG_YD:

                if posInfo[i].ContractType not in pos_group:
                    Log('合约', posInfo[i].ContractType, '移仓至持空组')
                    exchange.SetContractType(posInfo[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('closebuy')
                    exchange.Sell(curPrice, 1)

                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('sell')
                    exchange.Sell(curPrice, 1)
            else:
                if posInfo[i].ContractType not in neg_group:
                    Log('合约', posInfo[i].ContractType, '移仓至持多组')
                    exchange.SetContractType(posInfo[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('closesell')
                    exchange.Buy(curPrice, 1)

                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('buy')
                    exchange.Buy(curPrice, 1)

还有最后一个板块,移仓的处理。首先获取主力合约列表,获取字母代码部分,方便我们根据品种的代码找到最新的主力合约,然后获取仓位的信息,如果仓位列表长度不为0,使用轮询的的方式,判断如果该品种不再主力列表当中,同样获取该品种的代码部分,对应索引找到该品种的主力合约。然后就进行平掉旧合约,开仓新合约,这里需要品种的type进行,保持该品种的多头和空头趋势。然后再次获取一下仓位的信息,如果都是主力合约,打印信息进行说明。

我们单因子模型需要的模块函数就定义完成了,然后我们使用ext加上函数名称的格式,定义这些模块函数,这样在我们就可以在策略编写中使用这个模版类库的交易函数了。下面呢,我们就进行一下测试。

# 移仓
def posTrans():

    mainList = getTarList()[0]
    
    codeList = [''.join(filter(str.isalpha, item)) for item in mainList]

    prePos = exchange.GetPosition()

    if len(prePos) != 0:
        for i in range(len(prePos)):
            if prePos[i].ContractType not in mainList:
                mainCode = re.search(r'([A-Za-z]+)\d+', prePos[i].ContractType).group(1)
                index = codeList.index(mainCode)
                mainID = mainList[index]

                Log('旧合约', prePos[i].ContractType, '需要被更换为', mainID)
                if prePos[i].Type == PD_LONG or prePos[i].Type == PD_LONG_YD:
                    # 平掉旧合约
                    exchange.SetContractType(prePos[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('closebuy')
                    exchange.Sell(curPrice, 1)

                    # 开仓新合约
                    exchange.SetContractType(mainID)
                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('buy')
                    exchange.Buy(curPrice, 1)

                else:
                    exchange.SetContractType(prePos[i].ContractType)
                    curPrice = exchange.GetRecords()[-1].Close + 5
                    exchange.SetDirection('closesell')
                    exchange.Buy(curPrice, 1)

                    exchange.SetContractType(mainID)
                    curPrice = exchange.GetRecords()[-1].Close - 5
                    exchange.SetDirection('sell')
                    exchange.Sell(curPrice, 1)

    # 移仓完成后再次判断
    afterPos = exchange.GetPosition()
    if len(prePos) != 0:
        all_in_main_list = all(afterPos[i].ContractType in mainList for i in range(len(afterPos)))
        if all_in_main_list:
            Log('所有合约都是主力合约', "#00FF00")

我们设置一个策略样例进行检验,固定框架安装上,首先第一步进行展期收益率的计算,然后划分做多组和做空组,在进行交易之前,首先进行移仓的处理,确保最新的仓位都是主力合约,接着进行对应的交易操作。这样呢,就是一个单因子模型的主体交易部分。当然为了实盘的运行,我们需要加上策略图表的展示,还有收益的展示。

这里比较特殊的一点是,因为我们想固定天数间隔运行一次策略,而策略的实时收益展示,我们是想及时更新的,所以这里计算了主操作的时间间隔,如果主操作的间隔大于固定周期,进行主策略的运行;而策略的实时展示,是一直在进行的。

设置休眠时间是10天,我们试着运行一下。在回测结果里,可以看到,首先进行建仓的处理,然后每隔10天,进行一下交易的操作,在品种对应组别更换的时候,进行相应的平仓还有相反方向开仓的处理。当接近主力合约更换的月份,我们进行移仓换月的处理。

def main():

    # 记录上次执行的时间
    last_operation_timeBig = 0
    isFirst = True
    initAccount = exchange.GetAccount()

    while True:
        current_timeBig = int(time.time())
        
        # 获取时间间隔(秒)
        if isFirst:
            time_intervalBig = 24 * 60 * 60 * 10
            isFirst = False
        else:
            time_intervalBig = current_timeBig - last_operation_timeBig

        # 执行多空组买卖之前的操作(每隔10天执行一次)
        if time_intervalBig >= 24 * 60 * 60 * 10:
            
            if exchange.IO("status"):
                # 移仓
                ext.posTrans()

                # 计算展期收益率
                callRollover = ext.calRollover()
                
                # 划分组
                positive_codes = ext.groupFactor(callRollover, 'RollOver')[0]
                negative_codes = ext.groupFactor(callRollover, 'RollOver')[1]

                # 多空组买卖
                ext.trade(positive_codes, negative_codes)

                last_operation_timeBig = current_timeBig

            else:
                Sleep(1)
            
        positions = exchange.GetPosition()

        if len(positions) != 0:
            longContract = []
            longPrice = []
            longProfit = []

            shortContract = []
            shortPrice = []
            shortProfit = []

            for i in range(len(positions)):
                if (positions[i]['Type'] == PD_LONG) or (positions[i]['Type'] == PD_LONG_YD):
                    longContract.append(positions[i]['ContractType'])
                    longPrice.append(positions[i]['Price'])
                    longProfit.append(positions[i]['Profit'])
                else:
                    shortContract.append(positions[i]['ContractType'])
                    shortPrice.append(positions[i]['Price'])
                    shortProfit.append(positions[i]['Profit'])

            tblAStatus = {
                "type" : "table",
                "title" : "持多组",
                "cols" : ["合约名称", "持仓均价", "持仓盈亏"],
                "rows" : [] 
            }

            tblBStatus = {
                "type" : "table",
                "title" : "持空组",
                "cols" : ["合约名称", "持仓均价", "持仓盈亏"],
                "rows" : [] 
            }

            for i in range(len(longContract)):
                tblAStatus["rows"].append([longContract[i], longPrice[i], longProfit[i]])

            for i in range(len(shortContract)):
                tblBStatus["rows"].append([shortContract[i], shortPrice[i], shortProfit[i]])

            lastStatus = f"`{json.dumps(tblAStatus)}`\n`{json.dumps(tblBStatus)}`"

            curAccount = exchange.GetAccount()

            curProfit = curAccount.Balance - initAccount.Balance    
            LogProfit(curProfit, "&")

            LogStatus(lastStatus)

        Sleep(1)

本节的课程呢,是一个单因子的示范策略。在后续课程中,我们将继续探讨多因子模型,包括如何选择和组合不同的因子,以构建更强大的模型来分析和预测商品期货市场的走势。了解单因子模型是深入理解多因子模型的第一步,当然后续课程的难度也会逐步的增加,让我们慢慢来。

视频参考链接:

《期货市场的免费午餐?期限结构Carry收益模型分享》

28.高阶中心矩类因子

前面两节课程,我们介绍了多因子模型的概念,以及使用展期收益率进行了单因子模型的构建,初步建立起了多因子模型的框架。从本节课开始,我们将逐步的完善多因子模型,包括多个不同类型因子的引入,因子有效性的判断,因子的处理和合成等。

概念介绍

相对于股票市场,不同的股票对应的是实体的公司,所以有一些固定的因子,比如市盈率,市净率,年度收益等等。可以作为股票的因子直接拿过来使用。对于期货市场,因子的计算大多数的时候需要我们手动的计算和获取。但是即使针对于同一种因子,使用不同的计算方法,所获得的因子有效性也是具有显著区别的。这里的计算方法的差别有很多,比如k线的获取频率,因子的窗口周期,因子的标准化的方法等等。本节课呢,我们就介绍一种比较简单的因子计算方法,由此呢,来看出因子的一些计算差异。

本节课我们要介绍的一种因子类型,是高阶中心矩类因子。它是价格数据的直接生成品,因此计算起来并不复杂。但是作为一种参数类型,学会计算方式,将有助于我们计算其他与其相关的因子类型。另外,上节课我们只是使用了展期收益率一个因子,所以分组判断groupFactor和交易的trade函数设计的不是太完善,不能适用于所有的因子,所以这节课我们也要将这两个函数进行完善。

首先我们介绍一下高阶中心矩类因子:

  • 波动率因子:在商品期货市场中,波动率因子被用来衡量商品价格的波动性。它反映了商品价格变动的幅度,是价格变动序列的二阶中心矩。在实际应用中,波动率因子可以用来预测商品期货价格的变动趋势和风险程度。一般来说,波动率因子越大,意味着价格变动越剧烈,风险也相应较高。因此,在投资决策中,对波动率因子的分析和理解非常重要。一般来说,我们需要根据自己的风险承受能力和投资目标来决定是做多波动率较高的商品期货还是做多波动率较低的商品期货。如果我们希望追求较高的收益,同时也有较高的风险承受能力,那么可以选择做多波动率较高的商品期货。相反情况下,如果希望保持相对稳健的投资策略,那么可以选择做多波动率较低的商品期货。

  • 偏度因子:偏度因子是商品期货收益率序列的三阶中心矩,用于衡量价格变动的不对称性。在商品期货市场中,偏度因子的绝对值越大,意味着价格变动的离散程度越大,价格波动的分布曲线越有可能出现长拖尾现象。如果价格变动表现为右偏,意味着价格下跌的概率大于上涨的概率;反之,如果价格变动表现为左偏,意味着价格上涨的概率大于下跌的概率。所以研究发现偏度和收益率之间往往呈现负相关性,说明偏度因子也可以看作是一种反向指标。

image

  • 峰度因子:峰度因子是商品期货收益率序列的四阶中心矩,用于衡量价格变动的峰态。直观来说,峰度反映了价格变动分布的峰部尖度。正态分布的峰度为3,如果价格变动的分布峰度大于3,称为“尖峰态”;如果峰度小于3,则称为“低峰态”。对于尖峰分布来说,其尾部集中了比正态分布更多的数据量,这些数据往往是过大或过小的离群值。因此,尖峰分布往往伴随着“肥尾”现象。同理,厚峰分布的数据更多地分布在均值附近,其尾部更细。与偏度相比,峰度与波动率的关系更为紧密。波动率衡量的是随机变量偏离均值的离散程度,但并没有表明该离散程度主要归属于随机变量的哪些值。而峰度则衡量了随机变量的离散程度主要是来自峰部还是尾部。和波动率一样,峰度因子也是一个反向指标。

image

总之,高阶中心矩类因子在商品期货市场中具有重要的应用价值。通过对波动率因子、偏度因子和峰度因子的分析和理解,我们可以更好地把握商品期货市场的动态和风险特征,为投资决策提供有力的支持。

计算方式

下面我们来看下计算的公式。高阶中心矩类因子是需要一定时间段内的数据的,所以我们需要设置回看周期参数,计算指定的回看周期内的高阶中心矩类因子的数值。

第一个,波动率。其中$ret_i$为 5 分钟收益率,$𝑖 = 1, 2,3$为5分钟收益率的序列,$\mu_{5\text{min}}$为回看期内所有 5 分钟收益率序列的均值。

$sd_{\text{t}} = \sqrt{\mathbb{E}\left[(ret_{\text{i}} - \mu_{5\text{min}})^2\right]}$

第二个,偏度因子值。在上面公式基础上,除以$\sigma_{5\text{min}}$,它是波动率,然后是3次方。

$skew_{\text{t}} = \mathbb{E}\left[\left(\frac{ret_{i} - \mu_{5\text{min}}}{\sigma_{5\text{min}}}\right)^3\right]$

第三个,和偏度因子值的计算公式几乎一致,只是作为更高阶,这里要改成4次方。

$kurt_{\text{t}} = \mathbb{E}\left[\left(\frac{ret_{i} - \mu_{5\text{min}}}{\sigma_{5\text{min}}}\right)^4\right]$

相对于以往使用价格数据,这里使用收益率作为$ret_i$价格变化的统计。收益率消除了不同期货品种价格水平的影响,使得不同资产之间更容易进行比较。在许多金融模型中,收益率通常被假定为服从正态分布,这简化了数学模型的运算。而价格的分布通常不是正态分布。另外,收益率提供了对资产风险的更直观测量。投资者通常更关心的是他们的投资在一段时间内的表现,而不仅仅是资产的绝对价格水平。

我们来看下使用代码怎么实现。在上节课模版类库的基础上,定义calCMF函数,这个函数里面定义两个参数,PERIOD代表k线周期,backPERIOD代表回看周期。在函数里面,首先获取主力合约列表,使用上节课讲到的getTarList函数。我们对三个指标的计算是使用的收益率数据。这里使用轮询的方式,对每个合约先取k线数据,这里取得k线数据的范围是从-backPERIOD-2到-1,最后一根是倒数第二根,是完成的k线。这样呢,我们获取了backPERIOD+1根k线,然后我们使用最新收盘价除以上一根收盘价,再减去1就是收益率序列,可以获取backPERIOD个收益率数据。这里重要的一点是需要对收益率进行标准化。因为我们假设收益率是符合正态分布的,所以我们使用正态分布标准化。接下来,三个指标的计算就比较简单了。可以使用pandas包的三个函数,std(),skew()和kurt(),获取到各个品种的结果最后进行返回就可以。

# 计算高阶矩阵因子

def calCMF(PERIOD, backPERIOD):
    contractList = getTarList()[0]

    df = pd.DataFrame(columns=["InstrumentId", "Std", "Skew", "Kurt"])

    for codeID in contractList:
        
        info = exchange.SetContractType(codeID)

        records = exchange.GetRecords(PERIOD)[-backPERIOD-2: -1]

        recent_returns = [records[i + 1].Close / records[i].Close - 1 for i in range(len(records) - 1)]

        standardized_returns = (recent_returns - np.mean(recent_returns)) / np.std(recent_returns) # 收益率标准化

        std_dev = pd.Series(standardized_returns).std()
        skewness = pd.Series(standardized_returns).skew()
        kurtosis = pd.Series(standardized_returns).kurt()

        # 将结果添加到 DataFrame
        df = df.append({
            "InstrumentId": codeID,
            "Std": std_dev,
            "Skew": skewness,
            "Kurt": kurtosis
        }, ignore_index=True)

    return df

怎样判断因子的有效性呢,可以使用分层回归法。其实和我们上节课在单品种中使用的,展期收益率为正数进行做多,展期收益率为负数进行做空类似。但是这里根据待测试的因子排序,将期货品种分为N组来进行分组回测,使用固定的周期来进行调仓的操作。 如果情况理想,N组品种的收益率会呈现较好的单调性,单调递增或递减,且每一组的收益差距较大。这样的因子体现为较好的区分度。假如第一组收益最高,最后一组收益最低,那么做多第一组组合并做空最后一组组合,然后比较使用不同因子获得的收益率和夏普比率,来判断不同因子的有效性。

还记得上节课我们是怎样使用groupFactor对展期收益率进行分组的吗,对展期收益率为正的做多,对展期收益率为负的做空,但是有时候两者的数目不是一致的,可能存在很大的差异。另外对于波动率因子和峰度因子都是正数,所以不能按照符号进行区分。所以,我们重新改写下这个groupFactor函数。这里首先将因子进行排序,这里我们分成四组,获取各组的数量。分数排名前25%的为第一组,最后25%的第二组。这里我们使用pandas包的head函数,定义因子前25%为head_group,使用tail函数获取最后25%,最后将两个组的合约代码进行返回。

# 多空组判断

def groupFactor(factorDf, factorName):

    sortDf = factorDf.sort_values(by=factorName, ascending=False)

    # 取前25%和最后25%的数据
    total_count = len(sortDf)
    group_count = int(total_count * 0.25)

    head_group = sortDf.head(group_count)  # 第一组取前25%
    tail_group = sortDf.tail(group_count)  # 第二组取最后25%

    head_codes = head_group['InstrumentId'].tolist()  # 第一组codes
    tail_codes = tail_group['InstrumentId'].tolist()  # 第二组codes

    return [head_codes, tail_codes]

第二方面,对于trade函数,在前面一堂课中,我们对所有品种进行分类,然后进行交易的操作,如果该品种不再是持多组,那么该品种一定移动到了持空组,但是现在由于我们只做前25%和后25%,所以可能很多品种移动到了空闲组,就是不进行任何操作。所以我们也需要重新改写下函数。

在原来的函数基础上,首先获取目前持有多仓和空仓和合约列表,我们首先判断,当某品种已经具有仓位,但是不再是持多组或者持空组,我们第一步进行平仓。下面呢,再根据最新的需要做多或者做空的列表,开需要开的仓位,这里判断当前品种不再持仓当中,但是属于持多组或者持空组,我们进行相应仓位的开仓。

# 多空组买卖
def trade(pos_group, neg_group):
    posInfo = exchange.GetPosition()

    if len(posInfo) == 0:
        for i in range(len(pos_group)):
            exchange.SetContractType(pos_group[i])
            curPrice = exchange.GetRecords()[-1].Close + 5
            exchange.SetDirection('buy')
            exchange.Buy(curPrice, 1)
        
        for i in range(len(neg_group)):
            exchange.SetContractType(neg_group[i])
            curPrice = exchange.GetRecords()[-1].Close - 5
            exchange.SetDirection('sell')
            exchange.Sell(curPrice, 1)
        Log('多因子策略开始,建仓完成', "#FF0000")
    else:

        longList = [pos.ContractType for pos in posInfo if pos.Type == PD_LONG or pos.Type == PD_LONG_YD] # 已经开多仓
        shortList = [pos.ContractType for pos in posInfo if pos.Type == PD_SHORT or pos.Type == PD_SHORT_YD] # 已经开空仓

        # 检查已经开的多仓,如果在需要开多仓的列表中,则保留,否则平仓
        for pos_contract in longList:
            if pos_contract not in pos_group:
                # 进行平仓操作
                Log(pos_contract, '多余多仓进行平仓')
                exchange.SetContractType(pos_contract)
                curPrice = exchange.GetRecords()[-1].Close - 5
                exchange.SetDirection('closebuy')
                exchange.Sell(curPrice, 1)

        # 检查已经开的空仓,如果在需要开空仓的列表中,则保留,否则平仓
        for neg_contract in shortList:
            if neg_contract not in neg_group:
                # 进行平仓操作
                Log(neg_contract, '多余空仓进行平仓')
                exchange.SetContractType(neg_contract)
                curPrice = exchange.GetRecords()[-1].Close + 5
                exchange.SetDirection('closesell')
                exchange.Buy(curPrice, 1)

        # 开需要开的多仓
        for pos_contract in pos_group:
            if pos_contract not in longList:
                # 进行开仓操作
                Log(pos_contract, '需要开多的进行开多仓')
                exchange.SetContractType(pos_contract)
                curPrice = exchange.GetRecords()[-1].Close + 5
                exchange.SetDirection('buy')
                exchange.Buy(curPrice, 1)

        # 开需要开的空仓
        for neg_contract in neg_group:
            if neg_contract not in shortList:
                # 进行开仓操作
                Log(neg_contract, '需要开空的进行开空仓')
                exchange.SetContractType(neg_contract)
                curPrice = exchange.GetRecords()[-1].Close - 5
                exchange.SetDirection('sell')
                exchange.Sell(curPrice, 1)

这样是一种平仓的方法,另外还有一种方法,就是如果持仓品种不再属于做多组或者做空组的时候,我们不是直接进行平仓,而是等到落到相反组别的时候,再进行平仓。这两种平仓方法哪种更好,大家可以根据自己的想法去进行试验。

然后,我们验证各个因子的有效性的时候,在分组判断groupFactor函数中,如果是负向指标,比如偏度和峰度,所以这里positive_codes是groupFactor函数返回的第二个索引,negative_codes是第一个索引。这里我们使用Skew因子做一下层次回归法的因子验证,设置好代码,我们可以进行回测,获取到不同因子的收益率和夏普比率,比较下因子的有效性。

'''backtest
start: 2021-11-02 09:00:00
end: 2022-11-30 15:00:00
period: 1d
basePeriod: 1h
exchanges: [{"eid":"Futures_CTP","currency":"FUTURES","depthDeep":20}]
'''

import json
import time
import numpy as np

def main():

    # 记录上次执行的时间
    last_operation_timeBig = 0
    isFirst = True
    initAccount = exchange.GetAccount()

    while True:
        current_timeBig = int(time.time())
        
        # 获取时间间隔(秒)
        if isFirst:
            time_intervalBig = 24 * 60 * 60 * holdPeriod
            isFirst = False
        else:
            time_intervalBig = current_timeBig - last_operation_timeBig

        # 执行多空组买卖之前的操作(每隔10天执行一次)
        if time_intervalBig >= 24 * 60 * 60 * holdPeriod:
            
            if exchange.IO("status"):

                # 移仓
                ext.posTrans()

                # 计算展期收益率
                resRollover = ext.calRollover()

                # 计算CMF
                resCMF = ext.calCMF(PERIOD_M5, backPeriod)
                
                # 划分组
                positive_codes = ext.groupFactor(resCMF, 'Skew')[1]
                negative_codes = ext.groupFactor(resCMF, 'Skew')[0]

                # 多空组买卖
                ext.trade(positive_codes, negative_codes)

                last_operation_timeBig = current_timeBig

            else:
                Sleep(1)

            
        positions = exchange.GetPosition()

        if len(positions) != 0:
            longContract = []
            longPrice = []
            longProfit = []

            shortContract = []
            shortPrice = []
            shortProfit = []

            for i in range(len(positions)):
                if (positions[i]['Type'] == PD_LONG) or (positions[i]['Type'] == PD_LONG_YD):
                    
                    longContract.append(positions[i]['ContractType'])
                    longPrice.append(positions[i]['Price'])
                    longProfit.append(positions[i]['Profit'])
                else:
                    
                    shortContract.append(positions[i]['ContractType'])
                    shortPrice.append(positions[i]['Price'])
                    shortProfit.append(positions[i]['Profit'])

            tblAStatus = {
                "type" : "table",
                "title" : "持多组",
                "cols" : ["合约名称", "持仓均价", "持仓盈亏"],
                "rows" : [] 
            }

            tblBStatus = {
                "type" : "table",
                "title" : "持空组",
                "cols" : ["合约名称", "持仓均价", "持仓盈亏"],
                "rows" : [] 
            }

            for i in range(len(longContract)):
                tblAStatus["rows"].append([longContract[i], longPrice[i], longProfit[i]])

            for i in range(len(shortContract)):
                tblBStatus["rows"].append([shortContract[i], shortPrice[i], shortProfit[i]])


            lastStatus = f"`{json.dumps(tblAStatus)}`\n`{json.dumps(tblBStatus)}`"

            curAccount = exchange.GetAccount()

            totalProfit = sum(position['Profit'] for position in positions)
            totalMargin = sum(position['Margin'] for position in positions)

            curProfit = curAccount.Balance - initAccount.Balance + totalProfit + totalMargin
            LogProfit(curProfit, "&")

            LogStatus(lastStatus)

        Sleep(1)

另外如果我们想测试参数的最优组合,比如这里的高阶因子类的回看周期,我们想分别定义为300,400和500,然后持仓周期我们也设置为参数,这里定义为1天,10天和30天,测试下最优的参数组合配置。为了比较不同参数组合的效果,我们不需要重新编写函数来呈现具体的结果,还记得前面讲过的函数的调参吗,这里我们设定下参数的范围,然后看下具体参数组合的策略收益。

本节课呢,我们计算了高阶中心类的因子,并且随着因子类型的扩充,我们重新完善了groupFactor和trade函数,使我们的模型更具适用性。另外,现在我们使用的是各个模块函数的方式,所以有些重复变量调用的冗余,这一部分呢,我们将在各个模块函数设计的初步完善的时候,进行统一的封装。在后续的课程中,我们将伴随模型设计的完善,一步步的优化各个函数。这是一个探索的过程,大家有好的想法可以留言评论区,我们也会根据大家的想法进行优化。

视频参考链接:

《数字货币因子模型》 《中信期货-期货多因子系列(一):动量及高阶矩因子在商品期货截面上的运用》

29.因子有效性的检验方法

多因子模型是一种通过多个因子来解释和预测市场变动的模型。在这些因子中,每一个因子都代表着市场的一种特定特征或趋势。然而,并不是所有的因子都是有效的,也就是说,它们对市场的解释和预测能力并不是完全相同的。因此,在进行多因子分析之前,对单因子进行有效性检验就显得尤为重要。

单因子有效性检验的主要目的是确定每个因子是否能够独立地解释市场的变动。如果某个因子无法通过有效性检验,那么它就不能被纳入到多因子模型中。通过这样的筛选过程,我们可以确保最终的模型只包含对市场变动有显著影响的因子,从而提高模型的预测能力和解释能力。

上节课我们使用了分层回归法。本节课,我们首先补充下因子的平稳性检验方法;然后介绍其他两种方法,对商品期货市场的单因子进行有效性检验,分别是t检验和RankIC的方法。

在课前首先声明下哈,因子有效性验证的方法确实很多,并且使用的参数和具体的代码,也都有很大的差异。我们的课程,也是根据专业机构的研报,试着做一些探索性的工作。如果有哪里讲得不足或者欠缺,欢迎大家提出更多意见。

因子平稳性检验

首先,我们来看因子的平稳下检验。因子平稳性检验是在使用因子模型进行资产定价或投资组合管理时的一项重要步骤。平稳性指的是时间序列数据的统计特性在不同时间段内保持稳定。对于因子而言,平稳性是确保因子对资产或投资组合的解释能力和预测能力的关键因素。如果因子的相关性很高,说明这个因子的持续性会很好,强者恒强。关于因子平稳性的检验方法有很多,其中使用最多的是单位根检验,它是检验时间序列数据是否具有单位根(也就是非平稳性)的方法之一。ADF检验和PP检验是常用的单位根检验方法。如果在检验中无法拒绝原假设(存在单位根),那么表明因子可能是非平稳的。

这里我们来看怎样实现,首先需要获取数据。这里为了计算的方便,我们设置合约列表为8个。在策略开头,创建一个空的factor_df,用来保存k线数据,和前面我们讲过的四个因子,包括展期收益率,波动率,偏度和峰度。这里我们设置用来进行因子平稳性检验的日期设置为200天。在while循环里,我们来收集数据,200天的数据,设置循环条件是因子列表长度小于合约列表长度8乘以200。然后,我们使用各个模块函数来获取k线,展期收益率,和高阶中心矩类因子,以合约名称为key进行merge合并,最后将新收集到的数据进行concat纵向合并。

数据收集完成以后,这里我们使用的是ADF方法进行因子平稳性的检验。在多因子模型中,通常是分别对每个品种的因子进行ADF检验,而不是将所有品种的因子合并一起做检验。这是因为每个品种的价格走势和因子可能受到不同的影响,因此需要对每各品种的因子进行独立的平稳性检验。

这里我们对八个品种的四个因子分别做adf检验,首先导入需要的包,然后使用两个轮询不同的因子,和不同的品种,然后打印对应的p值到日志当中。综合来看,RollOver和std中具有较多的品种的p值大于0.05,代表不符合平稳性检验,而Skew和Kurt所有的因子都符合平稳性检验。当然这里我们只是展示一下具体平稳性检验的操作,伴随策略周期的改变,因子的平稳性检验结果可能会发生相应的变化。

'''backtest
start: 2022-11-01 09:00:00
end: 2023-11-01 15:00:00
period: 1d
basePeriod: 1h
exchanges: [{"eid":"Futures_CTP","currency":"FUTURES","depthDeep":20}]
args: [["holdPeriod",1]]
'''

import json
import time
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller

def main():

    # 记录上次执行的时间
    last_operation_timeBig = 0
    isFirst = True

    # Initialize factor_df
    factor_df = pd.DataFrame(columns=["Instrument", "InstrumentId", "Time", "Open", "High", "Low", "Close", "Volume", "RollOver", "Std", "Skew", "Kurt"])

    while len(factor_df) < 200 * 8:

        current_timeBig = int(time.time())
        
        # 获取时间间隔(秒)
        if isFirst:
            time_intervalBig = 24 * 60 * 60 * holdPeriod
            isFirst = False
        else:
            time_intervalBig = current_timeBig - last_operation_timeBig

        # 执行多空组买卖之前的操作(每隔holdPeriod执行一次)
        if time_intervalBig >= 24 * 60 * 60 * holdPeriod:
            
            if exchange.IO("status"):


                # 获取k线
                rData = ext.getMainData(PERIOD_D1, ext.getTarList()[0])

                # 计算展期收益率
                resRollover = ext.calRollover()

                # 计算CMF
                resCMF = ext.calCMF(PERIOD_M5, backPeriod)

                # 合并数据

                new_df = pd.merge(rData, resRollover, on='InstrumentId', how='inner').merge(resCMF, on='InstrumentId', how='inner')

                factor_df = pd.concat([factor_df, new_df], ignore_index=True)

                # 更新时间

                last_operation_timeBig = current_timeBig
        
        Sleep(60*60*2)

    factors = ['RollOver', 'Std', 'Skew', 'Kurt']

    for factor in factors:
        Log(f"ADF Test for {factor}:")
        for instrument in factor_df['Instrument'].unique():
            subset = factor_df[factor_df['Instrument'] == instrument][factor]
            result = adfuller(subset)
            Log(f"Instrument {instrument}: p-value = {result[1]}")

回归法(T检验)

接下来我们进行因子有效性的检验。回归分析,也称为t检验。t检验是一种常用的统计检验方法,大多数情况下,用于比较两组数据的均值是否存在显著差异,也可以用于在回归分析中,计算自变量的显著性。多因子策略中因子有效性检验的回归法,主要是通过将因子 T 期的因子暴露与 T 1期的收益率进行线性回归,得到的回归系数即为该因子对收益率的影响系数。同时,还能通过t-test,获得该因子对于收益率的显著度水平,从而从量化的角度鉴别该因子的有效性。这里可以一次回归多个因子,或者单独回归。

我们来看怎样使用代码进行实现。这里需要导入进行线性回归的包。

在上一步收集数据的基础上,这里我们需要根据收盘价计算收益率,代表因变量。我们要分品种计算收益率,所以设置groupby为Instrument,然后进行收益率的标准化。基期的收益率是为0的,因为我们想使用T期的因子数据预测T+1的收益率数据,所以需要将收益率进行前移。使用shift(-8),然后删除空值。

接下来我们定义收益率为因变量,提取四个因子定义为自变量,自变量同样进行标准化的处理。这样我们的数据基本就处理完毕了,接下来进行线性回归检验。这里我们设置DataFrame来存储各个因子的显著性的结果。然后使用for循环遍历自变量,将各个因子分别与收益率进行线性回归分析,并记录该因子对应的系数值,t值和p值。这里我们使用状态栏进行展示。结果表明,仅有展期收益率的因子对各品种的收益率具有显著的影响作用。

'''backtest
start: 2022-11-01 09:00:00
end: 2023-11-01 15:00:00
period: 1d
basePeriod: 1h
exchanges: [{"eid":"Futures_CTP","currency":"FUTURES","depthDeep":20}]
args: [["backPeriod",70],["holdPeriod",1]]
'''

import json
import time
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm

def main():

    # 记录上次执行的时间
    last_operation_timeBig = 0
    isFirst = True
    initAccount = exchange.GetAccount()

    # Initialize factor_df
    factor_df = pd.DataFrame(columns=["Instrument", "InstrumentId", "Time", "Open", "High", "Low", "Close", "Volume", "RollOver", "Std", "Skew", "Kurt"])

    while len(factor_df) < 200*8:

        current_timeBig = int(time.time())
        
        # 获取时间间隔(秒)
        if isFirst:
            time_intervalBig = 24 * 60 * 60 * holdPeriod
            isFirst = False
        else:
            time_intervalBig = current_timeBig - last_operation_timeBig

        # 执行多空组买卖之前的操作(每隔10天执行一次)
        if time_intervalBig >= 24 * 60 * 60 * holdPeriod:
            
            if exchange.IO("status"):

                # 获取k线
                rData = ext.getMainData(PERIOD_D1, ext.getTarList()[0])

                # 计算展期收益率
                resRollover = ext.calRollover()

                # 计算CMF
                resCMF = ext.calCMF(PERIOD_M5, backPeriod)

                # 合并数据

                new_df = pd.merge(rData, resRollover, on='InstrumentId', how='inner').merge(resCMF, on='InstrumentId', how='inner')

                factor_df = pd.concat([factor_df, new_df], ignore_index=True)

                last_operation_timeBig = current_timeBig
        
        Sleep(60*60*2)

    factor_df['Returns'] = factor_df.groupby('Instrument')['Close'].pct_change() # 计算收益率
    factor_df.Returns = factor_df.Returns.apply(lambda x: (x - factor_df.Returns.mean()) / factor_df.Returns.std()) # 收益率标准化
    factor_df['Returns'] = factor_df['Returns'].shift(-8) # 收益率前移
    factor_df = factor_df.dropna() # 去除空值

    # 定义因变量和自变量
    y = factor_df['Returns']  # 收益率
    X = factor_df.iloc[:, 8:12] # 4个因子
    X_standardized = X.apply(lambda col: (col - col.mean()) / col.std(), axis=0) # 因子标准化

    # 创建DataFrame来存储结果
    significant_factors_df = pd.DataFrame(columns=['Factor', 'Coefficient', 'Absolute_t_value', 'P_value'])

    # 遍历自变量
    for ind_var in X_standardized.columns:
        model = sm.OLS(y, X_standardized[ind_var])  # 添加截距项
        results = model.fit()

        # 将结果添加到DataFrame中
        significant_factors_df = significant_factors_df.append({
            'Factor': ind_var,
            'Coefficient': results.params[ind_var],
            'Absolute_t_value': abs(results.tvalues[ind_var]),
            'P_value': results.pvalues[ind_var]
        }, ignore_index=True)

    # 状态栏展示 
    table = {  
        "type" : "table",   
        "title" : "回归分析结果",   
        "cols" : ["Factor", "Coefficient", "Absolute_t_value", "P_value"],   
        "rows" : []  
    }

    for i in range(len(significant_factors_df)):
        table["rows"].append(significant_factors_df.iloc[i].astype(str).tolist())  

    # 显示结果
    LogStatus('`' + json.dumps(table) + '`')

需要注意的是,这里我们使用的是一次性回归所有的时间,我们也可以按照固定周期进行回归,获取到连续的t值,然后统计一下t值的均值是否大于2,代表是否显著;另外还可以统计t值大于2的占比,用来表示t值的稳定性。

这里关于其他因子不显著的原因,有多方面的原因,可是是因为波动率,偏度和峰度受到回看周期参数的影响,另外,这也是由于回归因子验证法固有的缺陷,只能验证因子对于收益率的线性关系。我们可以根据所学的统计学知识进行非线性方程的验证。

RankIC因子验证性方法

接下来我们来介绍RankIC(Rank Information Coefficient)和RankIC_IR(Rank Information Coefficient Information Ratio)因子验证性方法。

RankIC(排名信息系数): RankIC度量了因子值的排名与未来期(通常是下一期)的收益率之间的相关性。它是通过计算因子值的排名和未来期收益率的排名之间的相关系数来定义的。RankIC的范围在-1到1之间,越接近1表示因子值的排名与未来收益率呈正相关,越接近-1表示负相关,而接近0表示几乎没有相关性。

RankIC_IR(排名信息系数信息比率): RankIC_IR是RankIC的信息比率,它衡量了因子预测的相对表现。RankIC_IR是RankIC的均值除以其标准差,因此它提供了一个考虑到因子预测稳定性的度量。通常,高的RankIC_IR值表示因子在预测未来收益方面具有较好的表现。

这些指标的评估有助于确定在多因子模型中使用哪些因子对于预期收益的预测是有效的。

我们来看下怎样使用代码进行计算,这里需要导入rankdata的包。开始的数据处理,因子和目标变量的提取和回归法基本一样。

这里首先定义一个空的dataframe,用来保存各个因子的rankic值。接下来使用轮询,分别计算各个因子和收益率的RankIC值。这里我们不是统一的计算一个RankIC值,而是按照周期计算一个连续的RankIC值,这里我们设置RankIC的周期是800,步长是8,也就是RankIC计算的周期是100天,每隔一天计算一次,保存到rankic_df各个因子的列中。然后我们对各个因子取平均,就是该因子的RankIC均值,使用均值除以标准差,就是RankIC_IR值。

根据返回的结果,一般认为RankIC大于0.03时,因子值跟下一期收益率的相关性较为明显;RankICIR大于0.5时,因子稳定获取超额收益的能力较强。可以看到展期收益率和偏度的RankIC的绝对值大于0.03,并且它的是RankIC_IR值大于0.5,证明这两个因子对预测收益具有一定的预测作用。

'''backtest
start: 2022-11-01 09:00:00
end: 2023-11-01 15:00:00
period: 1d
basePeriod: 1h
exchanges: [{"eid":"Futures_CTP","currency":"FUTURES","depthDeep":20}]
args: [["holdPeriod",1]]
'''

import json
import time
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from scipy.stats import rankdata


def main():

    # 记录上次执行的时间
    last_operation_timeBig = 0
    isFirst = True
    initAccount = exchange.GetAccount()

    # Initialize factor_df
    factor_df = pd.DataFrame(columns=["Instrument", "InstrumentId", "Time", "Open", "High", "Low", "Close", "Volume", "RollOver", "Std", "Skew", "Kurt"])

    while len(factor_df) < 200*8:

        current_timeBig = int(time.time())
        
        # 获取时间间隔(秒)
        if isFirst:
            time_intervalBig = 24 * 60 * 60 * holdPeriod
            isFirst = False
        else:
            time_intervalBig = current_timeBig - last_operation_timeBig

        # 执行多空组买卖之前的操作(每隔持仓周期执行一次)
        if time_intervalBig >= 24 * 60 * 60 * holdPeriod:
            
            if exchange.IO("status"):

                # 获取k线
                rData = ext.getMainData(PERIOD_D1, ext.getTarList()[0])

                # 计算展期收益率
                resRollover = ext.calRollover()

                # 计算CMF
                resCMF = ext.calCMF(PERIOD_M5, backPeriod)

                new_df = pd.merge(rData, resRollover, on='InstrumentId', how='inner').merge(resCMF, on='InstrumentId', how='inner')

                factor_df = pd.concat([factor_df, new_df], ignore_index=True)

                last_operation_timeBig = current_timeBig
        
        Sleep(60*60*2)

    factor_df['Returns'] = factor_df.groupby('Instrument')['Close'].pct_change() 
    factor_df.Returns = factor_df.Returns.apply(lambda x: (x - factor_df.Returns.mean()) / factor_df.Returns.std())
    factor_df['Returns'] = factor_df['Returns'].shift(-8)
    factor_df = factor_df.dropna()

    # 提取因子和目标变量
    factors = factor_df[['RollOver', 'Std', 'Skew', 'Kurt']]
    factors = factors.apply(lambda col: (col - col.mean()) / col.std(), axis=0)
    returns = factor_df['Returns']

    # 保存rankic值
    rankic_df = pd.Dataframe()

    for factor in factors.columns:
        iclist = []
        for i in range(0,len(factors)+1-800,8): # 步长8
            current_factors = factors[factorl.ilocli:i+800]
            current_returns = returns.iloc[i:i+800]
            factor_ranks = rankdata(current_factors)
            returns_ranks = rankdata(current_returns)
            rankic_values = np.corrcoef(factor_ranks,returns_ranks)[0,1]
            iclist.append(rankic_values)
        rankic_df[factorl=iclist
    
    rankic_means = rankic_df.mean()
    rankic_ir = rankic_df.mean()/rankic_df.std()

但是需要注意的是,我们的预测周期是200天,所以拉长范围,因子的有效性会受到一定质疑。并且这些阈值通常是基于经验和实践得出的,并不是普适于所有情况。在具体的应用中,可以根据数据和具体问题的性质来调整这些阈值。同时,这些阈值的解释可能还受到市场环境、交易成本等因素的影响。因此,在使用这些规则时,需要谨慎考虑具体情境,并可以根据实际情况进行调整。

除此之外,还有一些机器学习方法也可以用于单因子检验,如支持向量机(SVM)、决策树(Decision Trees)、随机森林(Random Forests)等。这些方法可以通过学习历史数据来预测未来的市场变动,并可以用于评估因子的有效性。

最后在这里需要给大家说一声抱歉,由于确实期货多因子模型的资料比较少,使用的大部分都是来源于股票类的代码框架,再手动的编写代码移植到期货市场中,所以课程中代码整理和呈现会比较杂乱一点。在后期的课程中,我们也会不断优化,最后会呈现一个清晰明了可复用的实盘模型框架,请大家稍微等待一下。

视频参考链接:

《数字货币因子模型》

《中信期货-金融工程专题报告:期货多因子系列(二),商品期货截面风格因子初探》

30.基本面因子特


更多内容