Phase 4: 桥接与实盘交易¶
周期: 2 周 | 优先级: 🟡 中 | 风险: 中
1. 目标¶
实现 Channel 到 LineSeries 的可选桥接,以及实盘 WebSocket 数据接入。
1.1 核心目标¶
✅ ChannelBridge 实现(可选)
✅ LiveEventQueue 实现
✅ CCXT WebSocket 集成
✅ 实盘数据验证
2. 实施内容¶
2.1 ChannelBridge 实现(4 天)¶
文件*:
backtrader/channels/bridge.py
from backtrader import LineSeries
class ChannelBridge(LineSeries):
"""Channel 到 LineSeries 的桥接(可选)
警告:
- 不支持 runonce 模式
- 性能开销大(200 倍)
- 仅用于需要指标系统的场景
"""
lines = ('value',)
params = (
('channel', None),
('field', 'price'), # tick.price, ob.mid_price 等
)
def __init__(self):
if self.p.channel is None:
raise ValueError("channel parameter required")
self._channel = self.p.channel
self._field = self.p.field
# 禁用 runonce
self._runonce = False
def _load(self):
"""从 Channel 加载数据到 LineSeries"""
# 获取最新事件
latest = self._channel.latest
if latest is None:
return False
# 提取字段值
value = self._extract_field(latest)
# 写入 line
self.lines.value[0] = value
return True
def _extract_field(self, event):
"""提取事件字段"""
if hasattr(event, self._field):
return getattr(event, self._field)
# 支持嵌套字段,如 'best_bid.price'
parts = self._field.split('.')
obj = event
for part in parts:
obj = getattr(obj, part)
return obj
# 使用示例
class BridgeStrategy(bt.Strategy):
def __init__(self):
# 创建桥接
self.tick_price = ChannelBridge(
channel=self._channels[('tick', 'BTC/USDT')],
field='price'
)
# 可以使用指标
self.sma = bt.indicators.SMA(self.tick_price, period=20)
def next(self):
print(f"Tick Price: {self.tick_price[0]}, SMA: {self.sma[0]}")
```bash
- *测试**: `tests/phase4/test_channel_bridge.py`
```python
def test_channel_bridge_basic():
"""测试基本桥接功能"""
channel = TickChannel('BTC/USDT')
bridge = ChannelBridge(channel=channel, field='price')
# 推送 tick
channel.push(TickEvent(timestamp=100.0, price=50000, volume=1.0, direction='buy'))
# 加载到 bridge
bridge._load()
assert bridge.lines.value[0] == 50000
def test_bridge_with_indicator():
"""测试桥接与指标"""
cerebro = bt.Cerebro()
cerebro.add_channel(TickChannel, symbol='BTC/USDT', dataname='...')
class BridgeTestStrategy(bt.Strategy):
def __init__(self):
self.bridge = ChannelBridge(
channel=self._channels[('tick', 'BTC/USDT')],
field='price'
)
self.sma = bt.indicators.SMA(self.bridge, period=5)
cerebro.addstrategy(BridgeTestStrategy)
results = cerebro.run(mode=bt.RunMode.TICK)
# 验证 SMA 计算正确
# ...
```bash
- --
### 2.2 LiveEventQueue 实现(3 天)
- *文件**: `backtrader/channels/live_queue.py`
```python
import queue
import threading
from backtrader.channel import Event, EventPriority
class LiveEventQueue:
"""实盘事件队列 - 实时数据"""
def __init__(self, max_queue_size=10000):
self._queue = queue.PriorityQueue(maxsize=max_queue_size)
self._sequence = 0
self._lock = threading.Lock()
self._stopped = False
def push(self, event):
"""推送事件(线程安全)"""
with self._lock:
if self._stopped:
return
# 包装为优先级事件
priority_event = (
event.timestamp,
event.priority,
self._sequence,
event
)
self._sequence += 1
try:
self._queue.put(priority_event, block=False)
except queue.Full:
# 队列满,丢弃最旧事件
self._queue.get()
self._queue.put(priority_event)
def pop(self, timeout=1.0):
"""弹出事件(阻塞)"""
try:
_, _, _, event = self._queue.get(timeout=timeout)
return event
except queue.Empty:
return None
def stop(self):
"""停止队列"""
self._stopped = True
```bash
- --
### 2.3 CCXT WebSocket 集成(5 天)
- *文件**: `backtrader/feeds/ccxt_live_tick.py`
```python
import ccxt.pro as ccxtpro
import asyncio
from backtrader.channels.tick import TickChannel, TickEvent
from backtrader.channels.live_queue import LiveEventQueue
class CCXTLiveTickFeed:
"""CCXT 实盘 Tick 数据源"""
def __init__(self, exchange_id, symbol, event_queue):
self.exchange_id = exchange_id
self.symbol = symbol
self.event_queue = event_queue
self.exchange = None
self._running = False
async def start(self):
"""启动 WebSocket 连接"""
exchange_class = getattr(ccxtpro, self.exchange_id)
self.exchange = exchange_class()
self._running = True
try:
while self._running:
trades = await self.exchange.watch_trades(self.symbol)
for trade in trades:
event = TickEvent(
timestamp=trade['timestamp'] / 1000.0,
price=trade['price'],
volume=trade['amount'],
direction='buy' if trade['side'] == 'buy' else 'sell',
trade_id=trade['id'],
symbol=self.symbol
)
# 推送到事件队列
self.event_queue.push(Event(
timestamp=event.timestamp,
priority=EventPriority.TICK,
channel_type='tick',
channel_name=self.symbol,
data=event
))
finally:
await self.exchange.close()
def stop(self):
"""停止 WebSocket"""
self._running = False
# 使用示例
def run_live_trading():
cerebro = bt.Cerebro()
# 创建实盘事件队列
live_queue = LiveEventQueue()
# 启动 CCXT WebSocket
feed = CCXTLiveTickFeed('binance', 'BTC/USDT', live_queue)
# 在后台线程运行 WebSocket
def run_ws():
asyncio.run(feed.start())
ws_thread = threading.Thread(target=run_ws, daemon=True)
ws_thread.start()
# 配置 Cerebro 使用实盘队列
cerebro._event_queue = live_queue
# 运行策略
cerebro.addstrategy(LiveStrategy)
cerebro.run(mode=bt.RunMode.TICK)
# 停止
feed.stop()
```bash
- *测试**: `tests/phase4/test_live_trading.py`
```python
def test_live_event_queue():
"""测试实盘事件队列"""
queue = LiveEventQueue()
# 推送事件
event1 = Event(timestamp=100.0, priority=30, channel_type='tick')
event2 = Event(timestamp=100.0, priority=20, channel_type='orderbook')
queue.push(event1)
queue.push(event2)
# 弹出事件(按优先级)
e1 = queue.pop(timeout=1.0)
assert e1.priority == 20
e2 = queue.pop(timeout=1.0)
assert e2.priority == 30
def test_ccxt_websocket_integration():
"""测试 CCXT WebSocket 集成(模拟)"""
# 使用 mock 避免真实连接
# ...
```bash
- --
### 2.4 实盘数据验证(2 天)
- *文件**: `backtrader/channels/live_validator.py`
```python
class LiveDataValidator:
"""实盘数据验证器"""
def __init__(self):
self._last_timestamps = {}
self._anomaly_count = {}
def validate(self, event):
"""验证实盘数据"""
key = (event.channel_type, event.channel_name)
# 1. 时间戳检查
last_ts = self._last_timestamps.get(key, 0)
if event.timestamp < last_ts:
self._record_anomaly(key, 'out_of_order')
return False
# 2. 时间跳跃检查(>1 小时视为异常)
if last_ts > 0 and event.timestamp - last_ts > 3600:
self._record_anomaly(key, 'time_jump')
# 警告但不拒绝
# 3. 数据合理性检查
if event.channel_type == 'tick':
if event.data.price <= 0 or event.data.volume < 0:
self._record_anomaly(key, 'invalid_data')
return False
self._last_timestamps[key] = event.timestamp
return True
def _record_anomaly(self, key, anomaly_type):
"""记录异常"""
if key not in self._anomaly_count:
self._anomaly_count[key] = {}
self._anomaly_count[key][anomaly_type] = \
self._anomaly_count[key].get(anomaly_type, 0) + 1
def get_anomaly_report(self):
"""获取异常报告"""
return self._anomaly_count
```bash
- --
## 3. 交付物
### 3.1 代码
- [ ] `backtrader/channels/bridge.py` - ChannelBridge
- [ ] `backtrader/channels/live_queue.py` - LiveEventQueue
- [ ] `backtrader/feeds/ccxt_live_tick.py` - CCXT 集成
- [ ] `backtrader/channels/live_validator.py` - 实盘验证
### 3.2 测试
- [ ] `tests/phase4/test_channel_bridge.py`
- [ ] `tests/phase4/test_live_trading.py`
- [ ] `tests/phase4/test_live_validator.py`
### 3.3 文档
- [ ] Phase 4 完成报告
- [ ] 实盘交易指南
- [ ] ChannelBridge 使用说明
- --
## 4. 验收标准
### 4.1 功能验收
- [ ] ChannelBridge 正常工作
- [ ] LiveEventQueue 线程安全
- [ ] CCXT WebSocket 稳定连接
- [ ] 实盘数据验证有效
### 4.2 实盘验证
- [ ] WebSocket 连接稳定(>1 小时)
- [ ] 数据延迟 < 100ms
- [ ] 异常处理正确
- [ ] 自动重连正常
### 4.3 回归验证
- [ ] 回归测试 100%通过
- [ ] 向后兼容性保持
- --
## 5. 时间表
| 任务 | 工作量 |
|------|--------|
| ChannelBridge | 4 天 |
| LiveEventQueue | 3 天 |
| CCXT 集成 | 5 天 |
| 实盘验证 | 2 天 |
- *总计**: 14 天(2 周)
- --
## 6. 风险与应对
| 风险 | 概率 | 影响 | 应对措施 |
|------|------|------|---------|
| WebSocket 不稳定 | 中 | 高 | 自动重连+心跳检测 |
| 数据延迟过高 | 中 | 中 | 监控+告警 |
| Bridge 性能问题 | 低 | 中 | 文档说明限制 |
- --
## 7. 下一步
Phase 4 完成后,进入 Phase 5:文档与示例。