迭代 137 - 优化日志功能

需求背景

为了完善 backtrader 的实盘交易功能,希望能够完善日志功能,在回测或实盘交易时形成完善的日志记录,并在底层实现,不需要每个策略都去单独实现。

功能需求

希望能够自动保存以下日志文件:

  1. order.log- 记录所有的下单信息,包含完整的 order 对象信息

2.trade.log- 记录所有的交易信息,包含完整的 trade 对象信息 3.position.log- 记录所有的持仓变化信息 4.current_position.yaml - 记录当前的持仓快照信息


现有架构分析

已有的日志实现

| 文件 | 类 | 说明 |

|——|—–|——|

| backtrader/utils/log_message.py | SpdLogManager | 基于 spdlog 的日志管理器,支持日志轮转 |

| backtrader/strategy.py | BtApiStrategy | 带日志的策略子类,需要用户继承 |

| backtrader/brokers/cryptobroker.py | CryptoBroker | 独立实现日志 |

| backtrader/stores/cryptostore.py | CryptoStore | 独立实现日志 |

| backtrader/feeds/cryptofeed.py | CryptoFeed | 独立实现日志 |

现有问题

  1. 基础Strategy类没有内置日志功能

  2. 各组件日志配置分散,代码重复

  3. 没有自动记录关键事件(订单、交易、持仓等)

  4. 用户需要在每个策略中手动实现日志


方案设计

方案一:Cerebro 层面统一日志管理器

  • 核心思路*:在Cerebro中创建统一的日志管理器,所有组件共享

  • 实现要点*:


# cerebro.py

class Cerebro:
    params = (

# 新增日志相关参数
        ('log_enabled', True),
        ('log_dir', './logs'),
        ('log_level', 'INFO'),
        ('log_to_console', True),
        ('log_orders', True),
        ('log_trades', True),
        ('log_positions', True),
        ...
    )

    def __init__(self):
        self._logger_manager = None  # 延迟初始化

    @property
    def logger(self):
        """获取全局日志器"""
        if self._logger_manager is None:
            self._logger_manager = CerebroLogManager(self.p)
        return self._logger_manager

    def log_order(self, order):
        """记录订单"""
        self.logger.log_order(order)

    def log_trade(self, trade):
        """记录交易"""
        self.logger.log_trade(trade)

```bash

- *优点**
- 统一配置入口一处配置全局生效
- 所有组件可通过 `self.cerebro.logger` 访问
- 便于管理日志文件和格式
- 可在 Cerebro  run 循环中自动注入日志

- *缺点**
- 需要确保所有组件都能访问 cerebro 实例
- 对现有架构改动较大

- --

### 方案二:Strategy 基类集成日志功能

- *核心思路**在基础`Strategy`类中添加日志功能并自动记录关键事件

- *实现要点**

```python

# strategy.py

class Strategy(StrategyBase):
    params = (
        ('log_enabled', True),
        ('log_dir', './logs'),
        ('log_file', None),  # None 则使用策略类名
        ('log_level', 'INFO'),
        ('log_orders', True),
        ('log_trades', True),
        ('log_positions', True),
        ...
    )

    def __init__(self):
        super().__init__()
        if self.p.log_enabled:
            self._init_loggers()

    def _init_loggers(self):
        """初始化各类日志器"""
        log_dir = self.p.log_dir
        self._order_logger = SpdLogManager(f"{log_dir}/order.log").create_logger()
        self._trade_logger = SpdLogManager(f"{log_dir}/trade.log").create_logger()
        self._position_logger = SpdLogManager(f"{log_dir}/position.log").create_logger()

    def log(self, msg, level='info'):
        """通用日志方法"""
        timestamp = self.datetime.datetime()
        getattr(self._logger, level)(f"[{timestamp}] {msg}")

# 重写通知方法,自动记录
    def _notify_order(self, order):
        if self.p.log_orders:
            self._log_order(order)
        self.notify_order(order)

    def _notify_trade(self, trade):
        if self.p.log_trades:
            self._log_trade(trade)
        self.notify_trade(trade)

    def _log_order(self, order):
        """记录订单信息"""
        log_data = {
            'datetime': str(self.datetime.datetime()),
            'ref': order.ref,
            'type': 'Buy' if order.isbuy() else 'Sell',
            'status': order.getstatusname(),
            'size': order.size,
            'price': order.price,
            'executed_price': order.executed.price if order.executed else None,
            'executed_size': order.executed.size if order.executed else None,
            'commission': order.executed.comm if order.executed else None,
        }
        self._order_logger.info(json.dumps(log_data))

    def _log_trade(self, trade):
        """记录交易信息"""
        log_data = {
            'datetime': str(self.datetime.datetime()),
            'ref': trade.ref,
            'data': trade.data._name,
            'size': trade.size,
            'price': trade.price,
            'value': trade.value,
            'pnl': trade.pnl,
            'pnlcomm': trade.pnlcomm,
            'commission': trade.commission,
            'isclosed': trade.isclosed,
        }
        self._trade_logger.info(json.dumps(log_data))

```bash

- *优点**
- 最小改动兼容现有代码
- 策略自动获得日志能力
- 可选择性开启/关闭
- 日志格式统一便于后续分析

- *缺点**
- 仅覆盖策略层面Broker 等其他组件需单独处理

- --

### 方案三:专用日志 Observer(推荐)

- *核心思路**创建专门的 Observer 来记录所有交易事件符合 backtrader 设计哲学

- *实现要点**

```python

# observers/trade_logger.py

class TradeLogger(Observer):
    """自动记录所有交易活动的 Observer

    自动记录:

    - 订单创建、执行、取消等状态变化
    - 交易开仓、平仓、盈亏
    - 持仓变化
    - 当前持仓快照

    """

    _stclock = True  # 使用策略时钟
    lines = ()  # 不需要数据线

    params = (
        ('log_dir', './logs'),
        ('log_orders', True),
        ('log_trades', True),
        ('log_positions', True),
        ('log_position_snapshot', True),
        ('snapshot_file', 'current_position.yaml'),
        ('log_format', 'json'),  # json 或 text
        ('log_to_console', False),
    )

    def __init__(self):
        super().__init__()
        self._init_loggers()

    def _init_loggers(self):
        """初始化各类日志器"""
        import os
        os.makedirs(self.p.log_dir, exist_ok=True)

        if self.p.log_orders:
            self._order_logger = SpdLogManager(
                file_name=f"{self.p.log_dir}/order.log",
                print_info=self.p.log_to_console
            ).create_logger()

        if self.p.log_trades:
            self._trade_logger = SpdLogManager(
                file_name=f"{self.p.log_dir}/trade.log",
                print_info=self.p.log_to_console
            ).create_logger()

        if self.p.log_positions:
            self._position_logger = SpdLogManager(
                file_name=f"{self.p.log_dir}/position.log",
                print_info=self.p.log_to_console
            ).create_logger()

    def notify_order(self, order):
        """记录订单状态变化"""
        if not self.p.log_orders:
            return

        log_data = self._format_order(order)
        if self.p.log_format == 'json':
            self._order_logger.info(json.dumps(log_data, ensure_ascii=False))
        else:
            self._order_logger.info(self._format_order_text(order))

    def notify_trade(self, trade):
        """记录交易信息"""
        if not self.p.log_trades:
            return

        log_data = self._format_trade(trade)
        if self.p.log_format == 'json':
            self._trade_logger.info(json.dumps(log_data, ensure_ascii=False))
        else:
            self._trade_logger.info(self._format_trade_text(trade))

    def next(self):
        """每个 bar 记录持仓变化"""
        if self.p.log_positions:
            self._log_positions()

        if self.p.log_position_snapshot:
            self._save_position_snapshot()

    def _log_positions(self):
        """记录持仓变化"""
        for data in self._owner.datas:
            position = self._owner.getposition(data)
            if position.size != 0:
                log_data = {
                    'datetime': str(self._owner.datetime.datetime()),
                    'data': data._name,
                    'size': position.size,
                    'price': position.price,
                    'value': position.size *data.close[0],
                }
                self._position_logger.info(json.dumps(log_data, ensure_ascii=False))

    def _save_position_snapshot(self):
        """保存当前持仓快照到 YAML 文件"""
        import yaml

        snapshot = {
            'datetime': str(self._owner.datetime.datetime()),
            'positions': {}
        }

        for data in self._owner.datas:
            position = self._owner.getposition(data)
            if position.size != 0:
                snapshot['positions'][data._name] = {
                    'size': position.size,
                    'price': position.price,
                    'value': position.size*data.close[0],
                }

        snapshot_path = f"{self.p.log_dir}/{self.p.snapshot_file}"
        with open(snapshot_path, 'w', encoding='utf-8') as f:
            yaml.dump(snapshot, f, allow_unicode=True, default_flow_style=False)

    def _format_order(self, order):
        """格式化订单数据"""
        return {
            'datetime': str(self._owner.datetime.datetime()),
            'ref': order.ref,
            'type': 'Buy' if order.isbuy() else 'Sell',
            'status': order.getstatusname(),
            'size': order.size,
            'price': order.price,
            'executed_price': order.executed.price if order.executed.size else None,
            'executed_size': order.executed.size,
            'executed_value': order.executed.value,
            'commission': order.executed.comm,
            'data': order.data._name if order.data else None,
        }

    def _format_trade(self, trade):
        """格式化交易数据"""
        return {
            'datetime': str(self._owner.datetime.datetime()),
            'ref': trade.ref,
            'data': trade.data._name,
            'size': trade.size,
            'price': trade.price,
            'value': trade.value,
            'pnl': trade.pnl,
            'pnlcomm': trade.pnlcomm,
            'commission': trade.commission,
            'isclosed': trade.isclosed,
            'isopen': trade.isopen,
            'baropen': trade.baropen,
            'barclose': trade.barclose if trade.isclosed else None,
            'barlen': trade.barlen,
        }

    def stop(self):
        """策略结束时的清理工作"""

# 保存最终持仓快照
        if self.p.log_position_snapshot:
            self._save_position_snapshot()

```bash

- *使用方式**

```python
import backtrader as bt

cerebro = bt.Cerebro()

# 添加日志 Observer - 一行代码即可启用完整日志功能

cerebro.addobserver(bt.observers.TradeLogger,
                    log_dir='./logs',
                    log_orders=True,
                    log_trades=True,
                    log_positions=True)

cerebro.addstrategy(MyStrategy)
cerebro.run()

```bash

- *优点**
- **完全非侵入式**不修改现有代码
- **符合 backtrader 设计模式**Observer 本就用于监控
- **灵活配置**按需添加和配置
- **可复用**任何策略都可使用
- **易于扩展**可创建多个 Observer 记录不同内容

- *缺点**
- 需要用户显式添加 Observer但只需一行代码
- Observer 的通知机制可能需要增强以接收 order/trade 通知

- --

### 方案四:日志 Mixin + 全局日志注册中心

- *核心思路**创建 Mixin 类和全局日志注册中心提供最大灵活性

- *实现要点**

```python

# utils/logging.py

class LoggerRegistry:
    """全局日志注册中心 - 单例模式"""
    _instance = None
    _loggers = {}
    _config = {
        'log_dir': './logs',
        'log_level': 'INFO',
        'log_to_console': True,
    }

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    @classmethod
    def configure(cls, **kwargs):
        """配置全局日志参数"""
        cls._config.update(kwargs)

    @classmethod
    def get_logger(cls, name):
        """获取或创建日志器"""
        if name not in cls._loggers:
            log_file = f"{cls._config['log_dir']}/{name}.log"
            cls._loggers[name] = SpdLogManager(
                file_name=log_file,
                print_info=cls._config['log_to_console']
            ).create_logger()
        return cls._loggers[name]


class LoggingMixin:
    """可混入任何组件的日志 Mixin"""

    _log_name = None  # 子类可覆盖

    @property
    def logger(self):
        """延迟初始化日志器"""
        if not hasattr(self, '_logger') or self._logger is None:
            name = self._log_name or self.__class__.__name__
            self._logger = LoggerRegistry.get_logger(name)
        return self._logger

    def log(self, msg, level='info'):
        """记录日志"""
        getattr(self.logger, level)(msg)

    def log_info(self, msg):
        self.logger.info(msg)

    def log_warning(self, msg):
        self.logger.warning(msg)

    def log_error(self, msg):
        self.logger.error(msg)

    def log_debug(self, msg):
        self.logger.debug(msg)

```bash

- *使用方式**

```python
from backtrader.utils.logging import LoggerRegistry, LoggingMixin

# 全局配置(可选)

LoggerRegistry.configure(log_dir='./my_logs', log_to_console=True)

# 在策略中使用

class MyStrategy(bt.Strategy, LoggingMixin):
    _log_name = 'my_strategy'  # 可选,默认使用类名

    def next(self):
        self.log(f"Close: {self.data.close[0]}")
        self.log_info(f"Position: {self.position.size}")

```bash

- *优点**
- 高度灵活任何组件都可使用
- 统一的日志配置管理
- 不强制修改基类
- 可与其他方案组合使用

- *缺点**
- 需要用户主动混入和配置
- 不是完全自动化

- --

## 推荐方案

### 推荐:方案三(日志 Observer)+ 方案二(Strategy 基类增强)组合

- *理由**
1. **方案三作为主要实现**Observer  backtrader 的标准扩展方式非侵入式一行代码即可启用
2. **方案二作为补充** Strategy 基类中提供简单的`log()`方法方便用户记录自定义信息

- *实现计划**

1. **创建 `observers/trade_logger.py`**
   - 实现`TradeLogger` Observer
   - 自动记录 ordertradeposition
   - 支持 JSON 和文本两种格式
   - 支持 YAML 持仓快照

1. **增强 `Strategy` 基类**
   - 添加可选的`log()`方法
   - 添加日志相关参数
   - 保持向后兼容

1. **更新 `observers/__init__.py`**
   - 导出`TradeLogger`

1. **编写使用文档和示例**

- --

## 日志文件格式设计

### order.log 格式(JSON)

```json
{"datetime": "2024-01-15 09:30:00", "ref": 1, "type": "Buy", "status": "Submitted", "size": 100, "price": 10.5, "data": "AAPL"}
{"datetime": "2024-01-15 09:30:01", "ref": 1, "type": "Buy", "status": "Completed", "size": 100, "price": 10.5, "executed_price": 10.52, "executed_size": 100, "commission": 1.05, "data": "AAPL"}

```bash

### trade.log 格式(JSON)

```json
{"datetime": "2024-01-15 09:30:01", "ref": 1, "data": "AAPL", "size": 100, "price": 10.52, "value": 1052.0, "pnl": 0, "pnlcomm": -1.05, "commission": 1.05, "isclosed": false, "isopen": true}
{"datetime": "2024-01-15 10:00:00", "ref": 1, "data": "AAPL", "size": 0, "price": 10.80, "value": 0, "pnl": 28.0, "pnlcomm": 25.9, "commission": 2.10, "isclosed": true, "isopen": false}

```bash

### position.log 格式(JSON)

```json
{"datetime": "2024-01-15 09:30:01", "data": "AAPL", "size": 100, "price": 10.52, "value": 1052.0}
{"datetime": "2024-01-15 09:35:00", "data": "AAPL", "size": 100, "price": 10.52, "value": 1055.0}

```bash

### current_position.yaml 格式

```yaml
datetime: '2024-01-15 15:00:00'
positions:
  AAPL:
    size: 100
    price: 10.52
    value: 1080.0
  GOOGL:
    size: 50
    price: 150.25
    value: 7600.0

### indicator.log 格式(JSON)

```bash
{"datetime": "2024-01-15 09:30:01", "sma_20": 10.45, "rsi_14": 65.2, "macd": 0.15, "macd_signal": 0.12}
{"datetime": "2024-01-15 09:35:00", "sma_20": 10.48, "rsi_14": 68.5, "macd": 0.18, "macd_signal": 0.14}

### signal.log 格式(JSON)

```json
{"datetime": "2024-01-15 09:30:01", "action": "buy", "size": 100, "price": 10.52, "reason": "sma_crossover"}
{"datetime": "2024-01-15 10:00:00", "action": "sell", "size": 100, "price": 10.80, "reason": "take_profit"}

- --

## 确认的需求

1. **日志轮转**使用 SpdLogManager 已有的日期轮转功能
2. **MySQL 支持**支持写入 MySQL但默认参数不写入需要显式启用
3. **position.log**每个 bar 都记录
4. **indicator.log**自动记录策略中所有指标值
5. **signal.log**自动记录策略中的买卖信号
6. **自动化**指标和信号的记录都是自动的不需要每个策略单独实现

- --

## 最终实现方案

采用 **方案三日志 Observer+ 方案二Strategy 基类增强** 组合实现

### 日志文件清单

| 日志文件 | 说明 | 记录频率 |

|---------|------|---------|

| order.log | 订单状态变化 | 事件触发 |

| trade.log | 交易开仓/平仓 | 事件触发 |

| position.log | 持仓信息 | 每个 bar |

| indicator.log | 指标值 | 每个 bar |

| signal.log | 买卖信号 | 事件触发 |

| current_position.yaml | 当前持仓快照 | 每个 bar |

### MySQL 配置参数

```bash
params = (

# MySQL 配置 - 默认不启用
    ('mysql_enabled', False),
    ('mysql_host', 'localhost'),
    ('mysql_port', 3306),
    ('mysql_user', 'root'),
    ('mysql_password', ''),
    ('mysql_database', 'backtrader'),
)

### 自动指标收集机制

通过遍历策略的 `_indicators` 属性自动收集所有指标值无需用户手动配置

### 自动信号收集机制

通过监控策略的买卖操作自动记录信号包括

- 买入信号时机和价格
- 卖出信号时机和价格
- 信号来源如指标交叉等