背景¶
backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。
任务¶
阅读研究分析 backtrader 这个项目的源代码,了解这个项目。
阅读研究分析/Users/yunjinqi/Documents/量化交易框架/AI-Strategies-StockMarket
借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议
写需规文档和设计文档放到这个文档的最下面,方便后续借鉴
AI-Strategies-StockMarket 项目简介¶
AI-Strategies-StockMarket 是一个结合 AI 和机器学习的股票交易策略项目,具有以下核心特点:
机器学习: 使用机器学习预测
深度学习: 神经网络模型集成
特征工程: 金融特征工程
模型训练: 模型训练和评估
策略集成: AI 模型与交易策略集成
回测验证: 策略回测验证
重点借鉴方向¶
ML 集成: 机器学习模型集成
特征工程: 金融特征提取
模型训练: 模型训练流程
预测系统: 预测系统设计
策略融合: AI 与传统策略融合
模型评估: 模型性能评估
研究分析¶
AI-Strategies-StockMarket 架构特点总结¶
经过深入研究,AI-Strategies-StockMarket 项目是一个结合传统技术分析和机器学习的量化交易框架。
1. 核心 AI 策略¶
神经网络策略*:
6 层 MLP 架构 (36-128-64-16-8-1)
ReLU 隐藏层激活,tanh 输出激活
L2 正则化 (λ=0.001)
SGD + Nesterov 动量优化
在线学习:每 10 天用最新 15 个样本重新训练
PSO 组合信号策略*:
54 个移动平均交叉信号组合
粒子群优化动态权重分配
每 90 天重新优化(使用最近 100 天数据)
支持指数归一化和 L1 归一化
2. 特征工程¶
50+技术指标*:
价格数据:OHLCV
动量指标:5/10/15 日动量
均线指标:7/14/21 日 SMA 和 EMA
波动率:7/14/21 日标准差
振荡器:RSI (9/14/21), Stochastic (7/14/21)
趋势指标:MACD (12,26)
变化率:ROC (13,21)
智能标签生成*:
基于未来价格模拟
考虑交易成本
可配置止盈/止损阈值
3. 在线学习架构¶
┌─────────────────────────────────────────────────┐
│ 在线学习循环 │
├─────────────────────────────────────────────────┤
│ 预测 → 执行 → 收集新数据 → 更新记忆 → 重新训练 │
│ │
│ FIFO Replay Memory (最后 15 个样本) │
└─────────────────────────────────────────────────┘
```bash
#### 4. 集成模式
- 模型作为策略类属性传递
- 策略直接调用 model.predict()
- 周期性重新训练
- 阈值过滤低置信度预测
### Backtrader 当前 ML 能力分析
#### 优势
- **灵活的架构**: 可通过自定义指标集成 ML 模型
- **SignalStrategy**: 支持外部信号输入
- **统计指标**: OLS 回归、Hurst 指数等
- **Analyzer 框架**: 可扩展自定义分析器
#### 局限性(针对 ML)
1. **无内置 ML 库**: 不包含 TensorFlow、PyTorch、scikit-learn
2. **无特征工程工具**: 缺少金融特征提取模块
3. **无模型管理**: 无模型训练、保存、加载机制
4. **无在线学习支持**: 策略中无增量学习机制
5. **无 ML 专用分析器**: 缺少模型评估指标
- --
## 需求规格文档
### 1. ML 模型集成框架
#### 1.1 功能描述
提供统一的机器学习模型集成接口,支持多种 ML 框架。
#### 1.2 需求规格
| 需求 ID | 需求描述 | 优先级 |
|--------|----------|--------|
| ML-001 | 定义 ML 模型抽象接口 | P0 |
| ML-002 | 支持 scikit-learn 模型集成 | P0 |
| ML-003 | 支持 TensorFlow/Keras 模型集成 | P0 |
| ML-004 | 支持 PyTorch 模型集成 | P1 |
| ML-005 | 支持 XGBoost/LightGBM 模型集成 | P1 |
| ML-006 | 支持模型序列化和加载 | P1 |
#### 1.3 接口设计
```python
class MLModel(ABC):
"""机器学习模型抽象接口"""
@abstractmethod
def fit(self, X, y): pass
@abstractmethod
def predict(self, X): pass
@abstractmethod
def predict_proba(self, X): pass
@abstractmethod
def save(self, path): pass
@abstractmethod
def load(self, path): pass
class MLIndicator(bt.Indicator):
"""ML 预测指标"""
lines = ('prediction', 'probability',)
params = (
('model', None),
('features', None),
('retrain_interval', 0), # 0 表示不重新训练
)
```bash
### 2. 金融特征工程
#### 2.1 功能描述
提供丰富的金融技术指标和特征提取工具。
#### 2.2 需求规格
| 需求 ID | 需求描述 | 优先级 |
|--------|----------|--------|
| FE-001 | 基础价格特征提取 | P0 |
| FE-002 | 动量指标特征 | P0 |
| FE-003 | 均线指标特征 | P0 |
| FE-004 | 波动率指标特征 | P0 |
| FE-005 | 振荡器指标特征 | P1 |
| FE-006 | 趋势指标特征 | P1 |
| FE-007 | 特征标准化工具 | P1 |
| FE-008 | 特征选择工具 | P2 |
#### 2.3 接口设计
```python
class FeatureEngineer:
"""金融特征工程器"""
def __init__(self):
self.features = {}
def add_price_features(self, df):
"""添加基础价格特征"""
def add_momentum_features(self, df, periods=[5, 10, 15]):
"""添加动量特征"""
def add_ma_features(self, df, periods=[7, 14, 21]):
"""添加均线特征"""
def add_volatility_features(self, df, periods=[7, 14, 21]):
"""添加波动率特征"""
def add_oscillator_features(self, df):
"""添加振荡器特征 (RSI, Stochastic)"""
def add_trend_features(self, df):
"""添加趋势特征 (MACD)"""
def get_feature_matrix(self, df):
"""获取特征矩阵"""
```bash
### 3. 模型训练流程
#### 3.1 功能描述
提供完整的模型训练、验证和评估流程。
#### 3.2 需求规格
| 需求 ID | 需求描述 | 优先级 |
|--------|----------|--------|
| TRAIN-001 | 定义训练数据集类 | P0 |
| TRAIN-002 | 实现时间序列交叉验证 | P0 |
| TRAIN-003 | 实现标签生成器 | P0 |
| TRAIN-004 | 实现训练 Pipeline | P1 |
| TRAIN-005 | 支持超参数优化 | P1 |
| TRAIN-006 | 支持早停机制 | P2 |
#### 3.3 接口设计
```python
class LabelGenerator:
"""交易标签生成器"""
def __init__(self, gain_threshold=0.02,
loss_threshold=-0.02,
holding_period=5,
commission=0.001):
self.gain_threshold = gain_threshold
self.loss_threshold = loss_threshold
self.holding_period = holding_period
self.commission = commission
def generate_labels(self, df):
"""
生成交易标签
Returns:
labels: 1 (买入), 0 (卖出)
"""
pass
class TimeSeriesCV:
"""时间序列交叉验证"""
def split(self, X, y, n_splits=5):
"""生成时间序列分割"""
pass
class TrainingPipeline:
"""模型训练 Pipeline"""
def __init__(self, model, feature_engineer, label_generator):
self.model = model
self.feature_engineer = feature_engineer
self.label_generator = label_generator
def train(self, df, start_date, end_date):
"""训练模型"""
pass
def evaluate(self, X_test, y_test):
"""评估模型"""
pass
```bash
### 4. 在线学习系统
#### 4.1 功能描述
支持策略运行时的增量学习和模型更新。
#### 4.2 需求规格
| 需求 ID | 需求描述 | 优先级 |
|--------|----------|--------|
| OL-001 | 实现 Replay Memory | P0 |
| OL-002 | 支持增量训练 | P0 |
| OL-003 | 实现周期性重训练 | P1 |
| OL-004 | 支持模型版本管理 | P2 |
#### 4.3 接口设计
```python
class ReplayMemory:
"""经验回放缓冲区"""
def __init__(self, max_size=100):
self.max_size = max_size
self.memory = []
def add(self, experience):
"""添加经验"""
def sample(self, batch_size):
"""采样经验"""
def get_all(self):
"""获取所有经验"""
class OnlineLearningStrategy(bt.Strategy):
"""在线学习策略"""
params = (
('model', None),
('retrain_interval', 10), # 每 N 天重新训练
('memory_size', 15),
)
def __init__(self):
self.memory = ReplayMemory(self.p.memory_size)
def retrain(self):
"""重新训练模型"""
pass
```bash
### 5. 模型评估指标
#### 5.1 功能描述
提供专门的 ML 模型性能评估分析器。
#### 5.2 需求规格
| 需求 ID | 需求描述 | 优先级 |
|--------|----------|--------|
| EVAL-001 | 预测准确率分析 | P0 |
| EVAL-002 | 混淆矩阵分析 | P0 |
| EVAL-003 | ROC/AUC 分析 | P1 |
| EVAL-004 | 特征重要性分析 | P1 |
| EVAL-005 | 预测置信度分布 | P2 |
#### 5.3 接口设计
```python
class MLAnalyzer(bt.Analyzer):
"""ML 模型性能分析器"""
def __init__(self):
self.predictions = []
self.actuals = []
self.confidences = []
def notify_order(self, order):
"""记录预测结果"""
def get_accuracy(self):
"""计算准确率"""
def get_confusion_matrix(self):
"""获取混淆矩阵"""
def get_roc_auc(self):
"""计算 ROC/AUC"""
```bash
### 6. 策略融合
#### 6.1 功能描述
支持 AI 信号与传统技术指标的融合。
#### 6.2 需求规格
| 需求 ID | 需求描述 | 优先级 |
|--------|----------|--------|
| FUSION-001 | 实现信号加权融合 | P0 |
| FUSION-002 | 支持动态权重调整 | P1 |
| FUSION-003 | 实现置信度过滤 | P1 |
| FUSION-004 | 支持多模型集成 | P2 |
#### 6.3 接口设计
```python
class SignalFusionStrategy(bt.Strategy):
"""信号融合策略"""
params = (
('ml_model', None),
('ml_weight', 0.5),
('ta_weight', 0.5),
('confidence_threshold', 0.6),
)
def get_combined_signal(self):
"""获取融合信号"""
ml_signal = self.p.ml_model.predict_proba(self.features)
ta_signal = self.ta_indicator[0]
# 置信度过滤
if ml_signal < self.p.confidence_threshold:
return 0
# 加权融合
return (ml_signal *self.p.ml_weight +
ta_signal*self.p.ta_weight)
```bash
- --
## 设计文档
### 整体架构设计
#### 1. 目录结构
```bash
backtrader/
├── ml/ # 机器学习模块
│ ├── __init__.py
│ ├── base.py # ML 模型抽象基类
│ ├── models/ # 模型包装器
│ │ ├── __init__.py
│ │ ├── sklearn.py # scikit-learn 包装
│ │ ├── keras.py # Keras 包装
│ │ ├── pytorch.py # PyTorch 包装
│ │ └── xgboost.py # XGBoost/LightGBM 包装
│ ├── indicators/ # ML 指标
│ │ ├── __init__.py
│ │ ├── prediction.py # 预测指标
│ │ └── signal.py # 信号指标
│ └── utils.py # ML 工具函数
│
├── features/ # 特征工程模块
│ ├── __init__.py
│ ├── engineer.py # 特征工程器
│ ├── price.py # 价格特征
│ ├── momentum.py # 动量特征
│ ├── trend.py # 趋势特征
│ ├── volatility.py # 波动率特征
│ ├── oscillator.py # 振荡器特征
│ └── normalization.py # 标准化工具
│
├── training/ # 训练模块
│ ├── __init__.py
│ ├── pipeline.py # 训练流程
│ ├── label.py # 标签生成
│ ├── cv.py # 交叉验证
│ └── optimization.py # 超参数优化
│
├── online/ # 在线学习模块
│ ├── __init__.py
│ ├── memory.py # Replay Memory
│ ├── strategy.py # 在线学习策略基类
│ └── updater.py # 模型更新器
│
└── analyzers/ # 分析器
├── __init__.py
├── ml_metrics.py # ML 指标分析器
└── feature_importance.py # 特征重要性分析器
```bash
### 详细设计
#### 1. ML 模型抽象接口
```python
# ml/base.py
from abc import ABC, abstractmethod
from typing import Any, Union
import numpy as np
class MLModel(ABC):
"""机器学习模型抽象接口"""
def __init__(self, model=None):
self.model = model
self.is_fitted = False
@abstractmethod
def fit(self, X: np.ndarray, y: np.ndarray) -> 'MLModel':
"""训练模型
Args:
X: 特征矩阵 (n_samples, n_features)
y: 标签 (n_samples,)
Returns:
self
"""
pass
@abstractmethod
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测
Args:
X: 特征矩阵
Returns:
预测结果
"""
pass
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""预测概率
Args:
X: 特征矩阵
Returns:
预测概率
"""
raise NotImplementedError("predict_proba not implemented")
@abstractmethod
def save(self, path: str):
"""保存模型"""
pass
@abstractmethod
def load(self, path: str) -> 'MLModel':
"""加载模型"""
pass
class ScikitLearnModel(MLModel):
"""scikit-learn 模型包装器"""
def __init__(self, model):
super().__init__(model)
def fit(self, X, y):
self.model.fit(X, y)
self.is_fitted = True
return self
def predict(self, X):
return self.model.predict(X)
def predict_proba(self, X):
if hasattr(self.model, 'predict_proba'):
return self.model.predict_proba(X)
return self.model.predict(X)
def save(self, path):
import joblib
joblib.dump(self.model, path)
def load(self, path):
import joblib
self.model = joblib.load(path)
self.is_fitted = True
return self
class KerasModel(MLModel):
"""Keras 模型包装器"""
def __init__(self, model=None, input_shape=None):
super().__init__(model)
self.input_shape = input_shape
def fit(self, X, y, epochs=100, batch_size=32, verbose=0):
import tensorflow as tf
from tensorflow import keras
if self.model is None:
self.model = self._build_model(X.shape[1])
history = self.model.fit(
X, y,
epochs=epochs,
batch_size=batch_size,
verbose=verbose
)
self.is_fitted = True
return self
def _build_model(self, input_dim):
"""构建默认神经网络"""
from tensorflow import keras
from tensorflow.keras import layers
model = keras.Sequential([
layers.Dense(128, activation='relu',
input_dim=input_dim,
kernel_regularizer=keras.regularizers.l2(0.001)),
layers.Dense(64, activation='relu',
kernel_regularizer=keras.regularizers.l2(0.001)),
layers.Dense(16, activation='relu'),
layers.Dense(8, activation='relu'),
layers.Dense(1, activation='tanh')
])
model.compile(
optimizer=keras.optimizers.SGD(lr=0.001, momentum=0.5),
loss='mse'
)
return model
def predict(self, X):
return self.model.predict(X)
def predict_proba(self, X):
pred = self.predict(X)
# 将 tanh 输出转换为概率
return (pred + 1) / 2
def save(self, path):
self.model.save(path)
def load(self, path):
from tensorflow import keras
self.model = keras.models.load_model(path)
self.is_fitted = True
return self
```bash
#### 2. 特征工程器
```python
# features/engineer.py
import pandas as pd
import numpy as np
from typing import List, Dict
class FeatureEngineer:
"""金融特征工程器"""
def __init__(self):
self.feature_names = []
def add_price_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""添加基础价格特征
Features:
- Returns: 日收益率
- Log Returns: 对数收益率
- Gap: 跳空幅度
"""
df = df.copy()
df['returns'] = df['Close'].pct_change()
df['log_returns'] = np.log(df['Close'] / df['Close'].shift(1))
df['gap'] = (df['Open'] - df['Close'].shift(1)) / df['Close'].shift(1)
self._add_feature_names(['returns', 'log_returns', 'gap'])
return df
def add_momentum_features(self, df: pd.DataFrame,
periods: List[int] = [5, 10, 15]) -> pd.DataFrame:
"""添加动量特征
Features:
- Momentum: 价格变化率
- ROC: 变化率
"""
df = df.copy()
for p in periods:
df[f'momentum_{p}'] = df['Close'] - df['Close'].shift(p)
df[f'roc_{p}'] = (df['Close'] - df['Close'].shift(p)) / df['Close'].shift(p)
self._add_feature_names([f'momentum_{p}' for p in periods])
self._add_feature_names([f'roc_{p}' for p in periods])
return df
def add_ma_features(self, df: pd.DataFrame,
periods: List[int] = [7, 14, 21]) -> pd.DataFrame:
"""添加均线特征
Features:
- SMA: 简单移动平均
- EMA: 指数移动平均
- Price vs MA: 价格相对于均线的位置
"""
df = df.copy()
for p in periods:
df[f'sma_{p}'] = df['Close'].rolling(p).mean()
df[f'ema_{p}'] = df['Close'].ewm(span=p).mean()
df[f'price_vs_sma_{p}'] = (df['Close'] - df[f'sma_{p}']) / df[f'sma_{p}']
df[f'price_vs_ema_{p}'] = (df['Close'] - df[f'ema_{p}']) / df[f'ema_{p}']
self._add_feature_names([f'sma_{p}' for p in periods])
self._add_feature_names([f'ema_{p}' for p in periods])
return df
def add_volatility_features(self, df: pd.DataFrame,
periods: List[int] = [7, 14, 21]) -> pd.DataFrame:
"""添加波动率特征
Features:
- StdDev: 滚动标准差
- ATR: 平均真实波幅
- Bollinger Bands: 布林带
"""
df = df.copy()
for p in periods:
df[f'std_{p}'] = df['Close'].rolling(p).std()
df[f'cv_{p}'] = df[f'std_{p}'] / df['Close'].rolling(p).mean() # 变异系数
# ATR
df['tr'] = np.maximum(
df['High'] - df['Low'],
np.maximum(
abs(df['High'] - df['Close'].shift(1)),
abs(df['Low'] - df['Close'].shift(1))
)
)
df['atr_14'] = df['tr'].rolling(14).mean()
# Bollinger Bands
df['bb_middle'] = df['Close'].rolling(20).mean()
df['bb_std'] = df['Close'].rolling(20).std()
df['bb_upper'] = df['bb_middle'] + 2*df['bb_std']
df['bb_lower'] = df['bb_middle'] - 2*df['bb_std']
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle']
df['bb_position'] = (df['Close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
self._add_feature_names([f'std_{p}' for p in periods])
self._add_feature_names(['atr_14', 'bb_width', 'bb_position'])
return df
def add_oscillator_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""添加振荡器特征
Features:
- RSI: 相对强弱指标
- Stochastic: 随机振荡器
"""
df = df.copy()
# RSI
for p in [9, 14, 21]:
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(p).mean()
loss = (-delta.where(delta < 0, 0)).rolling(p).mean()
rs = gain / loss
df[f'rsi_{p}'] = 100 - (100 / (1 + rs))
# Stochastic
for p in [7, 14, 21]:
low_min = df['Low'].rolling(p).min()
high_max = df['High'].rolling(p).max()
df[f'stoch_k_{p}'] = 100*(df['Close'] - low_min) / (high_max - low_min)
df[f'stoch_d_{p}'] = df[f'stoch_k_{p}'].rolling(3).mean()
self._add_feature_names([f'rsi_{p}' for p in [9, 14, 21]])
self._add_feature_names([f'stoch_k_{p}' for p in [7, 14, 21]])
return df
def add_trend_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""添加趋势特征
Features:
- MACD: 指数平滑移动平均
- MACD Signal
- MACD Histogram
- ADX: 平均趋向指标
"""
df = df.copy()
# MACD
df['ema_12'] = df['Close'].ewm(span=12).mean()
df['ema_26'] = df['Close'].ewm(span=26).mean()
df['macd'] = df['ema_12'] - df['ema_26']
df['macd_signal'] = df['macd'].ewm(span=9).mean()
df['macd_hist'] = df['macd'] - df['macd_signal']
# +DI/-DI
df['tr'] = np.maximum(
df['High'] - df['Low'],
np.maximum(
abs(df['High'] - df['Close'].shift(1)),
abs(df['Low'] - df['Close'].shift(1))
)
)
df['plus_dm'] = np.where(df['High'] - df['High'].shift(1) > df['Low'].shift(1) - df['Low'],
np.maximum(df['High'] - df['High'].shift(1), 0), 0)
df['minus_dm'] = np.where(df['Low'].shift(1) - df['Low'] > df['High'] - df['High'].shift(1),
np.maximum(df['Low'].shift(1) - df['Low'], 0), 0)
for p in [14]:
df[f'plus_di_{p}'] = 100*(df['plus_dm'].rolling(p).mean() / df['tr'].rolling(p).mean())
df[f'minus_di_{p}'] = 100*(df['minus_dm'].rolling(p).mean() / df['tr'].rolling(p).mean())
self._add_feature_names(['macd', 'macd_signal', 'macd_hist', 'plus_di_14', 'minus_di_14'])
return df
def add_all_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""添加所有特征"""
df = self.add_price_features(df)
df = self.add_momentum_features(df)
df = self.add_ma_features(df)
df = self.add_volatility_features(df)
df = self.add_oscillator_features(df)
df = self.add_trend_features(df)
return df
def get_feature_matrix(self, df: pd.DataFrame) -> np.ndarray:
"""获取特征矩阵
排除 OHLCV 列,只返回特征列
"""
base_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
feature_cols = [c for c in df.columns if c not in base_cols]
self.feature_names = feature_cols
return df[feature_cols].values
def _add_feature_names(self, names: List[str]):
for name in names:
if name not in self.feature_names:
self.feature_names.append(name)
```bash
#### 3. 标签生成器
```python
# training/label.py
import pandas as pd
import numpy as np
class LabelGenerator:
"""交易标签生成器
基于未来价格模拟生成交易标签
"""
def __init__(self,
gain_threshold: float = 0.02,
loss_threshold: float = -0.02,
holding_period: int = 5,
commission: float = 0.001):
"""
Args:
gain_threshold: 止盈阈值(涨幅)
loss_threshold: 止损阈值(跌幅)
holding_period: 持有天数
commission: 交易手续费率
"""
self.gain_threshold = gain_threshold
self.loss_threshold = loss_threshold
self.holding_period = holding_period
self.commission = commission
def generate_labels(self, df: pd.DataFrame) -> np.ndarray:
"""生成交易标签
策略:
1. 模拟未来 n 天的价格变化
2. 如果涨幅超过 gain_threshold,标记为买入(1)
3. 如果跌幅超过 loss_threshold,标记为卖出(0)
4. 考虑交易成本
Args:
df: 包含 OHLCV 数据的 DataFrame
Returns:
labels: 1D 数组,1=买入,0=卖出
"""
n = len(df)
labels = np.zeros(n, dtype=int)
for i in range(n - self.holding_period):
current_price = df['Close'].iloc[i]
future_prices = df['Close'].iloc[i+1:i+1+self.holding_period]
# 计算未来收益(考虑买入成本)
buy_cost = current_price*(1 + self.commission)
# 检查是否达到止盈
max_gain = (future_prices.max() - buy_cost) / buy_cost
if max_gain >= self.gain_threshold:
labels[i] = 1
continue
# 检查是否达到止损
max_loss = (future_prices.min() - buy_cost) / buy_cost
if max_loss <= self.loss_threshold:
labels[i] = 0
continue
# 如果没有达到阈值,根据最终收益判断
final_return = (future_prices.iloc[-1]*(1 - self.commission) - buy_cost) / buy_cost
labels[i] = 1 if final_return > 0 else 0
# 最后 holding_period 天没有未来数据,标记为卖出
labels[-self.holding_period:] = 0
return labels
def generate_regression_labels(self, df: pd.DataFrame) -> np.ndarray:
"""生成回归标签(未来收益率)"""
n = len(df)
labels = np.zeros(n)
for i in range(n - self.holding_period):
current_price = df['Close'].iloc[i]
future_price = df['Close'].iloc[i + self.holding_period]
labels[i] = (future_price - current_price) / current_price
labels[-self.holding_period:] = 0
return labels
```bash
#### 4. 在线学习策略
```python
# online/strategy.py
import backtrader as bt
import numpy as np
from typing import List
from collections import deque
class ReplayMemory:
"""经验回放缓冲区"""
def __init__(self, max_size: int = 100):
self.max_size = max_size
self.experiences: deque = deque(maxlen=max_size)
def add(self, features: np.ndarray, label: int):
"""添加经验"""
self.experiences.append((features, label))
def sample(self, batch_size: int):
"""随机采样"""
import random
return random.sample(list(self.experiences), min(batch_size, len(self.experiences)))
def get_all(self):
"""获取所有经验"""
features = np.array([e[0] for e in self.experiences])
labels = np.array([e[1] for e in self.experiences])
return features, labels
def __len__(self):
return len(self.experiences)
class OnlineLearningStrategy(bt.Strategy):
"""在线学习策略基类
支持策略运行时的增量学习和模型更新
"""
params = (
('model', None), # ML 模型
('feature_cols', None), # 特征列名
('retrain_interval', 10), # 重新训练间隔(天)
('memory_size', 15), # Replay Memory 大小
('buy_threshold', 0.55), # 买入阈值
('sell_threshold', 0.45), # 卖出阈值
('confidence_filter', True), # 是否过滤低置信度预测
)
def __init__(self):
self.memory = ReplayMemory(self.p.memory_size)
self.days_since_retrain = 0
self.last_features = None
def next(self):
"""主策略逻辑"""
# 1. 获取当前特征
features = self._get_features()
if features is None:
return
self.last_features = features
# 2. 模型预测
if hasattr(self.p.model, 'predict_proba'):
proba = self.p.model.predict_proba(features.reshape(1, -1))[0]
prediction = proba[1] # 正类概率
else:
prediction = self.p.model.predict(features.reshape(1, -1))[0]
proba = [1-prediction, prediction]
# 3. 置信度过滤
if self.p.confidence_filter:
if prediction < 0.45 or prediction > 0.55: # 高置信度
# 4. 根据预测进行交易
if prediction >= self.p.buy_threshold and not self.position:
self.buy()
elif prediction <= self.p.sell_threshold and self.position:
self.sell()
# 5. 收集经验(用于未来训练)
# 使用未来价格作为标签(延迟标签)
if len(self) >= self.p.retrain_interval:
future_return = self._get_future_return()
if future_return is not None:
label = 1 if future_return > 0 else 0
self.memory.add(self.last_features, label)
# 6. 周期性重新训练
self.days_since_retrain += 1
if self.days_since_retrain >= self.p.retrain_interval:
self._retrain()
self.days_since_retrain = 0
def _get_features(self) -> np.ndarray:
"""获取当前特征向量
需要在子类中实现,根据 self.p.feature_cols
从数据中提取特征
"""
if self.p.feature_cols is None:
return None
features = []
for col in self.p.feature_cols:
if hasattr(self.data, col):
features.append(getattr(self.data, col)[0])
return np.array(features) if features else None
def _get_future_return(self) -> float:
"""获取未来收益率
用于延迟标签
"""
if len(self.data) < self.p.retrain_interval + 1:
return None
current_price = self.data.close[0]
future_price = self.data.close[-self.p.retrain_interval]
return (future_price - current_price) / current_price
def _retrain(self):
"""重新训练模型"""
if len(self.memory) < 5: # 至少需要 5 个样本
return
X, y = self.memory.get_all()
self.p.model.fit(X, y)
print(f"[{self.datetime.date()}] Model retrained with {len(X)} samples")
class MLPredictionIndicator(bt.Indicator):
"""ML 预测指标
将 ML 模型的预测结果作为指标输出
"""
lines = ('prediction', 'probability', 'signal',)
params = (
('model', None),
('features', None), # 特征数据源列表
('buy_threshold', 0.55),
('sell_threshold', 0.45),
)
def __init__(self):
self.model = self.p.model
def next(self):
"""计算预测"""
# 获取特征
features = self._get_features()
if features is None:
self.lines.prediction[0] = 0
self.lines.probability[0] = 0.5
self.lines.signal[0] = 0
return
# 预测
if hasattr(self.model, 'predict_proba'):
proba = self.model.predict_proba(features.reshape(1, -1))[0]
prediction = proba[1]
else:
prediction = self.model.predict(features.reshape(1, -1))[0]
proba = [1 - prediction, prediction]
self.lines.probability[0] = prediction
# 生成交易信号
if prediction >= self.p.buy_threshold:
self.lines.signal[0] = 1
elif prediction <= self.p.sell_threshold:
self.lines.signal[0] = -1
else:
self.lines.signal[0] = 0
self.lines.prediction[0] = 1 if prediction > 0.5 else 0
def _get_features(self):
"""获取特征向量"""
if self.p.features is None:
return None
features = []
for data_feed in self.p.features:
if hasattr(data_feed, 'close'):
features.append(data_feed.close[0])
elif hasattr(data_feed, 'prediction'):
features.append(data_feed.prediction[0])
return np.array(features) if features else None
```bash
#### 5. ML 分析器
```python
# analyzers/ml_metrics.py
import backtrader as bt
import numpy as np
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix
)
class MLAnalyzer(bt.Analyzer):
"""ML 模型性能分析器
分析 ML 预测的准确性和交易表现
"""
def __init__(self):
self.predictions = []
self.actuals = []
self.confidences = []
self.trade_results = []
def notify_order(self, order):
"""记录订单结果"""
if order.status in [order.Completed]:
# 记录交易结果
pass
def add_prediction(self, prediction, actual, confidence=None):
"""添加预测记录"""
self.predictions.append(prediction)
self.actuals.append(actual)
if confidence is not None:
self.confidences.append(confidence)
def get_accuracy(self):
"""计算准确率"""
if not self.predictions:
return None
return accuracy_score(self.actuals, self.predictions)
def get_precision(self):
"""计算精确率"""
if not self.predictions:
return None
return precision_score(self.actuals, self.predictions, zero_division=0)
def get_recall(self):
"""计算召回率"""
if not self.predictions:
return None
return recall_score(self.actuals, self.predictions, zero_division=0)
def get_f1_score(self):
"""计算 F1 分数"""
if not self.predictions:
return None
return f1_score(self.actuals, self.predictions, zero_division=0)
def get_roc_auc(self):
"""计算 ROC AUC"""
if not self.confidences:
return None
return roc_auc_score(self.actuals, self.confidences)
def get_confusion_matrix(self):
"""获取混淆矩阵"""
if not self.predictions:
return None
return confusion_matrix(self.actuals, self.predictions)
def get_analysis(self):
"""获取完整分析结果"""
return {
'accuracy': self.get_accuracy(),
'precision': self.get_precision(),
'recall': self.get_recall(),
'f1_score': self.get_f1_score(),
'roc_auc': self.get_roc_auc(),
'confusion_matrix': self.get_confusion_matrix(),
'total_predictions': len(self.predictions),
}
```bash
### 使用示例
#### 示例 1: 使用 scikit-learn 模型
```python
import backtrader as bt
from backtrader.ml import ScikitLearnModel
from backtrader.features import FeatureEngineer, LabelGenerator
from backtrader.training import TrainingPipeline
from sklearn.ensemble import RandomForestClassifier
# 1. 准备数据
df = pd.read_csv('stock_data.csv')
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
# 2. 特征工程
fe = FeatureEngineer()
df = fe.add_all_features(df)
# 3. 生成标签
label_gen = LabelGenerator(gain_threshold=0.02, loss_threshold=-0.02)
labels = label_gen.generate_labels(df)
# 4. 训练模型
model = ScikitLearnModel(RandomForestClassifier(n_estimators=100))
train_size = int(len(df)*0.7)
X_train = df.iloc[:train_size]
y_train = labels[:train_size]
model.fit(fe.get_feature_matrix(X_train), y_train)
# 5. 回测
cerebro = bt.Cerebro()
# 添加数据
data = bt.feeds.PandasData(dataname=df)
cerebro.adddata(data)
# 添加 ML 策略
cerebro.addstrategy(
OnlineLearningStrategy,
model=model,
feature_cols=fe.feature_names,
retrain_interval=10,
buy_threshold=0.6,
sell_threshold=0.4
)
# 运行
result = cerebro.run()
```bash
#### 示例 2: 使用 Keras 神经网络
```python
from backtrader.ml import KerasModel
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
# 构建自定义 Keras 模型
def build_model(input_dim):
model = Sequential([
Dense(128, activation='relu', input_dim=input_dim),
Dropout(0.2),
Dense(64, activation='relu'),
Dropout(0.2),
Dense(32, activation='relu'),
Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
return model
# 创建包装器
ml_model = KerasModel(input_shape=(50,)) # 根据特征数量调整
# 训练
ml_model.fit(X_train, y_train, epochs=100, batch_size=32)
# 在策略中使用
cerebro.addstrategy(
OnlineLearningStrategy,
model=ml_model,
feature_cols=fe.feature_names,
retrain_interval=5
)
```bash
#### 示例 3: 信号融合
```python
class FusionStrategy(bt.Strategy):
"""融合 ML 信号和技术指标的策略"""
def __init__(self):
# ML 预测指标
self.ml_pred = MLPredictionIndicator(
model=self.p.model,
features=[self.data],
buy_threshold=0.6,
sell_threshold=0.4
)
# 技术指标
self.sma_short = bt.indicators.SMA(self.data.close, period=20)
self.sma_long = bt.indicators.SMA(self.data.close, period=50)
self.rsi = bt.indicators.RSI(self.data.close, period=14)
def next(self):
# ML 信号
ml_signal = self.ml_pred.signal[0]
# 技术分析信号
ta_signal = 0
if self.sma_short[0] > self.sma_long[0]:
ta_signal += 1
if self.rsi[0] < 30: # 超卖
ta_signal += 1
elif self.rsi[0] > 70: # 超买
ta_signal -= 1
# 融合信号
combined = ml_signal*0.6 + ta_signal* 0.4
if combined > 0.5 and not self.position:
self.buy()
elif combined < -0.5 and self.position:
self.sell()
```bash
### 实施计划
#### 第一阶段 (P0 功能)
1. ML 模型抽象接口
2. scikit-learn 模型包装
3. Keras/TensorFlow 模型包装
4. 基础特征工程器
5. 标签生成器
6. ML 预测指标
#### 第二阶段 (P1 功能)
1. PyTorch 模型包装
2. XGBoost/LightGBM 包装
3. 在线学习策略
4. Replay Memory
5. ML 分析器
6. 时间序列交叉验证
#### 第三阶段 (P2 功能)
1. 特征选择工具
2. 超参数优化
3. 多模型集成
4. 模型版本管理
5. 高级评估指标
- --
## 总结
通过借鉴 AI-Strategies-StockMarket 项目的设计理念,Backtrader 可以扩展以下能力:
1. **统一的 ML 接口**: 支持多种 ML 框架的无缝集成
2. **丰富的特征工程**: 50+金融技术指标的自动提取
3. **智能标签生成**: 基于交易成本和风险阈值的标签系统
4. **在线学习**: 策略运行时的增量学习和模型更新
5. **信号融合**: ML 预测与传统技术指标的有机结合
6. **完整评估**: 模型性能和交易表现的综合分析
这些增强功能将使 Backtrader 能够支持 AI 驱动的量化策略开发,从传统的技术分析扩展到机器学习、深度学习领域,为用户提供更强大的策略研发工具。