背景

backtrader 已经比较完善了,我想要借鉴量化投资框架中其他项目的优势,继续改进优化 backtrader。

任务

  1. 阅读研究分析 backtrader 这个项目的源代码,了解这个项目。

  2. 阅读研究分析/Users/yunjinqi/Documents/量化交易框架/quant-strategies

  3. 借鉴这个新项目的优点和功能,给 backtrader 优化改进提供新的建议

  4. 写需规文档和设计文档放到这个文档的最下面,方便后续借鉴

quant-strategies 项目简介

quant-strategies 是量化交易策略合集,具有以下核心特点:

  • 策略合集: 多种量化策略

  • 学术策略: 学术论文策略实现

  • 因子策略: 因子投资策略

  • 动量策略: 动量类策略

  • 均值回归: 均值回归策略

  • 机器学习: ML 策略

重点借鉴方向

  1. 策略分类: 策略分类体系

  2. 学术实现: 学术策略实现

  3. 因子构建: 因子构建方法

  4. 回测框架: 回测框架设计

  5. 策略评价: 策略评价方法

  6. 代码组织: 代码组织方式


项目对比分析

Backtrader vs Quant-Strategies

| 维度 | Backtrader | Quant-Strategies |

|——|———–|——————|

| 核心定位| 通用回测框架 | 策略合集+回测框架 |

|策略组织| 用户自行管理 | 策略工厂模式统一管理 |

|策略分类| 无内置分类 | 技术/情绪/对冲/ML 分类 |

|学术策略| 需自行实现 | 部分学术策略已实现 |

|情绪指标| 无内置 | 市场情绪策略 |

|对冲系统| 基础支持 | 多账户对冲系统 |

|风险管理| 基础止损 | ATR/动态回撤/波动率自适应 |

|回测引擎| Cerebro | 封装的 BacktestEngine |

|评价体系| Analyzer 分散 | 统一评价报告 |

|强化学习| 无 | FinRL 框架集成 |

Backtrader 可借鉴的优势

1.策略工厂模式:统一管理和注册策略

  1. 策略分类体系:按类型组织策略

  2. 市场情绪策略:多维度情绪指标构建

  3. 多账户对冲:ETF+期货对冲系统

  4. 增强的风险管理:ATR 自适应、动态止损

  5. 统一评价报告:完整的策略绩效报告

  6. 强化学习集成:RL 策略框架


功能需求文档

FR-01 策略工厂 [高优先级]

  • 描述*: 建立策略注册和管理机制

  • 需求*:

  • FR-01.1 定义 StrategyRegistry 注册中心

  • FR-01.2 策略自动发现和注册

  • FR-01.3 策略元数据管理(名称、描述、分类)

  • FR-01.4 策略参数元信息获取

  • FR-01.5 策略实例化和配置

  • 验收标准*:

  • 通过策略名称获取策略类

  • 支持按分类查询策略

  • 自动提取策略参数信息

FR-02 策略分类体系 [中优先级]

  • 描述*: 建立策略分类和组织系统

  • 需求*:

  • FR-02.1 定义策略分类枚举

  • FR-02.2 策略标签系统

  • FR-02.3 按分类浏览策略

  • FR-02.4 策略搜索功能

  • 验收标准*:

  • 支持 5+种策略分类

  • 可按分类和标签筛选

  • 搜索结果准确

FR-03 市场情绪策略 [高优先级]

  • 描述*: 实现市场情绪驱动的交易策略

  • 需求*:

  • FR-03.1 情绪指标计算(涨跌比、换手率、波动率)

  • FR-03.2 多维度情绪合成

  • FR-03.3 动态情绪阈值

  • FR-03.4 情绪状态识别(恐慌/贪婪/中性)

  • FR-03.5 情绪驱动的仓位管理

  • 验收标准*:

  • 支持 5+种情绪指标

  • 情绪状态识别准确率>60%

  • 支持历史情绪分析

FR-04 双动量策略 [中优先级]

  • 描述*: 实现绝对动量和相对动量策略

  • 需求*:

  • FR-04.1 绝对动量计算

  • FR-04.2 相对动量计算

  • FR-04.3 动量合成得分

  • FR-04.4 动量轮动逻辑

  • FR-04.5 动量失效检测

  • 验收标准*:

  • 支持多时间周期动量

  • 动量轮动信号准确

  • 支持多资产轮动

FR-05 多账户对冲系统 [中优先级]

  • 描述*: 实现多账户协同和对冲功能

  • 需求*:

  • FR-05.1 多账户管理

  • FR-05.2 账户间资金分配

  • FR-05.3 对冲比例计算

  • FR-05.4 对冲信号触发

  • FR-05.5 对冲效果评估

  • 验收标准*:

  • 支持 2+个独立账户

  • 对冲比例自动计算

  • 对冲效果可量化

FR-06 增强风险管理 [高优先级]

  • 描述*: 实现动态风险管理系统

  • 需求*:

  • FR-06.1 ATR 自适应止损

  • FR-06.2 动态回撤控制

  • FR-06.3 波动率自适应仓位

  • FR-06.4 跟踪止损

  • FR-06.5 风险预算管理

  • 验收标准*:

  • 支持 5+种风险控制方法

  • 止损触发准确

  • 仓位调整及时

FR-07 策略评价报告 [高优先级]

  • 描述*: 生成统一的策略绩效报告

  • 需求*:

  • FR-07.1 收益指标(总收益、年化收益)

  • FR-07.2 风险指标(夏普、最大回撤、波动率)

  • FR-07.3 交易指标(胜率、盈亏比、SQN)

  • FR-07.4 图表生成(净值曲线、回撤图)

  • FR-07.5 报告导出(HTML/PDF/Excel)

  • 验收标准*:

  • 包含 15+项评价指标

  • 图表美观清晰

  • 支持多格式导出

FR-08 强化学习策略框架 [低优先级]

  • 描述*: 集成强化学习交易策略

  • 需求*:

  • FR-08.1 状态空间构建

  • FR-08.2 动作空间定义

  • FR-08.3 奖励函数设计

  • FR-08.4 模型训练接口

  • FR-08.5 模型推理集成

  • 验收标准*:

  • 支持 2+种 RL 算法

  • 状态特征可配置

  • 训练结果可复现

FR-09 情绪指标库 [中优先级]

  • 描述*: 建立市场情绪指标计算库

  • 需求*:

  • FR-09.1 涨跌比指标(ADR)

  • FR-09.2 涨停跌停比

  • FR-09.3 新高新低比

  • FR-09.4 市场广度指标

  • FR-09.5 投资者恐慌指数(VIX 风格)

  • 验收标准*:

  • 支持 10+种情绪指标

  • 计算性能优化

  • 支持实时计算

FR-10 回测引擎封装 [高优先级]

  • 描述*: 封装简化版的回测引擎

  • 需求*:

  • FR-10.1 一键回测接口

  • FR-10.2 参数配置简化

  • FR-10.3 数据源自动适配

  • FR-10.4 结果自动分析

  • FR-10.5 批量回测支持

  • 验收标准*:

  • 单次回测<5 行代码

  • 支持 100+参数组合批量回测

  • 结果格式统一


设计文档

1. 策略工厂设计

1.1 策略注册中心

from typing import Dict, Type, List, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import backtrader as bt

class StrategyCategory(Enum):
    """策略分类"""
    TREND_FOLLOWING = "趋势跟踪"      # 趋势跟踪类策略
    MEAN_REVERSION = "均值回归"      # 均值回归类策略
    MOMENTUM = "动量策略"            # 动量类策略
    SENTIMENT = "情绪驱动"           # 情绪驱动策略
    HEDGING = "对冲策略"             # 对冲策略
    ML = "机器学习"                  # 机器学习策略
    ARBITRAGE = "套利策略"           # 套利策略
    MARKET_MAKING = "做市策略"       # 做市策略

@dataclass
class StrategyMetadata:
    """策略元数据"""
    name: str                          # 策略名称
    description: str                   # 策略描述
    category: StrategyCategory         # 策略分类
    author: str = ""                   # 作者
    version: str = "1.0.0"             # 版本
    tags: List[str] = field(default_factory=list)  # 标签
    parameters: Dict[str, Any] = field(default_factory=dict)  # 参数说明

class StrategyRegistry:
    """策略注册中心"""

    _strategies: Dict[str, Type[bt.Strategy]] = {}
    _metadata: Dict[str, StrategyMetadata] = {}

    @classmethod
    def register(cls, strategy_class: Type[bt.Strategy],
                metadata: StrategyMetadata):
        """注册策略"""
        cls._strategies[metadata.name] = strategy_class
        cls._metadata[metadata.name] = metadata

    @classmethod
    def get(cls, name: str) -> Optional[Type[bt.Strategy]]:
        """获取策略类"""
        return cls._strategies.get(name)

    @classmethod
    def get_metadata(cls, name: str) -> Optional[StrategyMetadata]:
        """获取策略元数据"""
        return cls._metadata.get(name)

    @classmethod
    def list_strategies(cls, category: StrategyCategory = None) -> List[str]:
        """列出策略"""
        if category is None:
            return list(cls._strategies.keys())
        return [
            name for name, meta in cls._metadata.items()
            if meta.category == category
        ]

    @classmethod
    def get_parameters(cls, name: str) -> Dict[str, Any]:
        """获取策略参数"""
        strategy_class = cls.get(name)
        if strategy_class is None:
            return {}

        params = getattr(strategy_class, 'params', ())
        return {
            name: {
                'default': value,
                'doc': f'Parameter {name}'
            }
            for name, value in params._getpairs()
        }

# 装饰器注册

def register_strategy(metadata: StrategyMetadata):
    """策略注册装饰器"""
    def decorator(cls: Type[bt.Strategy]):
        StrategyRegistry.register(cls, metadata)
        return cls
    return decorator

```bash

#### 1.2 策略工厂

```python
class StrategyFactory:
    """策略工厂"""

    def __init__(self, registry: StrategyRegistry = None):
        self.registry = registry or StrategyRegistry

    def create_strategy(self, name: str, **kwargs) -> bt.Strategy:
        """创建策略实例"""
        strategy_class = self.registry.get(name)
        if strategy_class is None:
            raise ValueError(f"Strategy {name} not found")

# 验证参数
        params = self.registry.get_parameters(name)
        for key in kwargs:
            if key not in params:
                raise ValueError(f"Unknown parameter: {key}")

        return strategy_class(**kwargs)

    def create_strategy_with_config(self, name: str,
                                   config: Dict[str, Any]) -> bt.Strategy:
        """从配置创建策略"""
        return self.create_strategy(name, **config)

    def get_strategy_info(self, name: str) -> Dict[str, Any]:
        """获取策略完整信息"""
        metadata = self.registry.get_metadata(name)
        parameters = self.registry.get_parameters(name)

        return {
            'name': metadata.name if metadata else name,
            'description': metadata.description if metadata else '',
            'category': metadata.category.value if metadata else '',
            'parameters': parameters
        }

    def search_strategies(self, keyword: str) -> List[str]:
        """搜索策略"""
        results = []
        keyword = keyword.lower()

        for name, meta in self.registry._metadata.items():
            if (keyword in name.lower() or
                keyword in meta.description.lower() or
                any(keyword in tag.lower() for tag in meta.tags)):
                results.append(name)

        return results

```bash

### 2. 市场情绪策略设计

#### 2.1 情绪指标计算器

```python
import pandas as pd
import numpy as np
from typing import Dict, List, Optional

class SentimentCalculator:
    """市场情绪指标计算器"""

    @staticmethod
    def advance_decline_ratio(up_count: int, down_count: int) -> float:
        """
        计算涨跌比(ADR)

        Args:
            up_count: 上涨股票数量
            down_count: 下跌股票数量

        Returns:
            涨跌比值
        """
        if down_count == 0:
            return up_count if up_count > 0 else 1.0
        return up_count / down_count

    @staticmethod
    def limit_up_down_ratio(limit_up: int, limit_down: int) -> float:
        """计算涨跌停比"""
        if limit_down == 0:
            return limit_up if limit_up > 0 else 1.0
        return limit_up / limit_down

    @staticmethod
    def new_high_low_ratio(new_high: int, new_low: int) -> float:
        """计算新高新低比"""
        if new_low == 0:
            return new_high if new_high > 0 else 1.0
        return new_high / new_low

    @staticmethod
    def market_breadth(participation_rate: float) -> float:
        """
        市场广度指标

        Args:
            participation_rate: 参与度(上涨股票数/总股票数)

        Returns:
            市场广度值
        """
        return participation_rate *100

    @staticmethod
    def volatility_sentiment(volatility: float,
                            vol_ma: float,
                            vol_threshold: float = 25.0) -> str:
        """
        基于波动率的情绪状态

        Args:
            volatility: 当前波动率
            vol_ma: 波动率均值
            vol_threshold: 高波动阈值

        Returns:
            情绪状态: 'extreme_fear', 'fear', 'neutral', 'greed', 'extreme_greed'
        """
        if volatility > vol_threshold:
            return 'extreme_fear'
        elif volatility > vol_ma*1.5:
            return 'fear'
        elif volatility < vol_ma*0.5:
            return 'extreme_greed'
        elif volatility < vol_ma*0.8:
            return 'greed'
        else:
            return 'neutral'

    @staticmethod
    def composite_sentiment(indicators: Dict[str, float],
                           weights: Dict[str, float] = None) -> float:
        """
        综合情绪指数

        Args:
            indicators: 各情绪指标值
            weights: 指标权重

        Returns:
            综合情绪指数 (0-100, 50 为中性)
        """
        if weights is None:
            weights = {
                'adr': 0.3,
                'limit_ratio': 0.2,
                'new_high_low': 0.2,
                'breadth': 0.15,
                'volatility': 0.15
            }

# 归一化处理
        normalized = {}
        if 'adr' in indicators:

# ADR > 3 为极度贪婪
            normalized['adr'] = min(indicators['adr'] / 3.0, 1.0)*100
        if 'limit_ratio' in indicators:

# 涨停比 > 2 为极度贪婪
            normalized['limit_ratio'] = min(indicators['limit_ratio'] / 2.0, 1.0)*100
        if 'new_high_low' in indicators:

# 新高新低比 > 3 为极度贪婪
            normalized['new_high_low'] = min(indicators['new_high_low'] / 3.0, 1.0)*100
        if 'breadth' in indicators:

# 直接使用百分比
            normalized['breadth'] = indicators['breadth']
        if 'volatility' in indicators:

# 波动率反向(低波动=贪婪)
            normalized['volatility'] = max(100 - indicators['volatility'], 0)

# 加权合成
        composite = sum(normalized.get(k, 50)*w
                       for k, w in weights.items())

        return composite

class MarketSentimentIndicator(bt.Indicator):
    """市场情绪技术指标"""

    lines = ('sentiment', 'adr', 'limit_ratio', 'breadth')
    params = (
        ('adr_window', 5),
        ('sentiment_threshold_low', 30),
        ('sentiment_threshold_high', 70),
    )

    plotinfo = dict(subplot=True)

    def __init__(self):

# 计算各子指标
        self.calculator = SentimentCalculator()

# 这里简化处理,实际需要从市场数据计算

# 使用价格动量作为情绪的代理指标
        momentum = bt.indicators.ROC(self.data.close, period=self.p.adr_window)

# 将动量转换为情绪值 (0-100)
        self.lines.sentiment = 50 + momentum / 2

    def next(self):

# 情绪状态
        if self.lines.sentiment[0] < self.p.sentiment_threshold_low:
            self.state = 'fear'
        elif self.lines.sentiment[0] > self.p.sentiment_threshold_high:
            self.state = 'greed'
        else:
            self.state = 'neutral'

```bash

#### 2.2 市场情绪策略

```python
@register_strategy(StrategyMetadata(
    name="市场情绪策略",
    description="基于市场情绪指标的动态交易策略",
    category=StrategyCategory.SENTIMENT,
    tags=["情绪", "自适应", "多因子"],
    parameters={
        'sentiment_core': {'default': 40.0, 'doc': '核心情绪阈值'},
        'sentiment_secondary': {'default': 60.0, 'doc': '次级情绪阈值'},
        'momentum_short': {'default': 10, 'doc': '短期动量周期'},
        'momentum_long': {'default': 60, 'doc': '长期动量周期'},
        'vol_threshold': {'default': 20.0, 'doc': '高波动阈值'},
    }
))
class MarketSentimentStrategy(bt.Strategy):
    """市场情绪策略"""

    params = (

# 情绪参数
        ('sentiment_core', 40.0),       # 核心情绪阈值
        ('sentiment_secondary', 60.0),  # 次级情绪阈值

# 动量参数
        ('momentum_short', 10),
        ('momentum_long', 60),

# 波动率参数
        ('vol_window', 20),
        ('vol_threshold', 20.0),
        ('garch_vol_threshold', 25.0),

# 交易参数
        ('position_size', 0.95),
        ('risk_per_trade', 0.02),
        ('trail_percent', 2.0),
    )

    def __init__(self):

# 情绪指标
        self.sentiment = MarketSentimentIndicator(self.data)

# 动量指标
        self.mom_short = bt.indicators.Momentum(
            self.data.close, period=self.p.momentum_short
        )
        self.mom_long = bt.indicators.Momentum(
            self.data.close, period=self.p.momentum_long
        )

# 波动率指标
        self.volatility = bt.indicators.StandardDeviation(
            self.data.close, period=self.p.vol_window
        )

# 趋势确认
        self.ema_short = bt.indicators.EMA(self.data, period=20)
        self.ema_long = bt.indicators.EMA(self.data, period=60)
        self.trend = self.ema_short - self.ema_long

# ATR 止损
        self.atr = bt.indicators.ATR(self.data, period=14)

# 订单追踪
        self.order = None
        self.entry_price = None
        self.stop_price = None

    def next(self):
        """主交易逻辑"""

# 等待未完成订单
        if self.order:
            return

# 没有持仓时寻找入场机会
        if not self.position:
            self._check_entry()
        else:
            self._check_exit()

    def _check_entry(self):
        """检查入场条件"""
        sentiment = self.sentiment.lines.sentiment[0]
        vol = self.volatility.lines.volatility[0]

# 恐慌情绪 + 趋势向上 + 动量转正
        if (sentiment < self.p.sentiment_core and
            self.trend[0] > 0 and
            self.mom_short[0] > 0):

            self._enter_long()

# 极度贪婪 + 趋势向下 + 动量转负
        elif (sentiment > self.p.sentiment_secondary and
              self.trend[0] < 0 and
              self.mom_short[0] < 0):

            self._enter_short()

    def _check_exit(self):
        """检查出场条件"""
        sentiment = self.sentiment.lines.sentiment[0]
        current_price = self.data.close[0]

# 更新跟踪止损
        if self.entry_price:
            if self.position.size > 0:  # 多头
                self.stop_price = max(
                    self.stop_price or 0,
                    current_price*(1 - self.p.trail_percent / 100)
                )
            else:  # 空头
                self.stop_price = min(
                    self.stop_price or float('inf'),
                    current_price*(1 + self.p.trail_percent / 100)
                )

# 情绪反转出场
        if self.position.size > 0:  # 多头持仓
            if sentiment > self.p.sentiment_secondary:
                self._close_position("情绪反转")
            elif current_price <= self.stop_price:
                self._close_position("跟踪止损")
        else:  # 空头持仓
            if sentiment < self.p.sentiment_core:
                self._close_position("情绪反转")
            elif current_price >= self.stop_price:
                self._close_position("跟踪止损")

    def _enter_long(self):
        """开多仓"""

# 根据 ATR 计算仓位
        atr_value = self.atr.lines.atr[0]
        risk_amount = self.broker.getvalue()*self.p.risk_per_trade
        size = int(risk_amount / (atr_value*2))

        self.order = self.buy(size=size)
        self.entry_price = self.data.close[0]
        self.stop_price = self.entry_price*(1 - self.p.trail_percent / 100)

    def _enter_short(self):
        """开空仓"""
        self.order = self.sell(size=self.broker.getvalue()*self.p.position_size /
                              self.data.close[0])
        self.entry_price = self.data.close[0]
        self.stop_price = self.entry_price*(1 + self.p.trail_percent / 100)

    def _close_position(self, reason: str):
        """平仓"""
        self.order = self.close()
        print(f"[{self.datas[0].datetime.date(0)}] 平仓: {reason}")

    def notify_order(self, order):
        """订单状态通知"""
        if order.status in [order.Completed]:
            self.order = None

```bash

### 3. 双动量策略设计

```python
@register_strategy(StrategyMetadata(
    name="双动量轮动策略",
    description="基于绝对动量和相对动量的资产轮动策略",
    category=StrategyCategory.MOMENTUM,
    tags=["动量", "轮动", "多资产"],
))
class DualMomentumStrategy(bt.Strategy):
    """双动量策略"""

    params = (
        ('momentum_short', 12),   # 短期动量周期
        ('momentum_long', 126),   # 长期动量周期
        ('lookback', 252),        # 回望期
        ('top_n', 3),             # 选择 Top N 资产
        ('cash', 10000.0),
        ('commission', 0.001),
    )

    def __init__(self):

# 为每个数据流计算动量
        self.inds = {}
        for d in self.datas:
            self.inds[d] = {}

# 短期动量
            self.inds[d]['mom_short'] = bt.indicators.Momentum(
                d.close, period=self.p.momentum_short
            )

# 长期动量
            self.inds[d]['mom_long'] = bt.indicators.Momentum(
                d.close, period=self.p.momentum_long
            )

# 动量得分
            self.inds[d]['momentum_score'] = (
                self.inds[d]['mom_short'] /
                (abs(self.inds[d]['mom_long']) + 1e-6)
            )

# 绝对动量(正收益)
            self.inds[d]['abs_momentum'] = (
                d.close / d.close(-self.p.lookback) - 1
            )

        self.rebalance_days = 20  # 每月调仓
        self.days_since_rebalance = 0

    def next(self):
        """主逻辑"""
        self.days_since_rebalance += 1

# 调仓日
        if self.days_since_rebalance >= self.rebalance_days:
            self._rebalance()
            self.days_since_rebalance = 0

    def _rebalance(self):
        """执行调仓"""

# 获取所有资产的动量得分
        scores = []
        for d in self.datas:
            if len(d) < self.p.lookback:
                continue

            abs_mom = self.inds[d]['abs_momentum'][0]
            mom_score = self.inds[d]['momentum_score'][0]

# 绝对动量必须为正
            if abs_mom > 0:
                scores.append((d, mom_score))

# 按动量得分排序,选择 Top N
        scores.sort(key=lambda x: x[1], reverse=True)
        selected = scores[:self.p.top_n] if scores else []

# 平仓不在选择中的资产
        for d in self.datas:
            if d not in [s[0] for s in selected]:
                self.close(data=d)

# 等权配置选中的资产
        if selected:
            weight = 1.0 / len(selected)
            for d, _ in selected:
                target_value = self.broker.getvalue()*weight
                target_size = int(target_value / d.close[0])
                self.order_target_size(data=d, target=target_size)

```bash

### 4. 多账户对冲系统设计

```python
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class AccountInfo:
    """账户信息"""
    name: str
    account_type: str  # 'stock', 'futures', 'etf'
    initial_cash: float
    cash: float = 0.0
    value: float = 0.0

class MultiAccountManager:
    """多账户管理器"""

    def __init__(self):
        self.accounts: Dict[str, AccountInfo] = {}
        self.hedge_ratio: float = 0.5  # 对冲比例
        self.hedge_threshold: float = 0.1  # 对冲触发阈值

    def add_account(self, name: str, account_type: str,
                   initial_cash: float):
        """添加账户"""
        self.accounts[name] = AccountInfo(
            name=name,
            account_type=account_type,
            initial_cash=initial_cash
        )

    def get_account(self, name: str) -> AccountInfo:
        """获取账户"""
        return self.accounts.get(name)

    def calculate_hedge_ratio(self, spot_account: str,
                             futures_account: str) -> float:
        """计算对冲比例"""
        spot = self.get_account(spot_account)
        futures = self.get_account(futures_account)

        if not spot or not futures:
            return 0.0

        spot_value = spot.value
        futures_value = futures.value

# 目标对冲比例
        target_hedge = self.hedge_ratio*spot_value

# 当前对冲比例
        current_hedge = abs(futures_value)

# 计算调整比例
        if spot_value > 0:
            return target_hedge / spot_value
        return 0.0

class HedgingStrategy(bt.Strategy):
    """对冲策略基类"""

    params = (
        ('hedge_account', None),      # 对冲账户名称
        ('hedge_ratio', 0.5),         # 对冲比例
        ('rebalance_threshold', 0.1), # 再平衡阈值
    )

    def __init__(self):
        self.hedge_manager = MultiAccountManager()
        self.hedge_positions: Dict[str, float] = {}

    def next(self):
        """主逻辑"""

# 获取现货账户总价值
        total_value = self.broker.getvalue()

# 计算目标对冲仓位
        hedge_value = total_value*self.p.hedge_ratio

# 当前对冲偏离
        current_hedge = sum(abs(pos)*price
                           for pos, price in self.hedge_positions.items())

        deviation = abs(current_hedge - hedge_value) / total_value

# 超过阈值时调整
        if deviation > self.p.rebalance_threshold:
            self._rebalance_hedge(hedge_value)

    def _rebalance_hedge(self, target_value: float):
        """再平衡对冲仓位"""

# 子类实现具体逻辑
        pass

@register_strategy(StrategyMetadata(
    name="双均线对冲策略",
    description="双均线策略与期货对冲结合",
    category=StrategyCategory.HEDGING,
    tags=["对冲", "双均线"],
))
class DualMAHedgingStrategy(HedgingStrategy):
    """双均线对冲策略"""

    params = (
        ('fast_period', 5),
        ('slow_period', 20),
        ('hedge_ratio', 0.5),
        ('hedge_symbol', 'IF300'),  # 对冲期货品种
    )

    def __init__(self):
        super().__init__()

# 双均线
        self.fast_ma = bt.indicators.SMA(self.data, period=self.p.fast_period)
        self.slow_ma = bt.indicators.SMA(self.data, period=self.p.slow_period)
        self.crossover = bt.indicators.CrossOver(self.fast_ma, self.slow_ma)

    def next(self):

# 金叉开多
        if self.crossover[0] > 0:
            self._enter_hedge('short')  # 对冲空单

# 死叉平仓
        elif self.crossover[0] < 0:
            self._exit_hedge()

    def _enter_hedge(self, direction: str):
        """开对冲仓位"""
        hedge_size = self.broker.getvalue()*self.p.hedge_ratio

# 实际实现需要连接期货账户
        print(f"开对冲仓: {direction}, 数量: {hedge_size}")

    def _exit_hedge(self):
        """平对冲仓位"""
        print("平对冲仓")

```bash

### 5. 增强风险管理设计

```python
class EnhancedRiskManager:
    """增强风险管理器"""

    def __init__(self, strategy: bt.Strategy):
        self.strategy = strategy
        self.peak_value = strategy.broker.getvalue()
        self.max_drawdown = 0.15

    def check_drawdown(self) -> bool:
        """检查回撤是否超限"""
        current_value = self.strategy.broker.getvalue()

# 更新峰值
        if current_value > self.peak_value:
            self.peak_value = current_value
            return False

# 计算回撤
        drawdown = (self.peak_value - current_value) / self.peak_value

# 超过最大回撤,停止交易
        if drawdown > self.max_drawdown:
            return True

        return False

    def calculate_position_size(self, entry_price: float,
                               stop_price: float) -> int:
        """基于 ATR 的仓位计算"""
        atr = self.strategy.atr[0]
        account_value = self.strategy.broker.getvalue()
        risk_ratio = 0.02  # 单笔风险 2%

# 风险金额
        risk_amount = account_value*risk_ratio

# 止损距离
        stop_distance = abs(entry_price - stop_price)
        if stop_distance == 0:
            stop_distance = atr*2

# 计算仓位
        size = int(risk_amount / stop_distance)

        return size

class TrailingStop(bt.Indicator):
    """跟踪止损指标"""

    lines = ('stop', 'direction')
    params = (('trail_percent', 2.0),)

    plotinfo = dict(subplot=False)

    def __init__(self):
        self.direction = 1  # 1 for long, -1 for short
        self.high_since_entry = self.data.close[0]
        self.low_since_entry = self.data.close[0]

    def next(self):
        price = self.data.close[0]

# 更新极值
        self.high_since_entry = max(self.high_since_entry, price)
        self.low_since_entry = min(self.low_since_entry, price)

# 根据方向计算止损
        if self.direction == 1:  # 多头
            self.lines.stop[0] = price*(1 - self.p.trail_percent / 100)
        else:  # 空头
            self.lines.stop[0] = price*(1 + self.p.trail_percent / 100)

class VolatilityAdjustedSizer(bt.Sizer):
    """波动率调整仓位器"""

    params = (('vol_period', 20), ('target_vol', 0.15))

    def _getsizing(self, data, broker):

# 计算目标仓位
        vol = bt.indicators.StandardDeviation(
            data, period=self.p.vol_period
        )

        target_size = (self.p.target_vol / (vol[0] + 1e-6))

# 限制仓位范围
        target_size = min(max(target_size, 0.3), 1.0)

        account_value = broker.getvalue()
        return int(account_value*target_size / data.close[0])

```bash

### 6. 策略评价报告设计

```python
from typing import Dict, Any, Optional
import pandas as pd
from datetime import datetime

class StrategyReport:
    """策略评价报告生成器"""

    def __init__(self, strategy_results):
        self.results = strategy_results
        self.strat = strategy_results[0]

    def generate_metrics(self) -> Dict[str, Any]:
        """生成评价指标"""

# 获取各个 analyzer 的结果
        sharpe = self.strat.analyzers.sharpe.get_analysis()
        drawdown = self.strat.analyzers.drawdown.get_analysis()
        returns = self.strat.analyzers.returns.get_analysis()
        trades = self.strat.analyzers.trades.get_analysis()
        sqn = self.strat.analyzers.sqn.get_analysis()

        return {

# 收益指标
            'total_return': returns.get('rtot', 0),
            'annualized_return': returns.get('rnorm', 0)*100,
            'avg_monthly_return': returns.get('ravg', 0)*100,

# 风险指标
            'sharpe_ratio': sharpe.get('sharperatio', 0),
            'max_drawdown': drawdown.get('max', {}).get('drawdown', 0),
            'max_drawdown_duration': drawdown.get('max', {}).get('len', 0),
            'annual_volatility': returns.get('rnorm', 0),

# 交易指标
            'total_trades': trades.get('total', {}).get('total', 0),
            'win_rate': self._calculate_win_rate(trades),
            'profit_factor': self._calculate_profit_factor(trades),
            'avg_trade_return': self._calculate_avg_trade(trades),
            'sqn': sqn.get('sqn', 0),
        }

    def _calculate_win_rate(self, trades: Dict) -> float:
        """计算胜率"""
        total = trades.get('total', {}).get('total', 0)
        won = trades.get('won', {}).get('total', 0)
        if total == 0:
            return 0.0
        return won / total* 100

    def _calculate_profit_factor(self, trades: Dict) -> float:
        """计算盈亏比"""
        won = trades.get('won', {}).get('pnl', {}).get('total', 0)
        lost = trades.get('lost', {}).get('pnl', {}).get('total', 0)
        if lost == 0:
            return float('inf') if won > 0 else 0.0
        return abs(won / lost)

    def _calculate_avg_trade(self, trades: Dict) -> float:
        """计算平均交易收益"""
        total = trades.get('total', {}).get('total', 0)
        pnl_total = trades.get('total', {}).get('pnl', {}).get('total', 0)
        if total == 0:
            return 0.0
        return pnl_total / total

    def generate_dataframe(self) -> pd.DataFrame:
        """生成结果 DataFrame"""

# 获取净值曲线
        values = self.strat.stats.broker.value
        dates = self.strat.stats.broker.value_idx

        df = pd.DataFrame({
            'date': dates,
            'portfolio_value': values,
            'returns': pd.Series(values).pct_change()
        })

        return df

    def generate_html_report(self, output_path: str):
        """生成 HTML 报告"""
        metrics = self.generate_metrics()

        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>策略回测报告</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .metric {{ display: inline-block; margin: 10px; padding: 15px;
                          background: #f0f0f0; border-radius: 5px; min-width: 150px; }}
                .metric-value {{ font-size: 24px; font-weight: bold; }}
                .metric-label {{ color: #666; font-size: 14px; }}
                h1 {{ color: #333; }}
                h2 {{ color: #666; border-bottom: 2px solid #ddd; padding-bottom: 10px; }}
            </style>
        </head>
        <body>
            <h1>策略回测报告</h1>
            <p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>

            <h2>收益指标</h2>
            <div class="metric">
                <div class="metric-value">{metrics['total_return']:.2%}</div>
                <div class="metric-label">总收益率</div>
            </div>
            <div class="metric">
                <div class="metric-value">{metrics['annualized_return']:.2f}%</div>
                <div class="metric-label">年化收益率</div>
            </div>

            <h2>风险指标</h2>
            <div class="metric">
                <div class="metric-value">{metrics['sharpe_ratio']:.2f}</div>
                <div class="metric-label">夏普比率</div>
            </div>
            <div class="metric">
                <div class="metric-value">{metrics['max_drawdown']:.2%}</div>
                <div class="metric-label">最大回撤</div>
            </div>

            <h2>交易指标</h2>
            <div class="metric">
                <div class="metric-value">{metrics['total_trades']}</div>
                <div class="metric-label">总交易次数</div>
            </div>
            <div class="metric">
                <div class="metric-value">{metrics['win_rate']:.1f}%</div>
                <div class="metric-label">胜率</div>
            </div>
            <div class="metric">
                <div class="metric-value">{metrics['profit_factor']:.2f}</div>
                <div class="metric-label">盈亏比</div>
            </div>
            <div class="metric">
                <div class="metric-value">{metrics['sqn']:.2f}</div>
                <div class="metric-label">系统质量指数</div>
            </div>
        </body>
        </html>
        """

        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(html)

```bash

### 7. 回测引擎封装设计

```python
class BacktestEngine:
    """简化版回测引擎"""

    def __init__(self, initial_cash: float = 100000.0,
                 commission: float = 0.00025):
        self.cerebro = bt.Cerebro()
        self.cerebro.broker.setcash(initial_cash)
        self.cerebro.broker.setcommission(commission)

# 默认分析器
        self._setup_default_analyzers()

    def _setup_default_analyzers(self):
        """设置默认分析器"""
        self.cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
        self.cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
        self.cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
        self.cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
        self.cerebro.addanalyzer(bt.analyzers.SQN, _name='sqn')

    def add_data(self, data, name: str = None):
        """添加数据"""
        self.cerebro.adddata(data, name=name)

    def add_strategy(self, strategy_class, **kwargs):
        """添加策略"""
        self.cerebro.addstrategy(strategy_class, **kwargs)

    def run(self) -> 'StrategyReport':
        """运行回测"""
        print(f'初始资金: {self.cerebro.broker.getvalue():.2f}')

        results = self.cerebro.run()

        print(f'最终资金: {self.cerebro.broker.getvalue():.2f}')
        print(f'收益率: {(self.cerebro.broker.getvalue() /
                           self.cerebro.broker.startingcash - 1) * 100:.2f}%')

        return StrategyReport(results)

    def plot(self, **kwargs):
        """绘图"""
        self.cerebro.plot(**kwargs)

# 快速回测函数

def quick_backtest(strategy_name: str,
                  data_feeds: List[bt.feed.Feed],
                  strategy_params: Dict = None,
                  initial_cash: float = 100000.0) -> StrategyReport:
    """
    快速回测函数

    Args:
        strategy_name: 策略名称
        data_feeds: 数据源列表
        strategy_params: 策略参数
        initial_cash: 初始资金

    Returns:
        策略报告
    """

# 获取策略类
    strategy_class = StrategyRegistry.get(strategy_name)
    if strategy_class is None:
        raise ValueError(f"策略 {strategy_name} 未找到")

# 创建回测引擎
    engine = BacktestEngine(initial_cash=initial_cash)

# 添加数据
    for feed in data_feeds:
        engine.add_data(feed)

# 添加策略
    params = strategy_params or {}
    engine.add_strategy(strategy_class, **params)

# 运行回测
    return engine.run()

```bash

### 8. 强化学习策略框架设计

```python
import gym
from gym import spaces
import numpy as np

class TradingEnv(gym.Env):
    """交易环境"""

    metadata = {'render.modes': ['human']}

    def __init__(self, data: pd.DataFrame, initial_balance: float = 10000):
        super().__init__()

        self.data = data
        self.initial_balance = initial_balance
        self.current_step = 0

# 动作空间: 0=持有, 1=买入, 2=卖出
        self.action_space = spaces.Discrete(3)

# 状态空间: [价格特征, 技术指标, 持仓状态]
        n_features = 10
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(n_features,), dtype=np.float32
        )

# 账户状态
        self.balance = initial_balance
        self.shares = 0
        self.total_shares_bought = 0
        self.total_shares_sold = 0

    def step(self, action):
        """执行动作"""

# 获取当前价格
        price = self.data.iloc[self.current_step]['close']

# 执行动作
        if action == 1:  # 买入
            shares_to_buy = int(self.balance / price)
            self.balance -= shares_to_buy *price
            self.shares += shares_to_buy
            self.total_shares_bought += shares_to_buy
        elif action == 2:  # 卖出
            self.balance += self.shares*price
            self.shares = 0
            self.total_shares_sold += self.shares

# 移动到下一步
        self.current_step += 1
        done = self.current_step >= len(self.data) - 1

# 计算奖励
        current_price = self.data.iloc[self.current_step]['close']
        portfolio_value = self.balance + self.shares*current_price
        reward = (portfolio_value - self.initial_balance) / self.initial_balance

# 获取新状态
        obs = self._get_observation()

        return obs, reward, done, {}

    def reset(self):
        """重置环境"""
        self.current_step = 0
        self.balance = self.initial_balance
        self.shares = 0
        return self._get_observation()

    def _get_observation(self) -> np.ndarray:
        """获取观察状态"""
        row = self.data.iloc[self.current_step]

# 价格特征
        price_norm = row['close'] / self.data['close'].max()

# 技术指标
        sma_short = row['close'] / self.data['close'].rolling(20).mean().iloc[self.current_step]
        sma_long = row['close'] / self.data['close'].rolling(60).mean().iloc[self.current_step]

# 持仓状态
        position_ratio = (self.shares* row['close']) / self.initial_balance

        return np.array([
            price_norm,
            sma_short,
            sma_long,
            position_ratio,

# ...更多特征
        ], dtype=np.float32)

class RLTradingStrategy(bt.Strategy):
    """强化学习交易策略"""

    params = (
        ('model_path', None),
        ('state_window', 30),
    )

    def __init__(self):
        self.model = None
        self.state_window = self.p.state_window

# 加载模型
        if self.p.model_path:
            self._load_model()

    def _load_model(self):
        """加载训练好的模型"""

# 使用 stable-baselines3 或其他 RL 库
        from stable_baselines3 import PPO
        self.model = PPO.load(self.p.model_path)

    def _get_state(self) -> np.ndarray:
        """构建状态空间"""

# 收集最近 N 根 bar 的数据
        states = []
        for i in range(-self.state_window, 0):
            states.append([
                self.data.open[i],
                self.data.high[i],
                self.data.low[i],
                self.data.close[i],
                self.data.volume[i],
            ])

# 添加技术指标
        states[-1].extend([
            self.inds['ema_short'][0],
            self.inds['ema_long'][0],
            self.inds['rsi'][0],
            self.inds['macd'][0],
        ])

# 添加账户状态
        position = self.position.size / self.broker.getvalue() if self.position else 0
        states[-1].append(position)

        return np.array(states, dtype=np.float32).flatten()

    def next(self):
        """主逻辑"""
        if self.model is None:
            return

# 获取当前状态
        state = self._get_state()

# 模型预测
        action, _ = self.model.predict(state)

# 执行动作
        if action == 1 and not self.position:
            self.buy()
        elif action == 2 and self.position:
            self.sell()

```bash

- --

## 实施计划

### 第一阶段:策略工厂(1 周)

1. 实现 StrategyRegistry 注册中心
2. 实现 StrategyFactory 工厂类
3. 实现策略元数据管理
4. 单元测试

### 第二阶段:市场情绪策略(2 周)

1. 实现 SentimentCalculator
2. 实现 MarketSentimentIndicator
3. 实现 MarketSentimentStrategy
4. 回测验证

### 第三阶段:风险管理增强(1 周)

1. 实现 EnhancedRiskManager
2. 实现 TrailingStop 指标
3. 实现 VolatilityAdjustedSizer
4. 集成测试

### 第四阶段:多账户对冲(2 周)

1. 实现 MultiAccountManager
2. 实现 HedgingStrategy 基类
3. 实现 DualMAHedgingStrategy
4. 对冲效果评估

### 第五阶段:报告和回测引擎(1 周)

1. 实现 StrategyReport
2. 实现 BacktestEngine 封装
3. HTML 报告生成
4. 集成测试

### 第六阶段:强化学习框架(2 周)

1. 实现 TradingEnv 环境
2. 实现 RLTradingStrategy
3. FinRL 集成
4. 示例和文档

- --

## API 兼容性保证

1. **策略继承自 bt.Strategy**所有策略保持原有接口
2. **指标继承自 bt.Indicator**所有指标保持原有接口
3. **分析器使用原生接口**不改变 analyzer 使用方式
4. **新增功能独立模块**不影响现有代码

- --

## 使用示例

### 示例 1:使用策略工厂

```python
from backtrader.strategy_factory import StrategyRegistry, StrategyFactory

# 列出所有策略

strategies = StrategyRegistry.list_strategies()
print("可用策略:", strategies)

# 按分类查询

trend_strategies = StrategyRegistry.list_strategies(
    StrategyCategory.TREND_FOLLOWING
)

# 获取策略信息

factory = StrategyFactory()
info = factory.get_strategy_info("市场情绪策略")
print(info)

# 创建策略

strategy = factory.create_strategy(
    "市场情绪策略",
    sentiment_core=35.0,
    momentum_short=12
)

```bash

### 示例 2:快速回测

```python
from backtrader.backtest_engine import quick_backtest

# 加载数据

data = bt.feeds.PandasData(dataname=df)

# 一行代码回测

report = quick_backtest(
    strategy_name="市场情绪策略",
    data_feeds=[data],
    strategy_params={'sentiment_core': 40.0},
    initial_cash=100000
)

# 生成报告

metrics = report.generate_metrics()
print(f"夏普比率: {metrics['sharpe_ratio']:.2f}")
report.generate_html_report('report.html')

```bash

### 示例 3:情绪策略使用

```python
cerebro = bt.Cerebro()

# 添加数据

cerebro.adddata(data)

# 添加情绪策略

cerebro.addstrategy(
    MarketSentimentStrategy,
    sentiment_core=40.0,
    sentiment_secondary=60.0,
    momentum_short=10,
    momentum_long=60,
    trail_percent=2.0
)

# 运行

results = cerebro.run()

```bash

### 示例 4:多账户对冲

```python

# 现货账户

cerebro_spot = bt.Cerebro()
cerebro_spot.addstrategy(
    DualMAHedgingStrategy,
    fast_period=5,
    slow_period=20,
    hedge_ratio=0.5
)

# 期货账户(对冲)

cerebro_futures = bt.Cerebro()

# ... 添加期货数据

# 运行对冲策略

# ...