背景

backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。

任务

  1. 阅读研究分析 backtrader 这个项目的源代码,了解这个项目。

  2. 阅读研究分析/Users/yunjinqi/Documents/量化交易框架/rqalpha

  3. 借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议

  4. 写需规文档和设计文档放到这个文档的最下面,方便后续借鉴

rqalpha 项目简介

rqalpha 是由 Ricequant 开发的 Python 量化回测框架,具有以下核心特点:

  • 模块化设计: 通过 Mod 机制实现功能扩展

  • 事件驱动: 基于事件的回测引擎

  • API 兼容: 与 Ricequant 平台 API 兼容

  • 多维度分析: 丰富的回测分析指标

  • 配置管理: 灵活的 YAML 配置系统

  • 命令行工具: 完善的 CLI 命令行工具

重点借鉴方向

  1. Mod 扩展机制: 插件式的模块扩展系统

  2. ExecutionContext: 执行上下文管理

  3. Environment: 全局环境变量管理

  4. 配置系统: 基于 YAML 的配置管理

  5. 事件总线: EventBus 事件通信机制

  6. 持仓管理: Position 和 Portfolio 管理


框架对比分析

架构设计对比

| 维度 | backtrader | rqalpha |

|——|———–|———|

| 核心引擎| Cerebro (集中式) | Environment (依赖注入) |

|扩展机制| 继承 + 组合 | Mod 插件系统 |

|配置管理| 代码级参数 | YAML 多层级配置 |

|事件通信| 回调方法 | EventBus 发布-订阅 |

|上下文管理| 隐式状态 | ExecutionContext 栈式管理 |

|持仓管理| Broker 内置 | 独立 Position/Portfolio 模块 |

|执行阶段| prenext/next/nextstart | 枚举定义的 EXECUTION_PHASE |

backtrader 的优势

1.简洁性: API 设计更直观,学习曲线平缓

  1. 性能: LineBuffer 高效内存管理,支持 runonce 向量化模式

  2. 灵活性: 支持多时间周期、多数据源的复杂策略

  3. 指标库: 内置 60+技术指标,覆盖全面

rqalpha 的优势

  1. 模块化: Mod 机制实现真正的插件化,核心功能可替换

  2. 配置驱动: 支持 YAML 配置,便于策略参数管理

  3. 阶段约束: ExecutionContext 确保 API 在正确阶段调用

  4. 事件解耦: EventBus 实现松耦合的组件通信

  5. 持仓精细化: 支持今昨仓分离、FIFO 队列管理


需求规格文档

需求 1: 插件化扩展系统 (Mod System)

  • 需求描述*:

引入类似于 rqalpha 的 Mod 插件机制,允许用户通过插件方式扩展或替换 backtrader 的核心功能组件,而无需修改核心代码。

  • 功能需求*:

  1. Mod 接口定义: 定义 AbstractMod 基类,包含 start_up 和 tear_down 生命周期方法

  2. Mod 加载器: 支持从配置文件动态加载插件

  3. 组件替换: 允许插件替换 Broker、DataFeed、Analyzer 等核心组件

  4. 优先级控制: 支持插件启动顺序控制

  5. 依赖管理: 支持插件间的依赖关系声明

  • 非功能需求*:

  • 向后兼容: 现有策略代码无需修改

  • 性能影响最小: 插件机制不应显著影响回测性能

  • 错误隔离: 插件异常不应导致核心框架崩溃

需求 2: 执行上下文管理 (ExecutionContext)

  • 需求描述*:

实现执行上下文管理机制,确保策略 API 在正确的执行阶段被调用,并提供更好的错误提示。

  • 功能需求*:

  1. 执行阶段枚举: 定义 INIT、BEFORE_TRADING、ON_BAR、AFTER_TRADING 等阶段

  2. 上下文栈: 支持嵌套上下文管理

  3. 阶段检查装饰器: 装饰器确保方法只在特定阶段可调用

  4. 友好错误提示: 当 API 在错误阶段调用时,给出清晰提示

  • 非功能需求*:

  • 性能开销小: 上下文检查不应显著影响执行速度

  • 可选功能: 允许用户禁用阶段检查以提高性能

需求 3: YAML 配置系统

  • 需求描述*:

支持通过 YAML 文件配置回测参数,包括数据源、策略参数、佣金设置等。

  • 功能需求*:

  1. 多层级配置: 默认配置 < 用户配置 < 项目配置 < 命令行配置

  2. 配置验证: 类型检查和范围验证

  3. 策略配置: 支持在策略文件中定义配置

  4. 环境变量: 支持环境变量替换

  • 非功能需求*:

  • 向后兼容: 保持现有代码配置方式

  • 配置简化: 常用配置应有合理的默认值

需求 4: 事件总线系统

  • 需求描述*:

实现发布-订阅模式的事件总线,用于组件间解耦通信。

  • 功能需求*:

  1. 事件定义: 预定义 ORDER_PENDING、ORDER_FILLED、TRADE 等事件类型

  2. 监听器注册: 支持同步/异步监听器

  3. 优先级支持: 支持监听器优先级排序

  4. 事件过滤: 支持条件过滤

  • 非功能需求*:

  • 性能优化: 事件分发应尽可能高效

  • 向后兼容: 现有 notify_*回调机制继续支持

需求 5: 增强持仓管理

  • 需求描述*:

改进持仓管理功能,支持更精细的持仓控制和更丰富的持仓信息。

  • 功能需求*:

  1. FIFO 队列: 支持先进先出的持仓平仓逻辑

  2. 今昨仓分离: 期货策略支持今仓昨仓分别管理

  3. 持仓详情: 记录每笔交易的开仓价、平仓价、盈亏

  4. 持仓代理: 统一管理多空持仓

  • 非功能需求*:

  • 向后兼容: 现有持仓查询 API 保持不变

  • 可选功能: 高级功能可按需启用


设计文档

1. Mod 扩展系统设计

1.1 核心类设计


# backtrader/mod.py

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Type
from collections import OrderedDict

class ModConfig:
    """Mod 配置类"""
    def __init__(self, name: str, enabled: bool = True, priority: int = 100,
                 dependencies: Optional[List[str]] = None):
        self.name = name
        self.enabled = enabled
        self.priority = priority
        self.dependencies = dependencies or []

class AbstractMod(ABC):
    """Mod 插件基类"""

    def __init__(self, config: ModConfig):
        self.config = config
        self.env = None

    @abstractmethod
    def start_up(self, env, mod_config: Dict):
        """插件启动时调用

        Args:
            env: Environment 实例
            mod_config: 插件配置字典
        """
        pass

    def tear_down(self, code: int = 0, exception: Optional[Exception] = None):
        """插件清理时调用

        Args:
            code: 退出码
            exception: 异常信息(如果有)
        """
        pass

class ModHandler:
    """Mod 插件管理器"""

    def __init__(self):
        self._env = None
        self._mod_configs: List[ModConfig] = []
        self._mod_instances: OrderedDict = OrderedDict()

    def set_env(self, env):
        """设置 Environment 实例"""
        self._env = env

    def load_from_config(self, config_dict: Dict):
        """从配置加载 Mod

        Args:
            config_dict: 配置字典,格式: {mod_name: {enabled, priority, config}}
        """
        for name, mod_conf in config_dict.items():
            if not mod_conf.get('enabled', True):
                continue
            self._mod_configs.append(ModConfig(
                name=name,
                enabled=mod_conf.get('enabled', True),
                priority=mod_conf.get('priority', 100),
                dependencies=mod_conf.get('dependencies', [])
            ))

    def start_up(self):
        """启动所有已加载的 Mod

        按优先级排序后启动,处理依赖关系
        """

# 拓扑排序处理依赖
        sorted_mods = self._topological_sort()

# 按优先级排序
        sorted_mods.sort(key=lambda x: x.priority, reverse=True)

        for mod_config in sorted_mods:
            mod_instance = self._import_and_create_mod(mod_config)
            if mod_instance:
                mod_config_dict = self._get_mod_config(mod_config.name)
                mod_instance.start_up(self._env, mod_config_dict)
                self._mod_instances[mod_config.name] = mod_instance

    def tear_down(self, code: int = 0, exception: Optional[Exception] = None):
        """清理所有 Mod,按启动逆序"""
        for mod_instance in reversed(self._mod_instances.values()):
            try:
                mod_instance.tear_down(code, exception)
            except Exception as e:

# 记录错误但继续清理其他插件
                pass

    def _import_and_create_mod(self, mod_config: ModConfig) -> Optional[AbstractMod]:
        """动态导入并创建 Mod 实例"""
        try:

# 支持系统模块和自定义模块
            import importlib
            if '.' in mod_config.name:

# 自定义模块: package.mod.ClassName
                module_path, class_name = mod_config.name.rsplit('.', 1)
                module = importlib.import_module(module_path)
                mod_class = getattr(module, class_name)
            else:

# 系统模块: backtrader.mods.{name}
                from backtrader.mods import import_mod
                mod_class = import_mod(mod_config.name)

            return mod_class(mod_config)
        except Exception as e:
            return None

    def _topological_sort(self) -> List[ModConfig]:
        """处理依赖关系的拓扑排序"""

# 实现依赖解析
        pass

```bash

#### 1.2 与 Cerebro 集成

```python

# backtrader/cerebro.py 添加

class Cerebro:
    def __init__(self):

# ... 现有代码 ...
        self._mod_handler = ModHandler()
        self._mod_handler.set_env(self)  # 或创建独立的 Environment

    def addmod(self, mod_name: str, **kwargs):
        """添加 Mod 插件"""
        mod_config = ModConfig(mod_name, **kwargs)
        self._mod_handler._mod_configs.append(mod_config)
        return self

    def run(self):

# 启动 Mod
        self._mod_handler.start_up()

        try:

# 现有 run 逻辑
            results = self._run()
        finally:

# 清理 Mod
            self._mod_handler.tear_down()

        return results

```bash

### 2. ExecutionContext 设计

#### 2.1 核心类设计

```python

# backtrader/execution_context.py

from enum import Enum
from contextlib import contextmanager
from functools import wraps
from typing import Optional, List, Callable

class ExecutionPhase(Enum):
    """执行阶段枚举"""
    ON_INIT = "on_init"              # 策略初始化
    BEFORE_TRADING = "before_trading"  # 交易前
    ON_BAR = "on_bar"                # K 线处理
    AFTER_TRADING = "after_trading"  # 交易后
    SCHEDULED = "scheduled"          # 定时任务
    FINALIZE = "finalize"            # 结束

class ExecutionContext:
    """执行上下文管理器"""

    _stack = []  # 类级别的上下文栈

    def __init__(self, phase: ExecutionPhase):
        self.phase = phase
        self._prev_context = None

    def __enter__(self):
        self._prev_context = self.current()
        ExecutionContext._stack.append(self)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if ExecutionContext._stack and ExecutionContext._stack[-1] is self:
            ExecutionContext._stack.pop()
        return False

    @classmethod
    def current(cls) -> Optional['ExecutionContext']:
        """获取当前上下文"""
        if cls._stack:
            return cls._stack[-1]
        return None

    @classmethod
    def get_phase(cls) -> ExecutionPhase:
        """获取当前执行阶段"""
        ctx = cls.current()
        return ctx.phase if ctx else None

    @classmethod
    def enforce_phase(cls, *allowed_phases: ExecutionPhase) -> Callable:
        """阶段检查装饰器

        确保被装饰的方法只在允许的阶段执行
        """
        def decorator(func: Callable) -> Callable:
            @wraps(func)
            def wrapper(*args, **kwargs):
                current_phase = cls.get_phase()
                if current_phase not in allowed_phases:
                    raise RuntimeError(
                        f"{func.__name__} can only be called in phases "
                        f"{[p.value for p in allowed_phases]}, "
                        f"but current phase is {current_phase.value if current_phase else 'None'}"
                    )
                return func(*args, **kwargs)
            return wrapper
        return decorator

# 使用示例

@ExecutionContext.enforce_phase(ExecutionPhase.ON_BAR, ExecutionPhase.SCHEDULED)
def order_target_percent(symbol, percent):
    """只在 ON_BAR 或 SCHEDULED 阶段允许下单"""
    pass

```bash

#### 2.2 与 Cerebro 集成

```python

# backtrader/cerebro.py 修改

def _run(self):

# 初始化阶段
    with ExecutionContext(ExecutionPhase.ON_INIT):
        self._stratrunstart()

# 主循环
    while not runstop:

# 交易前
        with ExecutionContext(ExecutionPhase.BEFORE_TRADING):
            self._pre_trading()

# K 线处理
        with ExecutionContext(ExecutionPhase.ON_BAR):
            self._next()

# 交易后
        with ExecutionContext(ExecutionPhase.AFTER_TRADING):
            self._post_trading()

# 结束
    with ExecutionContext(ExecutionPhase.FINALIZE):
        self._stratstop()

```bash

### 3. YAML 配置系统设计

#### 3.1 配置结构

```yaml

# config.yml 示例

base:
  start_date: 2020-01-01
  end_date: 2023-12-31
  cash: 1000000
  commission: 0.001

data:

  - name: stock_data

    type: csv
    path: ./data/stock.csv
    timeframe: day

  - name: future_data

    type: pandas
    symbol: RB2305

strategy:
  name: MyStrategy
  params:
    period: 20
    threshold: 0.02

mods:
  sys_risk:
    enabled: true
    priority: 100
    config:
      max_drawdown: 0.2

  sys_analyzer:
    enabled: true
    config:
      metrics: [sharpe, max_drawdown, annual_return]

```bash

#### 3.2 配置加载器

```python

# backtrader/config.py

import yaml
from pathlib import Path
from typing import Dict, Any, Optional
from copy import deepcopy

class Config:
    """配置管理器"""

    DEFAULT_CONFIG = {
        'base': {
            'start_date': None,
            'end_date': None,
            'cash': 10000,
            'commission': 0.0,
        },
        'data': [],
        'strategy': {
            'name': None,
            'params': {},
        },
        'mods': {},
    }

    def __init__(self):
        self._config = deepcopy(self.DEFAULT_CONFIG)

    def load_from_yaml(self, path: str):
        """从 YAML 文件加载配置"""
        config_path = Path(path)
        if not config_path.exists():
            raise FileNotFoundError(f"Config file not found: {path}")

        with open(config_path, 'r', encoding='utf-8') as f:
            user_config = yaml.safe_load(f) or {}

        self._deep_update(user_config, self._config)

    def load_from_dict(self, config: Dict[str, Any]):
        """从字典加载配置"""
        self._deep_update(config, self._config)

    def get(self, key: str, default: Any = None) -> Any:
        """获取配置值,支持点分隔路径"""
        keys = key.split('.')
        value = self._config
        for k in keys:
            if isinstance(value, dict):
                value = value.get(k)
            else:
                return default
        return value if value is not None else default

    def _deep_update(self, source: Dict, target: Dict):
        """递归更新字典"""
        for key, value in source.items():
            if key in target and isinstance(target[key], dict) and isinstance(value, dict):
                self._deep_update(value, target[key])
            else:
                target[key] = value

# 使用

def run_from_config(config_path: str):
    config = Config()
    config.load_from_yaml(config_path)

    cerebro = bt.Cerebro()
    cerebro.broker.setcash(config.get('base.cash'))
    cerebro.broker.setcommission(config.get('base.commission'))

# 加载数据
    for data_config in config.get('data', []):
        data = load_data(data_config)
        cerebro.adddata(data)

# 添加策略
    strategy_config = config.get('strategy')
    StrategyClass = get_strategy(strategy_config['name'])
    cerebro.addstrategy(StrategyClass, **strategy_config.get('params', {}))

    return cerebro.run()

```bash

### 4. EventBus 设计

#### 4.1 核心类设计

```python

# backtrader/events.py

from enum import Enum
from typing import Callable, Dict, List, Any, Optional
from collections import defaultdict
from dataclasses import dataclass

class EventType(Enum):
    """事件类型枚举"""

# 系统事件
    PRE_SYSTEM_INIT = "pre_system_init"
    POST_SYSTEM_INIT = "post_system_init"
    PRE_BAR = "pre_bar"
    BAR = "bar"
    POST_BAR = "post_bar"
    FINALIZE = "finalize"

# 订单事件
    ORDER_PENDING = "order_pending"
    ORDER_ACCEPTED = "order_accepted"
    ORDER_REJECTED = "order_rejected"
    ORDER_COMPLETED = "order_completed"
    ORDER_CANCELED = "order_canceled"

# 成交事件
    TRADE_OPENED = "trade_opened"
    TRADE_CLOSED = "trade_closed"

@dataclass
class Event:
    """事件对象"""
    type: EventType
    data: Dict[str, Any]

    def __getitem__(self, key: str) -> Any:
        return self.data.get(key)

class EventBus:
    """事件总线"""

    def __init__(self):
        self._listeners: Dict[EventType, List[Callable]] = defaultdict(list)
        self._system_listeners: Dict[EventType, List[Callable]] = defaultdict(list)

    def subscribe(self, event_type: EventType, listener: Callable,
                 priority: int = 100, is_system: bool = False):
        """订阅事件

        Args:
            event_type: 事件类型
            listener: 监听器函数
            priority: 优先级,越大越先执行
            is_system: 是否为系统监听器
        """
        listener_dict = self._system_listeners if is_system else self._listeners
        listener_dict[event_type].append((priority, listener))

# 按优先级排序
        listener_dict[event_type].sort(key=lambda x: x[0], reverse=True)

    def unsubscribe(self, event_type: EventType, listener: Callable):
        """取消订阅"""
        self._listeners[event_type] = [
            (p, l) for p, l in self._listeners[event_type] if l != listener
        ]
        self._system_listeners[event_type] = [
            (p, l) for p, l in self._system_listeners[event_type] if l != listener
        ]

    def publish(self, event: Event):
        """发布事件

        先执行系统监听器,支持返回 True 中断传播
        再执行用户监听器
        """

# 系统监听器
        for priority, listener in self._system_listeners[event.type]:
            try:
                result = listener(event)
                if result is True:  # 中断传播
                    return
            except Exception as e:
                pass

# 用户监听器
        for priority, listener in self._listeners[event.type]:
            try:
                listener(event)
            except Exception as e:
                pass

    def clear(self):
        """清空所有监听器"""
        self._listeners.clear()
        self._system_listeners.clear()

```bash

#### 4.2 与 Cerebro 集成

```python

# backtrader/cerebro.py 添加

class Cerebro:
    def __init__(self):

# ... 现有代码 ...
        self._event_bus = EventBus()

    @property
    def event_bus(self) -> EventBus:
        return self._event_bus

    def _run(self):

# 发布系统初始化事件
        self._event_bus.publish(Event(EventType.POST_SYSTEM_INIT, {'cerebro': self}))

# 在主循环中发布 BAR 事件
        while not runstop:
            self._event_bus.publish(Event(EventType.PRE_BAR, {'datetime': dt}))
            self._next()
            self._event_bus.publish(Event(EventType.POST_BAR, {'datetime': dt}))

# 在 Broker 中发布订单事件

class BrokerBack(BrokerBase):
    def accept(self, order):

# 接受订单
        self._cerebro.event_bus.publish(
            Event(EventType.ORDER_ACCEPTED, {'order': order})
        )

    def next(self):

# 检查订单状态变化并发布事件
        if order.iscompleted():
            self._cerebro.event_bus.publish(
                Event(EventType.ORDER_COMPLETED, {'order': order})
            )

```bash

### 5. 增强持仓管理设计

#### 5.1 核心类设计

```python

# backtrader/position.py

from collections import deque
from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field

@dataclass
class PositionItem:
    """持仓项,记录单次开仓信息"""
    entry_date: datetime
    entry_price: float
    quantity: float
    commission: float = 0.0

    @property
    def value(self) -> float:
        """持仓价值"""
        return self.entry_price *self.quantity

class PositionQueue:
    """FIFO 持仓队列"""

    def __init__(self):
        self._queue: deque[PositionItem] = deque()
        self._total_quantity = 0.0

    def add_position(self, item: PositionItem):
        """添加开仓"""
        self._queue.append(item)
        self._total_quantity += item.quantity

    def reduce_position(self, quantity: float, exit_price: float,
                        exit_date: datetime) -> List[Dict[str, Any]]:
        """平仓,按 FIFO 顺序

        Returns:
            已平仓的交易列表
        """
        closed_trades = []
        remaining = abs(quantity)

        while remaining > 0 and self._queue:
            item = self._queue[0]

            if item.quantity <= remaining:

# 完全平掉这个持仓项
                remaining -= item.quantity
                self._queue.popleft()
                self._total_quantity -= item.quantity

                closed_trades.append({
                    'entry_date': item.entry_date,
                    'exit_date': exit_date,
                    'entry_price': item.entry_price,
                    'exit_price': exit_price,
                    'quantity': item.quantity,
                    'commission': item.commission,
                    'pnl': (exit_price - item.entry_price)*item.quantity - item.commission,
                })
            else:

# 部分平仓
                item.quantity -= remaining
                self._total_quantity -= remaining

                closed_trades.append({
                    'entry_date': item.entry_date,
                    'exit_date': exit_date,
                    'entry_price': item.entry_price,
                    'exit_price': exit_price,
                    'quantity': remaining,
                    'commission': item.commission*(remaining / (remaining + item.quantity)),
                    'pnl': (exit_price - item.entry_price)*remaining,
                })
                remaining = 0

        return closed_trades

    @property
    def quantity(self) -> float:
        return self._total_quantity

    @property
    def items(self) -> List[PositionItem]:
        return list(self._queue)

    @property
    def avg_price(self) -> float:
        """平均持仓成本"""
        if not self._queue or self._total_quantity == 0:
            return 0.0
        total_value = sum(item.value for item in self._queue)
        return total_value / self._total_quantity

class EnhancedPosition:
    """增强的持仓管理类"""

    def __init__(self):
        self._long_queue = PositionQueue()  # 多头持仓
        self._short_queue = PositionQueue()  # 空头持仓
        self._last_price = 0.0

    def apply_trade(self, quantity: float, price: float, date: datetime,
                    commission: float = 0.0):
        """应用成交"""
        if quantity > 0:

# 开多或平空
            if self._short_queue.quantity > 0:

# 先平空
                close_qty = min(quantity, self._short_queue.quantity)
                closed_trades = self._short_queue.reduce_position(

                    - close_qty, price, date

                )

# 剩余开多
                remaining = quantity - close_qty
                if remaining > 0:
                    self._long_queue.add_position(
                        PositionItem(date, price, remaining, commission)
                    )
            else:

# 直接开多
                self._long_queue.add_position(
                    PositionItem(date, price, quantity, commission)
                )
        else:

# 开空或平多
            abs_qty = abs(quantity)
            if self._long_queue.quantity > 0:

# 先平多
                close_qty = min(abs_qty, self._long_queue.quantity)
                closed_trades = self._long_queue.reduce_position(

                    - close_qty, price, date

                )

# 剩余开空
                remaining = abs_qty - close_qty
                if remaining > 0:
                    self._short_queue.add_position(
                        PositionItem(date, price, remaining, commission)
                    )
            else:

# 直接开空
                self._short_queue.add_position(
                    PositionItem(date, price, abs_qty, commission)
                )

        self._last_price = price
        return closed_trades

    @property
    def net_quantity(self) -> float:
        """净持仓"""
        return self._long_queue.quantity - self._short_queue.quantity

    @property
    def market_value(self) -> float:
        """市值"""
        if self._last_price == 0:
            return 0.0
        return self.net_quantity*self._last_price

    @property
    def unrealized_pnl(self) -> float:
        """浮动盈亏"""
        long_pnl = (self._last_price - self._long_queue.avg_price)*self._long_queue.quantity
        short_pnl = (self._short_queue.avg_price - self._last_price)*self._short_queue.quantity
        return long_pnl + short_pnl

    @property
    def realized_pnl(self) -> float:
        """已实现盈亏"""

# 需要跟踪历史平仓记录
        pass

    @property
    def long_avg_price(self) -> float:
        return self._long_queue.avg_price

    @property
    def short_avg_price(self) -> float:
        return self._short_queue.avg_price

    @property
    def closable_long(self) -> float:
        """可平多头数量"""
        return self._long_queue.quantity

    @property
    def closable_short(self) -> float:
        """可平空头数量"""
        return self._short_queue.quantity

    def get_position_detail(self) -> Dict[str, Any]:
        """获取持仓详情"""
        return {
            'net_quantity': self.net_quantity,
            'long_quantity': self._long_queue.quantity,
            'short_quantity': self._short_queue.quantity,
            'long_avg_price': self.long_avg_price,
            'short_avg_price': self.short_avg_price,
            'last_price': self._last_price,
            'market_value': self.market_value,
            'unrealized_pnl': self.unrealized_pnl,
            'long_items': len(self._long_queue.items),
            'short_items': len(self._short_queue.items),
        }

```bash

#### 5.2 期货今昨仓管理

```python

# backtrader/position_future.py

class FuturePositionQueue(PositionQueue):
    """期货持仓队列,支持今昨仓分离"""

    def __init__(self):
        super().__init__()
        self._today_queue: deque[PositionItem] = deque()  # 今仓
        self._yesterday_queue: deque[PositionItem] = deque()  # 昨仓

    def add_position(self, item: PositionItem, is_today: bool = True):
        """添加持仓"""
        if is_today:
            self._today_queue.append(item)
        else:
            self._yesterday_queue.append(item)
        self._total_quantity += item.quantity

    def reduce_position(self, quantity: float, exit_price: float,
                        exit_date: datetime, close_today: bool = False) -> List[Dict]:
        """平仓

        Args:
            quantity: 平仓数量
            exit_price: 平仓价格
            exit_date: 平仓日期
            close_today: 是否优先平今仓 (期货交易所规则)
        """
        closed_trades = []
        remaining = abs(quantity)

# 根据 close_today 决定平仓顺序
        if close_today:
            queues = [self._today_queue, self._yesterday_queue]
        else:
            queues = [self._yesterday_queue, self._today_queue]

        for queue in queues:
            while remaining > 0 and queue:
                item = queue[0]

                if item.quantity <= remaining:
                    remaining -= item.quantity
                    queue.popleft()
                    self._total_quantity -= item.quantity

                    closed_trades.append({
                        'entry_date': item.entry_date,
                        'exit_date': exit_date,
                        'entry_price': item.entry_price,
                        'exit_price': exit_price,
                        'quantity': item.quantity,
                        'is_today': queue is self._today_queue,
                        'pnl': (exit_price - item.entry_price)*item.quantity,
                    })
                else:
                    item.quantity -= remaining
                    self._total_quantity -= remaining

                    closed_trades.append({
                        'entry_date': item.entry_date,
                        'exit_date': exit_date,
                        'entry_price': item.entry_price,
                        'exit_price': exit_price,
                        'quantity': remaining,
                        'is_today': queue is self._today_queue,
                        'pnl': (exit_price - item.entry_price)* remaining,
                    })
                    remaining = 0

        return closed_trades

    @property
    def today_quantity(self) -> float:
        return sum(item.quantity for item in self._today_queue)

    @property
    def yesterday_quantity(self) -> float:
        return sum(item.quantity for item in self._yesterday_queue)

```bash

### 6. 实施计划

#### 6.1 实施优先级

1. **高优先级**(第一阶段)
   - YAML 配置系统 - 立即可用提升用户体验
   - EventBus 基础实现 - 为其他功能提供基础设施

2.**中优先级**(第二阶段)

   - ExecutionContext - 增强错误提示
   - Mod 扩展系统 - 实现插件化

3.**可选优先级** (第三阶段)

   - 增强持仓管理 - 高级用户需求
   - 今昨仓管理 - 期货策略专用

#### 6.2 向后兼容性保证

所有新功能都是**可选的**现有代码无需修改即可继续使用

```python

# 现有用法继续支持

cerebro = bt.Cerebro()
cerebro.addstrategy(MyStrategy, period=20)
cerebro.run()

# 新用法

cerebro = bt.Cerebro()
cerebro.load_config('config.yml')  # 可选

cerebro.addmod(RiskMod)  # 可选

cerebro.run()

```bash

#### 6.3 目录结构

```bash
backtrader/
├── __init__.py
├── cerebro.py          # 核心引擎 (修改)

├── events.py           # 新增: EventBus

├── execution_context.py # 新增: ExecutionContext

├── config.py           # 新增: 配置系统

├── mod.py              # 新增: Mod 系统

├── position.py         # 新增: 增强持仓管理

├── position_future.py  # 新增: 期货持仓

└── mods/               # 新增: 内置 Mod 目录
    ├── __init__.py
    ├── risk.py         # 风控 Mod
    ├── analyzer.py     # 分析器 Mod
    └── ...

```bash

- --

## 总结

通过借鉴 rqalpha 的设计思想backtrader 可以在保持简洁性的同时获得以下改进

1. **更好的可扩展性**: Mod 插件系统使功能扩展更灵活
2. **更清晰的配置管理**: YAML 配置降低使用门槛
3. **更强的类型安全**: ExecutionContext 确保 API 正确使用
4. **更松的耦合**: EventBus 实现组件间解耦
5. **更精细的持仓控制**: 支持 FIFO 和今昨仓管理

这些改进都是**向后兼容**用户可以按需使用新功能不影响现有策略代码