Phase 2: 回测引擎与 Broker

周期: 3 周 | 优先级: 🔴 最高 | 风险: 高


1. 目标

实现 Tick 级回测引擎的核心逻辑,包括三种运行模式和两种新 Broker。

1.1 核心目标

  • ✅ 实现 TickBroker(纯 Tick 撮合)

  • ✅ 实现 MixBroker(Tick+Bar 混合撮合)

  • ✅ 实现三种运行模式(BAR/TICK/MIXED)

  • ✅ 实现批处理通知机制

  • ✅ 确保撮合准确性

1.2 成功标准

| 指标 | 目标 | 测量方法 |

|——|——|———|

| 撮合准确性 | 100% | 对比验证 |

| 1 天 Tick 回测 | < 10 秒 | 性能测试 |

| 内存使用 | < 200MB | memory_profiler |

| 回归测试 | 100%通过 | 1020/1020 |


2. 实施内容

2.1 TickBroker 实现(5 天)

2.1.1 TickBroker 核心逻辑

  • 文件*: backtrader/brokers/tickbroker.py

import backtrader as bt
from backtrader.brokers import BackBroker
from backtrader.order import Order

class TickBroker(BackBroker):
    """Tick 级订单撮合 Broker"""

    params = (
        ('slippage_model', 'fixed'),      # 'fixed', 'percentage', 'tick'
        ('slippage_value', 0.0),          # 滑点值
        ('partial_fill', True),           # 允许部分成交
        ('market_impact', False),         # 市场冲击模型
        ('impact_factor', 0.0001),        # 冲击系数
    )

    def __init__(self):
        super().__init__()
        self._tick_channels = {}  # {symbol: TickChannel}
        self._pending_orders = []  # 待撮合订单
        self._matched_orders = set()  # 本 timestamp 已撮合订单

    def register_tick_channel(self, symbol, channel):
        """注册 Tick 通道"""
        self._tick_channels[symbol] = channel

    def match_tick(self, tick):
        """Tick 撮合逻辑

        Args:
            tick: TickEvent 实例
        """
        symbol = tick.symbol

# 获取该 symbol 的待撮合订单
        orders_to_match = [
            o for o in self._pending_orders
            if self._get_order_symbol(o) == symbol
            and o.ref not in self._matched_orders
        ]

        for order in orders_to_match:
            if self._try_match_order(order, tick):
                self._matched_orders.add(order.ref)

    def _try_match_order(self, order, tick):
        """尝试撮合订单

        Returns:
            bool: 是否成功撮合
        """

# 1. 检查订单方向与 tick 方向
        if order.isbuy() and tick.direction == 'sell':

# 买单与卖方成交 tick 匹配
            match_price = tick.price
        elif order.issell() and tick.direction == 'buy':

# 卖单与买方成交 tick 匹配
            match_price = tick.price
        else:
            return False

# 2. 检查价格条件
        if not self._check_price_condition(order, match_price):
            return False

# 3. 计算滑点
        exec_price = self._apply_slippage(order, match_price)

# 4. 计算成交量
        if self.p.partial_fill:

# 部分成交:取订单剩余量与 tick 量的最小值
            fill_size = min(order.executed.remsize, tick.volume)
        else:

# 全量成交:只有 tick 量足够时才成交
            if tick.volume >= order.executed.remsize:
                fill_size = order.executed.remsize
            else:
                return False

# 5. 执行成交
        self._execute_order(order, exec_price, fill_size, tick.timestamp)

        return True

    def _check_price_condition(self, order, price):
        """检查价格条件"""
        if order.exectype == Order.Market:
            return True

        elif order.exectype == Order.Limit:
            if order.isbuy():
                return price <= order.price
            else:
                return price >= order.price

        elif order.exectype == Order.Stop:
            if order.isbuy():
                return price >= order.price
            else:
                return price <= order.price

        elif order.exectype == Order.StopLimit:

# 先检查是否触发
            if order.triggered:

# 已触发,按 limit 检查
                if order.isbuy():
                    return price <= order.pricelimit
                else:
                    return price >= order.pricelimit
            else:

# 检查是否触发
                if order.isbuy():
                    if price >= order.price:
                        order.triggered = True
                        return price <= order.pricelimit
                else:
                    if price <= order.price:
                        order.triggered = True
                        return price >= order.pricelimit
                return False

        return False

    def _apply_slippage(self, order, price):
        """应用滑点"""
        if self.p.slippage_model == 'fixed':
            if order.isbuy():
                return price + self.p.slippage_value
            else:
                return price - self.p.slippage_value

        elif self.p.slippage_model == 'percentage':
            if order.isbuy():
                return price *(1 + self.p.slippage_value)
            else:
                return price*(1 - self.p.slippage_value)

        elif self.p.slippage_model == 'tick':

# 假设 tick_size 从数据中获取
            tick_size = getattr(order.data, 'tick_size', 0.01)
            if order.isbuy():
                return price + self.p.slippage_value*tick_size
            else:
                return price - self.p.slippage_value*tick_size

        return price

    def _execute_order(self, order, price, size, timestamp):
        """执行订单成交"""

# 更新订单执行信息
        order.execute(
            dt=timestamp,
            size=size,
            price=price,
            closed=size,
            closedvalue=size*price,
            closedcomm=self._getcommission(order, size, price),
            opened=0,
            openedvalue=0,
            openedcomm=0,
            margin=0,
            pnl=0,
            psize=0,
            pprice=0
        )

# 更新持仓
        self._update_position(order, size, price)

# 检查订单是否完全成交
        if order.executed.remsize == 0:
            order.completed()
            self._pending_orders.remove(order)
        else:
            order.partial()

# 加入通知队列
        self.notifs.append(order.clone())

    def _update_position(self, order, size, price):
        """更新持仓"""

# ... 持仓更新逻辑 ...
        pass

    def submit(self, order):
        """提交订单"""
        order.submit()
        self._pending_orders.append(order)
        self.notifs.append(order.clone())
        return order

    def finalize_timestamp(self):
        """时间戳结束时清理"""
        self._matched_orders.clear()

```bash

- *测试**: `tests/phase2/test_tick_broker.py`

```python
def test_tick_broker_market_order():
    """测试市价单撮合"""
    broker = TickBroker()

# 创建订单
    order = Order(
        owner=None,
        data=None,
        size=1.0,
        price=None,
        exectype=Order.Market,
        isbuy=True
    )

    broker.submit(order)

# 模拟 tick
    tick = TickEvent(
        timestamp=100.0,
        price=50000,
        volume=2.0,
        direction='sell',
        symbol='BTC/USDT'
    )

    broker.match_tick(tick)

# 验证成交
    assert order.status == Order.Completed
    assert order.executed.price == 50000
    assert order.executed.size == 1.0

def test_tick_broker_limit_order():
    """测试限价单撮合"""
    broker = TickBroker()

# 买单:限价 50000
    order = Order(
        size=1.0,
        price=50000,
        exectype=Order.Limit,
        isbuy=True
    )
    broker.submit(order)

# tick 价格 50001,不成交
    tick1 = TickEvent(timestamp=100.0, price=50001, volume=2.0, direction='sell')
    broker.match_tick(tick1)
    assert order.status == Order.Submitted

# tick 价格 49999,成交
    tick2 = TickEvent(timestamp=101.0, price=49999, volume=2.0, direction='sell')
    broker.match_tick(tick2)
    assert order.status == Order.Completed
    assert order.executed.price == 49999

def test_tick_broker_partial_fill():
    """测试部分成交"""
    broker = TickBroker(partial_fill=True)

    order = Order(size=10.0, exectype=Order.Market, isbuy=True)
    broker.submit(order)

# 第一个 tick:成交 3
    tick1 = TickEvent(timestamp=100.0, price=50000, volume=3.0, direction='sell')
    broker.match_tick(tick1)
    assert order.status == Order.Partial
    assert order.executed.size == 3.0

# 第二个 tick:成交 7,完全成交
    tick2 = TickEvent(timestamp=101.0, price=50001, volume=7.0, direction='sell')
    broker.match_tick(tick2)
    assert order.status == Order.Completed
    assert order.executed.size == 10.0

```bash

- --

### 2.2 MixBroker 实现(6 天)

- *文件**: `backtrader/brokers/mixbroker.py`

```python
class MixBroker(TickBroker):
    """混合撮合 Broker(Tick 优先 + Bar 兜底)"""

    params = (
        ('tick_priority', True),          # Tick 撮合优先
        ('bar_fallback', True),           # Bar 兜底撮合
        ('tick_timeout', 300.0),          # Tick 撮合超时(秒)
    )

    def __init__(self):
        super().__init__()
        self._order_submit_time = {}  # {order.ref: timestamp}
        self._tick_matched_orders = set()  # 已被 tick 撮合的订单

    def match_tick(self, tick):
        """Tick 撮合 - 记录已撮合订单"""
        symbol = tick.symbol

        orders_to_match = [
            o for o in self._pending_orders
            if self._get_order_symbol(o) == symbol
            and o.ref not in self._matched_orders
            and o.ref not in self._tick_matched_orders
        ]

        for order in orders_to_match:
            if self._try_match_order(order, tick):
                self._matched_orders.add(order.ref)
                self._tick_matched_orders.add(order.ref)

    def finalize_bar(self, data, bar_timestamp):
        """Bar 结束时的兜底撮合

        Args:
            data: DataBase 实例
            bar_timestamp: bar 的时间戳
        """
        if not self.p.bar_fallback:
            return

        symbol = getattr(data, '_name', 'default')

# 获取未被 tick 撮合的订单
        orders_to_match = [
            o for o in self._pending_orders
            if self._get_order_symbol(o) == symbol
            and o.ref not in self._tick_matched_orders
        ]

        for order in orders_to_match:

# 检查是否超时
            submit_time = self._order_submit_time.get(order.ref, 0)
            if bar_timestamp - submit_time > self.p.tick_timeout:

# 超时,使用 bar 撮合
                self._match_with_bar(order, data, bar_timestamp)
            else:

# 未超时,继续等待 tick
                pass

    def _match_with_bar(self, order, data, timestamp):
        """使用 Bar 数据撮合订单"""

# 使用 bar 的 OHLC 价格撮合
        if order.exectype == Order.Market:

# 市价单:使用 close 价格
            exec_price = data.close[0]

        elif order.exectype == Order.Limit:

# 限价单:检查是否在 bar 范围内
            if order.isbuy():
                if data.low[0] <= order.price:
                    exec_price = min(order.price, data.open[0])
                else:
                    return  # 未触发
            else:
                if data.high[0] >= order.price:
                    exec_price = max(order.price, data.open[0])
                else:
                    return

        elif order.exectype == Order.Stop:

# 止损单
            if order.isbuy():
                if data.high[0] >= order.price:
                    exec_price = max(order.price, data.open[0])
                else:
                    return
            else:
                if data.low[0] <= order.price:
                    exec_price = min(order.price, data.open[0])
                else:
                    return

        else:
            return

# 应用滑点
        exec_price = self._apply_slippage(order, exec_price)

# 执行成交(全量)
        fill_size = order.executed.remsize
        self._execute_order(order, exec_price, fill_size, timestamp)

    def submit(self, order):
        """提交订单 - 记录提交时间"""
        self._order_submit_time[order.ref] = self._current_timestamp
        return super().submit(order)

```bash

- *测试**: `tests/phase2/test_mix_broker.py`

```python
def test_mix_broker_tick_priority():
    """测试 Tick 优先撮合"""
    broker = MixBroker()

    order = Order(size=1.0, exectype=Order.Market, isbuy=True)
    broker.submit(order)

# Tick 撮合
    tick = TickEvent(timestamp=100.0, price=50000, volume=2.0, direction='sell')
    broker.match_tick(tick)

    assert order.status == Order.Completed
    assert order.executed.price == 50000

# Bar 兜底不应该再次撮合
    data = MockData(open=50100, high=50200, low=49900, close=50050)
    broker.finalize_bar(data, 100.5)

# 验证没有重复成交
    assert order.executed.size == 1.0

def test_mix_broker_bar_fallback():
    """测试 Bar 兜底撮合"""
    broker = MixBroker(tick_timeout=5.0)
    broker._current_timestamp = 100.0

    order = Order(size=1.0, exectype=Order.Market, isbuy=True)
    broker.submit(order)

# 没有 tick 撮合

# ...

# 超时后 bar 兜底
    data = MockData(open=50000, high=50100, low=49900, close=50050)
    broker.finalize_bar(data, 106.0)  # 超过 5 秒

    assert order.status == Order.Completed
    assert order.executed.price == 50050  # close 价格

```bash

- --

### 2.3 三种运行模式实现(5 天)

- *文件**: `backtrader/cerebro.py`

```python
from enum import Enum

class RunMode(Enum):
    """运行模式"""
    BAR = 'bar'        # 纯 K 线模式(向后兼容)
    TICK = 'tick'      # 纯 Tick 模式
    MIXED = 'mixed'    # 混合模式(Bar 主时钟 + Tick 辅助)

class Cerebro:
    """三种运行模式实现"""

    def _run_bar_mode(self, runstrats):
        """纯 K 线模式 - 完全向后兼容"""

# 使用现有的_runonce 或_runnext 逻辑
        if self.p.preload and self.p.runonce:
            return self._runonce(runstrats)
        else:
            return self._runnext(runstrats)

    def _run_tick_mode(self, runstrats):
        """纯 Tick 模式 - 事件驱动"""

# 初始化事件队列
        self._init_event_queue(runstrats)

# 设置 TickBroker
        if not isinstance(self._broker, TickBroker):
            self._broker = TickBroker()

# 注册 Tick 通道到 Broker
        for strat in runstrats:
            if hasattr(strat, '_channels'):
                for (ch_type, symbol), ch in strat._channels.items():
                    if ch_type == 'tick':
                        self._broker.register_tick_channel(symbol, ch)

# 事件循环
        while True:
            try:
                event = self._event_queue.pop()
            except StopIteration:
                break

            self._current_timestamp = event.timestamp

# 1. Broker 撮合(如果是 tick 事件)
            if event.channel_type == 'tick':
                self._broker.match_tick(event.data)

# 2. 分发事件到策略
            for strat in runstrats:
                self._safe_strategy_call(strat, '_dispatch_event', event)

# 3. 批量分发通知
            self._deliver_notifications(runstrats)

# 4. Broker 清理
            self._broker.finalize_timestamp()

        return runstrats

    def _run_mixed_mode(self, runstrats):
        """混合模式 - Bar 主时钟 + Tick 辅助"""

# 初始化事件队列
        self._init_event_queue(runstrats)

# 设置 MixBroker
        if not isinstance(self._broker, MixBroker):
            self._broker = MixBroker()

# 注册通道
        for strat in runstrats:
            if hasattr(strat, '_channels'):
                for (ch_type, symbol), ch in strat._channels.items():
                    if ch_type == 'tick':
                        self._broker.register_tick_channel(symbol, ch)

# 事件循环
        current_bar_ts = None
        bar_events = []  # 当前 bar 内的子事件

        while True:
            try:
                event = self._event_queue.pop()
            except StopIteration:
                break

            self._current_timestamp = event.timestamp

# 检查是否是新 bar
            if event.event_type == 'bar_close':

# 处理上一个 bar 的所有子事件
                for sub_event in bar_events:
                    if sub_event.channel_type == 'tick':
                        self._broker.match_tick(sub_event.data)

                    for strat in runstrats:
                        self._safe_strategy_call(strat, '_dispatch_event', sub_event)

# Bar 兜底撮合
                data = self._bar_by_name.get(event.channel_name)
                if data:
                    self._broker.finalize_bar(data, event.timestamp)

# 触发策略 next()
                for strat in runstrats:
                    self._safe_strategy_call(strat, 'next')

# 批量通知
                self._deliver_notifications(runstrats)

# 清理
                bar_events.clear()
                current_bar_ts = event.timestamp

            else:

# 子事件(tick/orderbook/funding)
                bar_events.append(event)

        return runstrats

```bash

- *测试**: `tests/phase2/test_run_modes.py`

```python
def test_bar_mode_backward_compatible():
    """测试 BAR 模式向后兼容"""
    cerebro = bt.Cerebro()
    cerebro.adddata(bt.feeds.GenericCSVData(dataname='...'))
    cerebro.addstrategy(SimpleStrategy)

    results = cerebro.run()  # 默认 BAR 模式

# 验证使用 BackBroker
    assert isinstance(cerebro._broker, bt.brokers.BackBroker)

# 验证结果与历史一致

# ...

def test_tick_mode():
    """测试 TICK 模式"""
    cerebro = bt.Cerebro()
    cerebro.add_channel(TickChannel, symbol='BTC/USDT', dataname='...')
    cerebro.addstrategy(TickStrategy)

    results = cerebro.run(mode=bt.RunMode.TICK)

# 验证使用 TickBroker
    assert isinstance(cerebro._broker, bt.brokers.TickBroker)

# 验证策略收到 tick 回调
    strat = results[0]
    assert strat.tick_count > 0

def test_mixed_mode():
    """测试 MIXED 模式"""
    cerebro = bt.Cerebro()
    cerebro.adddata(bt.feeds.GenericCSVData(dataname='...'))
    cerebro.add_channel(TickChannel, symbol='BTC/USDT', dataname='...')
    cerebro.addstrategy(MixedStrategy)

    results = cerebro.run(mode=bt.RunMode.MIXED)

# 验证使用 MixBroker
    assert isinstance(cerebro._broker, bt.brokers.MixBroker)

# 验证策略收到 bar 和 tick 回调
    strat = results[0]
    assert strat.bar_count > 0
    assert strat.tick_count > 0

```bash

- --

### 2.4 批处理通知机制(3 天)

已在 Phase 1 实现此处进行集成测试

- *测试**: `tests/phase2/test_batch_notifications.py`

```python
def test_notifications_within_same_timestamp():
    """测试同 timestamp 内通知批处理"""

    class NotificationTestStrategy(bt.Strategy):
        def __init__(self):
            self.notifications = []

        def notify_order(self, order):
            self.notifications.append({
                'timestamp': self._current_timestamp,
                'status': order.status,
                'ref': order.ref
            })

# ... 设置导致同 timestamp 多个通知的场景 ...

    results = cerebro.run(mode=bt.RunMode.TICK)
    strat = results[0]

# 验证通知在 timestamp 结束后批量分发

# 验证通知顺序按优先级排序

# ...

```bash

- --

## 3. 交付物

### 3.1 代码

- [ ] `backtrader/brokers/tickbroker.py` - TickBroker
- [ ] `backtrader/brokers/mixbroker.py` - MixBroker
- [ ] `backtrader/cerebro.py` - 三种运行模式
- [ ] `backtrader/__init__.py` - 导出 RunMode

### 3.2 测试

- [ ] `tests/phase2/test_tick_broker.py`
- [ ] `tests/phase2/test_mix_broker.py`
- [ ] `tests/phase2/test_run_modes.py`
- [ ] `tests/phase2/test_batch_notifications.py`
- [ ] `tests/phase2/test_accuracy.py` - 准确性验证

### 3.3 文档

- [ ] Phase 2 完成报告
- [ ] Broker 使用指南
- [ ] 运行模式选择指南

- --

## 4. 验收标准

### 4.1 功能验收

- [ ] TickBroker 正确撮合各类订单
- [ ] MixBroker 无重复/遗漏撮合
- [ ] 三种模式正常切换
- [ ] 批处理通知正确

### 4.2 准确性验收

- [ ] TickBroker 与手工计算一致
- [ ] MixBroker  TickBroker 一致 tick 场景
- [ ] 订单审计无遗漏

### 4.3 性能验收

- [ ] 1  Tick 回测 < 10 
- [ ] 内存使用 < 200MB
- [ ] 回归测试 100%通过

- --

## 5. 时间表

| 任务 | 工作量 | 开始 | 结束 |

|------|--------|------|------|

| TickBroker 实现 | 5  | Day 1 | Day 5 |

| MixBroker 实现 | 6  | Day 6 | Day 11 |

| 三种运行模式 | 5  | Day 12 | Day 16 |

| 批处理通知集成 | 3  | Day 17 | Day 19 |

| 准确性验证 | 2  | Day 20 | Day 21 |

- --

## 6. 风险与应对

| 风险 | 概率 | 影响 | 应对措施 |

|------|------|------|---------|

| MixBroker 撮合 bug |  |  | 详细状态机设计+完善测试 |

| 性能不达标 |  |  | 性能分析+优化 |

| 回归问题 |  |  | 持续回归测试 |

- --

## 7. 下一步

Phase 2 完成后进入 Phase 3OrderBook 深度撮合