Phase 1: 核心基础设施

周期: 3 周 | 优先级: 🔴 高 | 风险: 中


1. 目标

实现 Tick 级架构的核心基础设施,包括完整的 Channel 体系、Strategy 回调机制和优化的通知系统。

1.1 核心目标

  • ✅ 实现完整的 Channel 体系(Tick/OrderBook/Funding)

  • ✅ 实现 Strategy 新增回调机制

  • ✅ 实现优先级通知队列

  • ✅ 实现 Channel 共享模式

  • ✅ 集成到 Cerebro

1.2 成功标准

| 指标 | 目标 | 测量方法 |

|——|——|———|

| 功能完整性 | 100% | 所有 Channel 正常工作 |

| 回调触发 | 100%准确 | 集成测试 |

| 回归测试 | 100%通过 | 1020/1020 |

| 代码覆盖率 | >= 80% | pytest-cov |


2. 实施内容

2.1 OrderBookChannel 实现(3 天)

2.1.1 OrderBook 数据结构

  • 文件*: backtrader/channels/orderbook.py

from dataclasses import dataclass
from typing import List, Tuple
from backtrader.channel import DataChannel, DataValidationResult

@dataclass
class OrderBookSnapshot:
    """订单簿快照"""
    timestamp: float
    symbol: str
    bids: List[Tuple[float, float]]  # [(price, qty), ...] 降序
    asks: List[Tuple[float, float]]  # [(price, qty), ...] 升序

    @property
    def best_bid(self) -> Tuple[float, float]:
        """最优买价"""
        return self.bids[0] if self.bids else (0.0, 0.0)

    @property
    def best_ask(self) -> Tuple[float, float]:
        """最优卖价"""
        return self.asks[0] if self.asks else (0.0, 0.0)

    @property
    def spread(self) -> float:
        """买卖价差"""
        return self.best_ask[0] - self.best_bid[0]

    @property
    def mid_price(self) -> float:
        """中间价"""
        return (self.best_ask[0] + self.best_bid[0]) / 2

    def get_depth(self, side: str, levels: int = 5) -> List[Tuple[float, float]]:
        """获取指定档位深度"""
        if side == 'bid':
            return self.bids[:levels]
        elif side == 'ask':
            return self.asks[:levels]
        return []

    def total_volume(self, side: str, levels: int = 5) -> float:
        """计算总挂单量"""
        depth = self.get_depth(side, levels)
        return sum(qty for _, qty in depth)

class OrderBookChannel(DataChannel):
    """订单簿数据通道"""

    channel_type = 'orderbook'

    def __init__(self, symbol, maxlen=1000,
                 max_levels=20, **kwargs):
        super().__init__(symbol, maxlen, **kwargs)
        self.max_levels = max_levels

    def _validate_event(self, event: OrderBookSnapshot) -> DataValidationResult:
        """验证订单簿数据"""

# 1. 检查 bids 降序
        if len(event.bids) > 1:
            for i in range(len(event.bids) - 1):
                if event.bids[i][0] < event.bids[i+1][0]:
                    return DataValidationResult(
                        valid=False,
                        error=f"Bids not in descending order at index {i}"
                    )

# 2. 检查 asks 升序
        if len(event.asks) > 1:
            for i in range(len(event.asks) - 1):
                if event.asks[i][0] > event.asks[i+1][0]:
                    return DataValidationResult(
                        valid=False,
                        error=f"Asks not in ascending order at index {i}"
                    )

# 3. 检查价格合理性
        if event.bids and event.asks:
            if event.best_bid[0] >= event.best_ask[0]:
                return DataValidationResult(
                    valid=False,
                    error=f"Bid {event.best_bid[0]} >= Ask {event.best_ask[0]}"
                )

# 4. 检查数量为正
        for price, qty in event.bids + event.asks:
            if price <= 0 or qty <= 0:
                return DataValidationResult(
                    valid=False,
                    error=f"Invalid price/qty: {price}/{qty}"
                )

# 5. 时间戳检查
        if self._last_timestamp is not None:
            if event.timestamp < self._last_timestamp:

# 尝试修复
                fixed_event = OrderBookSnapshot(
                    timestamp=self._last_timestamp + 0.001,
                    symbol=event.symbol,
                    bids=event.bids,
                    asks=event.asks
                )
                return DataValidationResult(
                    valid=False,
                    error=f"Out-of-order timestamp",
                    fixed=True,
                    original_value=event,
                    fixed_value=fixed_event
                )

        self._last_timestamp = event.timestamp
        return DataValidationResult(valid=True)

    def load(self):
        """从 JSONL 文件加载订单簿数据"""
        import json

        if 'dataname' not in self.params:
            return

        with open(self.params['dataname']) as f:
            for line in f:
                data = json.loads(line)
                event = OrderBookSnapshot(
                    timestamp=float(data['timestamp']),
                    symbol=self.symbol,
                    bids=[(float(p), float(q)) for p, q in data['bids']],
                    asks=[(float(p), float(q)) for p, q in data['asks']]
                )
                yield event

```bash

- *测试**: `tests/phase1/test_orderbook_channel.py`

```python
def test_orderbook_validation():
    """测试订单簿验证"""
    channel = OrderBookChannel('BTC/USDT', validate=True)

# 正常订单簿
    ob = OrderBookSnapshot(
        timestamp=100.0,
        symbol='BTC/USDT',
        bids=[(50000, 1.0), (49999, 2.0)],
        asks=[(50001, 1.0), (50002, 2.0)]
    )
    channel.push(ob)
    assert len(channel._buffer) == 1

# 错误:bid >= ask
    bad_ob = OrderBookSnapshot(
        timestamp=101.0,
        symbol='BTC/USDT',
        bids=[(50002, 1.0)],
        asks=[(50001, 1.0)]
    )
    channel.push(bad_ob)
    assert len(channel._buffer) == 1  # 未添加
    assert len(channel._validation_errors) == 1

def test_orderbook_properties():
    """测试订单簿属性"""
    ob = OrderBookSnapshot(
        timestamp=100.0,
        symbol='BTC/USDT',
        bids=[(50000, 1.0), (49999, 2.0), (49998, 3.0)],
        asks=[(50001, 1.5), (50002, 2.5), (50003, 3.5)]
    )

    assert ob.best_bid == (50000, 1.0)
    assert ob.best_ask == (50001, 1.5)
    assert ob.spread == 1.0
    assert ob.mid_price == 50000.5
    assert ob.total_volume('bid', 3) == 6.0
    assert ob.total_volume('ask', 3) == 7.5

```bash

- --

### 2.2 FundingRateChannel 实现(2 天)

- *文件**: `backtrader/channels/funding.py`

```python
from dataclasses import dataclass
from backtrader.channel import DataChannel, DataValidationResult

@dataclass
class FundingEvent:
    """资金费率事件"""
    timestamp: float
    symbol: str
    rate: float                    # 当前费率
    mark_price: float              # 标记价格
    next_funding_time: float       # 下次结算时间
    predicted_rate: float = 0.0    # 预测费率

class FundingRateChannel(DataChannel):
    """资金费率数据通道"""

    channel_type = 'funding_rate'

    def _validate_event(self, event: FundingEvent) -> DataValidationResult:
        """验证资金费率数据"""

# 1. 费率范围检查(通常在-0.75%到 0.75%之间)
        if abs(event.rate) > 0.0075:
            return DataValidationResult(
                valid=False,
                error=f"Funding rate {event.rate} out of range"
            )

# 2. 标记价格检查
        if event.mark_price <= 0:
            return DataValidationResult(
                valid=False,
                error=f"Invalid mark price: {event.mark_price}"
            )

# 3. 下次结算时间检查
        if event.next_funding_time <= event.timestamp:
            return DataValidationResult(
                valid=False,
                error=f"Next funding time in the past"
            )

# 4. 时间戳检查
        if self._last_timestamp is not None:
            if event.timestamp < self._last_timestamp:
                fixed_event = FundingEvent(
                    timestamp=self._last_timestamp + 0.001,
                    symbol=event.symbol,
                    rate=event.rate,
                    mark_price=event.mark_price,
                    next_funding_time=event.next_funding_time,
                    predicted_rate=event.predicted_rate
                )
                return DataValidationResult(
                    valid=False,
                    error=f"Out-of-order timestamp",
                    fixed=True,
                    original_value=event,
                    fixed_value=fixed_event
                )

        self._last_timestamp = event.timestamp
        return DataValidationResult(valid=True)

    def load(self):
        """从 CSV 加载资金费率数据"""
        import csv

        if 'dataname' not in self.params:
            return

        with open(self.params['dataname']) as f:
            reader = csv.DictReader(f)
            for row in reader:
                event = FundingEvent(
                    timestamp=float(row['timestamp']),
                    symbol=self.symbol,
                    rate=float(row['rate']),
                    mark_price=float(row['mark_price']),
                    next_funding_time=float(row['next_funding_time']),
                    predicted_rate=float(row.get('predicted_rate', 0.0))
                )
                yield event

```bash

- --

### 2.3 Strategy 回调机制(4 天)

#### 2.3.1 Strategy 基类扩展

- *文件**: `backtrader/strategy.py`

```python
class StrategyBase(LineIterator):
    """扩展 Strategy 支持 Channel 回调"""

    def __init__(self):
        super().__init__()
        self._channels = {}  # {(channel_type, symbol): channel}
        self._bar_by_name = {}  # {data_name: data}

# === 新增 Channel 回调 ===

    def on_tick(self, channel, tick):
        """Tick 到达时调用

        Args:
            channel: TickChannel 实例
            tick: TickEvent 实例
        """
        pass

    def on_orderbook(self, channel, orderbook):
        """OrderBook 更新时调用

        Args:
            channel: OrderBookChannel 实例
            orderbook: OrderBookSnapshot 实例
        """
        pass

    def on_bar(self, data, bar):
        """Bar 到达时调用(TICK/MIXED 模式)

        Args:
            data: DataBase 实例
            bar: 当前 bar 数据

        注意:

        - 每个 bar_close 事件触发一次
        - 与 on_tick/on_orderbook 处于同一时间序列
        - next()在主时钟 bar_close 后触发

        """
        pass

    def on_funding(self, channel, funding):
        """FundingRate 更新时调用

        Args:
            channel: FundingRateChannel 实例
            funding: FundingEvent 实例
        """
        pass

    def on_channel(self, channel, event):
        """通用 Channel 回调(自定义类型)

        Args:
            channel: DataChannel 实例
            event: 自定义事件
        """
        pass

    def _dispatch_event(self, event):
        """分发事件到对应回调

        Args:
            event: Event 实例(来自 StreamingEventQueue)
        """
        channel = self._channels.get(
            (event.channel_type, event.channel_name)
        )

        if channel is None:
            return

# 根据 channel 类型调用对应回调
        if event.channel_type == 'tick':
            self.on_tick(channel, event.data)
        elif event.channel_type == 'orderbook':
            self.on_orderbook(channel, event.data)
        elif event.channel_type == 'funding_rate':
            self.on_funding(channel, event.data)
        else:
            self.on_channel(channel, event.data)

```bash

- *测试**: `tests/phase1/test_strategy_callbacks.py`

```python
class TestStrategy(bt.Strategy):
    def __init__(self):
        self.tick_count = 0
        self.ob_count = 0
        self.funding_count = 0
        self.bar_count = 0

    def on_tick(self, channel, tick):
        self.tick_count += 1

    def on_orderbook(self, channel, orderbook):
        self.ob_count += 1

    def on_funding(self, channel, funding):
        self.funding_count += 1

    def on_bar(self, data, bar):
        self.bar_count += 1

def test_strategy_callbacks():
    """测试策略回调触发"""
    cerebro = bt.Cerebro()

# 添加 Channels
    cerebro.add_channel(TickChannel, symbol='BTC/USDT', dataname='...')
    cerebro.add_channel(OrderBookChannel, symbol='BTC/USDT', dataname='...')
    cerebro.add_channel(FundingRateChannel, symbol='BTC/USDT', dataname='...')

# 添加策略
    cerebro.addstrategy(TestStrategy)

# 运行
    results = cerebro.run(mode=bt.RunMode.TICK)
    strat = results[0]

# 验证回调被触发
    assert strat.tick_count > 0
    assert strat.ob_count > 0
    assert strat.funding_count > 0

```bash

- --

### 2.4 优先级通知队列(3 天)

- *文件**: `backtrader/cerebro.py`

```python
from enum import IntEnum
from dataclasses import dataclass, field
import heapq

class NotificationPriority(IntEnum):
    """通知优先级"""
    REJECTED = 10      # 拒单最先
    CANCELLED = 20     # 撤单
    MARGIN_CALL = 30   # 保证金不足
    PARTIAL = 40       # 部分成交
    COMPLETED = 50     # 完全成交
    SUBMITTED = 60     # 已提交

@dataclass(order=True)
class Notification:
    """优先级通知"""
    priority: int
    sequence: int = field(compare=False)
    order: Any = field(compare=False)
    timestamp: float = field(compare=False)

class Cerebro:
    """增加优先级通知队列"""

    def __init__(self, *args, **kwargs):

# ... 现有初始化 ...
        self._notification_queue = []
        self._notification_sequence = 0

    def _get_notification_priority(self, order) -> int:
        """确定通知优先级"""
        if order.status == Order.Rejected:
            return NotificationPriority.REJECTED
        elif order.status == Order.Cancelled:
            return NotificationPriority.CANCELLED
        elif order.status == Order.Margin:
            return NotificationPriority.MARGIN_CALL
        elif order.status == Order.Partial:
            return NotificationPriority.PARTIAL
        elif order.status == Order.Completed:
            return NotificationPriority.COMPLETED
        else:
            return NotificationPriority.SUBMITTED

    def _deliver_notifications(self, runstrats):
        """按优先级分发通知"""
        notifications = []

# 收集所有通知
        while True:
            order = self._broker.get_notification()
            if order is None:
                break

            priority = self._get_notification_priority(order)

            heapq.heappush(notifications, Notification(
                priority=priority,
                sequence=self._notification_sequence,
                order=order,
                timestamp=self._current_timestamp
            ))
            self._notification_sequence += 1

# 按优先级排序分发
        while notifications:
            notif = heapq.heappop(notifications)
            owner = notif.order.owner or runstrats[0]
            owner._addnotification(notif.order, quicknotify=True)

```bash

- *测试**: `tests/phase1/test_notification_priority.py`

```python
def test_notification_priority_order():
    """测试通知优先级顺序"""

    class PriorityTestStrategy(bt.Strategy):
        def __init__(self):
            self.notifications = []

        def notify_order(self, order):
            self.notifications.append({
                'status': order.status,
                'ref': order.ref
            })

    cerebro = bt.Cerebro()

# ... 设置导致多种订单状态的场景 ...

    results = cerebro.run()
    strat = results[0]

# 验证通知顺序:拒单 -> 撤单 -> 保证金 -> 部分成交 -> 完全成交

# ...

```bash

- --

### 2.5 Channel 共享模式(4 天)

- *文件**: `backtrader/channel.py`

```python
from enum import Enum

class ChannelSharingMode(Enum):
    """Channel 共享模式"""
    EXCLUSIVE = 'exclusive'          # 每个策略独立 Channel
    SHARED_READONLY = 'shared_ro'    # 共享只读 Channel
    SHARED_ISOLATED = 'shared_isolated'  # 共享数据,隔离状态
    SHARED_FULL = 'shared_full'      # 完全共享(含状态)

class DataChannel:
    """增强的 Channel 基类"""

    def __init__(self, symbol, sharing_mode=ChannelSharingMode.SHARED_READONLY, **kwargs):
        self.symbol = symbol
        self.sharing_mode = sharing_mode
        self._buffer = deque(maxlen=kwargs.get('maxlen', 10000))
        self._event_count = 0

# 策略隔离状态
        self._strategy_states = {}  # {strategy_id: state_dict}

    def get_state(self, strategy_id):
        """获取策略专属状态"""
        if self.sharing_mode == ChannelSharingMode.SHARED_ISOLATED:
            if strategy_id not in self._strategy_states:
                self._strategy_states[strategy_id] = {}
            return self._strategy_states[strategy_id]
        else:
            return {}

    def push(self, event):
        """推送事件 - 只读模式检查"""
        if self.sharing_mode == ChannelSharingMode.SHARED_READONLY:

# 只读模式:只能读取,不能修改
            raise RuntimeError("Cannot push to read-only shared channel")

# ... 验证和添加逻辑 ...

```bash

- *文件**: `backtrader/cerebro.py`

```python
class Cerebro:
    """优化的 Channel 初始化"""

    def add_channel(self, channel_cls, symbol=None,
                   sharing_mode=ChannelSharingMode.SHARED_READONLY, **kwargs):
        """注册数据通道 - 支持共享模式"""
        self._channels.append((channel_cls, symbol, kwargs, sharing_mode))
        return self

    def _init_channels(self, runstrats):
        """根据共享模式初始化 Channel"""
        for ch_cls, symbol, kwargs, mode in self._channels:
            if symbol is None and self.datas:
                symbol = getattr(self.datas[0], '_name', 'default')

            if mode == ChannelSharingMode.EXCLUSIVE:

# 每个策略独立实例
                for strat in runstrats:
                    ch = ch_cls(symbol=symbol, sharing_mode=mode, **kwargs)
                    self._register_channel_to_strategy(strat, ch)

            elif mode in (ChannelSharingMode.SHARED_READONLY,
                         ChannelSharingMode.SHARED_ISOLATED):

# 共享数据,隔离状态
                ch = ch_cls(symbol=symbol, sharing_mode=mode, **kwargs)
                for strat in runstrats:
                    self._register_channel_to_strategy(strat, ch)

            elif mode == ChannelSharingMode.SHARED_FULL:

# 完全共享(含状态)
                ch = ch_cls(symbol=symbol, sharing_mode=mode, **kwargs)
                for strat in runstrats:
                    self._register_channel_to_strategy(strat, ch)

    def _register_channel_to_strategy(self, strat, channel):
        """注册 Channel 到策略"""
        if not hasattr(strat, '_channels'):
            strat._channels = {}
        strat._channels[(channel.channel_type, channel.symbol)] = channel

```bash

- *测试**: `tests/phase1/test_channel_sharing.py`

```python
def test_exclusive_mode():
    """测试独占模式"""
    cerebro = bt.Cerebro()
    cerebro.add_channel(
        TickChannel,
        symbol='BTC/USDT',
        sharing_mode=ChannelSharingMode.EXCLUSIVE,
        dataname='...'
    )

    cerebro.addstrategy(Strategy1)
    cerebro.addstrategy(Strategy2)

    results = cerebro.run()

# 验证每个策略有独立的 Channel 实例
    ch1 = results[0]._channels[('tick', 'BTC/USDT')]
    ch2 = results[1]._channels[('tick', 'BTC/USDT')]
    assert ch1 is not ch2

def test_shared_isolated_mode():
    """测试共享隔离模式"""

# ... 验证共享数据但状态隔离 ...

```bash

- --

### 2.6 Cerebro 集成(5 天)

- *文件**: `backtrader/cerebro.py`

```python
class Cerebro:
    """完整的 Cerebro 集成"""

    def __init__(self, **kwargs):

# ... 现有初始化 ...
        self._channels = []           # [(cls, symbol, kwargs, sharing_mode)]
        self._channel_instances = {}  # {(type, symbol): channel}
        self._event_queue = None
        self._run_mode = None

    def add_channel(self, channel_cls, symbol=None,
                   sharing_mode=ChannelSharingMode.SHARED_READONLY, **kwargs):
        """注册数据通道"""
        self._channels.append((channel_cls, symbol, kwargs, sharing_mode))
        return self

    def run(self, mode=None, **kwargs):
        """运行回测 - 支持模式选择"""

# ... 现有逻辑 ...

# 模式推断
        if mode is None:
            self._run_mode = self._infer_run_mode()
        else:
            self._run_mode = mode

# 初始化 Channels
        self._init_channels(runstrats)

# 根据模式运行
        if self._run_mode == RunMode.BAR:
            return self._run_bar_mode(runstrats)
        elif self._run_mode == RunMode.TICK:
            return self._run_tick_mode(runstrats)
        elif self._run_mode == RunMode.MIXED:
            return self._run_mixed_mode(runstrats)

    def _infer_run_mode(self):
        """推断运行模式"""
        if not self._channels:
            return RunMode.BAR

# 如果有 Channel 但也有 datas,默认 MIXED
        if self.datas:
            return RunMode.MIXED

# 只有 Channel,无 datas,使用 TICK
        return RunMode.TICK

    def _init_event_queue(self, runstrats):
        """初始化事件队列"""

# 收集所有 Channel 实例
        all_channels = []
        for strat in runstrats:
            if hasattr(strat, '_channels'):
                all_channels.extend(strat._channels.values())

# 去重
        unique_channels = list(set(all_channels))

        self._event_queue = StreamingEventQueue(
            channels=unique_channels,
            bars=self.datas,
            preload_window=self.p.get('preload_window', 300.0),
            max_memory_mb=self.p.get('max_memory_mb', 200),
            adaptive=self.p.get('adaptive_window', True)
        )

# bar 名称映射
        self._bar_by_name = {
            getattr(d, '_name', f'data{i}'): d
            for i, d in enumerate(self.datas)
        }
        for strat in runstrats:
            strat._bar_by_name = self._bar_by_name

```bash

- --

## 3. 交付物

### 3.1 代码

- [ ] `backtrader/channels/orderbook.py` - OrderBookChannel
- [ ] `backtrader/channels/funding.py` - FundingRateChannel
- [ ] `backtrader/strategy.py` - Strategy 回调扩展
- [ ] `backtrader/cerebro.py` - 优先级通知 + Channel 集成
- [ ] `backtrader/channel.py` - Channel 共享模式

### 3.2 测试

- [ ] `tests/phase1/test_orderbook_channel.py`
- [ ] `tests/phase1/test_funding_channel.py`
- [ ] `tests/phase1/test_strategy_callbacks.py`
- [ ] `tests/phase1/test_notification_priority.py`
- [ ] `tests/phase1/test_channel_sharing.py`
- [ ] `tests/phase1/test_cerebro_integration.py`

### 3.3 文档

- [ ] Phase 1 完成报告
- [ ] API 文档更新
- [ ] 使用示例

- --

## 4. 验收标准

### 4.1 功能验收

- [ ] OrderBookChannel 正确加载和验证数据
- [ ] FundingRateChannel 正确加载和验证数据
- [ ] Strategy 回调正确触发
- [ ] 通知按优先级排序
- [ ] 4 种共享模式正常工作

### 4.2 集成验收

- [ ]  Channel 协作正常
- [ ] 事件顺序正确
- [ ] 通知机制无遗漏

### 4.3 回归验收

- [ ] 1020 个现有测试全部通过
- [ ] 向后兼容性 100%

- --

## 5. 时间表

| 任务 | 工作量 | 开始 | 结束 |

|------|--------|------|------|

| OrderBookChannel | 3  | Day 1 | Day 3 |

| FundingRateChannel | 2  | Day 4 | Day 5 |

| Strategy 回调机制 | 4  | Day 6 | Day 9 |

| 优先级通知队列 | 3  | Day 10 | Day 12 |

| Channel 共享模式 | 4  | Day 13 | Day 16 |

| Cerebro 集成 | 5  | Day 17 | Day 21 |

- --

## 6. 风险与应对

| 风险 | 概率 | 影响 | 应对措施 |

|------|------|------|---------|

| 回调机制复杂 |  |  | 详细设计+单元测试 |

| 共享模式 bug |  |  | 完善测试用例 |

| 回归问题 |  |  | 持续回归测试 |

- --

## 7. 下一步

Phase 1 完成后进入 Phase 2回测引擎与 Broker 实现