背景¶
backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。
任务¶
阅读研究分析 backtrader 这个项目的源代码,了解这个项目。
阅读研究分析/Users/yunjinqi/Documents/量化交易框架/PandoraTrader
借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议
写需规文档和设计文档放到这个文档的最下面,方便后续借鉴
PandoraTrader 项目简介¶
PandoraTrader 是一个 C++实现的高性能量化交易框架,具有以下核心特点:
C++实现: 高性能 C++实现
CTP 接口: 支持 CTP 期货交易接口
多策略: 支持多策略并行运行
风控模块: 内置风险控制模块
行情处理: 高效行情数据处理
订单管理: 完善的订单管理系统
重点借鉴方向¶
高性能: C++性能优化技术
CTP 集成: CTP 接口集成方式
多策略: 多策略管理架构
风控系统: 风险控制模块设计
行情处理: 高效行情处理
订单系统: 订单管理系统设计
框架对比分析¶
架构设计对比¶
| 维度 | backtrader | PandoraTrader |
|——|———–|—————|
| 实现语言| Python | C++ |
|定位| 中低频回测 | 高频交易 |
|接口支持| 多种 | CTP 为主 |
|多策略| Cerebro 多策略 | Agent 系统 |
|风控| 基础 | 多层风控 |
|行情处理| 回测时处理 | 实时异步处理 |
|订单管理| Broker 内置 | 独立订单系统 |
|性能优化| Cython 扩展 | 无锁+TBB |
backtrader 的优势¶
1.易用性: Python 语言,API 简洁直观
灵活性: 易于扩展和定制
社区支持: 大量第三方库和文档
快速开发: 开发效率高,原型验证快
跨平台: 纯 Python,无编译依赖
PandoraTrader 的优势¶
极致性能: C++实现,微秒级延迟
实时交易: 生产级实盘交易系统
风控完善: 多层风控保护机制
CTP 深度集成: 专为 CTP 接口优化
高频优化: 无锁数据结构、内存池等
多策略隔离: Agent 系统实现策略隔离
需求规格文档¶
需求 1: 高性能 Cython 扩展模块¶
需求描述*:
使用 Cython 编写关键路径的高性能扩展模块,提升 backtrader 的执行效率。
功能需求*:
指标计算加速: 将常用技术指标用 Cython 重写
数据处理优化: 优化数据预处理和转换
无锁数据结构: 使用原子操作实现线程安全
内存池: 实现对象池减少内存分配
编译优化: 支持 SSE/AVX 指令集优化
非功能需求*:
性能提升: 关键路径性能提升 5-10 倍
兼容性: 与纯 Python API 完全兼容
可选安装: 独立扩展包,不强制安装
需求 2: CTP 接口增强支持¶
需求描述*:
增强对 CTP 接口的支持,提供更专业的中国期货交易功能。
功能需求*:
CTP 接口适配: 完整的 CTP 行情和交易接口
合约映射: 自动映射 CTP 合约代码
席位管理: 支持多席位登录管理
行情订阅: 高效的行情订阅和过滤
报单类型: 支持 FOK、FAK 等 CTP 特有订单类型
非功能需求*:
稳定性: 保持长时间连接稳定
断线重连: 自动重连机制
日志记录: 完整的接口调用日志
需求 3: 多策略隔离系统¶
需求描述*:
实现策略级别的隔离机制,支持多策略并行运行。
功能需求*:
Agent 系统: 每个策略对应一个 Agent
持仓隔离: 各策略独立管理持仓和资金
订单隔离: 订单归属到对应策略
风险隔离: 单策略风险不影响其他策略
性能监控: 监控各策略资源使用
非功能需求*:
资源控制: 限制单策略资源占用
故障隔离: 策略异常不影响其他策略
需求 4: 风控系统模块¶
需求描述*:
实现多层次的风险控制模块,保护交易安全。
功能需求*:
撤单限制: 限制单位时间撤单次数
报单限制: 限制单位时间报单数量
订单速度限制: 控制下单频率
持仓限制: 实时监控和限制持仓
资金保护: 实时监控资金使用
非功能需求*:
实时性: 风控检查延迟<1ms
可配置: 所有风控参数可配置
需求 5: 高级行情处理¶
需求描述*:
实现高性能的实时行情处理机制。
功能需求*:
异步处理: 独立线程处理行情更新
双缓冲队列: 使用无锁队列缓存行情
数据过滤: 过滤无效和异常行情数据
行情分发: 高效分发到多个策略
增量更新: 只更新变化的行情数据
非功能需求*:
延迟控制: 行情处理延迟<100 微秒
吞吐量: 支持 10 万+tick/秒
需求 6: 订单管理系统增强¶
需求描述*:
增强订单管理功能,支持更复杂的订单场景。
功能需求*:
订单状态机: 完整的订单状态管理
订单队列: 支持订单队列和批量处理
条件单: 支持止损止盈等条件单
OCO 订单: 支持二选一订单
冰山订单: 支持大单拆分
非功能需求*:
订单可靠性: 订单不丢失不重复
状态一致性: 订单状态始终一致
设计文档¶
1. 高性能 Cython 扩展设计¶
1.1 核心数据结构优化¶
# backtrader/ext/cython/core.pyx
# cython: language_level=3
# distutils: language = c++
# distutils: include_dirs = /usr/local/include
from libc.stdint cimport int32_t, int64_t
from libc.stdlib cimport malloc, free
from cython.operator cimport dereference as deref
import numpy as np
cimport numpy as cnp
cdef class FastLineBuffer:
"""高性能行缓冲区
使用 Cython 实现,避免 Python 开销
"""
cdef:
double* _data # 数据指针
int32_t _size # 缓冲区大小
int32_t _len # 当前长度
int32_t _index # 当前索引
int32_t _minperiod # 最小周期
bint _malloced # 是否动态分配
def __cinit__(self, int32_t size=1024):
self._size = size
self._len = 0
self._index = 0
self._minperiod = 1
self._data = <double*>malloc(size * sizeof(double))
if self._data == NULL:
raise MemoryError("Failed to allocate buffer")
self._malloced = True
def __dealloc__(self):
if self._malloced and self._data != NULL:
free(self._data)
cdef inline void push(self, double value) nogil:
"""添加数据(无 GIL)"""
self._data[self._index] = value
self._index = (self._index + 1) % self._size
if self._len < self._size:
self._len += 1
cdef inline double get(self, int32_t index) nogil:
"""获取数据(无 GIL)"""
if index >= self._len or index < 0:
return 0.0
actual_index = self._index - 1 - index
if actual_index < 0:
actual_index += self._size
return self._data[actual_index]
cdef inline double last(self) nogil:
"""获取最新数据(无 GIL)"""
if self._len == 0:
return 0.0
return self.get(0)
cdef inline double[:] get_array(self, int32_t count):
"""获取数组视图(支持切片操作)"""
cdef double[:] result = np.empty(count, dtype=np.float64)
cdef int32_t i
for i in range(min(count, self._len)):
result[i] = self.get(i)
return result
# 高性能指标计算
cdef inline double calc_sma(double* data, int32_t size, int32_t period) nogil:
"""计算 SMA(无 GIL)"""
cdef:
double sum_val = 0.0
int32_t i
for i in range(period):
sum_val += data[size - 1 - i]
return sum_val / period
def fast_sma(double[:] data, int32_t period):
"""快速 SMA 计算
使用 Cython 实现,避免 Python 循环开销
"""
cdef:
int32_t size = data.shape[0]
double[:] result = np.empty(size, dtype=np.float64)
int32_t i
for i in range(period - 1, size):
if i + 1 >= period:
result[i] = calc_sma(&data[0], i + 1, period)
else:
result[i] = np.nan
return np.asarray(result)
```bash
#### 1.2 无锁数据结构
```python
# backtrader/ext/cython/atomic.pyx
# cython: language_level=3
from libc.stdint cimport int32_t, int64_t
from cython.operator cimport preincrement as preinc
import threading
cdef class AtomicInt:
"""原子整数操作"""
cdef:
int32_t _value
object _lock
def __init__(self, int32_t value=0):
self._value = value
self._lock = threading.Lock()
cdef inline int32_t get(self) nogil:
"""获取值(无 GIL)"""
return self._value
cdef inline int32_t increment(self) nogil:
"""原子递增(无 GIL)
注意: 在 Python 中真正的原子操作需要特殊处理
这里使用 GIL 保证线程安全
"""
with self._lock:
self._value += 1
return self._value
cdef inline int32_t add(self, int32_t delta) nogil:
"""原子加法(无 GIL)"""
with self._lock:
self._value += delta
return self._value
cdef class AtomicCounter:
"""高性能计数器
用于统计订单、成交等数量
"""
cdef:
int64_t _count
object _lock
def __init__(self):
self._count = 0
self._lock = threading.Lock()
cdef inline int64_t increment(self) nogil:
"""递增计数"""
with self._lock:
self._count += 1
return self._count
cdef inline void reset(self) nogil:
"""重置计数"""
with self._lock:
self._count = 0
cdef inline int64_t get(self) nogil:
"""获取计数值"""
return self._count
```bash
### 2. CTP 接口增强设计
#### 2.1 CTP 接口适配器
```python
# backtrader/stores/ctpstore.py
from typing import Dict, Optional, List, Any
from backtrader.metabase import MetaSingleton
from backtrader.utils.py3 import with_metaclass
from .ctpapi import (
CTPMdApi, CTPTradeApi,
CTPMarketDataType, CTPOrderType,
CTPOrderStatus, CTPDirection
)
class CTPStore(with_metaclass(MetaSingleton, object)):
"""CTP 接口存储管理器
单例模式,管理 CTP 行情和交易接口
"""
def __init__(self):
self._md_stores: Dict[str, CTPMdApi] = {}
self._trade_store: Optional[CTPTradeApi] = None
self._contracts: Dict[str, Any] = {} # 合约信息缓存
def connect_md(self, broker_id: str, app_id: str, auth_code: str,
md_address: List[str], flow_path: str = "") -> bool:
"""连接行情接口
Args:
broker_id: 券商 ID
app_id: 应用 ID
auth_code: 授权码
md_address: 行情地址列表
flow_path: 流文件路径
Returns:
连接是否成功
"""
md_store = CTPMdApi(
broker_id=broker_id,
app_id=app_id,
auth_code=auth_code,
flow_path=flow_path
)
if md_store.connect(md_address):
self._md_stores[broker_id] = md_store
return True
return False
def connect_trade(self, broker_id: str, app_id: str, auth_code: str,
td_address: List[str], flow_path: str = "") -> bool:
"""连接交易接口"""
trade_store = CTPTradeApi(
broker_id=broker_id,
app_id=app_id,
auth_code=auth_code,
flow_path=flow_path
)
if trade_store.connect(td_address):
self._trade_store = trade_store
return True
return False
def subscribe_market_data(self, instruments: List[str]) -> bool:
"""订阅行情数据"""
if not self._md_stores:
return False
for md_store in self._md_stores.values():
md_store.subscribe(instruments)
return True
def get_tick(self, instrument: str) -> Optional[Dict]:
"""获取最新 tick 数据"""
for md_store in self._md_stores.values():
tick = md_store.get_last_tick(instrument)
if tick:
return tick
return None
def insert_order(self, instrument: str, direction: str, volume: int,
price: float, order_type: str = 'limit') -> Optional[str]:
"""下单
Args:
instrument: 合约代码
direction: 方向 (buy/sell)
volume: 数量
price: 价格
order_type: 订单类型 (limit/fok/fak)
Returns:
订单引用 ID
"""
if not self._trade_store:
return None
return self._trade_store.insert_order(
instrument=instrument,
direction=direction,
volume=volume,
price=price,
order_type=order_type
)
def cancel_order(self, order_ref: str) -> bool:
"""撤单"""
if not self._trade_store:
return False
return self._trade_store.cancel_order(order_ref)
```bash
#### 2.2 CTP 数据源
```python
# backtrader/feeds/ctpfeed.py
from backtrader.feed import DataBase
from backtrader.stores.ctpstore import CTPStore
from ..utils.py3 import date2num
class CTPFeed(DataBase):
"""CTP 实时数据源
支持 CTP 接口的实时行情数据
"""
params = (
('broker_id', ''),
('instrument', ''),
('store', None), # CTPStore 实例
)
datacls = CTPStore # 存储类
def _load(self):
"""加载 CTP 数据"""
if self.p.store is None:
self.p.store = CTPStore()
# 获取合约信息
contract = self.p.store.get_contract(self.p.instrument)
if contract:
self._update_contract_info(contract)
# 订阅行情
self.p.store.subscribe_market_data([self.p.instrument])
# 设置数据线名称
self._name = self.params.instrument
def haslivedata(self):
"""是否有实时数据"""
return True
def live_data(self):
"""处理实时数据"""
tick = self.p.store.get_tick(self.p.instrument)
if tick:
# 更新数据线
self.lines.datetime[0] = date2num(tick['datetime'])
self.lines.open[0] = tick['open']
self.lines.high[0] = tick['high']
self.lines.low[0] = tick['low']
self.lines.close[0] = tick['last_price']
self.lines.volume[0] = tick['volume']
self.lines.openinterest[0] = tick.get('open_interest', 0)
return True
return False
```bash
### 3. 多策略隔离系统设计
#### 3.1 Agent 管理系统
```python
# backtrader/agent/agent_manager.py
from typing import Dict, Optional, List
from collections import defaultdict
import threading
from .agent import Agent
from ..strategy import Strategy
class AgentManager:
"""Agent 管理器
管理多个 Agent(策略实例),实现隔离
"""
def __init__(self):
# Agent 注册表: {instrument: [agents]}
self._agent_map: Dict[str, List[Agent]] = defaultdict(list)
# 独占 Agent: {instrument: agent}
self._monopoly_agents: Dict[str, Optional[Agent]] = {}
# 订单到 Agent 的映射
self._order_to_agent: Dict[str, str] = {}
# 持仓到 Agent 的映射
self._position_to_agent: Dict[str, str] = {}
# 资金分配
self._agent_capital: Dict[str, float] = {}
# 锁
self._lock = threading.RLock()
def register_agent(
self,
agent: Agent,
instruments: List[str],
monopoly: bool = False
) -> bool:
"""注册 Agent
Args:
agent: Agent 实例
instruments: 交易合约列表
monopoly: 是否独占合约
Returns:
注册是否成功
"""
with self._lock:
for instrument in instruments:
if monopoly:
# 独占模式,该合约只能有一个 Agent
if instrument in self._monopoly_agents:
return False
self._monopoly_agents[instrument] = agent
else:
# 共享模式,多个 Agent 可以交易同一合约
self._agent_map[instrument].append(agent)
return True
def unregister_agent(self, agent: Agent):
"""注销 Agent"""
with self._lock:
# 移除独占 Agent
instruments_to_remove = []
for inst, mon_agent in self._monopoly_agents.items():
if mon_agent is agent:
instruments_to_remove.append(inst)
for inst in instruments_to_remove:
del self._monopoly_agents[inst]
# 移除共享 Agent
for agents in self._agent_map.values():
if agent in agents:
agents.remove(agent)
def get_agent_by_order(self, order_ref: str) -> Optional[Agent]:
"""根据订单引用获取 Agent"""
with self._lock:
agent_id = self._order_to_agent.get(order_ref)
if agent_id:
# 遍历查找 Agent 实例
for agents in self._agent_map.values():
for agent in agents:
if agent.agent_id == agent_id:
return agent
for agent in self._monopoly_agents.values():
if agent and agent.agent_id == agent_id:
return agent
return None
def allocate_capital(self, agent: Agent, capital: float):
"""分配资金给 Agent"""
with self._lock:
self._agent_capital[agent.agent_id] = capital
def get_agent_capital(self, agent: Agent) -> float:
"""获取 Agent 资金"""
return self._agent_capital.get(agent.agent_id, 0.0)
def get_agent_position(self, agent: Agent, instrument: str) -> float:
"""获取 Agent 持仓"""
# 计算该 Agent 在指定合约的持仓
return 0.0 # 实现中需要从持仓记录中计算
```bash
#### 3.2 Agent 基类
```python
# backtrader/agent/agent.py
from typing import Optional, Dict, List, Callable
import uuid
from threading import Lock
from ..order import Order
from ..trade import Trade
class Agent:
"""Agent 基类
每个策略实例对应一个 Agent,负责管理该策略的订单、持仓和资金
"""
def __init__(self, strategy):
"""初始化 Agent
Args:
strategy: 策略实例
"""
self.strategy = strategy
self.agent_id = str(uuid.uuid4()) # 唯一 ID
# 订单管理
self._orders: Dict[str, Order] = {}
self._order_lock = Lock()
# 持仓管理
self._positions: Dict[str, float] = {}
# 成交记录
self._trades: List[Trade] = []
# 资金
self._cash = 0.0
self._value = 0.0
# 回调
self._on_order_callback: Optional[Callable] = None
self._on_trade_callback: Optional[Callable] = None
def set_capital(self, cash: float):
"""设置初始资金"""
self._cash = cash
self._value = cash
def get_cash(self) -> float:
"""获取可用资金"""
return self._cash
def get_value(self) -> float:
"""获取总资产"""
return self._value
def get_position(self, instrument: str) -> float:
"""获取持仓"""
return self._positions.get(instrument, 0.0)
def buy(self, instrument: str, volume: int, price: float,
order_type: str = 'limit') -> Optional[str]:
"""买入
Returns:
订单引用 ID
"""
order_ref = f"{self.agent_id}_{len(self._orders)}"
order = Order(
ref=order_ref,
instrument=instrument,
direction='buy',
volume=volume,
price=price,
order_type=order_type,
agent_id=self.agent_id
)
with self._order_lock:
self._orders[order_ref] = order
# 通知下单
if self._on_order_callback:
self._on_order_callback(order)
return order_ref
def sell(self, instrument: str, volume: int, price: float,
order_type: str = 'limit') -> Optional[str]:
"""卖出"""
order_ref = f"{self.agent_id}_{len(self._orders)}"
order = Order(
ref=order_ref,
instrument=instrument,
direction='sell',
volume=volume,
price=price,
order_type=order_type,
agent_id=self.agent_id
)
with self._order_lock:
self._orders[order_ref] = order
if self._on_order_callback:
self._on_order_callback(order)
return order_ref
def cancel_order(self, order_ref: str) -> bool:
"""撤单"""
with self._order_lock:
if order_ref in self._orders:
del self._orders[order_ref]
return True
return False
def on_order_status(self, order_ref: str, status: str):
"""订单状态更新回调"""
with self._order_lock:
order = self._orders.get(order_ref)
if order:
order.status = status
# 如果成交,更新持仓
if status == 'filled':
if order.direction == 'buy':
self._positions[order.instrument] = \
self._positions.get(order.instrument, 0) + order.volume
else:
self._positions[order.instrument] = \
self._positions.get(order.instrument, 0) - order.volume
# 移除已完成订单
if status in ['filled', 'cancelled', 'rejected']:
del self._orders[order_ref]
def on_trade(self, trade: Trade):
"""成交回调"""
self._trades.append(trade)
if self._on_trade_callback:
self._on_trade_callback(trade)
def set_order_callback(self, callback: Callable):
"""设置订单回调"""
self._on_order_callback = callback
def set_trade_callback(self, callback: Callable):
"""设置成交回调"""
self._on_trade_callback = callback
```bash
### 4. 风控系统设计
#### 4.1 风控规则引擎
```python
# backtrader/risk/engine.py
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
import threading
from ..order import Order
class RiskRule:
"""风控规则基类"""
def __init__(self, name: str):
self.name = name
self.enabled = True
def check(self, **kwargs) -> tuple[bool, str]:
"""检查规则
Returns:
(是否通过, 拒绝原因)
"""
raise NotImplementedError
def reset(self):
"""重置规则状态"""
pass
class CancelLimitRule(RiskRule):
"""撤单次数限制规则"""
def __init__(self, max_cancel: int = 480, window: int = 60):
super().__init__('cancel_limit')
self.max_cancel = max_cancel
self.window = window # 时间窗口(秒)
self._cancel_count: Dict[str, List] = {}
self._lock = threading.Lock()
def check(self, agent_id: str, instrument: str = '', **kwargs) -> tuple[bool, str]:
"""检查撤单限制"""
with self._lock:
key = f"{agent_id}_{instrument}"
now = datetime.now()
# 清理过期记录
if key in self._cancel_count:
self._cancel_count[key] = [
t for t in self._cancel_count[key]
if now - t < timedelta(seconds=self.window)
]
# 检查限制
count = len(self._cancel_count.get(key, []))
if count >= self.max_cancel:
return False, f"撤单次数超限: {count}/{self.max_cancel}"
return True, ""
def record_cancel(self, agent_id: str, instrument: str = ''):
"""记录撤单"""
with self._lock:
key = f"{agent_id}_{instrument}"
if key not in self._cancel_count:
self._cancel_count[key] = []
self._cancel_count[key].append(datetime.now())
class OrderSpeedLimitRule(RiskRule):
"""下单速度限制规则"""
def __init__(self, max_per_second: int = 10):
super().__init__('order_speed')
self.max_per_second = max_per_second
self._order_times: Dict[str, List[float]] = {}
self._lock = threading.Lock()
def check(self, agent_id: str, **kwargs) -> tuple[bool, str]:
"""检查下单速度"""
with self._lock:
now = datetime.now().timestamp()
# 清理 1 秒前的记录
if agent_id in self._order_times:
self._order_times[agent_id] = [
t for t in self._order_times[agent_id]
if now - t < 1.0
]
# 检查限制
count = len(self._order_times.get(agent_id, []))
if count >= self.max_per_second:
return False, f"下单速度过快: {count}/{self.max_per_second}/s"
return True, ""
def record_order(self, agent_id: str):
"""记录下单"""
with self._lock:
now = datetime.now().timestamp()
if agent_id not in self._order_times:
self._order_times[agent_id] = []
self._order_times[agent_id].append(now)
class PositionLimitRule(RiskRule):
"""持仓限制规则"""
def __init__(self, max_position: int = 100):
super().__init__('position_limit')
self.max_position = max_position
def check(self, agent_id: str, instrument: str, volume: int,
current_position: int, direction: str, **kwargs) -> tuple[bool, str]:
"""检查持仓限制"""
new_position = current_position
if direction == 'buy':
new_position += volume
else:
new_position -= volume
if abs(new_position) > self.max_position:
return False, f"持仓超限: {abs(new_position)}/{self.max_position}"
return True, ""
class RiskEngine:
"""风控引擎
管理所有风控规则,检查交易请求
"""
def __init__(self):
self._rules: List[RiskRule] = []
self._enabled = True
def add_rule(self, rule: RiskRule):
"""添加风控规则"""
self._rules.append(rule)
def enable(self):
"""启用风控"""
self._enabled = True
def disable(self):
"""禁用风控"""
self._enabled = False
def check_order(self, agent_id: str, instrument: str, volume: int,
price: float, direction: str, current_position: int) -> tuple[bool, str]:
"""检查订单
Returns:
(是否通过, 拒绝原因)
"""
if not self._enabled:
return True, ""
for rule in self._rules:
if rule.enabled:
passed, reason = rule.check(
agent_id=agent_id,
instrument=instrument,
volume=volume,
price=price,
direction=direction,
current_position=current_position
)
if not passed:
return False, f"[{rule.name}] {reason}"
return True, ""
def record_order(self, agent_id: str, instrument: str, order_type: str):
"""记录订单(用于统计)"""
for rule in self._rules:
if hasattr(rule, 'record_order'):
rule.record_order(agent_id, instrument)
if order_type == 'cancel':
for rule in self._rules:
if hasattr(rule, 'record_cancel'):
rule.record_cancel(agent_id, instrument)
```bash
### 5. 高级行情处理设计
#### 5.1 异步行情处理器
```python
# backtrader/data/tick_processor.py
import threading
import queue
import time
from typing import Dict, List, Callable, Optional
from collections import defaultdict
from dataclasses import dataclass
from ..utils.date2num import date2num
@dataclass
class Tick:
"""Tick 数据结构"""
instrument: str
datetime: float
last_price: float
open_price: float
high_price: float
low_price: float
volume: int
open_interest: int = 0
bid_price1: float = 0
ask_price1: float = 0
bid_volume1: int = 0
ask_volume1: int = 0
class TickProcessor:
"""高性能 Tick 处理器
使用独立线程处理行情数据
"""
def __init__(self, queue_size: int = 100000):
# 无界队列(或设置大容量)
self._tick_queue: queue.Queue = queue.Queue(maxsize=queue_size)
# 订阅者: {instrument: [callbacks]}
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
# 最新行情缓存
self._last_tick: Dict[str, Tick] = {}
# 处理线程
self._thread: Optional[threading.Thread] = None
self._running = False
# 统计
self._processed_count = 0
self._dropped_count = 0
def start(self):
"""启动处理器"""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
def stop(self):
"""停止处理器"""
self._running = False
if self._thread:
self._thread.join(timeout=5)
def subscribe(self, instrument: str, callback: Callable[[Tick], None]):
"""订阅行情
Args:
instrument: 合约代码
callback: 回调函数
"""
self._subscribers[instrument].append(callback)
def unsubscribe(self, instrument: str, callback: Callable[[Tick], None]):
"""取消订阅"""
if callback in self._subscribers[instrument]:
self._subscribers[instrument].remove(callback)
def push_tick(self, tick: Tick):
"""推送 tick 数据
来自数据源
"""
try:
self._tick_queue.put_nowait(tick)
except queue.Full:
self._dropped_count += 1
def get_last_tick(self, instrument: str) -> Optional[Tick]:
"""获取最新 tick"""
return self._last_tick.get(instrument)
def _run_loop(self):
"""处理循环"""
while self._running:
try:
# 使用超时避免阻塞
tick = self._tick_queue.get(timeout=0.1)
# 更新缓存
self._last_tick[tick.instrument] = tick
# 分发给订阅者
callbacks = self._subscribers.get(tick.instrument, [])
for callback in callbacks:
try:
callback(tick)
except Exception as e:
pass # 记录错误但不中断
self._processed_count += 1
except queue.Empty:
continue
except Exception as e:
# 错误处理
time.sleep(0.01)
def get_stats(self) -> Dict[str, int]:
"""获取统计信息"""
return {
'processed': self._processed_count,
'dropped': self._dropped_count,
'queue_size': self._tick_queue.qsize(),
}
class TickDataFilter:
"""Tick 数据过滤器
过滤无效和异常行情数据
"""
def __init__(self):
self._last_prices: Dict[str, float] = {}
self._max_change_pct = 0.2 # 最大涨跌幅 20%
def validate(self, tick: Tick) -> bool:
"""验证 tick 数据是否有效
Returns:
True 表示有效,False 表示无效
"""
# 检查价格是否为正数
if tick.last_price <= 0:
return False
# 检查 OHLC 逻辑
if tick.high_price < tick.low_price:
return False
if tick.last_price > tick.high_price or tick.last_price < tick.low_price:
return False
# 检查价格跳变
last_price = self._last_prices.get(tick.instrument)
if last_price:
change_pct = abs(tick.last_price - last_price) / last_price
if change_pct > self._max_change_pct:
return False # 价格跳变过大,可能是异常数据
# 更新最新价
self._last_prices[tick.instrument] = tick.last_price
return True
```bash
### 6. 订单管理系统增强
#### 6.1 订单状态机
```python
# backtrader/order/order_manager.py
from enum import Enum
from typing import Dict, List, Optional, Callable
from threading import Lock
from datetime import datetime
import uuid
from .order import Order, OrderStatus, OrderType
from ..trade import Trade
class OrderState(Enum):
"""订单状态"""
CREATED = 'created' # 已创建
SUBMITTED = 'submitted' # 已提交
ACCEPTED = 'accepted' # 已接受
PARTIAL_FILLED = 'partial_filled' # 部分成交
FILLED = 'filled' # 全部成交
CANCELLED = 'cancelled' # 已撤销
REJECTED = 'rejected' # 已拒绝
EXPIRED = 'expired' # 已过期
class OrderStateMachine:
"""订单状态机
管理订单状态转换,确保状态一致性
"""
# 定义合法的状态转换
VALID_TRANSITIONS = {
OrderState.CREATED: [OrderState.SUBMITTED, OrderState.CANCELLED],
OrderState.SUBMITTED: [OrderState.ACCEPTED, OrderState.REJECTED],
OrderState.ACCEPTED: [OrderState.PARTIAL_FILLED, OrderState.FILLED, OrderState.CANCELLED],
OrderState.PARTIAL_FILLED: [OrderState.PARTIAL_FILLED, OrderState.FILLED, OrderState.CANCELLED],
OrderState.FILLED: [],
OrderState.CANCELLED: [],
OrderState.REJECTED: [],
}
def __init__(self, order: Order):
self.order = order
self._state = OrderState.CREATED
self._lock = Lock()
self._state_history: List[tuple[OrderState, datetime]] = []
def get_state(self) -> OrderState:
"""获取当前状态"""
with self._lock:
return self._state
def transition_to(self, new_state: OrderState, reason: str = "") -> bool:
"""状态转换
Args:
new_state: 目标状态
reason: 转换原因
Returns:
转换是否成功
"""
with self._lock:
# 检查转换是否合法
valid_next_states = self.VALID_TRANSITIONS.get(self._state, [])
if new_state not in valid_next_states and new_state != self._state:
return False
# 执行转换
old_state = self._state
self._state = new_state
self._state_history.append((new_state, datetime.now()))
# 通知状态变化
self._notify_state_change(old_state, new_state, reason)
return True
def _notify_state_change(self, old_state: OrderState, new_state: OrderState, reason: str):
"""通知状态变化"""
# 更新订单状态
if hasattr(self.order, 'status'):
self.order.status = new_state.value
# 触发回调
if hasattr(self.order, 'on_status_changed'):
self.order.on_status_changed(old_state, new_state, reason)
class OrderManager:
"""订单管理器
管理所有订单的生命周期
"""
def __init__(self):
# 系统订单 ID 到订单的映射
self._orders: Dict[str, Order] = {}
# 活跃订单
self._active_orders: Dict[str, Order] = {}
# 已完成订单
self._completed_orders: List[Order] = []
# 订单状态机
self._state_machines: Dict[str, OrderStateMachine] = {}
# 成交记录
self._trades: Dict[str, Trade] = {}
# 锁
self._lock = Lock()
# 回调
self._on_order_callback: Optional[Callable] = None
self._on_trade_callback: Optional[Callable] = None
def create_order(self, instrument: str, direction: str, volume: int,
price: float, order_type: OrderType = OrderType.LIMIT,
agent_id: Optional[str] = None) -> Order:
"""创建订单"""
order = Order(
order_id=str(uuid.uuid4()),
instrument=instrument,
direction=direction,
volume=volume,
price=price,
order_type=order_type,
agent_id=agent_id
)
with self._lock:
self._orders[order.order_id] = order
self._active_orders[order.order_id] = order
self._state_machines[order.order_id] = OrderStateMachine(order)
return order
def submit_order(self, order: Order) -> bool:
"""提交订单"""
state_machine = self._state_machines.get(order.order_id)
if not state_machine:
return False
if state_machine.transition_to(OrderState.SUBMITTED, "submit"):
# 触发回调
if self._on_order_callback:
self._on_order_callback(order)
return True
return False
def cancel_order(self, order_id: str) -> bool:
"""撤销订单"""
state_machine = self._state_machines.get(order_id)
if not state_machine:
return False
if state_machine.transition_to(OrderState.CANCELLED, "cancel"):
# 移出活跃订单
with self._lock:
if order_id in self._active_orders:
del self._active_orders[order_id]
self._completed_orders.append(self._orders[order_id])
# 触发回调
if self._on_order_callback:
self._on_order_callback(state_machine.order)
return True
return False
def on_order_accepted(self, order_id: str):
"""订单被接受"""
state_machine = self._state_machines.get(order_id)
if state_machine:
state_machine.transition_to(OrderState.ACCEPTED, "accepted")
if self._on_order_callback:
self._on_order_callback(state_machine.order)
def on_order_filled(self, order_id: str, filled_volume: int, remaining_volume: int):
"""订单成交"""
state_machine = self._state_machines.get(order_id)
if remaining_volume == 0:
# 全部成交
state_machine.transition_to(OrderState.FILLED, "filled")
# 移出活跃订单
with self._lock:
if order_id in self._active_orders:
del self._active_orders[order_id]
self._completed_orders.append(self._orders[order_id])
else:
# 部分成交
state_machine.transition_to(OrderState.PARTIAL_FILLED, "partial_filled")
if self._on_order_callback:
self._on_order_callback(state_machine.order)
def on_order_rejected(self, order_id: str, reason: str):
"""订单被拒绝"""
state_machine = self._state_machines.get(order_id)
if state_machine:
state_machine.transition_to(OrderState.REJECTED, reason)
# 移出活跃订单
with self._lock:
if order_id in self._active_orders:
del self._active_orders[order_id]
self._completed_orders.append(self._orders[order_id])
def get_active_orders(self) -> List[Order]:
"""获取所有活跃订单"""
with self._lock:
return list(self._active_orders.values())
def get_order(self, order_id: str) -> Optional[Order]:
"""获取订单"""
return self._orders.get(order_id)
def set_order_callback(self, callback: Callable):
"""设置订单回调"""
self._on_order_callback = callback
def set_trade_callback(self, callback: Callable):
"""设置成交回调"""
self._on_trade_callback = callback
```bash
#### 6.2 条件单支持
```python
# backtrader/order/conditional_order.py
from enum import Enum
from typing import Optional, Callable
from .order_manager import OrderManager
from ..strategy import Strategy
class ConditionType(Enum):
"""条件类型"""
STOP_LOSS = "stop_loss" # 止损
TAKE_PROFIT = "take_profit" # 止盈
TRAILING_STOP = "trailing_stop" # 移动止损
OCO = "oco" # 二选一
class ConditionalOrder:
"""条件单基类"""
def __init__(self, condition_type: ConditionType, strategy: Strategy,
instrument: str, volume: int):
self.condition_type = condition_type
self.strategy = strategy
self.instrument = instrument
self.volume = volume
self.active = True
self.triggered = False
def check_condition(self, current_price: float) -> bool:
"""检查条件是否触发
Returns:
True 表示条件已触发
"""
raise NotImplementedError
def execute(self):
"""执行条件单(生成实际订单)"""
raise NotImplementedError
class StopLossOrder(ConditionalOrder):
"""止损单"""
def __init__(self, strategy: Strategy, instrument: str, volume: int,
stop_price: float, trail: float = 0):
super().__init__(ConditionType.STOP_LOSS, strategy, instrument, volume)
self.stop_price = stop_price
self.trail = trail # 移动止损价差
self.highest_price = 0 # 记录最高价(用于移动止损)
def check_condition(self, current_price: float) -> bool:
"""检查止损条件"""
# 更新最高价
if current_price > self.highest_price:
self.highest_price = current_price
# 计算止损价
if self.trail > 0:
stop_price = self.highest_price - self.trail
else:
stop_price = self.stop_price
return current_price <= stop_price
def execute(self):
"""执行止损(市价单)"""
return self.strategy.sell(self.instrument, self.volume)
class TakeProfitOrder(ConditionalOrder):
"""止盈单"""
def __init__(self, strategy: Strategy, instrument: str, volume: int,
target_price: float):
super().__init__(ConditionType.TAKE_PROFIT, strategy, instrument, volume)
self.target_price = target_price
def check_condition(self, current_price: float) -> bool:
"""检查止盈条件"""
return current_price >= self.target_price
def execute(self):
"""执行止盈(限价单)"""
return self.strategy.sell(self.instrument, self.volume,
price=self.target_price)
class OCOOrder(ConditionalOrder):
"""二选一订单 (One-Cancels-Other)"""
def __init__(self, strategy: Strategy, instrument: str, volume: int,
order1: ConditionalOrder, order2: ConditionalOrder):
super().__init__(ConditionType.OCO, strategy, instrument, volume)
self.order1 = order1
self.order2 = order2
self.executed_order = None # 已执行的订单
def check_condition(self, current_price: float) -> bool:
"""检查 OCO 条件"""
if self.triggered:
return True
# 检查两个条件
triggered1 = self.order1.check_condition(current_price)
triggered2 = self.order2.check_condition(current_price)
if triggered1 or triggered2:
self.triggered = True
self.executed_order = self.order1 if triggered1 else self.order2
# 取消另一个
if triggered1:
if hasattr(self.order2, 'cancel'):
self.order2.cancel()
else:
if hasattr(self.order1, 'cancel'):
self.order1.cancel()
return True
return False
def execute(self):
"""执行 OCO 订单"""
if self.executed_order:
return self.executed_order.execute()
return None
class ConditionalOrderManager:
"""条件单管理器
管理所有条件单的监控和执行
"""
def __init__(self, strategy: Strategy):
self.strategy = strategy
self._orders: List[ConditionalOrder] = []
self._lock = None
def add_stop_loss(self, instrument: str, volume: int, stop_price: float,
trail: float = 0) -> StopLossOrder:
"""添加止损单"""
stop_loss = StopLossOrder(self.strategy, instrument, volume,
stop_price, trail)
self._orders.append(stop_loss)
return stop_loss
def add_take_profit(self, instrument: str, volume: int,
target_price: float) -> TakeProfitOrder:
"""添加止盈单"""
take_profit = TakeProfitOrder(self.strategy, instrument, volume,
target_price)
self._orders.append(take_profit)
return take_profit
def add_oco(self, instrument: str, volume: int,
order1: ConditionalOrder, order2: ConditionalOrder) -> OCOOrder:
"""添加 OCO 订单"""
oco = OCOOrder(self.strategy, instrument, volume, order1, order2)
self._orders.append(oco)
return oco
def check_conditions(self, tick_data: Dict[str, float]) -> List[ConditionalOrder]:
"""检查所有条件单
Args:
tick_data: {instrument: price} 字典
Returns:
触发的条件单列表
"""
triggered = []
for order in self._orders:
if order.active and not order.triggered:
current_price = tick_data.get(order.instrument)
if current_price and order.check_condition(current_price):
triggered.append(order)
return triggered
def execute_triggered(self, triggered_orders: List[ConditionalOrder]):
"""执行触发的条件单"""
for order in triggered_orders:
try:
order.execute()
order.active = False
except Exception as e:
# 记录错误
pass
def cancel(self, order: ConditionalOrder):
"""取消条件单"""
order.active = False
order.triggered = True # 标记为已处理
```bash
### 7. 实施计划
#### 7.1 实施优先级
1. **高优先级**(第一阶段)
- 订单管理系统增强 - 核心功能
- 风控系统模块 - 交易安全
2.**中优先级**(第二阶段)
- 多策略隔离系统 - 扩展性
- 高级行情处理 - 性能优化
3.**可选优先级** (第三阶段)
- CTP 接口增强 - 期货专用
- 高性能 Cython 扩展 - 极致性能
#### 7.2 向后兼容性保证
所有新功能都是**可选的**,现有代码无需修改即可继续使用:
```python
# 现有用法继续支持
cerebro = bt.Cerebro()
cerebro.adddata(data)
cerebro.addstrategy(MyStrategy)
cerebro.run()
# 新用法
# 使用 Agent 系统
from backtrader.agent import Agent, AgentManager
agent_manager = AgentManager()
agent = Agent(strategy)
agent_manager.register_agent(agent, ['RB2305', 'RB2310'])
# 使用风控系统
from backtrader.risk import RiskEngine, CancelLimitRule, OrderSpeedLimitRule
risk_engine = RiskEngine()
risk_engine.add_rule(CancelLimitRule(max_cancel=480))
risk_engine.add_rule(OrderSpeedLimitRule(max_per_second=10))
# 使用条件单
from backtrader.order.conditional_order import ConditionalOrderManager
cond_manager = ConditionalOrderManager(strategy)
cond_manager.add_stop_loss('RB2305', 10, stop_price=3800)
```bash
#### 7.3 目录结构
```bash
backtrader/
├── __init__.py
├── agent/ # 新增: Agent 模块
│ ├── __init__.py
│ ├── agent.py # Agent 基类
│ └── agent_manager.py # Agent 管理器
├── order/ # 修改: 订单模块增强
│ ├── __init__.py
│ ├── order.py # 订单类
│ ├── order_manager.py # 新增: 订单管理器
│ └── conditional_order.py # 新增: 条件单
├── risk/ # 新增: 风控模块
│ ├── __init__.py
│ ├── engine.py # 风控引擎
│ └── rules.py # 风控规则
├── data/ # 修改: 数据模块
│ ├── __init__.py
│ └── tick_processor.py # 新增: Tick 处理器
├── stores/ # 修改: 存储模块
│ ├── __init__.py
│ ├── ctpstore.py # 新增: CTP 接口
│ └── ctpapi/ # 新增: CTP API 封装
└── ext/ # 新增: Cython 扩展
├── __init__.py
├── core.pyx # 核心数据结构
├── atomic.pyx # 原子操作
└── indicators.pyx # 指标计算
```bash
- --
## 总结
通过借鉴 PandoraTrader 的设计思想,backtrader 可以在保持易用性的同时,获得以下改进:
1. **高性能**: Cython 扩展实现关键路径优化,性能提升 5-10 倍
2. **订单管理**: 完整的订单状态机和条件单支持
3. **风控系统**: 多层风控保护,确保交易安全
4. **多策略隔离**: Agent 系统实现策略级别隔离
5. **实时行情**: 异步 Tick 处理器,支持高频场景
6. **CTP 支持**: 完整的 CTP 接口支持,服务期货交易
这些改进都是**向后兼容**的,用户可以按需使用新功能,不影响现有策略代码。PandoraTrader 作为专业的高频交易系统,其在性能优化、风控系统、订单管理等方面的实践经验对 backtrader 的实盘交易能力提升具有重要参考价值。