背景¶
backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。
任务¶
阅读研究分析 backtrader 这个项目的源代码,了解这个项目。
阅读研究分析/Users/yunjinqi/Documents/量化交易框架/aat
借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议
写需规文档和设计文档放到这个文档的最下面,方便后续借鉴
aat 项目简介¶
aat (Asynchronous Algorithmic Trading) 是一个异步算法交易框架,具有以下核心特点:
异步架构: 基于 asyncio 的异步设计
C++核心: 性能关键部分使用 C++实现
多交易所: 支持多个交易所和市场
WebSocket: 原生 WebSocket 支持
策略类型: 支持多种策略类型
风险管理: 内置风险管理模块
重点借鉴方向¶
异步设计: asyncio 异步架构模式
C++加速: 性能关键模块的 C++实现
Exchange 抽象: 交易所统一抽象
OrderBook: 订单簿管理
StrategyManager: 策略管理器设计
RiskManager: 风险管理模块
架构对比分析¶
Backtrader 核心特点¶
优势:*
成熟的 Line 系统: 基于循环缓冲区的高效时间序列数据管理
完整的回测引擎: Cerebro 统一管理策略、数据、经纪商、分析器
丰富的技术指标: 60+内置技术指标
性能优化: 支持向量化(once 模式)和事件驱动(next 模式)双执行模式
Cython 加速: 关键路径使用 Cython 优化
多市场支持: 支持股票、期货、加密货币等多种市场
Python 优先: 纯 Python 实现,易于扩展和定制
局限:*
同步架构: 主要是同步设计,异步支持有限
事件驱动简单: 基于定时器的 bar-by-bar 处理
实时交易支持弱: 虽支持 live 模式,但设计主要面向回测
订单簿简陋: 缺少完整的订单簿模拟
交易所抽象不统一: 各交易所实现差异大
风险管理简单: 缺少系统化的风险管理引擎
策略模式分离: 回测和实盘策略需要不同代码
AAT 核心特点¶
优势:*
完全异步架构: 基于 asyncio 的真正异步设计
事件驱动: 完整的事件系统,支持多种事件类型
统一策略接口: 同一策略代码可用于回测和实盘
四引擎架构: Trading/Risk/Execution/Backtest 分离
完整订单簿: 支持复杂订单类型和订单标志
交易所抽象: 统一的_MarketData 和_OrderEntry 接口
WebSocket 原生支持: 内置 WebSocket 实时数据流
管理器模式: StrategyManager/OrderManager/PortfolioManager/RiskManager
uvloop 优化: 使用 uvloop 加速事件循环
局限:*
生态较小: 相比 backtrader 社区规模小
学习曲线: 异步编程和事件驱动模式需要学习
文档较少: 英文文档为主,示例有限
指标库少: 技术指标不如 backtrader 丰富
需求规格文档¶
1. 异步事件驱动架构 (优先级: 高)¶
需求描述:*
引入基于 asyncio 的异步事件驱动架构,支持并发处理多个数据源和策略。
功能需求:*
异步事件循环: 基于 asyncio 的事件循环
事件类型定义: TRADE/OPEN/CANCEL/CHANGE/FILL/DATA 等事件
异步策略回调: onTrade/onOrder/onFill 等异步方法
事件多路复用: 合并多个数据源的事件流
uvloop 支持: 可选使用 uvloop 加速
非功能需求:*
保持与现有同步 API 兼容
支持混合模式(部分策略异步,部分同步)
性能不低于现有实现
2. 统一交易所抽象 (优先级: 高)¶
需求描述:*
定义统一的交易所抽象接口,分离行情数据接口和交易接口。
功能需求:*
_MarketData 接口: 行情订阅、tick 数据、订单簿查询
_OrderEntry 接口: 账户查询、下单、撤单
Exchange 基类: 组合上述两个接口
多交易所支持: 统一接口支持多个交易所
WebSocket 集成: 原生支持 WebSocket 连接
非功能需求:*
接口设计简洁易用
支持交易所特定功能扩展
3. 完整订单簿系统 (优先级: 中)¶
需求描述:*
实现完整的订单簿模拟,支持复杂订单类型和订单标志。
功能需求:*
订单类型: 市价单、限价单、止损市价单、止损限价单
订单标志: FOK (Fill-or-Kill)、AON (All-or-None)、IOC (Immediate-or-Cancel)
价格水平: 按价格组织订单队列
订单撮合: 价格优先、时间优先原则
部分成交: 支持订单部分成交
订单修改: 支持订单修改和取消
非功能需求:*
O(1)价格水平访问
高效的订单插入和删除
可选 C++加速
4. 统一策略接口 (优先级: 中)¶
需求描述:*
实现统一的策略接口,同一策略代码可用于回测和实盘。
功能需求:*
统一 Strategy 基类: 回测和实盘共用
异步事件处理: onTrade/onOrder/onFill 回调
事件订阅: 订阅特定类型事件
订单管理: 统一的订单提交和管理接口
投资组合查询: 实时查询持仓和 P&L
非功能需求:*
零代码修改切换模式
行为一致性
5. 风险管理引擎 (优先级: 中)¶
需求描述:*
引入独立的风险管理引擎,在订单执行前进行风险检查。
功能需求:*
预交易检查: 订单提交前验证
持仓限制: 单品种/总持仓限制
资金限制: 单笔/总资金使用限制
风险规则: 可配置的风险规则
实时监控: 持续监控风险指标
订单拦截: 可修改或拒绝风险订单
非功能需求:*
不影响正常订单执行速度
规则配置灵活
6. 管理器模式重构 (优先级: 低)¶
需求描述:*
引入管理器模式,分离关注点。
功能需求:*
StrategyManager: 策略与引擎之间的协调器
OrderManager: 订单生命周期管理
PortfolioManager: 持仓和 P&L 跟踪
RiskManager: 风险检查和监控
EventManager: 事件分发和订阅
非功能需求:*
保持 API 兼容性
渐进式重构
设计文档¶
1. 异步事件驱动架构设计¶
1.1 架构设计¶
┌─────────────────────────────────────────────────────────────┐
│ EventLoop (asyncio) │
├─────────────────────────────────────────────────────────────┤
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Strategy 1 │ │ Strategy 2 │ │ Strategy N │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ StrategyManager │ │
│ │ - Event Multiplexing │ │
│ │ - Strategy Coordination │ │
│ └───────────────────┬─────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼─────────────────────────────────┐ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ TradingEngine │ │ │
│ │ ├─────────────┬─────────────┬─────────────────┤ │ │
│ │ │ RiskEngine │ExecEngine │ BacktestEngine │ │ │
│ │ └─────────────┴─────────────┴─────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼─────────────────────────────────┐ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Exchange (抽象层) │ │ │
│ │ ├───────────────────┬───────────────────────────┤ │ │
│ │ │ _MarketData │ _OrderEntry │ │ │
│ │ │ - subscribe() │ - newOrder() │ │ │
│ │ │ - tick() │ - cancelOrder() │ │ │
│ │ │ - book() │ - accounts() │ │ │
│ │ └───────────────────┴───────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```bash
#### 1.2 事件系统设计
```python
# backtrader/async_engine/event.py
from enum import Enum
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
class EventType(Enum):
"""事件类型枚举"""
HEARTBEAT = "heartbeat"
TRADE = "trade" # 交易事件
OPEN = "open" # 订单开启事件
CANCEL = "cancel" # 订单取消事件
CHANGE = "change" # 订单修改事件
FILL = "fill" # 订单成交事件
DATA = "data" # 市场数据事件
HALT = "halt" # 暂停事件
CONTINUE = "continue" # 继续事件
ERROR = "error" # 错误事件
START = "start" # 启动事件
EXIT = "exit" # 退出事件
@dataclass
class Event:
"""事件基类"""
type: EventType
timestamp: datetime
data: Any = None
target: Optional[str] = None # 目标策略 ID
exchange: Optional[str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
# 事件处理器类型
from typing import Callable, Awaitable
EventHandler = Callable[[Event], Awaitable[None]]
```bash
#### 1.3 异步策略基类设计
```python
# backtrader/async_strategy/async_base.py
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
class AsyncStrategy(ABC):
"""
异步策略基类
特点:
1. 同一代码可用于回测和实盘
2. 完全异步的事件处理
3. 统一的订单管理接口
"""
def __init__(self):
self._engine = None
self._context = None
self._event_queue = asyncio.Queue()
@property
def engine(self):
"""获取交易引擎引用"""
return self._engine
@property
def portfolio(self):
"""获取投资组合"""
return self._context.portfolio if self._context else None
# ========== 必须实现的方法 ==========
@abstractmethod
async def on_trade(self, event: Event) -> None:
"""
处理交易事件
Args:
event: 包含交易数据的事件
"""
raise NotImplementedError
# ========== 可选实现的方法 ==========
async def on_order(self, event: Event) -> None:
"""处理订单事件(OPEN/CANCEL/CHANGE/FILL)"""
pass
async def on_fill(self, event: Event) -> None:
"""处理成交事件"""
pass
async def on_data(self, event: Event) -> None:
"""处理市场数据事件"""
pass
async def on_error(self, event: Event) -> None:
"""处理错误事件"""
pass
async def on_start(self) -> None:
"""策略启动时的初始化"""
pass
async def on_stop(self) -> None:
"""策略停止时的清理"""
pass
# ========== 订单管理方法 ==========
async def buy(self, instrument: str, volume: float,
price: Optional[float] = None,
order_type: OrderType = OrderType.MARKET,
flags: OrderFlag = OrderFlag.NONE) -> Optional[Order]:
"""
发送买入订单
Args:
instrument: 交易品种
volume: 交易数量
price: 限价单价格(市价单可不传)
order_type: 订单类型
flags: 订单标志
Returns:
创建的订单对象
"""
order = Order(
instrument=instrument,
side=OrderSide.BUY,
volume=volume,
price=price,
order_type=order_type,
flags=flags
)
return await self._engine.submit_order(order, self)
async def sell(self, instrument: str, volume: float,
price: Optional[float] = None,
order_type: OrderType = OrderType.MARKET,
flags: OrderFlag = OrderFlag.NONE) -> Optional[Order]:
"""发送卖出订单"""
order = Order(
instrument=instrument,
side=OrderSide.SELL,
volume=volume,
price=price,
order_type=order_type,
flags=flags
)
return await self._engine.submit_order(order, self)
async def cancel_order(self, order: Order) -> bool:
"""取消订单"""
return await self._engine.cancel_order(order)
async def cancel_all(self, instrument: Optional[str] = None) -> int:
"""取消所有订单"""
return await self._engine.cancel_all_orders(self, instrument)
# ========== 查询方法 ==========
def get_position(self, instrument: str) -> Optional[Position]:
"""获取持仓"""
if self.portfolio:
return self.portfolio.get_position(instrument)
return None
def get_cash(self) -> float:
"""获取可用资金"""
if self.portfolio:
return self.portfolio.cash
return 0.0
def get_value(self) -> float:
"""获取总资产"""
if self.portfolio:
return self.portfolio.total_value
return 0.0
```bash
### 2. 统一交易所抽象设计
#### 2.1 接口定义
```python
# backtrader/exchange/base.py
from abc import ABC, abstractmethod
from typing import List, AsyncIterator, Optional
class _MarketData(ABC):
"""
行情数据接口
定义获取市场数据的抽象接口
"""
@abstractmethod
async def connect(self) -> None:
"""建立连接"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""断开连接"""
pass
@abstractmethod
def instruments(self) -> List[str]:
"""获取支持的交易品种列表"""
pass
@abstractmethod
async def subscribe(self, instrument: str) -> None:
"""订阅特定品种的数据"""
pass
@abstractmethod
async def unsubscribe(self, instrument: str) -> None:
"""取消订阅"""
pass
@abstractmethod
async def tick(self) -> AsyncIterator[Event]:
"""
获取实时数据流
Yields:
市场数据事件
"""
pass
@abstractmethod
async def book(self, instrument: str) -> Optional[OrderBook]:
"""获取订单簿快照"""
pass
class _OrderEntry(ABC):
"""
订单接口
定义订单操作的抽象接口
"""
@abstractmethod
async def accounts(self) -> List[str]:
"""获取账户列表"""
pass
@abstractmethod
async def balance(self, account: str) -> dict:
"""获取账户余额"""
pass
@abstractmethod
async def new_order(self, order: Order) -> bool:
"""
提交新订单
Returns:
是否成功提交
"""
pass
@abstractmethod
async def cancel_order(self, order: Order) -> bool:
"""取消订单"""
pass
@abstractmethod
async def cancel_all(self, instrument: Optional[str] = None) -> int:
"""取消所有订单"""
pass
@abstractmethod
async def get_orders(self, instrument: Optional[str] = None) -> List[Order]:
"""获取活动订单列表"""
pass
class Exchange(_MarketData, _OrderEntry):
"""
交易所基类
组合行情数据和订单接口
"""
def __init__(self, name: str, config: dict = None):
self.name = name
self.config = config or {}
self._connected = False
@property
def is_connected(self) -> bool:
return self._connected
async def connect(self) -> None:
"""建立连接"""
await super().connect()
self._connected = True
async def disconnect(self) -> None:
"""断开连接"""
self._connected = False
await super().disconnect()
```bash
#### 2.2 具体交易所实现示例
```python
# backtrader/exchange/exchanges/ccxt_exchange.py
import ccxt.async_support as ccxt
class CCXTExchange(Exchange):
"""
基于 CCXT 的交易所实现
支持所有 CCXT 支持的交易所
"""
def __init__(self, exchange_id: str, config: dict = None):
super().__init__(exchange_id, config)
self._exchange_class = getattr(ccxt, exchange_id)
self._exchange: ccxt.Exchange = None
async def connect(self) -> None:
"""建立连接"""
self._exchange = self._exchange_class(self.config)
await self._exchange.load_markets()
await super().connect()
async def disconnect(self) -> None:
"""断开连接"""
if self._exchange:
await self._exchange.close()
await super().disconnect()
def instruments(self) -> List[str]:
"""获取支持的交易品种"""
if self._exchange:
return list(self._exchange.markets.keys())
return []
async def subscribe(self, instrument: str) -> None:
"""订阅特定品种"""
# CCXT 使用 polling 方式,不需要显式订阅
pass
async def tick(self) -> AsyncIterator[Event]:
"""获取实时数据流"""
while self._connected:
for instrument in self.instruments():
try:
ticker = await self._exchange.fetch_ticker(instrument)
yield Event(
type=EventType.DATA,
data={'ticker': ticker, 'instrument': instrument}
)
except Exception as e:
yield Event(
type=EventType.ERROR,
data={'error': str(e)}
)
await asyncio.sleep(1) # 轮询间隔
async def book(self, instrument: str) -> Optional[OrderBook]:
"""获取订单簿"""
if self._exchange:
orderbook = await self._exchange.fetch_order_book(instrument)
return OrderBook.from_ccxt(orderbook)
return None
async def accounts(self) -> List[str]:
"""获取账户列表"""
# CCXT 通常只有一个账户
return ['default']
async def balance(self, account: str) -> dict:
"""获取账户余额"""
if self._exchange:
balance = await self._exchange.fetch_balance()
return balance
return {}
async def new_order(self, order: Order) -> bool:
"""提交新订单"""
if self._exchange:
try:
await self._exchange.create_order(
order.instrument,
order.order_type.value.lower(),
order.side.value.lower(),
order.volume,
order.price
)
return True
except Exception:
return False
return False
async def cancel_order(self, order: Order) -> bool:
"""取消订单"""
if self._exchange and order.exchange_id:
try:
await self._exchange.cancel_order(order.exchange_id)
return True
except Exception:
return False
return False
async def cancel_all(self, instrument: Optional[str] = None) -> int:
"""取消所有订单"""
if self._exchange:
try:
await self._exchange.cancel_all_orders(symbol=instrument)
return 0 # CCXT 不返回取消数量
except Exception:
return -1
return 0
```bash
### 3. 订单簿系统设计
#### 3.1 订单类型和标志
```python
# backtrader/order/order.py
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
class OrderSide(Enum):
BUY = "buy"
SELL = "sell"
class OrderType(Enum):
MARKET = "market"
LIMIT = "limit"
STOP_MARKET = "stop_market"
STOP_LIMIT = "stop_limit"
class OrderFlag(Enum):
NONE = "none"
FILL_OR_KILL = "fok" # 全部成交或立即取消
ALL_OR_NONE = "aon" # 全部成交(可等待)
IMMEDIATE_OR_CANCEL = "ioc" # 立即成交可部分
class OrderStatus(Enum):
PENDING = "pending" # 待提交
OPEN = "open" # 已提交
FILLED = "filled" # 已成交
PARTIALLY_FILLED = "partially_filled" # 部分成交
CANCELLED = "cancelled" # 已取消
REJECTED = "rejected" # 被拒绝
@dataclass
class Order:
"""订单对象"""
id: str # 订单 ID
instrument: str # 交易品种
side: OrderSide # 买卖方向
volume: float # 数量
price: Optional[float] # 价格(限价单)
stop_price: Optional[float] = None # 止损价
order_type: OrderType = OrderType.MARKET
flags: OrderFlag = OrderFlag.NONE
status: OrderStatus = OrderStatus.PENDING
filled_volume: float = 0 # 已成交数量
exchange_id: Optional[str] = None # 交易所订单 ID
created_at: datetime = None
updated_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
if self.updated_at is None:
self.updated_at = datetime.now()
@property
def remaining_volume(self) -> float:
"""剩余数量"""
return self.volume - self.filled_volume
@property
def is_filled(self) -> bool:
"""是否已完全成交"""
return self.filled_volume >= self.volume
@property
def is_active(self) -> bool:
"""是否活动"""
return self.status in (OrderStatus.OPEN, OrderStatus.PARTIALLY_FILLED)
def fill(self, volume: float, price: float) -> None:
"""成交"""
fill_volume = min(volume, self.remaining_volume)
self.filled_volume += fill_volume
self.updated_at = datetime.now()
if self.filled_volume >= self.volume:
self.status = OrderStatus.FILLED
else:
self.status = OrderStatus.PARTIALLY_FILLED
```bash
#### 3.2 订单簿实现
```python
# backtrader/order/order_book.py
from collections import defaultdict, deque
from typing import Optional, List
from sortedcontainers import SortedDict
class PriceLevel:
"""
价格水平
同一价格的所有订单按时间顺序排队
"""
def __init__(self, price: float):
self.price = price
self.orders = deque() # FIFO 队列
self.total_volume = 0.0
def add(self, order: Order) -> None:
"""添加订单"""
self.orders.append(order)
self.total_volume += order.volume
def remove(self, order: Order) -> None:
"""移除订单"""
if order in self.orders:
self.orders.remove(order)
self.total_volume -= order.volume
def match(self, volume: float) -> tuple[List[Order], float]:
"""
撮合订单
Returns:
(成交订单列表, 剩余数量)
"""
filled_orders = []
remaining = volume
while remaining > 0 and self.orders:
order = self.orders[0]
if order.remaining_volume <= remaining:
# 完全成交
filled_orders.append(order)
remaining -= order.remaining_volume
self.orders.popleft()
else:
# 部分成交
order.fill(remaining, self.price)
filled_orders.append(order)
remaining = 0
self.total_volume = sum(o.remaining_volume for o in self.orders)
return filled_orders, remaining
def __len__(self):
return len(self.orders)
def __bool__(self):
return len(self.orders) > 0
class OrderBook:
"""
订单簿
支持市价单、限价单、止损单
"""
def __init__(self, instrument: str):
self.instrument = instrument
# 买单: 价格从高到低排序 (使用负数作为 key 实现降序)
self._buy_levels: SortedDict = SortedDict()
# 卖单: 价格从低到高排序
self._sell_levels: SortedDict = SortedDict()
# 止损买单
self._stop_buy_levels: SortedDict = SortedDict()
# 止损卖单
self._stop_sell_levels: SortedDict = SortedDict()
# 价格到价格水平的映射
self._price_levels: dict = {}
def add_order(self, order: Order) -> bool:
"""添加订单到订单簿"""
if order.order_type == OrderType.MARKET:
# 市价单不加入订单簿,立即撮合
return self._match_market(order)
elif order.order_type == OrderType.LIMIT:
return self._add_limit_order(order)
elif order.order_type == OrderType.STOP_MARKET:
return self._add_stop_order(order)
elif order.order_type == OrderType.STOP_LIMIT:
return self._add_stop_limit_order(order)
return False
def _add_limit_order(self, order: Order) -> bool:
"""添加限价单"""
levels = self._buy_levels if order.side == OrderSide.BUY else self._sell_levels
# 创建或获取价格水平
if order.price not in levels:
price_level = PriceLevel(order.price)
levels[order.price] = price_level
self._price_levels[order.price] = price_level
price_level = levels[order.price]
price_level.add(order)
# 尝试撮合
self._try_match(order)
return True
def _match_market(self, order: Order) -> bool:
"""撮合市价单"""
if order.side == OrderSide.BUY:
levels = self._sell_levels
else:
levels = self._buy_levels
remaining = order.volume
filled_orders = []
prices_to_remove = []
# 遍历价格水平
for price in list(levels.keys()):
if remaining <= 0:
break
price_level = levels[price]
orders, remaining = price_level.match(remaining)
filled_orders.extend(orders)
if not price_level:
prices_to_remove.append(price)
# 清理空价格水平
for price in prices_to_remove:
del levels[price]
del self._price_levels[price]
return len(filled_orders) > 0 or remaining == order.volume
def _try_match(self, order: Order) -> None:
"""尝试撮合限价单"""
if order.side == OrderSide.BUY:
# 买单与卖单撮合
while order.remaining_volume > 0 and self._sell_levels:
best_ask = next(iter(self._sell_levels)) # 最低卖价
if best_ask > order.price:
break # 价格不匹配
price_level = self._sell_levels[best_ask]
orders, remaining = price_level.match(order.remaining_volume)
for o in orders:
o.fill(min(o.remaining_volume, order.remaining_volume), best_ask)
order.fill(o.filled_volume, best_ask)
if not price_level:
del self._sell_levels[best_ask]
else:
# 卖单与买单撮合
while order.remaining_volume > 0 and self._buy_levels:
best_bid = next(iter(self._buy_levels)) # 最高买价
if best_bid < order.price:
break # 价格不匹配
price_level = self._buy_levels[best_bid]
orders, remaining = price_level.match(order.remaining_volume)
for o in orders:
o.fill(min(o.remaining_volume, order.remaining_volume), best_bid)
order.fill(o.filled_volume, best_bid)
if not price_level:
del self._buy_levels[best_bid]
def cancel_order(self, order: Order) -> bool:
"""取消订单"""
# 找到订单所在的价格水平
if order.price and order.price in self._price_levels:
price_level = self._price_levels[order.price]
price_level.remove(order)
if not price_level:
# 清理空价格水平
if order.side == OrderSide.BUY:
del self._buy_levels[order.price]
else:
del self._sell_levels[order.price]
del self._price_levels[order.price]
return True
return False
@property
def best_bid(self) -> Optional[float]:
"""最优买价"""
if self._buy_levels:
return next(iter(self._buy_levels))
return None
@property
def best_ask(self) -> Optional[float]:
"""最优卖价"""
if self._sell_levels:
return next(iter(self._sell_levels))
return None
@property
def spread(self) -> Optional[float]:
"""买卖价差"""
if self.best_bid and self.best_ask:
return self.best_ask - self.best_bid
return None
def get_depth(self, levels: int = 5) -> dict:
"""
获取订单簿深度
Returns:
{'bids': [(price, volume), ...], 'asks': [(price, volume), ...]}
"""
bids = []
asks = []
for i, (price, level) in enumerate(self._buy_levels.items()):
if i >= levels:
break
bids.append((price, level.total_volume))
for i, (price, level) in enumerate(self._sell_levels.items()):
if i >= levels:
break
asks.append((price, level.total_volume))
return {'bids': bids, 'asks': asks}
```bash
### 4. 风险管理引擎设计
```python
# backtrader/risk/risk_manager.py
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class RiskRule:
"""风险规则"""
name: str
max_position_size: Optional[float] = None # 单品种最大持仓
max_total_position: Optional[float] = None # 总持仓限制
max_order_size: Optional[float] = None # 单笔最大订单
max_daily_loss: Optional[float] = None # 日最大亏损
max_drawdown: Optional[float] = None # 最大回撤
class RiskManager:
"""
风险管理引擎
在订单执行前进行风险检查
"""
def __init__(self, portfolio=None):
self.portfolio = portfolio
self.rules: dict = {} # {strategy_id: RiskRule}
self._daily_pnl: float = 0
self._peak_value: float = 0
def set_rule(self, strategy_id: str, rule: RiskRule) -> None:
"""设置策略的风险规则"""
self.rules[strategy_id] = rule
def get_rule(self, strategy_id: str) -> RiskRule:
"""获取策略的风险规则"""
return self.rules.get(strategy_id, RiskRule(name="default"))
async def check_order(self, order: Order, strategy_id: str) -> tuple[bool, str]:
"""
检查订单是否符合风险规则
Returns:
(是否通过, 拒绝原因)
"""
rule = self.get_rule(strategy_id)
# 检查单笔订单大小
if rule.max_order_size and order.volume > rule.max_order_size:
return False, f"订单超过最大限制: {order.volume} > {rule.max_order_size}"
# 检查持仓限制
if rule.max_position_size and self.portfolio:
current_position = self.portfolio.get_position(order.instrument)
new_size = abs(current_position.size + order.volume) if current_position else order.volume
if new_size > rule.max_position_size:
return False, f"持仓超过限制: {new_size} > {rule.max_position_size}"
# 检查总持仓
if rule.max_total_position and self.portfolio:
total_position = sum(abs(p.size) for p in self.portfolio.positions.values())
if total_position + order.volume > rule.max_total_position:
return False, f"总持仓超过限制: {total_position + order.volume} > {rule.max_total_position}"
# 检查日内亏损
if rule.max_daily_loss and self._daily_pnl < -rule.max_daily_loss:
return False, f"日内亏损超限: {self._daily_pnl} < -{rule.max_daily_loss}"
# 检查最大回撤
if rule.max_drawdown and self.portfolio:
current_value = self.portfolio.total_value
if current_value > self._peak_value:
self._peak_value = current_value
drawdown = (self._peak_value - current_value) / self._peak_value
if drawdown > rule.max_drawdown:
return False, f"回撤超限: {drawdown:.2%} > {rule.max_drawdown:.2%}"
return True, ""
async def modify_order(self, order: Order, strategy_id: str) -> Order:
"""
修改订单以符合风险规则
Returns:
修改后的订单
"""
rule = self.get_rule(strategy_id)
modified = Order(**order.__dict__)
# 调整订单大小
if rule.max_order_size and modified.volume > rule.max_order_size:
modified.volume = rule.max_order_size
# 调整持仓
if rule.max_position_size and self.portfolio:
current = self.portfolio.get_position(modified.instrument)
if current:
max_add = rule.max_position_size - abs(current.size)
if modified.volume > max_add:
modified.volume = max_add
return modified
def update_pnl(self, pnl: float) -> None:
"""更新盈亏"""
self._daily_pnl += pnl
def reset_daily(self) -> None:
"""重置日内统计"""
self._daily_pnl = 0
```bash
### 5. 交易引擎设计
```python
# backtrader/async_engine/engine.py
import asyncio
from typing import List, Optional, AsyncIterator
from aiostream import stream
class TradingEngine:
"""
交易引擎
协调策略、风险管理、执行和交易所
"""
def __init__(self):
self.strategies: List[AsyncStrategy] = []
self.exchanges: List[Exchange] = []
self.risk_manager: RiskManager = RiskManager()
self.portfolio_manager = PortfolioManager()
self._running = False
def add_strategy(self, strategy: AsyncStrategy) -> None:
"""添加策略"""
strategy._engine = self
strategy._context = StrategyContext(
portfolio=self.portfolio_manager.create_portfolio(strategy)
)
self.strategies.append(strategy)
def add_exchange(self, exchange: Exchange) -> None:
"""添加交易所"""
self.exchanges.append(exchange)
async def run(self) -> None:
"""启动交易引擎"""
self._running = True
# 启动所有策略
for strategy in self.strategies:
await strategy.on_start()
# 连接所有交易所
for exchange in self.exchanges:
await exchange.connect()
# 创建事件流
exchange_streams = [exchange.tick() for exchange in self.exchanges]
# 主事件循环
async with stream.merge(*exchange_streams).stream() as stream:
async for event in stream:
if not self._running:
break
# 分发事件到所有策略
await self._dispatch_event(event)
# 清理
for exchange in self.exchanges:
await exchange.disconnect()
for strategy in self.strategies:
await strategy.on_stop()
async def _dispatch_event(self, event: Event) -> None:
"""分发事件到策略"""
for strategy in self.strategies:
# 如果事件有指定目标,只发送给目标策略
if event.target and strategy.id != event.target:
continue
# 根据事件类型调用相应方法
if event.type == EventType.TRADE:
await strategy.on_trade(event)
elif event.type == EventType.FILL:
await strategy.on_fill(event)
self.portfolio_manager.update_position(event.data)
elif event.type in (EventType.OPEN, EventType.CANCEL, EventType.CHANGE):
await strategy.on_order(event)
elif event.type == EventType.DATA:
await strategy.on_data(event)
elif event.type == EventType.ERROR:
await strategy.on_error(event)
async def submit_order(self, order: Order, strategy: AsyncStrategy) -> Optional[Order]:
"""提交订单"""
# 风险检查
passed, reason = await self.risk_manager.check_order(order, strategy.id)
if not passed:
# 发送拒绝事件
await strategy.on_order(Event(
type=EventType.ERROR,
data={'reason': reason, 'order': order}
))
return None
# 可能的订单修改
order = await self.risk_manager.modify_order(order, strategy.id)
# 提交到交易所
for exchange in self.exchanges:
if await exchange.new_order(order):
return order
return None
async def cancel_order(self, order: Order) -> bool:
"""取消订单"""
for exchange in self.exchanges:
if await exchange.cancel_order(order):
return True
return False
def stop(self) -> None:
"""停止引擎"""
self._running = False
```bash
### 6. 使用示例
#### 6.1 定义异步策略
```python
import backtrader as bt
class MyAsyncStrategy(bt.AsyncStrategy):
"""简单的移动平均异步策略"""
async def on_start(self):
"""初始化"""
self.fast_prices = []
self.slow_prices = []
print(f"策略 {self.id} 启动")
async def on_trade(self, event):
"""处理交易事件"""
data = event.data
price = data.get('price')
# 更新价格缓存
self.fast_prices.append(price)
self.slow_prices.append(price)
if len(self.fast_prices) > 10:
self.fast_prices.pop(0)
if len(self.slow_prices) > 30:
self.slow_prices.pop(0)
# 计算移动平均
if len(self.fast_prices) >= 10 and len(self.slow_prices) >= 30:
fast_ma = sum(self.fast_prices) / len(self.fast_prices)
slow_ma = sum(self.slow_prices) / len(self.slow_prices)
position = self.get_position(event.instrument)
# 金叉买入
if fast_ma > slow_ma and (not position or position.size <= 0):
await self.buy(
instrument=event.instrument,
volume=1.0
)
# 死叉卖出
elif fast_ma < slow_ma and position and position.size > 0:
await self.sell(
instrument=event.instrument,
volume=position.size
)
async def on_fill(self, event):
"""处理成交事件"""
print(f"订单成交: {event.data}")
async def on_error(self, event):
"""处理错误"""
print(f"错误: {event.data.get('reason')}")
```bash
#### 6.2 运行回测
```python
# 创建引擎
engine = TradingEngine()
# 添加策略
strategy = MyAsyncStrategy()
engine.add_strategy(strategy)
# 添加回测交易所
backtest_exchange = BacktestExchange(
datafeed=bt.feeds.PandasData(dataname=df)
)
engine.add_exchange(backtest_exchange)
# 设置风险规则
engine.risk_manager.set_rule(strategy.id, RiskRule(
name="my_rule",
max_position_size=100,
max_order_size=10,
max_daily_loss=1000
))
# 运行
await engine.run()
```bash
#### 6.3 运行实盘
```python
# 创建引擎
engine = TradingEngine()
# 添加策略 (使用相同的策略类!)
strategy = MyAsyncStrategy()
engine.add_strategy(strategy)
# 添加实盘交易所
exchange = CCXTExchange(
exchange_id='binance',
config={'apiKey': 'xxx', 'secret': 'yyy'}
)
engine.add_exchange(exchange)
# 运行 (其余代码相同)
await engine.run()
```bash
### 7. 实施路线图
#### 阶段 1: 基础异步框架 (2-3 周)
- [ ] 实现事件系统
- [ ] 实现异步策略基类
- [ ] 实现基础交易引擎
- [ ] 添加回测支持
#### 阶段 2: 交易所抽象 (2 周)
- [ ] 实现交易所接口
- [ ] 实现 CCXT 集成
- [ ] 实现 WebSocket 支持
- [ ] 添加回测交易所
#### 阶段 3: 订单簿系统 (2 周)
- [ ] 实现订单类型和标志
- [ ] 实现价格水平管理
- [ ] 实现订单撮合逻辑
- [ ] 添加订单簿查询
#### 阶段 4: 风险管理 (1 周)
- [ ] 实现风险规则
- [ ] 实现风险检查
- [ ] 实现订单修改
#### 阶段 5: 测试和文档 (2 周)
- [ ] 单元测试
- [ ] 集成测试
- [ ] 文档编写
- [ ] 示例代码
- --
## 附录: 关键文件路径
### Backtrader 关键文件
- `cerebro.py`: 核心引擎
- `strategy.py`: Strategy 基类
- `linebuffer.py`: Line 缓冲区
- `indicator.py`: Indicator 基类
- `broker.py`: Broker 基类
### AAT 关键文件
- `aat/strategy/strategy.py`: 策略基类
- `aat/exchange/exchange.py`: 交易所抽象
- `aat/core/order_book/order_book/order_book.py`: 订单簿
- `aat/engine/core/trading_engine.py`: 交易引擎
- `aat/engine/dispatch/risk/risk.py`: 风险管理
- `aat/config/enums.py`: 枚举定义