回测配置指南

本指南详细介绍如何配置 Backtrader 的回测环境,包括经纪商设置、交易成本、数据处理等。

基本配置

1. Cerebro 配置


# 创建回测引擎

cerebro = bt.Cerebro(
    stdstats=True,          # 是否显示标准统计指标
    oldbuysell=False,       # 是否使用旧版买卖信号显示
    oldtrades=False,        # 是否使用旧版交易显示
    exactbars=False,        # 是否使用精确的 bar 计数
    optreturn=True,         # 优化模式下是否返回策略实例
    optdatas=True,          # 优化模式下是否优化数据预加载
    preload=True,          # 是否预加载数据
    runonce=True,          # 是否一次性运行所有数据
    maxcpus=None,          # 最大 CPU 核心数
    writer=False,          # 是否启用交易记录器
    tradehistory=False,    # 是否记录详细交易历史
    oldsync=False,         # 是否使用旧版数据同步
    tz=None,              # 时区设置
    cheat_on_open=False,  # 是否在开盘时作弊

)

```bash

### 2. 经纪商配置

```python

# 设置经纪商

cerebro.broker.setcash(100000.0)  # 设置初始资金

cerebro.broker.setcommission(commission=0.001)  # 设置佣金

cerebro.broker.set_slippage_perc(0.001)  # 设置滑点

# 详细佣金设置

cerebro.broker.setcommission(
    commission=0.001,     # 佣金率
    margin=2000,          # 保证金要求
    mult=10,              # 合约乘数
    commtype=bt.CommInfoBase.COMM_PERC,  # 佣金类型(百分比)
    percabs=True,         # 是否使用绝对百分比
    leverage=1.0,         # 杠杆率

)

```bash

### 3. 数据配置

```python

# 数据预处理设置

data = bt.feeds.PandasData(
    dataname=df,
    fromdate=datetime.datetime(2020, 1, 1),
    todate=datetime.datetime(2023, 12, 31),
    timeframe=bt.TimeFrame.Days,
    compression=1,
    sessionstart=datetime.time(9, 30),
    sessionend=datetime.time(15, 0),
    tz=pytz.timezone('Asia/Shanghai'),
)

# 数据重采样

cerebro.resampledata(
    data,
    timeframe=bt.TimeFrame.Weeks,
    compression=1,
    bar2edge=True,
    adjbartime=True,
    rightedge=True,
)

```bash

## 高级配置

### 1. 订单类型配置

```python

# 设置订单有效期

cerebro.broker.set_ordertimeout(ordertimeout=None)  # None 表示永不过期

# 设置订单类型

cerebro.broker.set_ordertype(ordertype=bt.Order.Market)  # 市价单

cerebro.broker.set_ordertype(ordertype=bt.Order.Limit)   # 限价单

cerebro.broker.set_ordertype(ordertype=bt.Order.Stop)    # 止损单

cerebro.broker.set_ordertype(ordertype=bt.Order.StopLimit)  # 止损限价单

```bash

### 2. 风险控制配置

```python

# 设置风险控制参数

class RiskManager(bt.Sizer):
    params = (
        ('risk_perc', 0.02),  # 每笔交易风险比例
        ('max_pos', 5),       # 最大持仓数量
    )

    def _getsizing(self, comminfo, cash, data, isbuy):
        if len(self.strategy.positions) >= self.p.max_pos:
            return 0

        risk_amount = cash *self.p.risk_perc
        price = data.close[0]
        size = risk_amount / price

        return int(size)

# 添加风险管理器

cerebro.addsizer(RiskManager)

```bash

### 3. 性能优化配置

```python

# 启用多核处理

cerebro.run(
    maxcpus=4,           # 使用 4 个 CPU 核心
    optreturn=False,     # 不返回策略实例
    optdatas=True,       # 优化数据预加载
    preload=True,        # 预加载数据
    runonce=True,        # 一次性运行

)

# 内存优化

class MemoryOptimizedStrategy(bt.Strategy):
    params = (('lookback', 20),)

    def __init__(self):
        self.data_close = self.data.close
        self.sma = bt.indicators.SMA(period=self.p.lookback)

    def next(self):
        if len(self.data) > self.p.lookback:

# 使用缓存的数据
            if self.data_close[0] > self.sma[0]:
                self.buy()

```bash

### 4. 日志配置

```python
import logging

# 配置日志

logging.basicConfig(
    filename='backtest.log',
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)

class LoggedStrategy(bt.Strategy):
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)

    def next(self):
        self.logger.info(f'当前价格: {self.data.close[0]}')

    def notify_order(self, order):
        if order.status == order.Completed:
            self.logger.info(
                f'订单执行: {order.executed.price}, 数量: {order.executed.size}'
            )

```bash

## 自定义配置

### 1. 自定义经纪商

```python
class CustomBroker(bt.brokers.BackBroker):
    params = (
        ('commission', 0.001),
        ('slip_perc', 0.001),
        ('slip_fixed', 0.00),
        ('slip_open', False),
        ('slip_match', True),
        ('slip_limit', True),
        ('slip_out', False),
    )

    def __init__(self):
        super(CustomBroker, self).__init__()

    def _slip_price(self, price, slip):
        if not self.p.slip_match:
            return price

        if self.p.slip_limit and isinstance(slip, float):
            return price* (1.0 + slip)

        return price + slip

```bash

### 2. 自定义数据源配置

```python
class CustomDataFeed(bt.feeds.GenericCSVData):
    params = (
        ('dtformat', '%Y-%m-%d'),
        ('tmformat', '%H:%M:%S'),
        ('datetime', 0),
        ('time', -1),
        ('open', 1),
        ('high', 2),
        ('low', 3),
        ('close', 4),
        ('volume', 5),
        ('openinterest', -1),
        ('reverse', False),
    )

    def _loadline(self, linetokens):

# 自定义数据加载逻辑
        return super(CustomDataFeed, self)._loadline(linetokens)

```bash

### 3. 自定义指标配置

```python
class CustomIndicator(bt.Indicator):
    params = (
        ('period', 20),
        ('movav', bt.indicators.MovAv.Simple),
        ('subplot', True),
        ('plotname', None),
        ('plotabove', False),
        ('plotlinelabels', True),
        ('plotforce', False),
        ('plotmaster', None),
    )

    plotinfo = dict(
        plot=True,
        subplot=True,
        plotname='Custom Indicator'
    )

    plotlines = dict(
        line=dict(
            _name='CustomLine',
            color='blue',
            ls='-',
            width=1.0
        )
    )

```bash

## 环境配置

### 1. 回测环境

```python

# 设置回测环境

class BacktestEnv:
    def __init__(self):
        self.cerebro = bt.Cerebro()
        self._configure_broker()
        self._configure_data()
        self._configure_analyzers()

    def _configure_broker(self):
        self.cerebro.broker.setcash(100000.0)
        self.cerebro.broker.setcommission(commission=0.001)

    def _configure_data(self):
        self.data = bt.feeds.YahooFinanceData(
            dataname='AAPL',
            fromdate=datetime.datetime(2020, 1, 1),
            todate=datetime.datetime(2023, 12, 31)
        )
        self.cerebro.adddata(self.data)

    def _configure_analyzers(self):
        self.cerebro.addanalyzer(bt.analyzers.SharpeRatio)
        self.cerebro.addanalyzer(bt.analyzers.DrawDown)
        self.cerebro.addanalyzer(bt.analyzers.TradeAnalyzer)

```bash

### 2. 实盘环境

```python

# 设置实盘环境

class LiveEnv:
    def __init__(self):
        self.cerebro = bt.Cerebro()
        self._configure_live_broker()
        self._configure_live_data()
        self._configure_risk_management()

    def _configure_live_broker(self):
        self.broker = CustomBroker()
        self.cerebro.setbroker(self.broker)

    def _configure_live_data(self):
        self.data = CustomDataFeed()
        self.cerebro.adddata(self.data)

    def _configure_risk_management(self):
        self.cerebro.addsizer(RiskManager)

```bash

## 最佳实践

### 1. 配置检查

```python
def validate_config(cerebro):
    """验证配置是否合理"""

# 检查资金设置
    if cerebro.broker.getcash() < 10000:
        raise ValueError("初始资金过低")

# 检查佣金设置
    if cerebro.broker.getcommissioninfo(None).commission > 0.01:
        raise ValueError("佣金设置过高")

# 检查数据设置
    for data in cerebro.datas:
        if data.params.timeframe < bt.TimeFrame.Minutes:
            raise ValueError("时间周期过小")

```bash

### 2. 性能优化

```python
def optimize_performance(cerebro):
    """优化回测性能"""

# 启用预加载
    cerebro.preload(True)

# 启用一次性运行
    cerebro.runonce(True)

# 设置最大 CPU 核心数
    cerebro.maxcpus = multiprocessing.cpu_count() - 1

```bash

### 3. 错误处理

```python
def safe_run(cerebro):
    """安全运行回测"""
    try:
        validate_config(cerebro)
        optimize_performance(cerebro)
        results = cerebro.run()
        return results
    except Exception as e:
        logging.error(f"回测运行错误: {e}")
        return None

```bash

## 常见问题

1. **内存使用过高**
   - 减少数据预加载
   - 使用数据过滤
   - 优化指标计算

1. **回测速度慢**
   - 启用多核处理
   - 减少数据频率
   - 优化策略逻辑

1. **配置冲突**
   - 检查参数兼容性
   - 验证数据同步
   - 确认订单设置

## 下一步

- 学习[策略开发](./strategies.md)
- 了解[数据处理](./data_feeds.md)
- 探索[指标系统](./indicators.md)
- 研究[参数优化](./optimization.md)