【策略】全天候平衡
一个直接能够跑的全天候策略【全天候的收益和回撤大概都是10%,感觉没啥意义,没有上到平台】
核心逻辑:基于经济周期的资产匹配

# -*- coding: utf-8 -*-
"""
修复卖出逻辑问题
"""
import pandas as pd
import numpy as np
import akshare as ak
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams["font.family"] = ["SimHei", ]
plt.rcParams['axes.unicode_minus'] = False
class RiskParityStrategy:
def __init__(self, initial_capital=100000, rebalance_period=20, confidence_level=2.58):
"""初始化风险平价策略"""
self.initial = initial_capital # 初始资金
self.rebalance_days = rebalance_period # 再平衡周期(交易日)
self.confidence = confidence_level # 置信水平
# 资产配置 (代码更简洁的命名)
self.assets = {
'equity': ['510300'], # 沪深300ETF
'commodity': ['518880'], # 黄金ETF
'commodity2': ['159985'], # 豆粕ETF(商品类)
'overseas': ['513100'] # 纳斯达克100ETF
}
self.pool = (self.assets['equity'] + self.assets['commodity'] +
self.assets['commodity2'] + self.assets['overseas'])
# 状态变量
self.cash = initial_capital
self.positions = {stock: 0 for stock in self.pool}
self.transaction = {stock: {
'amount': 0, 'cost': 0, 'buy': 0,
'sell': 0, 'win': 0, 'loss': 0, 'max_loss': 0,
'max_date': None, 'profit': 0, 'price': 0
} for stock in self.pool}
self.daily_prices = {stock: 0 for stock in self.pool}
self.portfolio_value = initial_capital
self.performance = [] # 每日绩效记录
# 回测控制 - 新增交易日计数器
self.trading_days_count = 0
self.hold_cycle = rebalance_period
self.data = {} # 缓存资产数据
def load_data(self, start_date, end_date):
"""批量加载所有资产数据"""
print("正在加载历史数据...")
for category, stocks in self.assets.items():
for stock in stocks:
try:
df = ak.fund_etf_hist_em(symbol=stock, adjust="qfq")
df = df.rename(columns={
'日期': 'date', '收盘': 'close'
})[['date', 'close']]
df['date'] = pd.to_datetime(df['date'])
df = df[(df['date'] >= start_date) & (df['date'] <= end_date)]
df.sort_values('date', inplace=True)
self.data[stock] = df
print(f"已加载 {stock} 数据,共 {len(df)} 条记录")
except Exception as e:
print(f"加载 {stock} 数据失败: {e}")
self.data[stock] = pd.DataFrame()
def get_returns(self, stock, date, lookback=120):
"""获取指定资产的历史收益率"""
df = self.data[stock]
if len(df) < 20:
return np.array([])
mask = df['date'] <= date
if not mask.any():
return np.array([])
end_idx = df[mask].index[-1]
start_idx = max(0, end_idx - lookback)
return df.loc[start_idx:end_idx, 'close'].pct_change().dropna().values
def calculate_es(self, stock, date, lookback=120):
"""计算预期损失(ES)"""
returns = self.get_returns(stock, date, lookback)
if len(returns) < 20:
return 0.01 # 数据不足时返回默认风险值
# 处理置信水平
alpha = 0.01 if self.confidence == 2.58 else 0.05
valid_returns = returns[~np.isnan(returns)]
valid_returns = valid_returns[valid_returns != 0]
if len(valid_returns) == 0:
return 0.01
valid_returns.sort()
threshold = max(1, int(len(valid_returns) * alpha))
if valid_returns[0] >= 0:
return 0.01 # 无负收益时返回默认值
return -np.mean(valid_returns[:threshold])
def calculate_weights(self, date):
"""计算风险平价权重并保留2位小数"""
es_values = [
self.calculate_es(self.assets['equity'][0], date),
self.calculate_es(self.assets['commodity'][0], date),
self.calculate_es(self.assets['commodity2'][0], date),
self.calculate_es(self.assets['overseas'][0], date)
]
max_es = max(es_values + [0.01]) # 确保不为零
# 计算风险平价配置
weights = [max_es / es if es > 0 else 0 for es in es_values]
total = max(sum(weights), 1) # 避免除零
# 归一化权重并保留2位小数
norm_weights = [round(w / total, 2) for w in weights]
return {
self.assets['equity'][0]: norm_weights[0],
self.assets['commodity'][0]: norm_weights[1],
self.assets['commodity2'][0]: norm_weights[2],
self.assets['overseas'][0]: norm_weights[3]
}
def get_holding_ratio(self):
"""获取当前持仓比例"""
if self.portfolio_value <= 0:
return {stock: 0 for stock in self.pool}
holdings = {}
for stock in self.pool:
value = self.positions[stock] * self.daily_prices[stock]
holdings[stock] = value / self.portfolio_value
return holdings
def update_portfolio_value(self, date):
"""更新组合价值并记录绩效"""
total = self.cash
for stock, amount in self.positions.items():
total += amount * self.daily_prices[stock]
self.portfolio_value = total
daily_return = (total / self.initial - 1) * 100
self.performance.append({
'date': date,
'value': total,
'return': daily_return
})
def trade(self, stock, target_ratio):
"""执行交易逻辑"""
price = self.daily_prices[stock]
if price <= 0:
return
target_value = self.portfolio_value * target_ratio
current_value = self.positions[stock] * price
delta = abs(target_value - current_value)
# 交易阈值控制
if delta / max(target_value, 1) < 0.25 or delta < 1000:
return
# 执行交易
if target_value > current_value:
self._buy(stock, target_value, price)
else:
self._sell(stock, target_value, price)
def _buy(self, stock, target_value, price):
"""买入资产(考虑最低1手交易限制)"""
# 计算可买数量(考虑最低1手=100股)
shares = int(target_value / price / 100) * 100
if shares < 100: # 确保至少买入1手
return
cost = shares * price
commission = max(5, cost * 0.0003) # 佣金,最低5元
if self.cash < cost + commission:
return
self.cash -= (cost + commission)
old_amount = self.positions[stock]
old_cost = self.transaction[stock]['cost']
new_amount = old_amount + shares
new_cost = (old_cost * old_amount + cost) / new_amount if new_amount > 0 else 0
self.positions[stock] = new_amount
self.transaction[stock].update({
'amount': new_amount,
'cost': new_cost,
'buy': self.transaction[stock]['buy'] + 1,
'price': max(new_cost, self.transaction[stock]['price'])
})
print(f"买入 {stock} {shares} 股,价格 {price:.2f},金额 {cost:.2f}")
def _sell(self, stock, target_value, price):
"""卖出资产(修复卖出数量计算逻辑)"""
current_amount = self.positions[stock]
if current_amount < 100: # 持仓不足1手不操作
return
current_value = current_amount * price
if target_value >= current_value:
return
# 计算目标股数,确保是100的整数倍
target_amount = max(0, int(target_value / price / 100) * 100)
# 计算需卖出的股数
shares_to_sell = current_amount - target_amount
# 确保卖出数量是100的整数倍且至少卖出1手
if shares_to_sell < 100:
return
# 计算交易成本
sell_value = shares_to_sell * price
commission = max(5, sell_value * 0.0003) # 佣金,最低5元
stamp_duty = sell_value * 0.001 # 印花税
total_cost = commission + stamp_duty
# 更新现金和持仓
self.cash += (sell_value - total_cost)
new_amount = current_amount - shares_to_sell
self.positions[stock] = new_amount
self.transaction[stock]['amount'] = new_amount
# 计算盈亏
old_cost = self.transaction[stock]['cost']
profit = (price - old_cost) * shares_to_sell - total_cost
self.transaction[stock]['profit'] += profit
# 更新交易统计
if profit > 0:
self.transaction[stock]['win'] += 1
else:
self.transaction[stock]['loss'] += 1
if profit < self.transaction[stock]['max_loss']:
self.transaction[stock]['max_loss'] = profit
self.transaction[stock]['max_date'] = datetime.now()
self.transaction[stock]['sell'] += 1
# 更新成本基础
if new_amount > 0:
new_cost = (old_cost * current_amount - price * shares_to_sell) / new_amount
self.transaction[stock]['cost'] = new_cost
else:
self.transaction[stock]['cost'] = 0
print(f"卖出 {stock} {shares_to_sell} 股,价格 {price:.2f},盈利 {profit:.2f}")
def rebalance(self, date):
"""执行资产再平衡"""
weights = self.calculate_weights(date)
# 格式化输出权重,保留2位小数
formatted_weights = {k: round(v, 2) for k, v in weights.items()}
print(f"{date.date()} 风险平价权重: {formatted_weights}")
hold_ratio = self.get_holding_ratio()
trade_list = sorted(
[(stock, weights[stock] - hold_ratio[stock]) for stock in self.pool],
key=lambda x: x[1]
)
# 先处理减仓
for stock, _ in trade_list:
self.trade(stock, weights[stock])
# 再处理加仓
for stock in self.pool:
self.trade(stock, weights[stock])
self.update_portfolio_value(date)
# 重置交易日计数器
self.trading_days_count = 0
def need_rebalance(self):
"""判断是否需要再平衡"""
return self.trading_days_count >= self.rebalance_days
def run_backtest(self, start_date, end_date):
"""运行回测"""
print(f"开始风险平价策略回测,周期: {start_date.date()} 至 {end_date.date()}")
self.load_data(start_date, end_date)
trading_days = pd.bdate_range(start=start_date, end=end_date).tolist()
for date in trading_days:
# 增加交易日计数
self.trading_days_count += 1
# 达到再平衡周期时执行再平衡
if self.need_rebalance():
self.rebalance(date)
# 更新当日价格
for stock in self.pool:
df = self.data[stock]
if len(df) == 0:
continue
mask = df['date'] == date
if mask.any():
self.daily_prices[stock] = df[mask]['close'].values[0]
else:
prev_dates = df[df['date'] < date]['date'].sort_values(ascending=False)
if not prev_dates.empty:
self.daily_prices[stock] = df[df['date'] == prev_dates.iloc[0]]['close'].values[0]
# 更新组合价值
self.update_portfolio_value(date)
# 打印进度
if (date - start_date).days % 60 == 0:
print(f"回测进度: {date.date()}, 组合价值: {self.portfolio_value:.2f}")
self._print_performance()
self._plot_performance()
def _print_performance(self):
"""打印绩效指标"""
if not self.performance:
print("无绩效数据可显示")
return
# 计算关键指标
final_value = self.performance[-1]['value']
total_return = (final_value / self.initial - 1) * 100
days = (self.performance[-1]['date'] - self.performance[0]['date']).days
annual_return = ((1 + total_return/100) **(365/days) - 1) * 100
# 计算最大回撤
df = pd.DataFrame(self.performance)
df['cummax'] = df['value'].cummax()
df['drawdown'] = (df['value'] - df['cummax']) / df['cummax'] * 100
max_drawdown = df['drawdown'].min()
print("\n===== 风险平价策略回测结果 =====")
print(f"初始资金: {self.initial:.2f}")
print(f"最终资产: {final_value:.2f}")
print(f"总收益率: {total_return:.2f}%")
print(f"年化收益率: {annual_return:.2f}%")
print(f"最大回撤: {max_drawdown:.2f}%")
# 打印各资产交易统计
print("\n各资产交易记录:")
for stock in self.pool:
t = self.transaction[stock]
win_rate = t['win'] / (t['win'] + t['loss']) * 100 if (t['win'] + t['loss']) > 0 else 0
print(f"{stock}: 交易{t['buy']+t['sell']}次, 胜率{win_rate:.2f}%, 盈亏{t['profit']:.2f}")
def _plot_performance(self):
"""绘制绩效图表"""
if not self.performance:
print("无绩效数据可绘图")
return
df = pd.DataFrame(self.performance)
plt.figure(figsize=(14, 10))
# 绘制净值曲线
plt.subplot(2, 1, 1)
plt.plot(df['date'], df['value']/self.initial, label='策略净值', color='blue', linewidth=2)
# 添加沪深300基准
if '510300' in self.data and len(self.data['510300']) > 0:
hs300 = self.data['510300']
hs300 = hs300[(hs300['date'] >= df['date'].min()) & (hs300['date'] <= df['date'].max())]
hs300['net_value'] = hs300['close'] / hs300['close'].iloc[0]
plt.plot(hs300['date'], hs300['net_value'], label='沪深300', color='gray', linestyle='--')
plt.title('风险平价策略净值曲线')
plt.xlabel('日期')
plt.ylabel('净值')
plt.grid(True, alpha=0.3)
plt.legend()
plt.tight_layout()
# 绘制回撤曲线
plt.subplot(2, 1, 2)
df['cummax'] = df['value'].cummax()
df['drawdown'] = (df['value'] - df['cummax']) / df['cummax'] * 100
plt.fill_between(df['date'], df['drawdown'], 0, color='red', alpha=0.2)
plt.title('策略回撤曲线')
plt.xlabel('日期')
plt.ylabel('回撤 (%)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 运行回测
if __name__ == "__main__":
strategy = RiskParityStrategy(initial_capital=50000)
start_date = datetime(2020, 1, 1)
end_date = datetime(2025, 9, 30)
strategy.run_backtest(start_date, end_date)
上一篇:3种ETF动量策略有什么区别 下一篇:QMT服务器和云服务器怎么选择
次方量化-技术博客
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。