背景¶
backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。
任务¶
阅读研究分析 backtrader 这个项目的源代码,了解这个项目。
阅读研究分析/Users/yunjinqi/Documents/量化交易框架/CryptoTradingBot
借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议
写需规文档和设计文档放到这个文档的最下面,方便后续借鉴
CryptoTradingBot 项目简介¶
CryptoTradingBot 是一个加密货币自动交易机器人,具有以下核心特点:
自动交易: 自动化交易执行
多交易所: 支持多个加密货币交易所
策略系统: 内置多种交易策略
风险管理: 内置风险控制
通知系统: 交易通知功能
Web 界面: Web 管理界面
重点借鉴方向¶
自动化: 自动交易框架设计
多交易所: 多交易所适配
通知系统: 通知推送机制
Web 界面: Web 管理界面
风控系统: 风险管理模块
策略管理: 策略管理机制
一、项目对比分析¶
1.1 架构设计对比¶
| 特性 | Backtrader | CryptoTradingBot (K) |
|——|———–|———————|
| 核心架构| Line 系统 + Cerebro 引擎 | C++ + TypeScript (AngularJS) |
|应用场景| 回测为主,实盘需扩展 | 实盘交易为主 |
|目标市场| 股票、期货、加密货币 | 专注加密货币 |
|部署方式| 本地脚本 | 服务器持续运行 |
|数据存储| 内存/文件 | SQLite (WAL 模式) |
|用户界面| matplotlib 绘图 | Web UI (AngularJS) |
|通信方式| 单线程执行 | WebSocket 实时通信 |
|策略类型 | 趋势、套利等 | 做市(Market Making)为主 |
1.2 CryptoTradingBot 的核心优势¶
1.2.1 实时 Web UI¶
CryptoTradingBot 提供了完整的 Web 界面:
实时市场数据展示
订单管理面板
持仓监控
参数实时调整
图表可视化
技术栈*:
前端:AngularJS + RxJS (响应式编程)
后端:C++内置 HTTP 服务器
通信:WebSocket 实时推送
1.2.2 做市策略引擎¶
K 的做市引擎非常完善,支持多种报价模式:
// 报价模式
mode: {
Join, // 加入最优买卖价
Top, // 跳到订单簿顶部
Mid, // 围绕中间价报价
Inverse, // 反向模式
HamelinRat, // 跟随大订单
Depth // 深度模式
}
// 安全模式
safety: {
PingPong, // 乒乓交易
Boomerang, // 回旋镖
AK47 // 多发模式
}
```bash
- *优势**:
- 参数化报价策略
- 风险控制(pDiv, apr)
- 自动仓位管理
- 多种安全模式
#### 1.2.3 SQLite 持久化
```cpp
// 每个交易对使用独立的数据库
/var/lib/K/db/K-COINBASE-BTC-USD.db*
```bash
- *优势**:
- WAL 模式提高并发性能
- 数据库与代码分离
- 支持内存数据库选项
- 易于备份和迁移
#### 1.2.4 多实例管理
```bash
# 通过配置文件管理多个实例
cp etc/K.sh.dist X.sh && chmod +x X.sh
K=X.sh make start
```bash
- *优势**:
- 单机多实例
- 配置文件隔离
- Matryoshka 嵌套 UI
- 统一管理命令
#### 1.2.5 多交易所支持
支持的交易所:
- Coinbase (REST + WebSocket + FIX)
- Binance (REST + WebSocket)
- Kraken (REST + WebSocket)
- KuCoin, Bitfinex, Gate.io, HitBTC, Poloniex
- *统一 API 设计**:
- REST + WebSocket 双通道
- 统一的订单接口
- 统一的数据格式
#### 1.2.6 风险控制系统
```typescript
// 目标仓位管理
tbp: number; // Target Base Position
pDiv: number; // Position Divergence
pDivMin: number; // Minimum Divergence
pDivMode: string; // Divergence Mode
// 积极仓位再平衡
apr: {
Off, // 不启用
Size, // 激进调整大小
SizeWidth // 激进调整大小和价差
}
// 超级机会
sop: {
Size, // 扩大订单大小
Trades, // 增加交易频率
tradesSize // 同时调整大小和频率
}
```bash
#### 1.2.7 实时监控指标
K 跟踪的指标包括:
- Fair Value (公允价值)
- EWMA (指数加权移动平均)
- STDEV (标准差)
- Target Base Position
- Wallet Balance
- Open Orders
- Trade Statistics
- Profit/Loss
### 1.3 可借鉴的具体设计
#### 1.3.1 Web UI 架构
虽然 Backtrader 有 matplotlib 绘图,但缺乏实时交互 UI:
- 可以借鉴 K 的 Web UI 设计
- 使用 Flask/FastAPI + WebSocket
- 实时参数调整能力
#### 1.3.2 做市策略
Backtrader 缺乏做市策略:
- 可以借鉴 K 的报价模式
- 参数化做市引擎
- 风险控制机制
#### 1.3.3 数据持久化
Backtrader 的数据存储较为简单:
- 可以借鉴 K 的 SQLite 设计
- WAL 模式提高性能
- 状态保存和恢复
#### 1.3.4 实时数据推送
Backtrader 主要基于 bar 数据:
- 可以借鉴 K 的 WebSocket 设计
- 支持 tick 级别数据
- 实时事件通知
#### 1.3.5 多实例架构
Backtrader 通常单实例运行:
- 可以借鉴 K 的多实例管理
- 配置文件隔离
- 统一的生命周期管理
- --
## 二、需求文档
### 2.1 优化目标
借鉴 CryptoTradingBot 的实盘交易能力,增强 Backtrader:
1. **Web UI 界面**: 实时监控和管理界面
2. **做市策略引擎**: 参数化做市策略
3. **WebSocket 支持**: 实时数据推送
4. **SQLite 持久化**: 状态保存和恢复
5. **风险管理模块**: 仓位和风险控制
6. **多实例管理**: 支持多策略并行运行
### 2.2 详细需求
#### 需求 1: Web UI 界面
- *描述**: 实时监控和管理界面
- *功能点**:
- 策略参数实时调整
- 订单管理和监控
- 持仓和余额显示
- 实时图表展示
- 交易历史查询
- *验收标准**:
- 提供 Web UI
- 支持参数热更新
- 实时数据刷新
- 响应式设计
#### 需求 2: 做市策略引擎
- *描述**: 参数化做市策略
- *功能点**:
- 多种报价模式(Join/Top/Mid 等)
- 动态价差调整
- 仓位管理
- 风险控制
- *验收标准**:
- 提供 MarketMaking 策略类
- 支持至少 3 种报价模式
- 可配置参数
- 回测和实盘支持
#### 需求 3: WebSocket 支持
- *描述**: 实时数据推送
- *功能点**:
- WebSocket 服务器
- 实时行情推送
- 订单状态推送
- 策略状态推送
- *验收标准**:
- WebSocket 接口可用
- 推送延迟<100ms
- 支持多客户端连接
#### 需求 4: SQLite 持久化
- *描述**: 策略状态持久化
- *功能点**:
- SQLite 数据库存储
- WAL 模式
- 自动保存
- 状态恢复
- *验收标准**:
- 数据库自动创建
- 状态保存和恢复
- 性能影响<5%
#### 需求 5: 风险管理模块
- *描述**: 仓位和风险控制
- *功能点**:
- 目标仓位管理
- 仓位偏离控制
- 自动再平衡
- 止损止盈
- *验收标准**:
- 提供 RiskManager 类
- 可配置风险参数
- 自动触发风险控制
#### 需求 6: 多实例管理
- *描述**: 多策略并行运行
- *功能点**:
- 配置文件管理
- 实例生命周期管理
- 资源隔离
- 统一监控
- *验收标准**:
- 支持多实例
- 配置文件隔离
- 统一启停命令
- --
## 三、设计文档
### 3.1 Web UI 架构设计
#### 3.1.1 后端 API (FastAPI + WebSocket)
```python
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import pydantic
from typing import Dict, List
import json
class BacktraderServer:
"""Backtrader Web 服务器
提供 REST API 和 WebSocket 接口
"""
def __init__(self, cerebro, host='0.0.0.0', port=3000):
self.app = FastAPI()
self.cerebro = cerebro
self.host = host
self.port = port
self.websocket_clients: List[WebSocket] = []
# 静态文件
self.app.mount("/static", StaticFiles(directory="ui/static"), name="static")
# 路由
self._setup_routes()
def _setup_routes(self):
"""设置路由"""
@self.app.get("/")
async def index():
with open("ui/index.html") as f:
return HTMLResponse(f.read())
@self.app.get("/api/strategy")
async def get_strategy():
"""获取策略状态"""
return {
"status": self.cerebro._state,
"params": self._get_strategy_params(),
"positions": self._get_positions(),
"orders": self._get_orders(),
}
@self.app.post("/api/strategy/params")
async def update_params(params: Dict):
"""更新策略参数"""
return self._update_strategy_params(params)
@self.app.get("/api/trades")
async def get_trades(limit: int = 100):
"""获取交易历史"""
return self._get_trades(limit)
@self.app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket 连接"""
await websocket.accept()
self.websocket_clients.append(websocket)
try:
while True:
# 保持连接,接收客户端消息
data = await websocket.receive_text()
# 处理客户端请求
await self._handle_ws_message(websocket, data)
except Exception as e:
print(f"WebSocket error: {e}")
finally:
self.websocket_clients.remove(websocket)
async def broadcast(self, event: str, data: Dict):
"""广播消息到所有客户端
Args:
event: 事件类型
data: 事件数据
"""
message = json.dumps({"event": event, "data": data})
# 移除已断开的客户端
self.websocket_clients = [
ws for ws in self.websocket_clients
if not ws.client_state.DISCONNECTED
]
for client in self.websocket_clients:
try:
await client.send_text(message)
except:
pass
def _get_strategy_params(self) -> Dict:
"""获取策略参数"""
params = {}
for strat in self.cerebro._strats:
params[strat.__class__.__name__] = strat.params._getitems()
return params
def _get_positions(self) -> List[Dict]:
"""获取持仓"""
positions = []
for datafeed in self.cerebro._datas:
pos = self.cerebro.broker.getposition(datafeed)
positions.append({
"symbol": getattr(datafeed, '_name', 'unknown'),
"size": pos.size,
"price": pos.price,
})
return positions
def _get_orders(self) -> List[Dict]:
"""获取订单"""
orders = []
for order in self.cerebro.broker.orders:
orders.append({
"ref": order.ref,
"type": order.ordtype,
"size": order.size,
"price": order.price,
"status": order.status,
})
return orders
def _get_trades(self, limit: int) -> List[Dict]:
"""获取交易历史"""
# 从数据库或内存获取
return []
def _update_strategy_params(self, params: Dict):
"""更新策略参数"""
for strat in self.cerebro._strats:
for key, value in params.items():
if hasattr(strat.params, key):
setattr(strat.params, key, value)
return {"success": True}
async def _handle_ws_message(self, ws: WebSocket, message: str):
"""处理 WebSocket 消息"""
data = json.loads(message)
action = data.get("action")
if action == "subscribe":
# 订阅特定事件
pass
elif action == "unsubscribe":
# 取消订阅
pass
def run(self):
"""运行服务器"""
import uvicorn
uvicorn.run(self.app, host=self.host, port=self.port)
```bash
#### 3.1.2 前端 UI (简洁版)
```html
<!-- ui/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Backtrader Web UI</title>
<script src="<https://cdn.jsdelivr.net/npm/vue@3>"></script>
<script src="<https://cdn.jsdelivr.net/npm/chart.js>"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.container { max-width: 1200px; margin: 0 auto; }
.grid { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; }
.card { border: 1px solid #ddd; padding: 15px; border-radius: 5px; }
.status { padding: 10px; border-radius: 5px; margin-bottom: 10px; }
.status.running { background: #d4edda; }
.status.stopped { background: #f8d7da; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
button { padding: 10px 20px; margin: 5px; }
</style>
</head>
<body>
<div id="app">
<div class="container">
<h1>Backtrader Strategy Manager</h1>
<div :class="['status', strategy.status]">
<h3>{{ strategy.status === 'running' ? '运行中' : '已停止' }}</h3>
</div>
<div class="grid">
<!-- 参数面板 -->
<div class="card">
<h3>策略参数</h3>
<div v-for="(value, key) in strategy.params" :key="key">
<label>{{ key }}:</label>
<input v-model.number="strategy.params[key]" type="number" step="0.01">
</div>
<button @click="updateParams">应用参数</button>
</div>
<!-- 持仓面板 -->
<div class="card">
<h3>持仓</h3>
<table>
<tr>
<th>品种</th>
<th>数量</th>
<th>价格</th>
</tr>
<tr v-for="pos in positions" :key="pos.symbol">
<td>{{ pos.symbol }}</td>
<td>{{ pos.size }}</td>
<td>{{ pos.price }}</td>
</tr>
</table>
</div>
<!-- 订单面板 -->
<div class="card">
<h3>活跃订单</h3>
<table>
<tr>
<th>订单 ID</th>
<th>类型</th>
<th>数量</th>
<th>价格</th>
<th>状态</th>
</tr>
<tr v-for="order in orders" :key="order.ref">
<td>{{ order.ref }}</td>
<td>{{ order.type }}</td>
<td>{{ order.size }}</td>
<td>{{ order.price }}</td>
<td>{{ order.status }}</td>
</tr>
</table>
</div>
<!-- 交易历史 -->
<div class="card">
<h3>交易历史</h3>
<table>
<tr>
<th>时间</th>
<th>品种</th>
<th>方向</th>
<th>数量</th>
<th>价格</th>
</tr>
<tr v-for="trade in trades" :key="trade.id">
<td>{{ trade.time }}</td>
<td>{{ trade.symbol }}</td>
<td>{{ trade.side }}</td>
<td>{{ trade.size }}</td>
<td>{{ trade.price }}</td>
</tr>
</table>
</div>
</div>
<!-- 图表 -->
<div class="card">
<h3>权益曲线</h3>
<canvas id="equity-chart"></canvas>
</div>
</div>
</div>
<script>
const app = Vue.createApp({
data() {
return {
strategy: {
status: 'stopped',
params: {}
},
positions: [],
orders: [],
trades: [],
ws: null
};
},
mounted() {
this.connectWS();
this.loadData();
},
methods: {
connectWS() {
this.ws = new WebSocket('ws://localhost:3000/ws');
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
this.handleWS(msg);
};
},
handleWS(msg) {
switch(msg.event) {
case 'strategy':
this.strategy = { ...this.strategy, ...msg.data };
break;
case 'positions':
this.positions = msg.data;
break;
case 'orders':
this.orders = msg.data;
break;
case 'trade':
this.trades.unshift(msg.data);
break;
}
},
async loadData() {
const res = await fetch('/api/strategy');
const data = await res.json();
this.strategy = data;
this.positions = data.positions;
this.orders = data.orders;
},
async updateParams() {
await fetch('/api/strategy/params', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(this.strategy.params)
});
}
}
});
app.mount('#app');
</script>
</body>
</html>
```bash
### 3.2 做市策略引擎设计
#### 3.2.1 MarketMaking 策略
```python
import backtrader as bt
from enum import Enum
from typing import Optional
from dataclasses import dataclass
class QuoteMode(Enum):
"""报价模式"""
JOIN = "join" # 加入最优价
TOP = "top" # 跳到订单簿顶部
MID = "mid" # 围绕中间价
INVERSE = "inverse" # 反向模式
class SafetyMode(Enum):
"""安全模式"""
NONE = "none"
PING_PONG = "ping_pong"
BOOMERANG = "boomerang"
AK47 = "ak47"
@dataclass
class MarketMakerParams:
"""做市策略参数"""
# 报价参数
mode: QuoteMode = QuoteMode.MID
width: float = 0.001 # 报价宽度(比例)
width_ping: float = 0.001 # Ping 宽度
width_pong: float = 0.002 # Pong 宽度
# 订单大小
bid_size: float = 0.1 # 买单大小
ask_size: float = 0.1 # 卖单大小
max_size: float = 1.0 # 最大单笔大小
# 风险控制
target_position: float = 0.5 # 目标仓位(0-1)
position_divergence: float = 0.3 # 仓位偏离容忍度
aggressive_rebalance: bool = False # 激进再平衡
# 超级机会
super_opportunity: bool = False
sop_width_mult: float = 2.0
sop_size_mult: float = 1.5
class MarketMakingStrategy(bt.Strategy):
"""做市策略
同时在买卖双方挂单,赚取买卖价差
"""
params = (
('mode', 'mid'),
('width', 0.001),
('width_ping', 0.001),
('width_pong', 0.002),
('bid_size', 0.1),
('ask_size', 0.1),
('target_position', 0.5),
('position_divergence', 0.3),
('aggressive_rebalance', False),
('safety', 'none'),
)
def __init__(self):
# 计算目标仓位(绝对值)
self._target_base_value = None
self._last_ping_side = None
self._ping_order = None
self._pong_orders = []
# 指标
self.fair_value = bt.indicators.MidPrice(self.data)
self.ewma_short = bt.indicators.EMA(self.data.close, period=60)
self.ewma_long = bt.indicators.EMA(self.data.close, period=300)
def next(self):
"""每根 bar 执行"""
# 计算当前仓位
current_position = self.get_position_ratio()
# 计算公允价值
fv = self._calculate_fair_value()
# 检查仓位偏离
if not self._check_position_limits(current_position):
# 超出限制,停止报价或激进再平衡
if self.p.aggressive_rebalance:
self._aggressive_rebalance(current_position, fv)
return
# 计算报价
bid_price, ask_price = self._calculate_quotes(fv)
# 检查是否有未成交订单
self._manage_orders(bid_price, ask_price)
def _calculate_fair_value(self) -> float:
"""计算公允价值
根据不同模式计算:
- BBO: (best_bid + best_ask) / 2
- Mid: (current_bar_open + current_bar_close) / 2
- EWMA: 使用移动平均
"""
if self.p.mode == 'mid':
return (self.data.open[0] + self.data.close[0]) / 2
else:
return self.fair_value[0]
def _calculate_quotes(self, fair_value: float) -> tuple:
"""计算买卖报价
Args:
fair_value: 公允价值
Returns:
(bid_price, ask_price)
"""
half_width = fair_value *self.p.width / 2
if self.p.mode == 'join':
# 加入当前最优价
best_bid = self._get_best_bid()
best_ask = self._get_best_ask()
if best_bid and best_ask:
if best_ask - best_bid < fair_value*self.p.width:
bid_price = best_bid
ask_price = best_ask
else:
bid_price = fair_value - half_width
ask_price = fair_value + half_width
else:
bid_price = fair_value - half_width
ask_price = fair_value + half_width
elif self.p.mode == 'top':
# 跳到订单簿顶部
bid_price = fair_value - half_width
ask_price = fair_value + half_width
# 尝试改进价格
best_bid = self._get_best_bid()
if best_bid and bid_price > best_bid:
bid_price = best_bid # 或稍微更高
else: # mid
bid_price = fair_value - half_width
ask_price = fair_value + half_width
return bid_price, ask_price
def _get_best_bid(self) -> Optional[float]:
"""获取最优买价(需数据源支持)"""
# 如果数据源有 level2 数据,返回最优买价
# 否则使用上一根 bar 的最低价
if len(self.data) > 1:
return self.data.low[-1]
return None
def _get_best_ask(self) -> Optional[float]:
"""获取最优卖价"""
if len(self.data) > 1:
return self.data.high[-1]
return None
def _check_position_limits(self, current_pos: float) -> bool:
"""检查仓位是否超出限制
Args:
current_pos: 当前仓位比例 (0-1)
Returns:
True 表示在限制内
"""
lower = self.p.target_position - self.p.position_divergence
upper = self.p.target_position + self.p.position_divergence
return lower <= current_pos <= upper
def _aggressive_rebalance(self, current_pos: float, fair_value: float):
"""激进再平衡
Args:
current_pos: 当前仓位
fair_value: 公允价值
"""
# 计算需要调整的数量
target_value = self.get_target_value()
current_value = self.broker.getvalue()
if current_pos > self.p.target_position:
# 持仓过多,卖出
excess_ratio = current_pos - self.p.target_position
size = self.broker.cash*excess_ratio / fair_value
self.sell(size=size)
else:
# 持仓不足,买入
deficit_ratio = self.p.target_position - current_pos
size = self.broker.cash*deficit_ratio / fair_value
self.buy(size=size)
def _manage_orders(self, bid_price: float, ask_price: float):
"""管理订单
取消旧订单,下发新订单
Args:
bid_price: 买价
ask_price: 卖价
"""
# 取消所有未成交订单
for order in self.broker.orders:
if order.status == bt.Order.Submitted or order.status == bt.Order.Accepted:
self.cancel(order)
# 下新单
if self.p.safety == 'ping_pong':
self._ping_pong_quotes(bid_price, ask_price)
else:
# 普通做市:双边挂单
self.buy(price=bid_price, size=self.p.bid_size)
self.sell(price=ask_price, size=self.p.ask_size)
def _ping_pong_quotes(self, bid_price: float, ask_price: float):
"""Ping-pong 报价
先有一边成交(Ping),然后在另一边挂更好的价格等待成交(Pong)
Args:
bid_price: 当前买价
ask_price: 当前卖价
"""
if self._ping_order is None or self._ping_order.status in (
bt.Order.Completed, bt.Order.Cancelled
):
# 没有 Ping 订单或已成交,下发新 Ping
if self._last_ping_side != 'buy':
# 上次是卖,这次买作为 Ping
self._ping_order = self.buy(price=ask_price, size=self.p.bid_size)
self._last_ping_side = 'buy'
else:
# 上次是买,这次卖作为 Ping
self._ping_order = self.sell(price=bid_price, size=self.p.ask_size)
self._last_ping_side = 'sell'
else:
# 有未成交的 Ping,挂 Pong
if self._last_ping_side == 'buy':
# 买单是 Ping,在更高价位挂卖单作为 Pong
pong_price = ask_price*(1 + self.p.width_pong)
self.sell(price=pong_price, size=self.p.ask_size)
else:
# 卖单是 Ping,在更低价位挂买单作为 Pong
pong_price = bid_price*(1 - self.p.width_pong)
self.buy(price=pong_price, size=self.p.bid_size)
def get_position_ratio(self) -> float:
"""获取当前仓位比例
Returns:
仓位比例,0 表示全现金,1 表示满仓
"""
total_value = self.broker.getvalue()
if total_value == 0:
return 0
position_value = 0
for datafeed in self.datas:
pos = self.broker.getposition(datafeed)
position_value += pos.size*pos.price
return position_value / total_value
def get_target_value(self) -> float:
"""获取目标仓位价值"""
total_value = self.broker.getvalue()
return total_value*self.p.target_position
def notify_order(self, order):
"""订单状态变化通知"""
if order.status == bt.Order.Completed:
print(f"Order completed: {order.ordtype} {order.size} @ {order.price}")
# 更新 Ping 订单状态
if self.p.safety == 'ping_pong' and order.ref == self._ping_order.ref:
self._ping_order = None
```bash
### 3.3 WebSocket 支持设计
```python
from fastapi import WebSocket
import json
from typing import Set, Dict
import asyncio
class WebSocketManager:
"""WebSocket 连接管理器"""
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self._subscriptions: Dict[WebSocket, Set[str]] = {}
async def connect(self, websocket: WebSocket):
"""接受新连接"""
await websocket.accept()
self.active_connections.add(websocket)
self._subscriptions[websocket] = set()
def disconnect(self, websocket: WebSocket):
"""断开连接"""
self.active_connections.discard(websocket)
self._subscriptions.pop(websocket, None)
def subscribe(self, websocket: WebSocket, channel: str):
"""订阅频道"""
if websocket in self._subscriptions:
self._subscriptions[websocket].add(channel)
def unsubscribe(self, websocket: WebSocket, channel: str):
"""取消订阅"""
if websocket in self._subscriptions:
self._subscriptions[websocket].discard(channel)
async def broadcast(self, channel: str, message: Dict):
"""广播消息到订阅者"""
for connection in self.active_connections:
if connection in self._subscriptions and channel in self._subscriptions[connection]:
try:
await connection.send_text(json.dumps({
"channel": channel,
"data": message
}))
except:
self.disconnect(connection)
async def send_personal(self, message: str, websocket: WebSocket):
"""发送个人消息"""
try:
await websocket.send_text(message)
except:
self.disconnect(websocket)
# 在 BacktraderServer 中使用
class BacktraderWithWS(BacktraderServer):
"""带 WebSocket 的 Backtrader 服务器"""
def __init__(self,*args, **kwargs):
super().__init__(*args, **kwargs)
self.ws_manager = WebSocketManager()
self._setup_ws_routes()
def _setup_ws_routes(self):
"""设置 WebSocket 路由"""
@self.app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await self.ws_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
msg = json.loads(data)
if msg["action"] == "subscribe":
for channel in msg.get("channels", []):
self.ws_manager.subscribe(websocket, channel)
elif msg["action"] == "unsubscribe":
for channel in msg.get("channels", []):
self.ws_manager.unsubscribe(websocket, channel)
except Exception as e:
print(f"WebSocket error: {e}")
finally:
self.ws_manager.disconnect(websocket)
async def broadcast_bar(self, bar_data: Dict):
"""广播新 bar 数据"""
await self.ws_manager.broadcast("bars", bar_data)
async def broadcast_order(self, order_data: Dict):
"""广播订单更新"""
await self.ws_manager.broadcast("orders", order_data)
async def broadcast_trade(self, trade_data: Dict):
"""广播成交数据"""
await self.ws_manager.broadcast("trades", trade_data)
# 在 Cerebro 中使用
class WSCerebro(bt.Cerebro):
"""支持 WebSocket 的 Cerebro"""
def __init__(self, ws_server=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ws_server = ws_server
# Hook into next
self._original_run = self.run
def run(self):
"""运行并推送数据到 WebSocket"""
# 启动 WebSocket 服务器
if self.ws_server:
import asyncio
from threading import Thread
def run_ws():
asyncio.run(self.ws_server.run())
ws_thread = Thread(target=run_ws, daemon=True)
ws_thread.start()
# 原有 run 逻辑
return self._original_run()
```bash
### 3.4 SQLite 持久化设计
```python
import sqlite3
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
import threading
class BacktraderDB:
"""Backtrader 数据库管理器
使用 WAL 模式,支持并发读写
"""
def __init__(self, db_path: str = "backtrader.db"):
"""初始化数据库
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._local = threading.local()
self._init_db()
def _get_conn(self) -> sqlite3.Connection:
"""获取线程本地连接"""
if not hasattr(self._local, 'conn'):
self._local.conn = sqlite3.connect(
self.db_path,
check_same_thread=False
)
self._local.conn.row_factory = sqlite3.Row
return self._local.conn
def _init_db(self):
"""初始化数据库表"""
conn = self._get_conn()
cursor = conn.cursor()
# 策略状态表
cursor.execute("""
CREATE TABLE IF NOT EXISTS strategy_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy_name TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
state JSON NOT NULL,
UNIQUE(strategy_name, timestamp)
)
""")
# 订单表
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
ref INTEGER PRIMARY KEY,
strategy_name TEXT NOT NULL,
symbol TEXT NOT NULL,
ordertype TEXT NOT NULL,
size REAL NOT NULL,
price REAL,
status TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
completed_at DATETIME
)
""")
# 交易表
cursor.execute("""
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_ref INTEGER NOT NULL,
strategy_name TEXT NOT NULL,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
size REAL NOT NULL,
price REAL NOT NULL,
commission REAL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (order_ref) REFERENCES orders(ref)
)
""")
# 持仓快照表
cursor.execute("""
CREATE TABLE IF NOT EXISTS position_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy_name TEXT NOT NULL,
symbol TEXT NOT NULL,
size REAL NOT NULL,
price REAL NOT NULL,
value REAL NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# 指标值表
cursor.execute("""
CREATE TABLE IF NOT EXISTS indicator_values (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy_name TEXT NOT NULL,
indicator_name TEXT NOT NULL,
timestamp DATETIME NOT NULL,
value REAL NOT NULL,
UNIQUE(strategy_name, indicator_name, timestamp)
)
""")
# 启用 WAL 模式
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
conn.commit()
def save_strategy_state(self, strategy_name: str, state: Dict[str, Any]):
"""保存策略状态
Args:
strategy_name: 策略名称
state: 状态字典
"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO strategy_state (strategy_name, state)
VALUES (?, ?)
""", (strategy_name, json.dumps(state)))
conn.commit()
def load_strategy_state(
self,
strategy_name: str,
limit: int = 1
) -> Optional[Dict]:
"""加载最新策略状态
Args:
strategy_name: 策略名称
limit: 加载最近几条
Returns:
状态字典
"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
SELECT state FROM strategy_state
WHERE strategy_name = ?
ORDER BY timestamp DESC
LIMIT ?
""", (strategy_name, limit))
rows = cursor.fetchall()
if rows:
return json.loads(rows[0]["state"])
return None
def save_order(self, order: bt.Order, strategy_name: str):
"""保存订单
Args:
order: 订单对象
strategy_name: 策略名称
"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO orders
(ref, strategy_name, symbol, ordertype, size, price, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
order.ref,
strategy_name,
order.data._name if hasattr(order, 'data') else 'unknown',
str(order.ordtype),
float(order.size),
float(order.price) if order.price else None,
order.getstatusname()
))
conn.commit()
def save_trade(
self,
order_ref: int,
strategy_name: str,
symbol: str,
side: str,
size: float,
price: float,
commission: float = 0
):
"""保存交易
Args:
order_ref: 订单引用
strategy_name: 策略名称
symbol: 品种
side: 方向
size: 数量
price: 价格
commission: 手续费
"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO trades
(order_ref, strategy_name, symbol, side, size, price, commission)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (order_ref, strategy_name, symbol, side, size, price, commission))
conn.commit()
def save_position(
self,
strategy_name: str,
symbol: str,
size: float,
price: float,
value: float
):
"""保存持仓快照
Args:
strategy_name: 策略名称
symbol: 品种
size: 持仓数量
price: 持仓价格
value: 持仓价值
"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO position_snapshots
(strategy_name, symbol, size, price, value)
VALUES (?, ?, ?, ?, ?)
""", (strategy_name, symbol, size, price, value))
conn.commit()
def save_indicator_value(
self,
strategy_name: str,
indicator_name: str,
timestamp: datetime,
value: float
):
"""保存指标值
Args:
strategy_name: 策略名称
indicator_name: 指标名称
timestamp: 时间戳
value: 指标值
"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO indicator_values
(strategy_name, indicator_name, timestamp, value)
VALUES (?, ?, ?, ?)
""", (strategy_name, indicator_name, timestamp, value))
conn.commit()
def get_trades(
self,
strategy_name: Optional[str] = None,
limit: int = 100
) -> List[Dict]:
"""获取交易记录
Args:
strategy_name: 策略名称,None 表示全部
limit: 返回条数
Returns:
交易列表
"""
conn = self._get_conn()
cursor = conn.cursor()
if strategy_name:
cursor.execute("""
SELECT *FROM trades
WHERE strategy_name = ?
ORDER BY created_at DESC
LIMIT ?
""", (strategy_name, limit))
else:
cursor.execute("""
SELECT*FROM trades
ORDER BY created_at DESC
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
def cleanup_old_records(self, days: int = 30):
"""清理旧记录
Args:
days: 保留天数
"""
conn = self._get_conn()
cursor = conn.cursor()
cutoff = datetime.now() - pd.Timedelta(days=days)
cursor.execute("""
DELETE FROM strategy_state WHERE timestamp < ?
""", (cutoff,))
cursor.execute("""
DELETE FROM indicator_values WHERE timestamp < ?
""", (cutoff,))
# 订单和交易记录保留
conn.commit()
def close(self):
"""关闭数据库连接"""
if hasattr(self._local, 'conn'):
self._local.conn.close()
```bash
### 3.5 风险管理模块设计
```python
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Optional
class RiskAction(Enum):
"""风险动作"""
NONE = "none" # 无动作
STOP_NEW_ORDERS = "stop" # 停止新订单
REDUCE_SIZE = "reduce" # 减少订单大小
CLOSE_POSITION = "close" # 平仓
EMERGENCY_EXIT = "emergency" # 紧急退出
@dataclass
class RiskLimit:
"""风险限制"""
name: str
value: float
action: RiskAction
class RiskManager:
"""风险管理器
监控策略风险,触发风控动作
"""
def __init__(self, strategy: bt.Strategy):
"""初始化风险管理器
Args:
strategy: 策略实例
"""
self.strategy = strategy
self.limits: List[RiskLimit] = []
self._actions: Dict[RiskAction, Callable] = {
RiskAction.NONE: lambda: None,
RiskAction.STOP_NEW_ORDERS: self._stop_new_orders,
RiskAction.REDUCE_SIZE: self._reduce_size,
RiskAction.CLOSE_POSITION: self._close_position,
RiskAction.EMERGENCY_EXIT: self._emergency_exit,
}
# 状态跟踪
self._drawdown_peak = 0
self._daily_loss = 0
self._consecutive_losses = 0
def add_limit(self, name: str, value: float, action: RiskAction):
"""添加风险限制
Args:
name: 限制名称
value: 限制值
action: 触发动作
"""
self.limits.append(RiskLimit(name, value, action))
def check_risks(self) -> Optional[RiskAction]:
"""检查所有风险限制
Returns:
需要执行的动作,None 表示无需动作
"""
for limit in self.limits:
if self._check_limit(limit):
print(f"Risk limit triggered: {limit.name}")
return limit.action
return None
def _check_limit(self, limit: RiskLimit) -> bool:
"""检查单个限制
Args:
limit: 风险限制
Returns:
True 表示触发
"""
if limit.name == "max_drawdown":
return self._check_drawdown(limit.value)
elif limit.name == "daily_loss_limit":
return self._check_daily_loss(limit.value)
elif limit.name == "position_limit":
return self._check_position_limit(limit.value)
elif limit.name == "consecutive_losses":
return self._check_consecutive_losses(limit.value)
elif limit.name == "correlation_limit":
return self._check_correlation(limit.value)
return False
def _check_drawdown(self, max_dd: float) -> bool:
"""检查回撤"""
# 计算当前回撤
current_value = self.strategy.broker.getvalue()
if current_value > self._drawdown_peak:
self._drawdown_peak = current_value
drawdown = (self._drawdown_peak - current_value) / self._drawdown_peak
return drawdown >= max_dd
def _check_daily_loss(self, limit: float) -> bool:
"""检查每日亏损"""
# 简化实现,实际需要按日期统计
return self._daily_loss >= limit
def _check_position_limit(self, limit: float) -> bool:
"""检查持仓限制"""
for datafeed in self.strategy.datas:
pos = self.strategy.broker.getposition(datafeed)
if abs(pos.size)* pos.price > limit:
return True
return False
def _check_consecutive_losses(self, limit: int) -> bool:
"""检查连续亏损"""
return self._consecutive_losses >= limit
def _check_correlation(self, limit: float) -> bool:
"""检查相关性(暂不实现)"""
return False
def _stop_new_orders(self):
"""停止新订单"""
print("Stopping new orders due to risk limit")
def _reduce_size(self):
"""减少订单大小"""
print("Reducing order size due to risk limit")
# 可以通过修改策略参数实现
def _close_position(self):
"""平仓"""
print("Closing position due to risk limit")
for datafeed in self.strategy.datas:
pos = self.strategy.broker.getposition(datafeed)
if pos.size > 0:
self.strategy.sell(data=datafeed, size=pos.size)
elif pos.size < 0:
self.strategy.buy(data=datafeed, size=-pos.size)
def _emergency_exit(self):
"""紧急退出"""
print("Emergency exit triggered")
self._close_position()
# 可以添加停止策略运行的逻辑
class RiskAwareStrategy(bt.Strategy):
"""支持风险管理的策略基类"""
def __init__(self):
super().__init__()
self.risk_mgr = RiskManager(self)
# 添加默认风险限制
self.risk_mgr.add_limit("max_drawdown", 0.1, RiskAction.REDUCE_SIZE)
self.risk_mgr.add_limit("daily_loss_limit", 0.05, RiskAction.STOP_NEW_ORDERS)
self.risk_mgr.add_limit("consecutive_losses", 5, RiskAction.STOP_NEW_ORDERS)
def next(self):
"""每 bar 检查风险"""
action = self.risk_mgr.check_risks()
if action:
# 执行风险动作
self.risk_mgr._actions[action]()
else:
# 正常策略逻辑
self.run_strategy()
def run_strategy(self):
"""策略主逻辑(子类覆盖)"""
pass
def notify_trade(self, trade):
"""交易成交通知"""
if trade.pnl < 0:
self.risk_mgr._consecutive_losses += 1
self.risk_mgr._daily_loss += abs(trade.pnl)
else:
self.risk_mgr._consecutive_losses = 0
```bash
### 3.6 多实例管理设计
```python
import os
import signal
import subprocess
from typing import Dict, List
import psutil
import yaml
class InstanceManager:
"""实例管理器
管理多个 Backtrader 实例
"""
def __init__(self, config_dir: str = "./configs"):
"""初始化实例管理器
Args:
config_dir: 配置文件目录
"""
self.config_dir = config_dir
self.instances: Dict[str, subprocess.Popen] = {}
def list_instances(self) -> List[str]:
"""列出所有实例
Returns:
实例名称列表
"""
configs = []
for filename in os.listdir(self.config_dir):
if filename.endswith('.yml') and not filename.startswith('_'):
configs.append(filename[:-4])
return configs
def start_instance(self, name: str, config: Dict = None) -> bool:
"""启动实例
Args:
name: 实例名称
config: 配置字典
Returns:
是否成功启动
"""
if name in self.instances:
print(f"Instance {name} is already running")
return False
config_file = os.path.join(self.config_dir, f"{name}.yml")
if config:
# 保存配置
with open(config_file, 'w') as f:
yaml.dump(config, f)
# 启动进程
cmd = ["python", "run_strategy.py", "--config", config_file]
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
self.instances[name] = proc
print(f"Instance {name} started with PID {proc.pid}")
return True
except Exception as e:
print(f"Failed to start instance {name}: {e}")
return False
def stop_instance(self, name: str) -> bool:
"""停止实例
Args:
name: 实例名称
Returns:
是否成功停止
"""
if name not in self.instances:
print(f"Instance {name} is not running")
return False
proc = self.instances[name]
try:
proc.send_signal(signal.SIGTERM)
proc.wait(timeout=10)
del self.instances[name]
print(f"Instance {name} stopped")
return True
except subprocess.TimeoutExpired:
proc.kill()
del self.instances[name]
print(f"Instance {name} killed")
return True
except Exception as e:
print(f"Failed to stop instance {name}: {e}")
return False
def restart_instance(self, name: str) -> bool:
"""重启实例
Args:
name: 实例名称
Returns:
是否成功重启
"""
self.stop_instance(name)
return self.start_instance(name)
def start_all(self):
"""启动所有实例"""
for name in self.list_instances():
self.start_instance(name)
def stop_all(self):
"""停止所有实例"""
for name in list(self.instances.keys()):
self.stop_instance(name)
def status(self) -> Dict[str, str]:
"""获取所有实例状态
Returns:
实例状态字典
"""
status = {}
for name in self.list_instances():
if name in self.instances:
proc = self.instances[name]
if proc.poll() is None:
status[name] = "running"
else:
status[name] = "stopped"
del self.instances[name]
else:
status[name] = "stopped"
return status
# 使用示例
if __name__ == "__main__":
import click
@click.group()
def cli():
"""Instance management CLI"""
pass
@click.command()
def list():
"""List all instances"""
mgr = InstanceManager()
for name in mgr.list_instances():
print(f" {name}")
@click.command()
@click.argument("name")
def start(name):
"""Start an instance"""
mgr = InstanceManager()
mgr.start_instance(name)
@click.command()
@click.argument("name")
def stop(name):
"""Stop an instance"""
mgr = InstanceManager()
mgr.stop_instance(name)
@click.command()
def startall():
"""Start all instances"""
mgr = InstanceManager()
mgr.start_all()
@click.command()
def stopall():
"""Stop all instances"""
mgr = InstanceManager()
mgr.stop_all()
@click.command()
def status():
"""Show instance status"""
mgr = InstanceManager()
for name, st in mgr.status().items():
print(f" {name}: {st}")
cli.add_command(list)
cli.add_command(start)
cli.add_command(stop)
cli.add_command(startall)
cli.add_command(stopall)
cli.add_command(status)
cli()
```bash
### 3.7 实现优先级
| 优先级 | 功能 | 复杂度 | 收益 |
|--------|------|--------|------|
| P0 | SQLite 持久化 | 中 | 高 |
| P0 | 风险管理模块 | 中 | 高 |
| P1 | WebSocket 支持 | 高 | 中 |
| P1 | 做市策略引擎 | 中 | 中 |
| P2 | Web UI 界面 | 高 | 中 |
| P2 | 多实例管理 | 低 | 低 |
### 3.8 兼容性保证
所有新功能通过以下方式保证兼容性:
1. 新增类不修改核心 API
2. 通过继承选择性启用新功能
3. 默认行为完全保持不变
4. 提供独立安装选项
- --
## 四、使用示例
### 4.1 完整的做市策略示例
```python
import backtrader as bt
from backtrader.extensions import MarketMakingStrategy, RiskAwareStrategy
class CryptoMarketMaker(MarketMakingStrategy, RiskAwareStrategy):
"""加密货币做市策略
结合做市和风险管理
"""
params = (
('mode', 'mid'),
('width', 0.001),
('bid_size', 0.01),
('ask_size', 0.01),
('target_position', 0.5),
('max_drawdown', 0.05),
)
def __init__(self):
MarketMakingStrategy.__init__(self)
RiskAwareStrategy.__init__(self)
# 配置风险限制
self.risk_mgr.add_limit(
"max_drawdown",
self.p.max_drawdown,
RiskAction.STOP_NEW_ORDERS
)
# 运行策略
cerebro = bt.Cerebro()
# 添加数据
data = bt.feeds.CCXXT(
exchange='binance',
symbol='BTC/USDT',
timeframe=bt.TimeFrame.Minutes
)
cerebro.adddata(data)
# 添加策略
cerebro.addstrategy(
CryptoMarketMaker,
mode='mid',
width=0.001,
bid_size=0.01,
ask_size=0.01,
)
# 添加数据库
db = BacktraderDB("crypto_market_maker.db")
# 运行
result = cerebro.run()
```bash
### 4.2 带 Web UI 的完整示例
```python
from backtrader.extensions import BacktraderServer
# 创建 Cerebro
cerebro = bt.Cerebro()
# 添加数据和策略...
# ...
# 创建并启动服务器
server = BacktraderServer(cerebro, host='0.0.0.0', port=3000)
# 在后台运行服务器
import threading
server_thread = threading.Thread(target=server.run, daemon=True)
server_thread.start()
# 运行策略
cerebro.run()
```bash
- --
## 五、总结
通过借鉴 CryptoTradingBot 的优秀设计,Backtrader 可以获得:
1. **专业的 Web UI**: 实时监控和管理界面
2. **做市策略引擎**: 参数化做市策略支持
3. **WebSocket 实时通信**: 低延迟数据推送
4. **SQLite 持久化**: 状态保存和恢复
5. **风险管理模块**: 多维度风险控制
6. **多实例管理**: 策略并行运行
这些改进使 Backtrader 从回测框架扩展为完整的实盘交易系统,特别适合加密货币市场的自动化交易需求。