次方量化 全球宏观 每月宏观 公众号 股票开户

【策略】全天候平衡

lindercube 2个月前 (10-28) 阅读数 320 #量化教程

一个直接能够跑的全天候策略【全天候的收益和回撤大概都是10%,感觉没啥意义,没有上到平台】

全天候策略是一种追求在任何经济环境下都能实现稳健收益的资产配置策略,核心是通过分散投资于不同经济周期表现各异的资产,降低单一市场风险。
这个策略的关键在于不预测市场,而是为所有可能的经济场景做好准备,非常适合追求长期稳健、厌恶剧烈波动的投资者。

核心逻辑:基于经济周期的资产匹配

image.png


该策略的底层逻辑是将经济周期划分为四种主要状态,并为每种状态配置对应的核心资产。通过这种方式,确保无论经济处于繁荣还是衰退,组合中都有资产能贡献收益
# -*- coding: utf-8 -*-
"""
修复卖出逻辑问题
"""

import pandas as pd
import numpy as np
import akshare as ak
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams["font.family"] = ["SimHei", ]
plt.rcParams['axes.unicode_minus'] = False

class RiskParityStrategy:
    def __init__(self, initial_capital=100000, rebalance_period=20, confidence_level=2.58):
        """初始化风险平价策略"""
        self.initial = initial_capital         # 初始资金
        self.rebalance_days = rebalance_period # 再平衡周期(交易日)
        self.confidence = confidence_level    # 置信水平
        
        # 资产配置 (代码更简洁的命名)
        self.assets = {
            'equity': ['510300'],       # 沪深300ETF
            'commodity': ['518880'],    # 黄金ETF
            'commodity2': ['159985'],   # 豆粕ETF(商品类)
            'overseas': ['513100']      # 纳斯达克100ETF
        }
        self.pool = (self.assets['equity'] + self.assets['commodity'] + 
                    self.assets['commodity2'] + self.assets['overseas'])
        
        # 状态变量
        self.cash = initial_capital
        self.positions = {stock: 0 for stock in self.pool}
        self.transaction = {stock: {
            'amount': 0, 'cost': 0, 'buy': 0,
            'sell': 0, 'win': 0, 'loss': 0, 'max_loss': 0,
            'max_date': None, 'profit': 0, 'price': 0
        } for stock in self.pool}
        self.daily_prices = {stock: 0 for stock in self.pool}
        self.portfolio_value = initial_capital
        self.performance = []  # 每日绩效记录
        
        # 回测控制 - 新增交易日计数器
        self.trading_days_count = 0
        self.hold_cycle = rebalance_period
        self.data = {}  # 缓存资产数据
    
    def load_data(self, start_date, end_date):
        """批量加载所有资产数据"""
        print("正在加载历史数据...")
        for category, stocks in self.assets.items():
            for stock in stocks:
                try:
                    df = ak.fund_etf_hist_em(symbol=stock, adjust="qfq")
                    df = df.rename(columns={
                        '日期': 'date', '收盘': 'close'
                    })[['date', 'close']]
                    df['date'] = pd.to_datetime(df['date'])
                    df = df[(df['date'] >= start_date) & (df['date'] <= end_date)]
                    df.sort_values('date', inplace=True)
                    self.data[stock] = df
                    print(f"已加载 {stock} 数据,共 {len(df)} 条记录")
                except Exception as e:
                    print(f"加载 {stock} 数据失败: {e}")
                    self.data[stock] = pd.DataFrame()
    
    def get_returns(self, stock, date, lookback=120):
        """获取指定资产的历史收益率"""
        df = self.data[stock]
        if len(df) < 20:
            return np.array([])
            
        mask = df['date'] <= date
        if not mask.any():
            return np.array([])
            
        end_idx = df[mask].index[-1]
        start_idx = max(0, end_idx - lookback)
        return df.loc[start_idx:end_idx, 'close'].pct_change().dropna().values
    
    def calculate_es(self, stock, date, lookback=120):
        """计算预期损失(ES)"""
        returns = self.get_returns(stock, date, lookback)
        if len(returns) < 20:
            return 0.01  # 数据不足时返回默认风险值
            
        # 处理置信水平
        alpha = 0.01 if self.confidence == 2.58 else 0.05
        
        valid_returns = returns[~np.isnan(returns)]
        valid_returns = valid_returns[valid_returns != 0]
        if len(valid_returns) == 0:
            return 0.01
            
        valid_returns.sort()
        threshold = max(1, int(len(valid_returns) * alpha))
        
        if valid_returns[0] >= 0:
            return 0.01  # 无负收益时返回默认值
        return -np.mean(valid_returns[:threshold])
    
    def calculate_weights(self, date):
        """计算风险平价权重并保留2位小数"""
        es_values = [
            self.calculate_es(self.assets['equity'][0], date),
            self.calculate_es(self.assets['commodity'][0], date),
            self.calculate_es(self.assets['commodity2'][0], date),
            self.calculate_es(self.assets['overseas'][0], date)
        ]
        
        max_es = max(es_values + [0.01])  # 确保不为零
        
        # 计算风险平价配置
        weights = [max_es / es if es > 0 else 0 for es in es_values]
        total = max(sum(weights), 1)  # 避免除零
        
        # 归一化权重并保留2位小数
        norm_weights = [round(w / total, 2) for w in weights]
        
        return {
            self.assets['equity'][0]: norm_weights[0],
            self.assets['commodity'][0]: norm_weights[1],
            self.assets['commodity2'][0]: norm_weights[2],
            self.assets['overseas'][0]: norm_weights[3]
        }
    
    def get_holding_ratio(self):
        """获取当前持仓比例"""
        if self.portfolio_value <= 0:
            return {stock: 0 for stock in self.pool}
            
        holdings = {}
        for stock in self.pool:
            value = self.positions[stock] * self.daily_prices[stock]
            holdings[stock] = value / self.portfolio_value
        return holdings
    
    def update_portfolio_value(self, date):
        """更新组合价值并记录绩效"""
        total = self.cash
        for stock, amount in self.positions.items():
            total += amount * self.daily_prices[stock]
            
        self.portfolio_value = total
        daily_return = (total / self.initial - 1) * 100
        self.performance.append({
            'date': date,
            'value': total,
            'return': daily_return
        })
    
    def trade(self, stock, target_ratio):
        """执行交易逻辑"""
        price = self.daily_prices[stock]
        if price <= 0:
            return
            
        target_value = self.portfolio_value * target_ratio
        current_value = self.positions[stock] * price
        delta = abs(target_value - current_value)
        
        # 交易阈值控制
        if delta / max(target_value, 1) < 0.25 or delta < 1000:
            return
            
        # 执行交易
        if target_value > current_value:
            self._buy(stock, target_value, price)
        else:
            self._sell(stock, target_value, price)
    
    def _buy(self, stock, target_value, price):
        """买入资产(考虑最低1手交易限制)"""
        # 计算可买数量(考虑最低1手=100股)
        shares = int(target_value / price / 100) * 100
        if shares < 100:  # 确保至少买入1手
            return
            
        cost = shares * price
        commission = max(5, cost * 0.0003)  # 佣金,最低5元
        if self.cash < cost + commission:
            return
            
        self.cash -= (cost + commission)
        old_amount = self.positions[stock]
        old_cost = self.transaction[stock]['cost']
        
        new_amount = old_amount + shares
        new_cost = (old_cost * old_amount + cost) / new_amount if new_amount > 0 else 0
        
        self.positions[stock] = new_amount
        self.transaction[stock].update({
            'amount': new_amount,
            'cost': new_cost,
            'buy': self.transaction[stock]['buy'] + 1,
            'price': max(new_cost, self.transaction[stock]['price'])
        })
        print(f"买入 {stock} {shares} 股,价格 {price:.2f},金额 {cost:.2f}")
    
    def _sell(self, stock, target_value, price):
        """卖出资产(修复卖出数量计算逻辑)"""
        current_amount = self.positions[stock]
        if current_amount < 100:  # 持仓不足1手不操作
            return
            
        current_value = current_amount * price
        if target_value >= current_value:
            return
            
        # 计算目标股数,确保是100的整数倍
        target_amount = max(0, int(target_value / price / 100) * 100)
        
        # 计算需卖出的股数
        shares_to_sell = current_amount - target_amount
        
        # 确保卖出数量是100的整数倍且至少卖出1手
        if shares_to_sell < 100:
            return
            
        # 计算交易成本
        sell_value = shares_to_sell * price
        commission = max(5, sell_value * 0.0003)  # 佣金,最低5元
        stamp_duty = sell_value * 0.001  # 印花税
        total_cost = commission + stamp_duty
        
        # 更新现金和持仓
        self.cash += (sell_value - total_cost)
        new_amount = current_amount - shares_to_sell
        
        self.positions[stock] = new_amount
        self.transaction[stock]['amount'] = new_amount
        
        # 计算盈亏
        old_cost = self.transaction[stock]['cost']
        profit = (price - old_cost) * shares_to_sell - total_cost
        self.transaction[stock]['profit'] += profit
        
        # 更新交易统计
        if profit > 0:
            self.transaction[stock]['win'] += 1
        else:
            self.transaction[stock]['loss'] += 1
            if profit < self.transaction[stock]['max_loss']:
                self.transaction[stock]['max_loss'] = profit
                self.transaction[stock]['max_date'] = datetime.now()
                
        self.transaction[stock]['sell'] += 1
        
        # 更新成本基础
        if new_amount > 0:
            new_cost = (old_cost * current_amount - price * shares_to_sell) / new_amount
            self.transaction[stock]['cost'] = new_cost
        else:
            self.transaction[stock]['cost'] = 0
            
        print(f"卖出 {stock} {shares_to_sell} 股,价格 {price:.2f},盈利 {profit:.2f}")
    
    def rebalance(self, date):
        """执行资产再平衡"""
        weights = self.calculate_weights(date)
        # 格式化输出权重,保留2位小数
        formatted_weights = {k: round(v, 2) for k, v in weights.items()}
        print(f"{date.date()} 风险平价权重: {formatted_weights}")
        
        hold_ratio = self.get_holding_ratio()
        trade_list = sorted(
            [(stock, weights[stock] - hold_ratio[stock]) for stock in self.pool],
            key=lambda x: x[1]
        )
        
        # 先处理减仓
        for stock, _ in trade_list:
            self.trade(stock, weights[stock])
            
        # 再处理加仓
        for stock in self.pool:
            self.trade(stock, weights[stock])
            
        self.update_portfolio_value(date)
        # 重置交易日计数器
        self.trading_days_count = 0
    
    def need_rebalance(self):
        """判断是否需要再平衡"""
        return self.trading_days_count >= self.rebalance_days
    
    def run_backtest(self, start_date, end_date):
        """运行回测"""
        print(f"开始风险平价策略回测,周期: {start_date.date()} 至 {end_date.date()}")
        self.load_data(start_date, end_date)
        
        trading_days = pd.bdate_range(start=start_date, end=end_date).tolist()
        for date in trading_days:
            # 增加交易日计数
            self.trading_days_count += 1
            
            # 达到再平衡周期时执行再平衡
            if self.need_rebalance():
                self.rebalance(date)
                
            # 更新当日价格
            for stock in self.pool:
                df = self.data[stock]
                if len(df) == 0:
                    continue
                    
                mask = df['date'] == date
                if mask.any():
                    self.daily_prices[stock] = df[mask]['close'].values[0]
                else:
                    prev_dates = df[df['date'] < date]['date'].sort_values(ascending=False)
                    if not prev_dates.empty:
                        self.daily_prices[stock] = df[df['date'] == prev_dates.iloc[0]]['close'].values[0]
            
            # 更新组合价值
            self.update_portfolio_value(date)
            
            # 打印进度
            if (date - start_date).days % 60 == 0:
                print(f"回测进度: {date.date()}, 组合价值: {self.portfolio_value:.2f}")
        
        self._print_performance()
        self._plot_performance()
    
    def _print_performance(self):
        """打印绩效指标"""
        if not self.performance:
            print("无绩效数据可显示")
            return
            
        # 计算关键指标
        final_value = self.performance[-1]['value']
        total_return = (final_value / self.initial - 1) * 100
        days = (self.performance[-1]['date'] - self.performance[0]['date']).days
        annual_return = ((1 + total_return/100) **(365/days) - 1) * 100
        
        # 计算最大回撤
        df = pd.DataFrame(self.performance)
        df['cummax'] = df['value'].cummax()
        df['drawdown'] = (df['value'] - df['cummax']) / df['cummax'] * 100
        max_drawdown = df['drawdown'].min()
        
        print("\n===== 风险平价策略回测结果 =====")
        print(f"初始资金: {self.initial:.2f}")
        print(f"最终资产: {final_value:.2f}")
        print(f"总收益率: {total_return:.2f}%")
        print(f"年化收益率: {annual_return:.2f}%")
        print(f"最大回撤: {max_drawdown:.2f}%")
        
        # 打印各资产交易统计
        print("\n各资产交易记录:")
        for stock in self.pool:
            t = self.transaction[stock]
            win_rate = t['win'] / (t['win'] + t['loss']) * 100 if (t['win'] + t['loss']) > 0 else 0
            print(f"{stock}: 交易{t['buy']+t['sell']}次, 胜率{win_rate:.2f}%, 盈亏{t['profit']:.2f}")
    
    def _plot_performance(self):
        """绘制绩效图表"""
        if not self.performance:
            print("无绩效数据可绘图")
            return
            
        df = pd.DataFrame(self.performance)
        plt.figure(figsize=(14, 10))
        
        # 绘制净值曲线
        plt.subplot(2, 1, 1)
        plt.plot(df['date'], df['value']/self.initial, label='策略净值', color='blue', linewidth=2)
        
        # 添加沪深300基准
        if '510300' in self.data and len(self.data['510300']) > 0:
            hs300 = self.data['510300']
            hs300 = hs300[(hs300['date'] >= df['date'].min()) & (hs300['date'] <= df['date'].max())]
            hs300['net_value'] = hs300['close'] / hs300['close'].iloc[0]
            plt.plot(hs300['date'], hs300['net_value'], label='沪深300', color='gray', linestyle='--')
        
        plt.title('风险平价策略净值曲线')
        plt.xlabel('日期')
        plt.ylabel('净值')
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.tight_layout()
        
        # 绘制回撤曲线
        plt.subplot(2, 1, 2)
        df['cummax'] = df['value'].cummax()
        df['drawdown'] = (df['value'] - df['cummax']) / df['cummax'] * 100
        plt.fill_between(df['date'], df['drawdown'], 0, color='red', alpha=0.2)
        plt.title('策略回撤曲线')
        plt.xlabel('日期')
        plt.ylabel('回撤 (%)')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        
        plt.show()

# 运行回测
if __name__ == "__main__":
    strategy = RiskParityStrategy(initial_capital=50000)
    start_date = datetime(2020, 1, 1)
    end_date = datetime(2025, 9, 30)
    strategy.run_backtest(start_date, end_date)


分享到:

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

热门
标签列表