背景¶
backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。
任务¶
阅读研究分析 backtrader 这个项目的源代码,了解这个项目。
阅读研究分析/Users/yunjinqi/Documents/量化交易框架/easytrader
借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议
写需规文档和设计文档放到这个文档的最下面,方便后续借鉴
easytrader 项目简介¶
easytrader 是一个 A 股自动化交易框架,通过客户端自动化实现交易,具有以下核心特点:
客户端自动化: 通过模拟操作实现自动交易
多券商支持: 支持多家券商客户端
简洁 API: 简洁的交易 API 接口
实时查询: 实时查询持仓和余额
交易执行: 自动化下单执行
跟单功能: 支持跟单交易
重点借鉴方向¶
交易接口: 统一的交易接口抽象
多券商: 多券商适配器模式
持仓查询: 持仓和资金查询
订单管理: 订单提交和状态管理
错误处理: 交易错误处理机制
日志系统: 交易日志记录
项目对比分析¶
Backtrader vs easytrader 架构对比¶
| 维度 | Backtrader | easytrader |
|——|————|————|
| 应用场景| 回测和实盘交易框架 | A 股客户端自动化交易 |
|交易方式| 通过 API 接口 | 通过 GUI 自动化 |
|券商支持| IB/OANDA 等国际券商 | 国内多家券商客户端 |
|接口抽象| Broker 基类 | IClientTrader/WebTrader |
|持仓查询| 通过 Broker 获取 | 通过 GUI 控件获取 |
|订单管理| Order/Trade 类 | 弹窗+状态查询 |
|错误处理| 基础异常处理 | TradeError/NotLoginError |
|日志系统| 基础日志 | 详细的性能监控日志 |
|跟单功能| 无 | 支持多平台跟单 |
|配置管理| 参数系统 | 配置类+窗口 ID 映射 |
|数据获取策略| Line 系统 | Copy/Xls/Dbf 策略 |
|性能监控| 基础分析器 | perf_clock 装饰器 |
easytrader 可借鉴的核心优势¶
1. 多券商适配器模式¶
统一接口抽象: IClientTrader 和 WebTrader 定义统一接口
配置驱动: 每个券商独立的配置类
工厂模式: api.use()根据券商名创建对应交易器
窗口 ID 映射: 通过配置映射 GUI 控件 ID
2. 策略模式的数据获取¶
可插拔策略: Copy、Xls、Dbf 多种数据获取方式
自动降级: 策略失败时自动尝试其他方式
性能优化: 选择最优的数据获取方式
3. 错误处理机制¶
自定义异常类型: TradeError、NotLoginError
弹窗处理: 自动识别和响应交易确认弹窗
登录状态检查: 自动检查和重新登录
4. 性能监控¶
装饰器模式: perf_clock 记录操作耗时
详细日志: 记录每个交易操作的执行时间
性能分析: 方便定位性能瓶颈
5. 跟单功能¶
多平台支持: 聚米、米筐、雪球等平台
多用户跟踪: 支持跟踪多个用户的交易
定时轮询: 定时检查交易信号并执行
6. 配置管理¶
配置类继承: CommonConfig 定义通用配置
窗口控件映射: 通过 ID 定位 GUI 元素
路径配置: 默认可执行文件路径配置
需求文档¶
需求概述¶
借鉴 easytrader 项目的设计理念,为 backtrader 添加以下功能模块,提升实盘交易能力和国内券商支持:
功能需求¶
FR1: 多券商适配器¶
FR1.1 券商接口抽象*
需求描述: 建立统一的券商交易接口抽象
优先级: 高
验收标准:
定义 BrokerAdapter 接口
定义标准交易方法(买入、卖出、查询)
定义事件回调接口
FR1.2 配置驱动的适配器*
需求描述: 通过配置实现不同券商的适配
优先级: 高
验收标准:
定义 BrokerConfig 配置基类
支持券商特定配置
支持配置文件加载
FR1.3 工厂模式创建*
需求描述: 使用工厂模式创建券商适配器
优先级: 中
验收标准:
实现 BrokerFactory 工厂类
支持按券商名称创建适配器
支持自动检测已安装客户端
FR2: 数据获取策略¶
FR2.1 策略接口定义*
需求描述: 定义数据获取策略接口
优先级: 高
验收标准:
定义 DataFetchStrategy 接口
定义策略优先级
支持策略降级
FR2.2 多策略实现*
需求描述: 实现多种数据获取策略
优先级: 中
验收标准:
实现 APICopy 策略(API 复制)
实现 FileExport 策略(文件导出)
实现 DirectQuery 策略(直接查询)
FR2.3 策略管理器*
需求描述: 管理和调度数据获取策略
优先级: 中
验收标准:
实现 StrategyManager
支持策略优先级排序
支持自动降级重试
FR3: 错误处理增强¶
FR3.1 自定义异常类型*
需求描述: 定义交易相关的异常类型
优先级: 高
验收标准:
定义 TradeError 异常基类
定义 NotLoginError 异常
定义 OrderRejectError 异常
定义 NetworkError 异常
FR3.2 弹窗处理机制*
需求描述: 实现弹窗自动处理
优先级: 中
验收标准:
支持弹窗检测
支持弹窗自动关闭
支持弹窗内容识别
FR3.3 登录状态管理*
需求描述: 实现登录状态检查和自动重连
优先级: 高
验收标准:
支持登录状态检测
支持自动重新登录
支持登录失败重试
FR4: 性能监控¶
FR4.1 性能装饰器*
需求描述: 实现性能监控装饰器
优先级: 中
验收标准:
实现 perf_clock 装饰器
记录函数执行时间
输出性能日志
FR4.2 交易统计*
需求描述: 统计交易操作性能
优先级: 低
验收标准:
统计订单提交耗时
统计数据查询耗时
生成性能报告
FR4.3 性能预警*
需求描述: 性能异常预警
优先级: 低
验收标准:
支持性能阈值设置
超时自动告警
记录性能异常
FR5: 跟单功能¶
FR5.1 跟单基类*
需求描述: 定义跟单功能基类
优先级: 中
验收标准:
定义 FollowerBase 基类
定义跟单接口
支持多策略跟踪
FR5.2 信号源适配器*
需求描述: 实现信号源适配器
优先级: 中
验收标准:
支持聚米跟单
支持米筐跟单
支持雪球跟单
FR5.3 信号过滤*
需求描述: 实现信号过滤机制
优先级: 低
验收标准:
支持股票代码过滤
支持交易方向过滤
支持金额过滤
FR6: 日志系统增强¶
FR6.1 结构化日志*
需求描述: 实现结构化交易日志
优先级: 中
验收标准:
支持 JSON 格式日志
支持日志文件分离
支持日志按日期轮转
FR6.2 交易审计*
需求描述: 记录完整交易审计日志
优先级: 中
验收标准:
记录所有订单操作
记录所有查询操作
支持日志查询和回放
FR6.3 敏感信息保护*
需求描述: 保护日志中的敏感信息
优先级: 中
验收标准:
自动脱敏密码
自动脱敏账号
可配置脱敏规则
非功能需求¶
NFR1: 性能¶
订单提交响应时间 < 500ms
持仓查询响应时间 < 1s
登录状态检查 < 100ms
NFR2: 可靠性¶
自动重连成功率 > 95%
订单提交成功率 > 99%
异常恢复时间 < 5s
NFR3: 兼容性¶
支持 Windows 7/10/11
支持主流券商客户端
保持与现有 backtrader API 兼容
设计文档¶
整体架构设计¶
新增模块结构¶
backtrader/
├── backtrader/
│ ├── brokers/ # 增强:券商适配器
│ │ ├── __init__.py
│ │ ├── base.py # 券商适配器基类
│ │ ├── factory.py # 券商工厂
│ │ ├── config/ # 券商配置
│ │ │ ├── __init__.py
│ │ │ ├── base.py # 配置基类
│ │ │ ├── ths.py # 同花顺配置
│ │ │ ├── ht.py # 华泰配置
│ │ │ └── yh.py # 银河配置
│ │ ├── ths.py # 同花顺适配器
│ │ ├── ht.py # 华泰适配器
│ │ └── yh.py # 银河适配器
│ ├── data/ # 增强:数据获取策略
│ │ ├── __init__.py
│ │ ├── strategies/ # 数据获取策略
│ │ │ ├── __init__.py
│ │ │ ├── base.py # 策略基类
│ │ │ ├── copy.py # 复制策略
│ │ │ ├── file.py # 文件策略
│ │ │ └── api.py # API 策略
│ │ └── manager.py # 策略管理器
│ ├── exceptions/ # 新增:异常定义
│ │ ├── __init__.py
│ │ ├── trade.py # 交易异常
│ │ ├── login.py # 登录异常
│ │ └── network.py # 网络异常
│ ├── monitor/ # 新增:性能监控
│ │ ├── __init__.py
│ │ ├── perf.py # 性能监控装饰器
│ │ ├── stats.py # 性能统计
│ │ └── alert.py # 性能预警
│ ├── follower/ # 新增:跟单功能
│ │ ├── __init__.py
│ │ ├── base.py # 跟单基类
│ │ ├── signal.py # 信号源适配器
│ │ ├── filter.py # 信号过滤器
│ │ ├── joinquant.py # 聚米跟单
│ │ ├── rq.py # 米筐跟单
│ │ └── xq.py # 雪球跟单
│ ├── ui/ # 新增:UI 自动化
│ │ ├── __init__.py
│ │ ├── dialog.py # 弹窗处理
│ │ ├── window.py # 窗口管理
│ │ └── clipboard.py # 剪贴板操作
│ └── utils/
│ └── logging/ # 增强:日志系统
│ ├── __init__.py
│ ├── logger.py # 日志配置
│ ├── audit.py # 审计日志
│ └── mask.py # 敏感信息脱敏
```bash
### 详细设计
#### 1. 券商适配器
- *1.1 适配器基类**
```python
# backtrader/brokers/base.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class OrderResult:
"""订单结果"""
order_id: str
status: str
message: str = ""
timestamp: float = None
@dataclass
class Position:
"""持仓信息"""
symbol: str
volume: int
available: int
price: float
cost: float
profit: float = 0.0
@dataclass
class Balance:
"""资金信息"""
total: float
available: float
market_value: float
frozen: float = 0.0
class BrokerAdapter(ABC):
"""券商适配器基类"""
def __init__(self, config):
self.config = config
self._logged_in = False
self._session = None
@property
@abstractmethod
def balance(self) -> Balance:
"""获取资金信息"""
pass
@property
@abstractmethod
def position(self) -> Dict[str, Position]:
"""获取持仓信息"""
pass
@abstractmethod
def login(self, user: str, password: str, **kwargs) -> bool:
"""登录"""
pass
@abstractmethod
def logout(self) -> bool:
"""登出"""
pass
@abstractmethod
def buy(self, symbol: str, price: float, volume: int, **kwargs) -> OrderResult:
"""买入"""
pass
@abstractmethod
def sell(self, symbol: str, price: float, volume: int, **kwargs) -> OrderResult:
"""卖出"""
pass
@abstractmethod
def cancel_order(self, order_id: str) -> bool:
"""撤单"""
pass
@abstractmethod
def query_orders(self, **kwargs) -> List[Dict]:
"""查询委托"""
pass
@abstractmethod
def query_deals(self, **kwargs) -> List[Dict]:
"""查询成交"""
pass
def is_logged_in(self) -> bool:
"""检查登录状态"""
return self._logged_in
def keep_alive(self) -> bool:
"""保持连接"""
if not self.is_logged_in():
return False
# 子类实现具体保活逻辑
return True
```bash
- *1.2 工厂类**
```python
# backtrader/brokers/factory.py
from typing import Dict, Type, Optional
from .base import BrokerAdapter
from .config.base import BrokerConfig
from .ths import THSBroker
from .ht import HTBroker
from .yh import YHBroker
class BrokerFactory:
"""券商工厂"""
_brokers: Dict[str, Type[BrokerAdapter]] = {
'ths': THSBroker,
'tonghuashun': THSBroker,
'ht': HTBroker,
'huatai': HTBroker,
'yh': YHBroker,
'yinhe': YHBroker,
}
_configs: Dict[str, Type[BrokerConfig]] = {
'ths': THSConfig,
'tonghuashun': THSConfig,
'ht': HTConfig,
'huatai': HTConfig,
'yh': YHConfig,
'yinhe': YHConfig,
}
@classmethod
def register(cls, name: str, broker_class: Type[BrokerAdapter],
config_class: Type[BrokerConfig] = None):
"""注册券商"""
cls._brokers[name.lower()] = broker_class
if config_class:
cls._configs[name.lower()] = config_class
@classmethod
def create(cls, broker: str, config: Optional[BrokerConfig] = None,
auto_detect: bool = False) -> BrokerAdapter:
"""创建券商适配器"""
broker = broker.lower()
if broker not in cls._brokers:
raise ValueError(f"Unknown broker: {broker}")
broker_class = cls._brokers[broker]
# 自动检测配置
if config is None:
config_class = cls._configs.get(broker)
if config_class:
config = config_class()
else:
config = BrokerConfig()
# 自动检测客户端
if auto_detect:
config.detect_executable()
return broker_class(config)
@classmethod
def list_available(cls) -> List[str]:
"""列出可用的券商"""
return list(cls._brokers.keys())
def use(broker: str, **kwargs) -> BrokerAdapter:
"""便捷函数:创建券商适配器"""
return BrokerFactory.create(broker, **kwargs)
```bash
- *1.3 配置基类**
```python
# backtrader/brokers/config/base.py
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Optional
import win32gui
import win32con
class BrokerConfig(ABC):
"""券商配置基类"""
# 默认可执行文件路径
DEFAULT_EXE_PATH: str = None
# 主窗口标题
WINDOW_TITLE: str = None
# 交易控件 ID
TRADE_SECURITY_CTRL_ID: int = 1032
TRADE_PRICE_CTRL_ID: int = 1033
TRADE_AMOUNT_CTRL_ID: int = 1034
TRADE_BUY_BTN_ID: int = 1005
TRADE_SELL_BTN_ID: int = 1006
# 查询控件 ID
BALANCE_CTRL_GROUP: Dict[str, int] = None
POSITION_CTRL_ID: int = 1054
# 菜单路径
MENU_PATH_POSITION: List[str] = None
MENU_PATH_BALANCE: List[str] = None
def __init__(self, exe_path: str = None):
self.exe_path = exe_path or self.DEFAULT_EXE_PATH
self.window_title = self.WINDOW_TITLE
def detect_executable(self) -> bool:
"""检测可执行文件"""
if self.exe_path and Path(self.exe_path).exists():
return True
# 常见安装路径检测
common_paths = [
Path("C:/") / "Program Files" / "券商软件",
Path("C:/") / "Program Files (x86)" / "券商软件",
]
for path in common_paths:
if path.exists():
for exe in path.rglob("*.exe"):
if self._is_valid_executable(exe):
self.exe_path = str(exe)
return True
return False
def _is_valid_executable(self, exe_path: Path) -> bool:
"""判断是否为有效的可执行文件"""
# 子类实现具体逻辑
return False
def find_window(self) -> Optional[int]:
"""查找主窗口句柄"""
if not self.window_title:
return None
def callback(hwnd, windows):
if win32gui.IsWindowVisible(hwnd) and self.window_title in win32gui.GetWindowText(hwnd):
windows.append(hwnd)
return True
windows = []
win32gui.EnumWindows(callback, windows)
return windows[0] if windows else None
class THSConfig(BrokerConfig):
"""同花顺配置"""
DEFAULT_EXE_PATH = r"C:\同花顺软件\同花顺\TdxW.exe"
WINDOW_TITLE = "同花顺"
class HTConfig(BrokerConfig):
"""华泰配置"""
DEFAULT_EXE_PATH = r"C:\华泰证券\涨乐财富通\HTClient.exe"
WINDOW_TITLE = "涨乐财富通"
class YHConfig(BrokerConfig):
"""银河配置"""
DEFAULT_EXE_PATH = r"C:\双子星-中国银河证券\Binarystar.exe"
WINDOW_TITLE = "中国银河证券"
```bash
#### 2. 数据获取策略
- *2.1 策略基类**
```python
# backtrader/data/strategies/base.py
from abc import ABC, abstractmethod
from typing import Any, Optional
from enum import Enum
class FetchPriority(Enum):
"""策略优先级"""
HIGH = 1
MEDIUM = 2
LOW = 3
class DataFetchStrategy(ABC):
"""数据获取策略基类"""
name: str = None
priority: FetchPriority = FetchPriority.MEDIUM
def __init__(self, context):
self.context = context
@abstractmethod
def fetch(self, **kwargs) -> Optional[Any]:
"""获取数据"""
pass
@abstractmethod
def is_available(self) -> bool:
"""检查策略是否可用"""
pass
def on_success(self, data):
"""成功回调"""
pass
def on_failure(self, error):
"""失败回调"""
pass
```bash
- *2.2 具体策略实现**
```python
# backtrader/data/strategies/copy.py
import win32clipboard
import win32con
from .base import DataFetchStrategy, FetchPriority
class CopyStrategy(DataFetchStrategy):
"""剪贴板复制策略"""
name = "copy"
priority = FetchPriority.HIGH
def fetch(self, window=None, grid_ctrl_id=None, **kwargs):
"""通过剪贴板复制获取数据"""
try:
# 选中文本
if window and grid_ctrl_id:
grid = window[grid_ctrl_id]
grid.type_keys("^A^C", set_foreground=False)
# 获取剪贴板数据
win32clipboard.OpenClipboard()
try:
data = win32clipboard.GetClipboardData(win32con.CF_UNICODETEXT)
finally:
win32clipboard.CloseClipboard()
return self._parse_data(data)
except Exception as e:
self.on_failure(e)
return None
def is_available(self) -> bool:
"""检查剪贴板是否可用"""
try:
win32clipboard.OpenClipboard()
win32clipboard.CloseClipboard()
return True
except:
return False
def _parse_data(self, data: str):
"""解析数据"""
# 解析剪贴板文本数据
lines = data.strip().split('\n')
return [line.split('\t') for line in lines]
# backtrader/data/strategies/file.py
from .base import DataFetchStrategy, FetchPriority
import pandas as pd
from pathlib import Path
import time
class FileExportStrategy(DataFetchStrategy):
"""文件导出策略"""
name = "file"
priority = FetchPriority.MEDIUM
def fetch(self, window=None, export_btn_id=None, **kwargs):
"""通过文件导出获取数据"""
temp_file = Path("temp_export.csv")
try:
# 点击导出按钮
if window and export_btn_id:
window[export_btn_id].click()
# 等待文件生成
for _ in range(10):
if temp_file.exists():
break
time.sleep(0.5)
# 读取数据
if temp_file.exists():
data = pd.read_csv(temp_file)
temp_file.unlink() # 删除临时文件
return data.to_dict('records')
except Exception as e:
self.on_failure(e)
finally:
if temp_file.exists():
temp_file.unlink()
return None
def is_available(self) -> bool:
"""检查文件操作是否可用"""
return True
# backtrader/data/strategies/api.py
from .base import DataFetchStrategy, FetchPriority
class DirectApiStrategy(DataFetchStrategy):
"""直接 API 策略"""
name = "api"
priority = FetchPriority.HIGH
def fetch(self, api_obj=None, method=None, **kwargs):
"""通过直接 API 调用获取数据"""
try:
if api_obj and method:
data = getattr(api_obj, method)(**kwargs)
return self._parse_data(data)
except Exception as e:
self.on_failure(e)
return None
def is_available(self) -> bool:
"""检查 API 是否可用"""
return self.context.api is not None
def _parse_data(self, data):
"""解析数据"""
return data
```bash
- *2.3 策略管理器**
```python
# backtrader/data/manager.py
from typing import List, Optional, Any
from .strategies.base import DataFetchStrategy, FetchPriority
class StrategyManager:
"""数据获取策略管理器"""
def __init__(self):
self._strategies: List[DataFetchStrategy] = []
self._last_success: Optional[str] = None
def register(self, strategy: DataFetchStrategy):
"""注册策略"""
self._strategies.append(strategy)
# 按优先级排序
self._strategies.sort(key=lambda s: s.priority.value)
def unregister(self, strategy_name: str):
"""取消注册策略"""
self._strategies = [s for s in self._strategies if s.name != strategy_name]
def fetch(self, **kwargs) -> Optional[Any]:
"""使用策略获取数据"""
# 优先使用上次成功的策略
if self._last_success:
for strategy in self._strategies:
if strategy.name == self._last_success and strategy.is_available():
data = strategy.fetch(**kwargs)
if data is not None:
return data
# 按优先级尝试所有策略
for strategy in self._strategies:
if not strategy.is_available():
continue
try:
data = strategy.fetch(**kwargs)
if data is not None:
self._last_success = strategy.name
strategy.on_success(data)
return data
except Exception as e:
strategy.on_failure(e)
continue
return None
@property
def strategies(self) -> List[DataFetchStrategy]:
"""获取所有策略"""
return list(self._strategies)
```bash
#### 3. 异常处理
- *3.1 异常定义**
```python
# backtrader/exceptions/__init__.py
class BacktraderError(Exception):
"""Backtrader 基础异常"""
pass
# backtrader/exceptions/trade.py
class TradeError(BacktraderError):
"""交易错误基类"""
def __init__(self, message: str, code: str = None, order_id: str = None):
super().__init__(message)
self.code = code
self.order_id = order_id
class OrderRejectError(TradeError):
"""订单被拒绝"""
pass
class OrderTimeoutError(TradeError):
"""订单超时"""
pass
# backtrader/exceptions/login.py
class LoginError(BacktraderError):
"""登录错误基类"""
pass
class NotLoginError(LoginError):
"""未登录异常"""
pass
class LoginFailedError(LoginError):
"""登录失败异常"""
def __init__(self, message: str, retry: int = 0):
super().__init__(message)
self.retry = retry
# backtrader/exceptions/network.py
class NetworkError(BacktraderError):
"""网络错误"""
pass
class ConnectionError(NetworkError):
"""连接错误"""
pass
class TimeoutError(NetworkError):
"""超时错误"""
pass
```bash
#### 4. 性能监控
- *4.1 性能装饰器**
```python
# backtrader/monitor/perf.py
import time
import logging
from functools import wraps
from typing import Callable
logger = logging.getLogger("backtrader.perf")
def perf_clock(func: Callable) -> Callable:
"""性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
elapsed = time.time() - start_time
logger.info(f"{func.__name__} executed in {elapsed:.4f}s")
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"{func.__name__} failed after {elapsed:.4f}s: {e}")
raise
return wrapper
def perf_async(func: Callable) -> Callable:
"""异步性能监控装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
elapsed = time.time() - start_time
logger.info(f"{func.__name__} executed in {elapsed:.4f}s")
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"{func.__name__} failed after {elapsed:.4f}s: {e}")
raise
return wrapper
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self._records: dict = {}
def record(self, name: str, duration: float):
"""记录性能"""
if name not in self._records:
self._records[name] = []
self._records[name].append(duration)
def get_stats(self, name: str) -> dict:
"""获取统计信息"""
if name not in self._records:
return {}
records = self._records[name]
return {
'count': len(records),
'total': sum(records),
'avg': sum(records) / len(records),
'min': min(records),
'max': max(records),
}
def get_all_stats(self) -> dict:
"""获取所有统计"""
return {name: self.get_stats(name) for name in self._records}
def reset(self):
"""重置记录"""
self._records.clear()
# 全局性能监控器
perf_monitor = PerformanceMonitor()
```bash
- *4.2 性能预警**
```python
# backtrader/monitor/alert.py
import logging
from typing import Callable, Optional
logger = logging.getLogger("backtrader.alert")
class PerformanceAlert:
"""性能预警"""
def __init__(self, threshold: float = 1.0,
on_alert: Optional[Callable] = None):
self.threshold = threshold
self.on_alert = on_alert or self._default_alert
def check(self, name: str, duration: float):
"""检查是否需要预警"""
if duration > self.threshold:
self.on_alert(name, duration)
def _default_alert(self, name: str, duration: float):
"""默认预警处理"""
logger.warning(f"Performance alert: {name} took {duration:.4f}s "
f"(threshold: {self.threshold:.4f}s)")
```bash
#### 5. 跟单功能
- *5.1 跟单基类**
```python
# backtrader/follower/base.py
from abc import ABC, abstractmethod
from typing import List, Dict, Callable
import time
import threading
class FollowerBase(ABC):
"""跟单基类"""
def __init__(self, broker_adapter):
self.broker = broker_adapter
self._running = False
self._thread = None
@abstractmethod
def login(self, user: str, password: str, **kwargs) -> bool:
"""登录信号源"""
pass
@abstractmethod
def fetch_signals(self, **kwargs) -> List[Dict]:
"""获取交易信号"""
pass
def follow(self, users: List[str] = None,
strategies: List[str] = None,
track_interval: int = 5,
signal_filter: Callable = None):
"""启动跟单"""
self._running = True
self._thread = threading.Thread(
target=self._follow_loop,
args=(users, strategies, track_interval, signal_filter),
daemon=True
)
self._thread.start()
def stop(self):
"""停止跟单"""
self._running = False
if self._thread:
self._thread.join()
def _follow_loop(self, users: List[str], strategies: List[str],
interval: int, signal_filter: Callable):
"""跟单主循环"""
last_signals = {}
while self._running:
try:
signals = self.fetch_signals(users=users, strategies=strategies)
for signal in signals:
signal_id = self._get_signal_id(signal)
# 跳过已处理的信号
if signal_id in last_signals:
continue
# 应用过滤器
if signal_filter and not signal_filter(signal):
continue
# 执行交易
self._execute_signal(signal)
last_signals[signal_id] = True
time.sleep(interval)
except Exception as e:
logger.error(f"Follow error: {e}")
def _get_signal_id(self, signal: Dict) -> str:
"""获取信号 ID"""
return f"{signal.get('user')}_{signal.get('time')}_{signal.get('symbol')}"
def _execute_signal(self, signal: Dict):
"""执行交易信号"""
action = signal.get('action')
symbol = signal.get('symbol')
price = signal.get('price')
amount = signal.get('amount')
if action == 'buy':
self.broker.buy(symbol, price, amount)
elif action == 'sell':
self.broker.sell(symbol, price, amount)
```bash
- *5.2 信号过滤器**
```python
# backtrader/follower/filter.py
from typing import Dict, List, Callable
class SignalFilter:
"""信号过滤器"""
def __init__(self):
self._filters: List[Callable] = []
def add_filter(self, filter_func: Callable):
"""添加过滤器"""
self._filters.append(filter_func)
def remove_filter(self, filter_func: Callable):
"""移除过滤器"""
if filter_func in self._filters:
self._filters.remove(filter_func)
def apply(self, signal: Dict) -> bool:
"""应用所有过滤器,返回 True 表示通过"""
for filter_func in self._filters:
if not filter_func(signal):
return False
return True
# 预定义过滤器
def symbol_filter(allowed_symbols: List[str]):
"""股票代码过滤"""
def filter_func(signal: Dict) -> bool:
return signal.get('symbol') in allowed_symbols
return filter_func
def amount_filter(min_amount: int = 100, max_amount: int = 100000):
"""金额过滤"""
def filter_func(signal: Dict) -> bool:
amount = signal.get('amount', 0)
return min_amount <= amount <= max_amount
return filter_func
def direction_filter(allowed_directions: List[str]):
"""交易方向过滤"""
def filter_func(signal: Dict) -> bool:
return signal.get('action') in allowed_directions
return filter_func
```bash
#### 6. UI 自动化
- *6.1 弹窗处理**
```python
# backtrader/ui/dialog.py
import win32gui
import win32con
from typing import Optional, List
class DialogHandler:
"""弹窗处理器"""
# 常见弹窗标题
WARNING_TITLES = ["提示", "警告", "确认", "提示信息"]
CONFIRM_TITLES = ["委托确认", "交易确认"]
def __init__(self):
self._handled: List[int] = []
def check_pop_dialog(self) -> Optional[dict]:
"""检查是否有弹窗"""
def callback(hwnd, dialogs):
if win32gui.IsWindowVisible(hwnd):
title = win32gui.GetWindowText(hwnd)
# 检查是否是目标弹窗
if self._is_target_dialog(title):
dialogs.append({
'hwnd': hwnd,
'title': title,
'text': self._get_dialog_text(hwnd)
})
return True
dialogs = []
win32gui.EnumWindows(callback, dialogs)
return dialogs[0] if dialogs else None
def close_dialog(self, hwnd: int, button: str = "确定"):
"""关闭弹窗"""
try:
# 查找按钮
btn_hwnd = self._find_button(hwnd, button)
if btn_hwnd:
win32gui.PostMessage(btn_hwnd, win32con.BM_CLICK, 0, 0)
self._handled.append(hwnd)
return True
except Exception as e:
logger.error(f"Close dialog failed: {e}")
return False
def confirm_dialog(self, hwnd: int):
"""确认弹窗"""
return self.close_dialog(hwnd, "确定")
def cancel_dialog(self, hwnd: int):
"""取消弹窗"""
return self.close_dialog(hwnd, "取消")
def _is_target_dialog(self, title: str) -> bool:
"""判断是否是目标弹窗"""
all_titles = self.WARNING_TITLES + self.CONFIRM_TITLES
return any(t in title for t in all_titles)
def _get_dialog_text(self, hwnd: int) -> str:
"""获取弹窗文本"""
# 获取弹窗中的文本内容
pass
def _find_button(self, hwnd: int, text: str) -> Optional[int]:
"""查找按钮"""
def callback(btn_hwnd, buttons):
if text in win32gui.GetWindowText(btn_hwnd):
buttons.append(btn_hwnd)
return True
buttons = []
win32gui.EnumChildWindows(hwnd, callback, buttons)
return buttons[0] if buttons else None
```bash
- *6.2 窗口管理**
```python
# backtrader/ui/window.py
import win32gui
import win32con
from typing import Optional
class WindowManager:
"""窗口管理器"""
@staticmethod
def find_window(title: str = None, class_name: str = None) -> Optional[int]:
"""查找窗口"""
def callback(hwnd, windows):
if win32gui.IsWindowVisible(hwnd):
match = True
if title:
match = match and title in win32gui.GetWindowText(hwnd)
if class_name:
match = match and class_name in win32gui.GetClassName(hwnd)
if match:
windows.append(hwnd)
return True
windows = []
win32gui.EnumWindows(callback, windows)
return windows[0] if windows else None
@staticmethod
def set_foreground(hwnd: int):
"""设置窗口前置"""
try:
if win32gui.IsIconic(hwnd): # 最小化
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(hwnd)
except Exception as e:
logger.error(f"Set foreground failed: {e}")
@staticmethod
def is_minimized(hwnd: int) -> bool:
"""检查是否最小化"""
return win32gui.IsIconic(hwnd)
@staticmethod
def get_window_rect(hwnd: int) -> tuple:
"""获取窗口位置"""
return win32gui.GetWindowRect(hwnd)
```bash
#### 7. 日志系统
- *7.1 审计日志**
```python
# backtrader/utils/logging/audit.py
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Any
from .mask import mask_sensitive_info
class AuditLogger:
"""审计日志记录器"""
def __init__(self, log_dir: str = "logs/audit"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger("backtrader.audit")
def log_order(self, order: Dict[str, Any]):
"""记录订单"""
self._write_log("order", order)
def log_query(self, query_type: str, result: Dict[str, Any]):
"""记录查询"""
self._write_log("query", {
'type': query_type,
'result': result
})
def log_login(self, user: str, success: bool, error: str = None):
"""记录登录"""
self._write_log("login", {
'user': mask_sensitive_info(user),
'success': success,
'error': error
})
def log_error(self, error: Exception, context: Dict[str, Any] = None):
"""记录错误"""
self._write_log("error", {
'type': type(error).__name__,
'message': str(error),
'context': context or {}
})
def _write_log(self, log_type: str, data: Dict[str, Any]):
"""写入日志"""
# 脱敏处理
data = mask_sensitive_info(data)
# 添加时间戳
data['timestamp'] = datetime.now().isoformat()
data['type'] = log_type
# 按日期分文件
date_str = datetime.now().strftime("%Y%m%d")
log_file = self.log_dir / f"{date_str}.jsonl"
# 追加写入
with open(log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(data, ensure_ascii=False) + '\n')
# 全局审计日志记录器
audit_logger = AuditLogger()
```bash
- *7.2 敏感信息脱敏**
```python
# backtrader/utils/logging/mask.py
import re
from typing import Any, Dict
# 默认脱敏规则
DEFAULT_MASK_RULES = [
('password', '***'),
('passwd', '***'),
('pwd', '***'),
('account', '***'),
('user', '***'),
]
def mask_value(value: str, mask: str = '***') -> str:
"""脱敏单个值"""
if not value:
return value
# 保留部分信息
if len(value) <= 4:
return mask
return value[:2] + mask + value[-2:]
def mask_sensitive_info(data: Any, rules: list = None) -> Any:
"""脱敏敏感信息"""
rules = rules or DEFAULT_MASK_RULES
if isinstance(data, dict):
return {k: _mask_dict_value(k, v, rules) for k, v in data.items()}
elif isinstance(data, list):
return [mask_sensitive_info(item, rules) for item in data]
else:
return data
def _mask_dict_value(key: str, value: Any, rules: list) -> Any:
"""脱敏字典值"""
# 检查是否需要脱敏
for pattern, mask in rules:
if pattern in key.lower():
if isinstance(value, str):
return mask_value(value, mask)
return mask
# 递归处理嵌套结构
return mask_sensitive_info(value, rules)
```bash
### 实现计划
#### 第一阶段:券商适配器(优先级:高)
1. 实现 BrokerAdapter 基类
2. 实现 BrokerFactory 工厂类
3. 实现配置管理系统
4. 实现华泰/同花顺/银河适配器
#### 第二阶段:数据获取策略(优先级:中)
1. 实现 DataFetchStrategy 基类
2. 实现 Copy/File/API 策略
3. 实现 StrategyManager 管理器
#### 第三阶段:异常处理(优先级:高)
1. 实现自定义异常类型
2. 实现弹窗处理器
3. 实现登录状态管理
#### 第四阶段:性能监控(优先级:中)
1. 实现 perf_clock 装饰器
2. 实现性能监控器
3. 实现性能预警
#### 第五阶段:跟单功能(优先级:中)
1. 实现 FollowerBase 基类
2. 实现信号过滤器
3. 实现聚米/米筐跟单
#### 第六阶段:UI 自动化(优先级:低)
1. 实现弹窗处理器
2. 实现窗口管理器
3. 实现剪贴板操作
#### 第七阶段:日志系统(优先级:中)
1. 实现审计日志
2. 实现敏感信息脱敏
3. 实现日志查询和回放
### API 兼容性保证
所有新增功能保持与现有 backtrader API 的兼容性:
1. **向后兼容**: 现有代码无需修改即可运行
2. **可选启用**: 新功能通过选择使用
3. **渐进增强**: 用户可以选择使用新功能或保持原有方式
```python
# 示例:传统方式(保持不变)
import backtrader as bt
cerebro = bt.Cerebro()
cerebro.setbroker(bt.brokers.BackBroker())
result = cerebro.run()
# 示例:新方式(可选)
from backtrader.brokers.factory import use
# 创建券商适配器
broker = use('ht') # 华泰证券
broker.login('user', 'password')
# 使用适配器
cerebro = bt.Cerebro()
cerebro.set_broker(broker)
result = cerebro.run()
```bash
### 使用示例
- *券商适配器使用示例:**
```python
from backtrader.brokers.factory import use
# 创建华泰证券适配器
broker = use('ht')
broker.login('username', 'password')
# 查询资金
balance = broker.balance
print(f"总资产: {balance.total}, 可用: {balance.available}")
# 查询持仓
positions = broker.position
for symbol, pos in positions.items():
print(f"{symbol}: {pos.volume}股")
# 买入
result = broker.buy('600000', 10.0, 100)
print(f"订单 ID: {result.order_id}")
# 卖出
result = broker.sell('600000', 10.5, 100)
```bash
- *跟单功能使用示例:**
```python
from backtrader.follower.joinquant import JoinQuantFollower
from backtrader.brokers.factory import use
from backtrader.follower.filter import symbol_filter, amount_filter
# 创建券商适配器
broker = use('ht')
broker.login('user', 'password')
# 创建跟单器
follower = JoinQuantFollower(broker)
follower.login('jq_user', 'jq_password')
# 创建过滤器
signal_filter = symbol_filter(['600000', '000001'])
# 启动跟单
follower.follow(
users=['target_user'],
strategies=['target_strategy'],
track_interval=5,
signal_filter=signal_filter
)
```bash
- *性能监控使用示例:**
```python
from backtrader.monitor.perf import perf_clock, perf_monitor
class MyBroker:
@perf_clock
def buy(self, symbol, price, amount):
# 交易逻辑
pass
# 查看性能统计
stats = perf_monitor.get_all_stats()
for name, stat in stats.items():
print(f"{name}: 平均 {stat['avg']:.4f}s")
```bash
### 测试策略
1. **单元测试**: 每个新增模块的单元测试覆盖率 > 80%
2. **集成测试**: 与现有功能的集成测试
3. **模拟测试**: 使用模拟客户端测试 GUI 自动化
4. **性能测试**: 性能监控开销 < 5%
5. **兼容性测试**: 确保现有代码无需修改即可运行