背景¶
backtrader 已经去除了元编程,并且进行了一系列的优化,性能有了大幅度的提升,但是也逐渐接近了 python 的瓶颈。现在 backtrader 底层的 line 的数据结构应该是基于 array 构建的,尝试使用 numpy.array 来构建 line 的数据结构,看是否能够进一步提高性能。
认真研究 backtrader 这个项目,进行一个可行性研究,如果这个方案是可行的,尝试给出一个优化计划;如果是不可行的,给出明确的理由。
可行性研究¶
1. 当前数据结构实现分析¶
LineBuffer 核心存储结构* (
backtrader/linebuffer.py):
# 非缓存模式 (UnBounded)
self.array = array.array("d") # Python 内置数组,双精度浮点
# 缓存模式 (QBuffer)
self.array = collections.deque(maxlen=deque_maxlen) # 固定大小的双端队列
```bash
- *关键操作模式**:
| 操作 | 方法 | 调用频率 | 复杂度 |
|------|------|---------|--------|
| 追加数据 | `forward()` → `array.append()` | 每 bar 调用一次 | O(1) |
| 相对索引访问 | `__getitem__(ago)` → `array[idx + ago]` | 每个指标/策略每 bar 多次 | O(1) |
| 值设置 | `__setitem__(ago, value)` | 每个指标每 bar 一次 | O(1) |
| 批量切片 | `src[start:end]` in `once()` | runonce 模式批量计算 | O(n) |
| 批量扩展 | `array.extend([val] *size)` | forward 多步时 | O(k) |
- *访问模式特点**:
1. **顺序追加为主**: 回测过程中,数据按时间顺序追加
2. **相对索引访问**: `self[0]`当前值,`self[-1]`前一个值,`self[1]`下一个值
3. **绝对索引访问**: `once()`方法中直接用 `array[i]` 访问
4. **切片访问**: 指标计算如 SMA 使用 `src[start_idx:end_idx]`
#### 2. numpy.array 替换的技术分析
- *优势**:
1. **批量计算加速**: numpy 的向量化操作比 Python 循环快 10-100 倍
2. **内存效率**: numpy 数组连续存储,缓存友好
3. **数学运算**: `np.mean()`, `np.sum()`等函数高度优化
- *劣势与挑战**:
| 问题 | 当前实现 | numpy 实现 | 影响 |
|------|---------|----------|------|
| **动态追加**| `array.append()` O(1)摊销 | `np.append()` O(n) 每次创建新数组 |**严重**|
|**预分配问题**| 不需要预知大小 | 需要预分配或频繁 resize | 复杂度增加 |
|**循环缓冲区**| `deque(maxlen=n)` 原生支持 | 需要手动实现环形缓冲 | 额外代码 |
|**类型灵活性** | 自动处理 None/NaN | 需要显式处理 | 兼容性问题 |
- *核心瓶颈分析**:
```python
# 当前 forward() 方法 - 每 bar 调用一次
def forward(self, value=NAN, size=1):
self.idx += size
self.lencount += size
self.array.append(append_val) # array.array: O(1)摊销
# 如果用 numpy: np.append()会创建新数组,O(n)复杂度!
```bash
- *性能对比测试估算** (1885 bars 数据):
| 操作 | array.array | numpy.array | 备注 |
|------|-------------|-------------|------|
| 1885 次 append | ~0.1ms | ~50ms | numpy 需要 O(n²)总时间 |
| 1 次 mean(1885) | ~0.05ms | ~0.01ms | numpy 快 5 倍 |
| 1885 次__getitem__ | ~0.5ms | ~0.3ms | numpy 略快 |
#### 3. 可行性结论
- *结论:直接替换不可行,但可以采用混合方案**
- *不可行的原因**:
1. **动态追加是核心操作**: 回测框架的本质是逐 bar 处理,每 bar 都需要追加数据
2. **numpy 不支持高效追加**: `np.append()`每次创建新数组,1885 个 bar 会导致 O(n²)复杂度
3. **QBuffer 模式需要循环缓冲**: numpy 没有原生的固定大小循环缓冲区
- *可行的混合方案**:
1. **保持 array.array 作为主存储**: 用于动态追加
2. **在 once()批量计算时转换为 numpy**: 利用 numpy 的向量化优势
3. **为特定指标提供 numpy 加速版本**: 如 SMA、EMA 等
### 优化计划
#### 方案 A: 混合存储策略 (推荐)
- *核心思想**: 保留`array.array`作为动态存储,在需要批量计算时临时转换为 numpy
- *实现步骤**:
1. **添加 numpy 转换方法**:
```python
class LineBuffer:
def to_numpy(self):
"""将 array.array 转换为 numpy 数组(只读视图)"""
import numpy as np
return np.frombuffer(self.array, dtype=np.float64)
```bash
1. **优化 once()方法中的批量计算**:
```python
# 当前实现
def once(self, start, end):
src = self.data.array
for i in range(calc_start, actual_end):
window = src[start_idx:end_idx]
dst[i] = sum(window) / period
# 优化后
def once(self, start, end):
import numpy as np
src_np = np.frombuffer(self.data.array, dtype=np.float64)
# 使用 numpy 的滑动窗口或 cumsum 技巧
cumsum = np.cumsum(src_np)
dst_np = (cumsum[period:] - cumsum[:-period]) / period
# 写回结果
```bash
1. **为高频指标提供专门的 numpy 加速版本**:
- `MovingAverageSimple.once()` - 使用`np.convolve`或`cumsum`技巧
- `ExponentialMovingAverage.once()` - 使用`scipy.ndimage.uniform_filter1d`
- `StandardDeviation.once()` - 使用`np.std`
- *预期收益**:
- `once()`模式下批量计算速度提升 5-10 倍
- 不影响`next()`模式的逐 bar 处理
- 兼容现有代码,无需大规模重构
#### 方案 B: 预分配数组策略 (可选)
- *适用场景**: 数据长度已知的离线回测
- *实现思路**:
```python
class LineBuffer:
def preallocate(self, size):
"""预分配 numpy 数组"""
import numpy as np
self._np_array = np.empty(size, dtype=np.float64)
self._np_array.fill(np.nan)
self._write_idx = 0
def forward_preallocated(self, value):
"""使用预分配数组的 forward"""
self._np_array[self._write_idx] = value
self._write_idx += 1
```bash
- *限制**: 需要提前知道数据长度,不适用于实时交易
#### 方案 C: 专门的 numpy 指标基类 (可选)
- *为需要高性能的指标提供专门的基类**:
```python
class NumpyIndicatorBase(IndicatorBase):
"""使用 numpy 优化的指标基类"""
def once(self, start, end):
"""numpy 向量化计算"""
import numpy as np
# 获取输入数据的 numpy 视图
src = np.frombuffer(self.data.array, dtype=np.float64)
# 子类实现具体计算
result = self._calculate_numpy(src, start, end)
# 写回结果
dst = self.lines[0].array
for i, val in enumerate(result, start):
dst[i] = val
```bash
### 实施建议
1. **第一阶段**: 实现`to_numpy()`方法,为现有指标的`once()`方法提供 numpy 加速选项
2. **第二阶段**: 优化高频使用的指标(SMA、EMA、RSI 等)的`once()`方法
3. **第三阶段**: 性能测试和基准对比,验证优化效果
- *风险评估**:
- 低风险: 方案 A 不改变核心数据结构,仅在批量计算时使用 numpy
- 中风险: 需要处理 numpy 与 Python 原生类型的转换边界情况
- 可回退: 如果出现问题,可以轻松回退到纯 Python 实现
- *预期性能提升**:
- `runonce=True`模式: 指标计算速度提升 5-10 倍
- `runonce=False`模式: 无明显变化(本身就是逐 bar 处理)
- 整体回测速度: 预计提升 20-50%(取决于指标复杂度)
- --
## 补充分析:预分配 numpy 数组 + 索引赋值方案
### 问题重述
如果预先给 numpy 数组分配固定大小,每次`append`改成索引赋值`arr[idx] = value`,这种方式能解决动态追加的 O(n)问题吗?
### 结论:**可行,但有适用范围限制**
#### 技术分析
- *预分配+赋值 vs 动态追加**:
| 操作 | array.array.append() | np.append() | np 预分配+赋值 |
|------|---------------------|-------------|--------------|
| 单次复杂度 | O(1)摊销 | O(n) | **O(1)**|
| 1885 次操作 | ~0.1ms | ~50ms |**~0.05ms**|
| 内存分配 | 动态增长 | 每次新建 |**一次性** |
- *numpy 索引赋值确实是 O(1)操作**,性能与`array.array`相当甚至更好。
#### 可行性条件
预分配方案需要满足以下条件:
| 条件 | preload=True | preload=False | 说明 |
|------|-------------|---------------|------|
| 数据长度已知 | ✅ | ❌ | preload 后 `len(data)` 确定 |
| 适用场景 | 离线回测 | 实时交易 | 实时数据长度未知 |
| runonce 模式 | ✅ 最佳 | N/A | 向量化+预分配双重优势 |
- *关键发现**: 在默认的 `preload=True, runonce=True` 模式下:
1. `cerebro.run()` 首先调用 `data.preload()` 加载所有数据
2. 此时 `len(data)` 已知(如 1885 bars)
3. 可以在策略/指标初始化时预分配 numpy 数组
### 方案 D: 完整 numpy 预分配方案 (新增推荐)
- *适用场景**: `preload=True` 的离线回测(默认配置)
#### 实现架构
```python
class LineBuffer:
def __init__(self):
# 保留原有 array.array 作为后备
self._array = array.array("d")
# numpy 预分配数组(延迟初始化)
self._np_array = None
self._np_size = 0
self._write_idx = 0
self._use_numpy = False
def preallocate_numpy(self, size):
"""预分配 numpy 数组 - 在数据 preload 后调用"""
import numpy as np
self._np_array = np.empty(size, dtype=np.float64)
self._np_array.fill(np.nan) # 用 NaN 填充,表示未计算
self._np_size = size
self._write_idx = 0
self._use_numpy = True
@property
def array(self):
"""兼容现有代码的 array 属性"""
if self._use_numpy:
return self._np_array
return self._array
def forward(self, value=NAN, size=1):
"""优化的 forward 方法"""
if self._use_numpy:
# numpy 模式: O(1)索引赋值
for i in range(size):
if self._write_idx < self._np_size:
self._np_array[self._write_idx] = value
self._write_idx += 1
self.idx += size
self.lencount += size
else:
# 原有模式: array.array.append()
self.idx += size
self.lencount += size
if size == 1:
self._array.append(value)
else:
self._array.extend([value] * size)
def __getitem__(self, ago):
"""统一的索引访问"""
if self._use_numpy:
return self._np_array[self._idx + ago]
return self._array[self._idx + ago]
def __setitem__(self, ago, value):
"""统一的索引设置"""
if self._use_numpy:
self._np_array[self._idx + ago] = value
else:
self._array[self._idx + ago] = value
```bash
#### Cerebro 集成
```python
# cerebro.py 中的修改
def _runonce(self, runstrats, predata):
# 数据已 preload,获取总长度
data_len = max(len(d) for d in self.datas)
# 为所有 Line 对象预分配 numpy 数组
for strat in runstrats:
self._preallocate_lines(strat, data_len)
# 继续原有的 runonce 流程...
def _preallocate_lines(self, obj, size):
"""递归预分配所有 Line 对象"""
# 预分配对象自身的 lines
if hasattr(obj, 'lines'):
for line in obj.lines:
if hasattr(line, 'preallocate_numpy'):
line.preallocate_numpy(size)
# 递归处理指标和观察器
if hasattr(obj, '_lineiterators'):
for indicators in obj._lineiterators.values():
for ind in indicators:
self._preallocate_lines(ind, size)
```bash
#### once()方法优化
```python
# indicators/sma.py 示例
class MovingAverageSimple(MovingAverageBase):
def once(self, start, end):
"""完全向量化的 SMA 计算"""
import numpy as np
# 直接使用 numpy 数组(已预分配)
src = self.data.lines[0]._np_array # 输入数据
dst = self.lines[0]._np_array # 输出结果
period = self.p.period
# 使用 numpy 的 cumsum 技巧计算滑动平均 - O(n)而非 O(n*period)
cumsum = np.cumsum(src)
cumsum[period:] = cumsum[period:] - cumsum[:-period]
dst[period-1:end] = cumsum[period-1:end] / period
# 填充 warmup 期的 NaN
dst[:period-1] = np.nan
```bash
### 性能对比预估
| 场景 | 当前实现 | 方案 D (numpy 预分配) | 提升 |
|------|---------|-------------------|------|
| forward() 1885 次 | 0.1ms | 0.05ms | 2x |
| SMA.once() | 5ms (Python 循环) | 0.1ms (numpy 向量化) | **50x**|
| EMA.once() | 8ms | 0.2ms |**40x**|
| 整体回测 | 基准 | 预计提升 50-70% | - |
### 方案对比总结
| 方案 | 改动范围 | 兼容性 | 性能提升 | 适用场景 |
|------|---------|--------|---------|---------|
| A: 混合存储 | 小 | 高 | 20-50% | 所有场景 |
| B: 可选预分配 | 中 | 高 | 30-50% | preload 模式 |
| C: numpy 指标基类 | 中 | 高 | 指标层面 50x | 所有场景 |
|**D: 完整 numpy 预分配**|**大**|**中**|**50-70%**|**preload 模式**|
### 最终建议
1.**短期优化**: 采用方案 A(混合存储),低风险快速见效
1. **中期优化**: 采用方案 D(完整 numpy 预分配),preload 模式下获得最大性能提升
2. **保持兼容**: 对于`preload=False`或实时交易场景,自动回退到原有`array.array`实现
### 注意事项
1. **QBuffer 模式**: 预分配方案不适用于 exactbars 内存优化模式,需保留原有 deque 实现
2. **类型检查**: numpy 数组要求严格类型,需处理 None 值转换
3. **边界情况**: 数据长度变化(如 replay 模式)需要特殊处理
- --
## C++量化交易系统的数组实现方案参考
### 问题本质
C++ `std::array` 和原生数组确实需要固定大小,但实际上 C++量化系统有多种成熟方案解决这个问题:
### 方案 1: std::vector + reserve() 预分配
- *这是最常用的方案**,与我们提出的 numpy 预分配方案原理相同:
```cpp
class LineBuffer {
private:
std::vector<double> data_;
size_t write_idx_ = 0;
public:
// 预分配容量,避免动态扩容
void reserve(size_t capacity) {
data_.reserve(capacity);
data_.resize(capacity, NAN); // 预填充 NaN
}
// O(1) 追加 - 实际是索引赋值
void append(double value) {
data_[write_idx_++] = value;
}
// O(1) 随机访问
double operator[](int ago) const {
return data_[current_idx_ + ago];
}
};
```bash
- *关键点**: `vector.reserve()` 预分配内存但不改变 size,`resize()` 预填充数据。之后的"追加"实际是索引赋值,O(1)复杂度。
### 方案 2: 环形缓冲区 (Ring Buffer)
- *适用于实时交易和内存受限场景**(对应 backtrader 的 QBuffer 模式):
```cpp
template<typename T, size_t N>
class RingBuffer {
private:
std::array<T, N> buffer_; // 固定大小数组
size_t head_ = 0; // 写入位置
size_t size_ = 0; // 当前元素数
public:
void push(T value) {
buffer_[head_] = value;
head_ = (head_ + 1) % N; // 环形
if (size_ < N) size_++;
}
// ago=0 当前, ago=1 前一个
T operator[](size_t ago) const {
size_t idx = (head_ - 1 - ago + N) % N;
return buffer_[idx];
}
};
// 使用:固定保留最近 100 个 bar
RingBuffer<double, 100> prices;
```bash
### 方案 3: 内存池 (Memory Pool)
- *高性能场景的标准做法**:
```cpp
class BarDataPool {
private:
// 预分配大块内存
std::vector<double> pool_;
size_t block_size_;
public:
BarDataPool(size_t max_bars, size_t lines_per_bar)
: block_size_(lines_per_bar) {
pool_.resize(max_bars * lines_per_bar);
}
// 获取第 n 个 bar 的数据指针
double*get_bar(size_t n) {
return &pool_[n*block_size_];
}
};
```bash
### 方案 4: 分离存储架构
- *专业量化系统的典型设计**:
```cpp
// 数据层:预加载所有历史数据
class MarketData {
std::vector<OHLCV> bars_; // 预加载,大小已知
public:
void load(const std::string& file) {
// 一次性加载所有数据
bars_ = read_csv(file); // 大小确定
}
size_t size() const { return bars_.size(); }
};
// 指标层:根据数据大小预分配
class SMA {
std::vector<double> values_;
public:
void initialize(size_t data_size) {
values_.resize(data_size, NAN); // 预分配
}
void calculate(const MarketData& data) {
// 向量化计算,直接填充 values_
for (size_t i = period_-1; i < data.size(); i++) {
values_[i] = compute_sma(data, i, period_);
}
}
};
```bash
### 方案对比
| 方案 | 适用场景 | 内存效率 | 访问速度 | 实现复杂度 |
|------|---------|---------|---------|-----------|
| vector+reserve | 离线回测 | 中 | 最快 | 低 |
| 环形缓冲区 | 实时交易 | 最高 | 快 | 中 |
| 内存池 | 高频交易 | 高 | 最快 | 高 |
| 分离架构 | 通用 | 中 | 快 | 中 |
### 对 backtrader 的启示
C++系统的核心思路与我们的方案 D 一致:
1. **离线回测**: 数据先加载 → 大小已知 → 预分配所有数组 → 索引赋值
2. **实时交易**: 使用环形缓冲区(即现有的 deque/QBuffer)
3. **混合模式**: 根据运行模式自动选择存储策略
```python
# Python 实现可以借鉴 C++的设计
class LineBuffer:
def __init__(self):
self._use_numpy = False
self._np_array = None
self._array = array.array("d") # 后备方案
def set_mode(self, preload: bool, data_size: int = 0):
"""根据运行模式选择存储策略"""
if preload and data_size > 0:
# 离线回测:预分配 numpy
import numpy as np
self._np_array = np.empty(data_size, dtype=np.float64)
self._np_array.fill(np.nan)
self._use_numpy = True
else:
# 实时交易:动态 array
self._use_numpy = False
```bash
### 结论
- *numpy 预分配方案完全可行**,这正是 C++量化系统的标准做法。关键是:
1. **区分场景**: 离线回测 vs 实时交易
2. **预分配时机**: 数据加载后、策略初始化前
3. **统一接口**: 对外暴露相同的`__getitem__`/`__setitem__`接口
这种设计既能在离线回测中获得 numpy 的向量化性能优势,又能在实时交易中保持灵活性。